From d52efbf112b2c9867a091a39afbb0a6d079157d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A6=82=E6=A2=A6=E6=8A=80=E6=9C=AF?= <596392912@qq.com> Date: Wed, 8 Sep 2021 17:33:33 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=B7=BB=E5=8A=A0=20mica-mqtt-bro?= =?UTF-8?q?ker=EF=BC=8C=E5=BE=85=E5=AE=8C=E5=96=84=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cluster/RedisMqttMessageReceiver.java | 31 ++++++++++++----- .../broker/cluster/RedisMqttMessageStore.java | 4 +-- .../AbstractMqttMessageDispatcher.java | 17 ++++++++-- .../core/server/http/api/MqttHttpApi.java | 34 ++++++++++++------- .../core/server/http/core/MqttWebServer.java | 2 +- .../server/session/IMqttSessionManager.java | 2 +- .../session/InMemoryMqttSessionManager.java | 6 ++-- .../support/DefaultMqttServerProcessor.java | 2 +- 8 files changed, 66 insertions(+), 32 deletions(-) diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java index 42e7ae7..0126abe 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java @@ -16,9 +16,11 @@ package net.dreamlu.iot.mqtt.broker.cluster; +import net.dreamlu.iot.mqtt.codec.MqttMessageType; import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.server.MqttServer; import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import net.dreamlu.mica.core.utils.JsonUtil; import net.dreamlu.mica.core.utils.StringUtil; import net.dreamlu.mica.redis.cache.MicaRedisCache; @@ -40,6 +42,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe private final RedisTemplate redisTemplate; private final String channel; private final MqttServer mqttServer; + private final IMqttSessionManager sessionManager; public RedisMqttMessageReceiver(MicaRedisCache redisCache, String channel, @@ -47,6 +50,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe this.redisTemplate = redisCache.getRedisTemplate(); this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null."); this.mqttServer = mqttServer; + this.sessionManager = mqttServer.getServerCreator().getSessionManager(); } @Override @@ -57,14 +61,25 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe if (mqttMessage == null) { return; } - String clientId = mqttMessage.getClientId(); - String topic = mqttMessage.getTopic(); - MqttQoS mqttQoS = MqttQoS.valueOf(mqttMessage.getQos()); - boolean retain = mqttMessage.isRetain(); - if (StringUtil.isBlank(clientId)) { - mqttServer.publishAll(topic, ByteBuffer.wrap(mqttMessage.getPayload()), mqttQoS, retain); - } else { - mqttServer.publish(clientId, topic, ByteBuffer.wrap(mqttMessage.getPayload()), mqttQoS, retain); + messageProcessing(mqttMessage); + } + + public void messageProcessing(Message message) { + MqttMessageType messageType = MqttMessageType.valueOf(message.getMessageType()); + if (MqttMessageType.PUBLISH == messageType) { + String clientId = message.getClientId(); + String topic = message.getTopic(); + MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos()); + boolean retain = message.isRetain(); + if (StringUtil.isBlank(clientId)) { + mqttServer.publishAll(topic, ByteBuffer.wrap(message.getPayload()), mqttQoS, retain); + } else { + mqttServer.publish(clientId, topic, ByteBuffer.wrap(message.getPayload()), mqttQoS, retain); + } + } else if (MqttMessageType.SUBSCRIBE == messageType) { + sessionManager.addSubscribe(message.getTopic(), message.getClientId(), message.getQos()); + } else if (MqttMessageType.UNSUBSCRIBE == messageType) { + sessionManager.removeSubscribe(message.getTopic(), message.getClientId()); } } diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java index 276be30..1bb577e 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java @@ -61,7 +61,7 @@ public class RedisMqttMessageStore implements IMqttMessageStore { } @Override - public Message getRetainMessage(String topic) { - return null; + public Message getRetainMessage(String topicFilter) { + return redisCache.get(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topicFilter)); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java index ec286b2..ef07ae3 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java @@ -16,9 +16,11 @@ package net.dreamlu.iot.mqtt.core.server.dispatcher; +import net.dreamlu.iot.mqtt.codec.MqttMessageType; import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.server.MqttServer; import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import org.tio.core.ChannelContext; import org.tio.core.Tio; import org.tio.server.ServerTioConfig; @@ -33,9 +35,11 @@ import java.util.Objects; */ public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispatcher { protected MqttServer mqttServer; + protected IMqttSessionManager sessionManager; public void config(MqttServer mqttServer) { this.mqttServer = mqttServer; + this.sessionManager = mqttServer.getServerCreator().getSessionManager(); } /** @@ -59,9 +63,16 @@ public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispa public boolean send(Message message) { Objects.requireNonNull(mqttServer, "MqttServer require not Null."); // 1. 先发送到本服务 - ByteBuffer payload = ByteBuffer.wrap(message.getPayload()); - MqttQoS qoS = MqttQoS.valueOf(message.getQos()); - mqttServer.publishAll(message.getTopic(), payload, qoS); + MqttMessageType messageType = MqttMessageType.valueOf(message.getMessageType()); + if (MqttMessageType.PUBLISH == messageType) { + ByteBuffer payload = ByteBuffer.wrap(message.getPayload()); + MqttQoS qoS = MqttQoS.valueOf(message.getQos()); + mqttServer.publishAll(message.getTopic(), payload, qoS, message.isRetain()); + } else if (MqttMessageType.SUBSCRIBE == messageType) { + sessionManager.addSubscribe(message.getTopic(), message.getClientId(), message.getQos()); + } else if (MqttMessageType.UNSUBSCRIBE == messageType) { + sessionManager.removeSubscribe(message.getTopic(), message.getClientId()); + } return sendAll(message); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java index ed89d2c..b250899 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java @@ -18,7 +18,6 @@ package net.dreamlu.iot.mqtt.core.server.http.api; import com.alibaba.fastjson.JSON; import net.dreamlu.iot.mqtt.codec.MqttMessageType; -import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; import net.dreamlu.iot.mqtt.core.server.http.api.form.BaseForm; @@ -27,7 +26,6 @@ import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm; import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; import net.dreamlu.iot.mqtt.core.server.model.Message; -import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import net.dreamlu.iot.mqtt.core.util.PayloadEncode; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; @@ -45,12 +43,9 @@ import java.util.function.Function; */ public class MqttHttpApi { private final IMqttMessageDispatcher messageDispatcher; - private final IMqttSessionManager sessionManager; - public MqttHttpApi(IMqttMessageDispatcher messageDispatcher, - IMqttSessionManager sessionManager) { + public MqttHttpApi(IMqttMessageDispatcher messageDispatcher) { this.messageDispatcher = messageDispatcher; - this.sessionManager = sessionManager; } /** @@ -74,7 +69,7 @@ public class MqttHttpApi { if (validResponse != null) { return validResponse; } - send(form); + sendPublish(form); return Result.ok(response); } @@ -105,12 +100,12 @@ public class MqttHttpApi { } // 批量发送 for (PublishForm form : formList) { - send(form); + sendPublish(form); } return Result.ok(response); } - private void send(PublishForm form) { + private void sendPublish(PublishForm form) { String payload = form.getPayload(); Message message = new Message(); message.setMessageType(MqttMessageType.PUBLISH.value()); @@ -151,7 +146,7 @@ public class MqttHttpApi { return Result.fail(response, ResultCode.E101); } // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 - sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(qos)); + sendSubscribe(form); return Result.ok(response); } @@ -187,7 +182,7 @@ public class MqttHttpApi { // 批量处理 for (SubscribeForm form : formList) { // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 - sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(form.getQos())); + sendSubscribe(form); } return Result.ok(response); } @@ -214,7 +209,7 @@ public class MqttHttpApi { return validResponse; } // 接口手动取消的订阅关系,可用来调试,不建议其他场景使用 - sessionManager.removeSubscribe(form.getTopic(), form.getClientId()); + sendSubscribe(form); return Result.ok(response); } @@ -246,11 +241,24 @@ public class MqttHttpApi { // 批量处理 for (BaseForm form : formList) { // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 - sessionManager.removeSubscribe(form.getTopic(), form.getClientId()); + sendSubscribe(form); } return Result.ok(response); } + private void sendSubscribe(BaseForm form) { + Message message = new Message(); + message.setClientId(form.getClientId()); + message.setTopic(form.getTopic()); + if (form instanceof SubscribeForm) { + message.setQos(((SubscribeForm) form).getQos()); + message.setMessageType(MqttMessageType.SUBSCRIBE.value()); + } else { + message.setMessageType(MqttMessageType.UNSUBSCRIBE.value()); + } + messageDispatcher.send(message); + } + /** * 读取表单 * diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java index c9e78ef..8f0d3b3 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java @@ -317,7 +317,7 @@ public class MqttWebServer { System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50"); } // 2.2 http 路由配置 - MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager()); + MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher()); httpApi.register(); // 2.3 认证配置 String username = serverCreator.getHttpBasicUsername(); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java index cbfd083..53d2b4f 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java @@ -37,7 +37,7 @@ public interface IMqttSessionManager { * @param clientId 客户端 Id * @param mqttQoS MqttQoS */ - void addSubscribe(String topicFilter, String clientId, MqttQoS mqttQoS); + void addSubscribe(String topicFilter, String clientId, int mqttQoS); /** * 删除订阅 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java index bc75f83..8ede758 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java @@ -51,12 +51,12 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { private final ConcurrentMap> pendingQos2PublishStore = new ConcurrentHashMap<>(); @Override - public void addSubscribe(String topicFilter, String clientId, MqttQoS mqttQoS) { + public void addSubscribe(String topicFilter, String clientId, int mqttQoS) { Map data = subscribeStore.computeIfAbsent(topicFilter, (key) -> new ConcurrentHashMap<>(16)); // 如果不存在或者老的订阅 qos 比较小也重新设置 Integer existingQos = data.get(clientId); - if (existingQos == null || existingQos < mqttQoS.value()) { - data.put(clientId, mqttQoS.value()); + if (existingQos == null || existingQos < mqttQoS) { + data.put(clientId, mqttQoS); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java index 8cd843b..2214cdd 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java @@ -268,7 +268,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { MqttQoS mqttQoS = subscription.qualityOfService(); mqttQosList.add(mqttQoS); topicList.add(topicName); - sessionManager.addSubscribe(topicName, clientId, mqttQoS); + sessionManager.addSubscribe(topicName, clientId, mqttQoS.value()); } logger.info("Subscribe - clientId:{} TopicFilters:{} mqttQoS:{} messageId:{}", clientId, topicList, mqttQosList, messageId); // 3. 返回 ack -- GitLab