Skip to content

Commit

Permalink
✨ 添加私服版客户端全局订阅功能。
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xunhuan committed Feb 18, 2024
1 parent f3f3440 commit 8f7d7e3
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.iot.mqtt.client;

import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.tio.utils.buffer.ByteBufferUtil;

/**
* 客户端全局订阅测试
*
* @author L.cm
*/
public class MqttClientGlobalTest {

public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
// 全局订阅的 topic
.globalSubscribe("/test", "/test/123")
// 全局监听,也会监听到服务端 http api 订阅的数据
.globalMessageListener((context, topic, message, payload) -> {
System.out.println("topic:\t" + topic);
System.out.println("payload:\t" + ByteBufferUtil.toString(payload));
})
// .debug()
.connectSync();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

Expand All @@ -39,17 +40,19 @@
*/
public class DefaultMqttClientProcessor implements IMqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
private final int reSubscribeBatchSize;
private final MqttClientCreator mqttClientCreator;
private final IMqttClientSession clientSession;
private final IMqttClientConnectListener connectListener;
private final IMqttClientGlobalMessageListener globalMessageListener;
private final IMqttClientMessageIdGenerator messageIdGenerator;
private final TimerTaskService taskService;
private final ExecutorService executor;

public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) {
this.reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize();
this.mqttClientCreator = mqttClientCreator;
this.clientSession = mqttClientCreator.getClientSession();
this.connectListener = mqttClientCreator.getConnectListener();
this.globalMessageListener = mqttClientCreator.getGlobalMessageListener();
this.messageIdGenerator = mqttClientCreator.getMessageIdGenerator();
this.taskService = mqttClientCreator.getTaskService();
this.executor = mqttClientCreator.getMqttExecutor();
Expand Down Expand Up @@ -118,13 +121,20 @@ private void publishConnectEvent(ChannelContext context) {
* @param context ChannelContext
*/
private void reSendSubscription(ChannelContext context) {
// 0. 全局订阅
Set<MqttTopicSubscription> globalSubscribe = mqttClientCreator.getGlobalSubscribe();
if (globalSubscribe != null && !globalSubscribe.isEmpty()) {
globalReSendSubscription(context, globalSubscribe);
}
List<MqttClientSubscription> reSubscriptionList = clientSession.getAndCleanSubscription();
// 1. 判断是否为空
if (reSubscriptionList.isEmpty()) {
return;
}
// 2. 订阅的数量
int subscribedSize = reSubscriptionList.size();
// 重新订阅批次大小
int reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize();
if (subscribedSize <= reSubscribeBatchSize) {
reSendSubscription(context, reSubscriptionList);
} else {
Expand All @@ -135,6 +145,22 @@ private void reSendSubscription(ChannelContext context) {
}
}

/**
* 全局订阅,不需要存储 session
*
* @param context ChannelContext
* @param globalReSubscriptionList globalReSubscriptionList
*/
private void globalReSendSubscription(ChannelContext context, Set<MqttTopicSubscription> globalReSubscriptionList) {
int messageId = messageIdGenerator.getId();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscriptions(globalReSubscriptionList)
.messageId(messageId)
.build();
boolean result = Tio.send(context, message);
logger.info("MQTT globalReSubscriptionList:{} messageId:{} resubscribing result:{}", globalReSubscriptionList, messageId, result);
}

/**
* 批量重新订阅
*
Expand Down Expand Up @@ -328,11 +354,26 @@ public void processPubComp(MqttMessage message) {
* @param message MqttPublishMessage
*/
private void invokeListenerForPublish(ChannelContext context, String topicName, MqttPublishMessage message) {
final byte[] payload = message.payload();
// 全局消息监听器
if (globalMessageListener != null) {
executor.submit(() -> {
try {
globalMessageListener.onMessage(context, topicName, message, payload);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
});
}
// topic 订阅监听
List<MqttClientSubscription> subscriptionList = clientSession.getMatchedSubscription(topicName);
if (subscriptionList.isEmpty()) {
logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", topicName);
if (globalMessageListener == null || mqttClientCreator.isDebug()) {
logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", topicName);
} else {
logger.debug("Mqtt message to accept topic:{} subscriptionList is empty.", topicName);
}
} else {
final byte[] payload = message.payload();
subscriptionList.forEach(subscription -> {
IMqttClientMessageListener listener = subscription.getListener();
executor.submit(() -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.iot.mqtt.core.client;

import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import org.tio.core.ChannelContext;

/**
* mqtt 全局消息处理
*
* @author L.cm
*/
public interface IMqttClientGlobalMessageListener {

/**
* 监听到消息
*
* @param context ChannelContext
* @param topic topic
* @param message MqttPublishMessage
* @param payload payload
*/
void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package net.dreamlu.iot.mqtt.core.client;

import net.dreamlu.iot.mqtt.codec.MqttConnectReasonCode;
import net.dreamlu.iot.mqtt.codec.MqttConstant;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import net.dreamlu.iot.mqtt.codec.*;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.TioClientConfig;
Expand All @@ -36,8 +33,10 @@
import org.tio.utils.timer.TimerTaskService;

import java.io.InputStream;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* mqtt 客户端构造器
Expand Down Expand Up @@ -145,6 +144,14 @@ public final class MqttClientCreator {
* 连接监听器
*/
private IMqttClientConnectListener connectListener;
/**
* 全局订阅
*/
private Set<MqttTopicSubscription> globalSubscribe;
/**
* 全局消息监听器
*/
private IMqttClientGlobalMessageListener globalMessageListener;
/**
* 客户端 session
*/
Expand Down Expand Up @@ -274,6 +281,14 @@ public IMqttClientConnectListener getConnectListener() {
return connectListener;
}

public Set<MqttTopicSubscription> getGlobalSubscribe() {
return globalSubscribe;
}

public IMqttClientGlobalMessageListener getGlobalMessageListener() {
return globalMessageListener;
}

public IMqttClientSession getClientSession() {
return clientSession;
}
Expand Down Expand Up @@ -447,6 +462,34 @@ public MqttClientCreator connectListener(IMqttClientConnectListener connectListe
return this;
}

public MqttClientCreator globalSubscribe(String... topics) {
Objects.requireNonNull(topics, "globalSubscribe topics is null.");
List<MqttTopicSubscription> subscriptionList = Arrays.stream(topics)
.map(MqttTopicSubscription::new)
.collect(Collectors.toList());
return globalSubscribe(subscriptionList);
}

public MqttClientCreator globalSubscribe(MqttTopicSubscription... topics) {
Objects.requireNonNull(topics, "globalSubscribe topics is null.");
return globalSubscribe(Arrays.asList(topics));
}

public MqttClientCreator globalSubscribe(List<MqttTopicSubscription> topicList) {
Objects.requireNonNull(topicList, "globalSubscribe topicList is null.");
if (this.globalSubscribe == null) {
this.globalSubscribe = new HashSet<>(topicList);
} else {
this.globalSubscribe.addAll(topicList);
}
return this;
}

public MqttClientCreator globalMessageListener(IMqttClientGlobalMessageListener globalMessageListener) {
this.globalMessageListener = globalMessageListener;
return this;
}

public MqttClientCreator clientSession(IMqttClientSession clientSession) {
this.clientSession = clientSession;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public final class MqttTopicSubscription {
private final String topicFilter;
private final MqttSubscriptionOption option;

public MqttTopicSubscription(String topicFilter) {
this(topicFilter, MqttQoS.AT_MOST_ONCE);
}

public MqttTopicSubscription(String topicFilter, MqttQoS qualityOfService) {
this.topicFilter = topicFilter;
this.option = MqttSubscriptionOption.onlyFromQos(qualityOfService);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,23 @@ public class MqttClientTemplate implements SmartInitializingSingleton, Disposabl
public static final String DEFAULT_CLIENT_TEMPLATE_BEAN = "mqttClientTemplate";
private final MqttClientCreator clientCreator;
private final ObjectProvider<IMqttClientConnectListener> clientConnectListenerObjectProvider;
private final ObjectProvider<IMqttClientGlobalMessageListener> globalMessageListenerObjectProvider;
private final ObjectProvider<MqttClientCustomizer> customizersObjectProvider;
private final List<MqttClientSubscription> tempSubscriptionList;
private MqttClient client;

public MqttClientTemplate(MqttClientCreator clientCreator,
ObjectProvider<IMqttClientConnectListener> clientConnectListenerObjectProvider) {
this(clientCreator, clientConnectListenerObjectProvider, null);
this(clientCreator, clientConnectListenerObjectProvider, null, null);
}

public MqttClientTemplate(MqttClientCreator clientCreator,
ObjectProvider<IMqttClientConnectListener> clientConnectListenerObjectProvider,
ObjectProvider<IMqttClientGlobalMessageListener> globalMessageListenerObjectProvider,
ObjectProvider<MqttClientCustomizer> customizersObjectProvider) {
this.clientCreator = clientCreator;
this.clientConnectListenerObjectProvider = clientConnectListenerObjectProvider;
this.globalMessageListenerObjectProvider = globalMessageListenerObjectProvider;
this.customizersObjectProvider = customizersObjectProvider;
this.tempSubscriptionList = new ArrayList<>();
}
Expand Down Expand Up @@ -286,6 +289,10 @@ void addSubscriptionList(String[] topicFilters, MqttQoS qos, IMqttClientMessageL
public void afterSingletonsInstantiated() {
// 配置客户端连接监听器
clientConnectListenerObjectProvider.ifAvailable(clientCreator::connectListener);
// 全局监听器
if (globalMessageListenerObjectProvider != null) {
globalMessageListenerObjectProvider.ifAvailable(clientCreator::globalMessageListener);
}
// 自定义处理
if (customizersObjectProvider != null) {
customizersObjectProvider.ifAvailable(customizer -> customizer.customize(clientCreator));
Expand Down
Loading

0 comments on commit 8f7d7e3

Please sign in to comment.