diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java index 42e7ae79c2fcefb8e4a23bdd575876c0a40684fa..0126abec734e1cc7c9281f2cf0d3e47c17fef09a 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java @@ -16,9 +16,11 @@ package net.dreamlu.iot.mqtt.broker.cluster; +import net.dreamlu.iot.mqtt.codec.MqttMessageType; import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.server.MqttServer; import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import net.dreamlu.mica.core.utils.JsonUtil; import net.dreamlu.mica.core.utils.StringUtil; import net.dreamlu.mica.redis.cache.MicaRedisCache; @@ -40,6 +42,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe private final RedisTemplate redisTemplate; private final String channel; private final MqttServer mqttServer; + private final IMqttSessionManager sessionManager; public RedisMqttMessageReceiver(MicaRedisCache redisCache, String channel, @@ -47,6 +50,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe this.redisTemplate = redisCache.getRedisTemplate(); this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null."); this.mqttServer = mqttServer; + this.sessionManager = mqttServer.getServerCreator().getSessionManager(); } @Override @@ -57,14 +61,25 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe if (mqttMessage == null) { return; } - String clientId = mqttMessage.getClientId(); - String topic = mqttMessage.getTopic(); - MqttQoS mqttQoS = MqttQoS.valueOf(mqttMessage.getQos()); - boolean retain = mqttMessage.isRetain(); - if (StringUtil.isBlank(clientId)) { - mqttServer.publishAll(topic, ByteBuffer.wrap(mqttMessage.getPayload()), mqttQoS, retain); - } else { - mqttServer.publish(clientId, topic, ByteBuffer.wrap(mqttMessage.getPayload()), mqttQoS, retain); + messageProcessing(mqttMessage); + } + + public void messageProcessing(Message message) { + MqttMessageType messageType = MqttMessageType.valueOf(message.getMessageType()); + if (MqttMessageType.PUBLISH == messageType) { + String clientId = message.getClientId(); + String topic = message.getTopic(); + MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos()); + boolean retain = message.isRetain(); + if (StringUtil.isBlank(clientId)) { + mqttServer.publishAll(topic, ByteBuffer.wrap(message.getPayload()), mqttQoS, retain); + } else { + mqttServer.publish(clientId, topic, ByteBuffer.wrap(message.getPayload()), mqttQoS, retain); + } + } else if (MqttMessageType.SUBSCRIBE == messageType) { + sessionManager.addSubscribe(message.getTopic(), message.getClientId(), message.getQos()); + } else if (MqttMessageType.UNSUBSCRIBE == messageType) { + sessionManager.removeSubscribe(message.getTopic(), message.getClientId()); } } diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java index 276be30527e232ea957604fa293f19fa6875a22c..1bb577ec4d32b9b4a7e739f0c7f820e5a25bf8de 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java @@ -61,7 +61,7 @@ public class RedisMqttMessageStore implements IMqttMessageStore { } @Override - public Message getRetainMessage(String topic) { - return null; + public Message getRetainMessage(String topicFilter) { + return redisCache.get(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topicFilter)); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java index ec286b26a987a9fc172485bffa6931e4c43b71c6..ef07ae3eecca830ce1eda586e278028a5866e51c 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java @@ -16,9 +16,11 @@ package net.dreamlu.iot.mqtt.core.server.dispatcher; +import net.dreamlu.iot.mqtt.codec.MqttMessageType; import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.server.MqttServer; import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import org.tio.core.ChannelContext; import org.tio.core.Tio; import org.tio.server.ServerTioConfig; @@ -33,9 +35,11 @@ import java.util.Objects; */ public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispatcher { protected MqttServer mqttServer; + protected IMqttSessionManager sessionManager; public void config(MqttServer mqttServer) { this.mqttServer = mqttServer; + this.sessionManager = mqttServer.getServerCreator().getSessionManager(); } /** @@ -59,9 +63,16 @@ public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispa public boolean send(Message message) { Objects.requireNonNull(mqttServer, "MqttServer require not Null."); // 1. 先发送到本服务 - ByteBuffer payload = ByteBuffer.wrap(message.getPayload()); - MqttQoS qoS = MqttQoS.valueOf(message.getQos()); - mqttServer.publishAll(message.getTopic(), payload, qoS); + MqttMessageType messageType = MqttMessageType.valueOf(message.getMessageType()); + if (MqttMessageType.PUBLISH == messageType) { + ByteBuffer payload = ByteBuffer.wrap(message.getPayload()); + MqttQoS qoS = MqttQoS.valueOf(message.getQos()); + mqttServer.publishAll(message.getTopic(), payload, qoS, message.isRetain()); + } else if (MqttMessageType.SUBSCRIBE == messageType) { + sessionManager.addSubscribe(message.getTopic(), message.getClientId(), message.getQos()); + } else if (MqttMessageType.UNSUBSCRIBE == messageType) { + sessionManager.removeSubscribe(message.getTopic(), message.getClientId()); + } return sendAll(message); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java index ed89d2c347a7f70bd5a7c4e4833041b95b8c9a8a..b25089990f32200e98a7c9d78255111d6743f59a 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java @@ -18,7 +18,6 @@ package net.dreamlu.iot.mqtt.core.server.http.api; import com.alibaba.fastjson.JSON; import net.dreamlu.iot.mqtt.codec.MqttMessageType; -import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; import net.dreamlu.iot.mqtt.core.server.http.api.form.BaseForm; @@ -27,7 +26,6 @@ import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm; import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; import net.dreamlu.iot.mqtt.core.server.model.Message; -import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import net.dreamlu.iot.mqtt.core.util.PayloadEncode; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; @@ -45,12 +43,9 @@ import java.util.function.Function; */ public class MqttHttpApi { private final IMqttMessageDispatcher messageDispatcher; - private final IMqttSessionManager sessionManager; - public MqttHttpApi(IMqttMessageDispatcher messageDispatcher, - IMqttSessionManager sessionManager) { + public MqttHttpApi(IMqttMessageDispatcher messageDispatcher) { this.messageDispatcher = messageDispatcher; - this.sessionManager = sessionManager; } /** @@ -74,7 +69,7 @@ public class MqttHttpApi { if (validResponse != null) { return validResponse; } - send(form); + sendPublish(form); return Result.ok(response); } @@ -105,12 +100,12 @@ public class MqttHttpApi { } // 批量发送 for (PublishForm form : formList) { - send(form); + sendPublish(form); } return Result.ok(response); } - private void send(PublishForm form) { + private void sendPublish(PublishForm form) { String payload = form.getPayload(); Message message = new Message(); message.setMessageType(MqttMessageType.PUBLISH.value()); @@ -151,7 +146,7 @@ public class MqttHttpApi { return Result.fail(response, ResultCode.E101); } // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 - sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(qos)); + sendSubscribe(form); return Result.ok(response); } @@ -187,7 +182,7 @@ public class MqttHttpApi { // 批量处理 for (SubscribeForm form : formList) { // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 - sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(form.getQos())); + sendSubscribe(form); } return Result.ok(response); } @@ -214,7 +209,7 @@ public class MqttHttpApi { return validResponse; } // 接口手动取消的订阅关系,可用来调试,不建议其他场景使用 - sessionManager.removeSubscribe(form.getTopic(), form.getClientId()); + sendSubscribe(form); return Result.ok(response); } @@ -246,11 +241,24 @@ public class MqttHttpApi { // 批量处理 for (BaseForm form : formList) { // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 - sessionManager.removeSubscribe(form.getTopic(), form.getClientId()); + sendSubscribe(form); } return Result.ok(response); } + private void sendSubscribe(BaseForm form) { + Message message = new Message(); + message.setClientId(form.getClientId()); + message.setTopic(form.getTopic()); + if (form instanceof SubscribeForm) { + message.setQos(((SubscribeForm) form).getQos()); + message.setMessageType(MqttMessageType.SUBSCRIBE.value()); + } else { + message.setMessageType(MqttMessageType.UNSUBSCRIBE.value()); + } + messageDispatcher.send(message); + } + /** * 读取表单 * diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java index c9e78efc29da660da724623e7505a6204a72ab7e..8f0d3b3ecb46dbc40bd2e0d57c46431c4d8399bb 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java @@ -317,7 +317,7 @@ public class MqttWebServer { System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50"); } // 2.2 http 路由配置 - MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager()); + MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher()); httpApi.register(); // 2.3 认证配置 String username = serverCreator.getHttpBasicUsername(); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java index cbfd08389f487948d5304a8d256558c62a245288..53d2b4fc1a3ec58935e1850fd480d748f478f67c 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java @@ -37,7 +37,7 @@ public interface IMqttSessionManager { * @param clientId 客户端 Id * @param mqttQoS MqttQoS */ - void addSubscribe(String topicFilter, String clientId, MqttQoS mqttQoS); + void addSubscribe(String topicFilter, String clientId, int mqttQoS); /** * 删除订阅 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java index bc75f8376ff939b407ef8959bd61ad8440bbd723..8ede758a9b8df30107a33dc9b38f3a07a20b5db9 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java @@ -51,12 +51,12 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { private final ConcurrentMap> pendingQos2PublishStore = new ConcurrentHashMap<>(); @Override - public void addSubscribe(String topicFilter, String clientId, MqttQoS mqttQoS) { + public void addSubscribe(String topicFilter, String clientId, int mqttQoS) { Map data = subscribeStore.computeIfAbsent(topicFilter, (key) -> new ConcurrentHashMap<>(16)); // 如果不存在或者老的订阅 qos 比较小也重新设置 Integer existingQos = data.get(clientId); - if (existingQos == null || existingQos < mqttQoS.value()) { - data.put(clientId, mqttQoS.value()); + if (existingQos == null || existingQos < mqttQoS) { + data.put(clientId, mqttQoS); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java index 8cd843b47b7c3a08628139b9f6339c3ea21eff23..2214cdd8cc6ef00782eac8ee014bda265b8660f5 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java @@ -268,7 +268,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { MqttQoS mqttQoS = subscription.qualityOfService(); mqttQosList.add(mqttQoS); topicList.add(topicName); - sessionManager.addSubscribe(topicName, clientId, mqttQoS); + sessionManager.addSubscribe(topicName, clientId, mqttQoS.value()); } logger.info("Subscribe - clientId:{} TopicFilters:{} mqttQoS:{} messageId:{}", clientId, topicList, mqttQosList, messageId); // 3. 返回 ack