提交 d52efbf1 编写于 作者: 如梦技术's avatar 如梦技术 🐛

添加 mica-mqtt-broker,待完善。

上级 6aadba08
...@@ -16,9 +16,11 @@ ...@@ -16,9 +16,11 @@
package net.dreamlu.iot.mqtt.broker.cluster; package net.dreamlu.iot.mqtt.broker.cluster;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer; import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.mica.core.utils.JsonUtil; import net.dreamlu.mica.core.utils.JsonUtil;
import net.dreamlu.mica.core.utils.StringUtil; import net.dreamlu.mica.core.utils.StringUtil;
import net.dreamlu.mica.redis.cache.MicaRedisCache; import net.dreamlu.mica.redis.cache.MicaRedisCache;
...@@ -40,6 +42,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe ...@@ -40,6 +42,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe
private final RedisTemplate<String, Object> redisTemplate; private final RedisTemplate<String, Object> redisTemplate;
private final String channel; private final String channel;
private final MqttServer mqttServer; private final MqttServer mqttServer;
private final IMqttSessionManager sessionManager;
public RedisMqttMessageReceiver(MicaRedisCache redisCache, public RedisMqttMessageReceiver(MicaRedisCache redisCache,
String channel, String channel,
...@@ -47,6 +50,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe ...@@ -47,6 +50,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe
this.redisTemplate = redisCache.getRedisTemplate(); this.redisTemplate = redisCache.getRedisTemplate();
this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null."); this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null.");
this.mqttServer = mqttServer; this.mqttServer = mqttServer;
this.sessionManager = mqttServer.getServerCreator().getSessionManager();
} }
@Override @Override
...@@ -57,14 +61,25 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe ...@@ -57,14 +61,25 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe
if (mqttMessage == null) { if (mqttMessage == null) {
return; return;
} }
String clientId = mqttMessage.getClientId(); messageProcessing(mqttMessage);
String topic = mqttMessage.getTopic(); }
MqttQoS mqttQoS = MqttQoS.valueOf(mqttMessage.getQos());
boolean retain = mqttMessage.isRetain(); public void messageProcessing(Message message) {
MqttMessageType messageType = MqttMessageType.valueOf(message.getMessageType());
if (MqttMessageType.PUBLISH == messageType) {
String clientId = message.getClientId();
String topic = message.getTopic();
MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos());
boolean retain = message.isRetain();
if (StringUtil.isBlank(clientId)) { if (StringUtil.isBlank(clientId)) {
mqttServer.publishAll(topic, ByteBuffer.wrap(mqttMessage.getPayload()), mqttQoS, retain); mqttServer.publishAll(topic, ByteBuffer.wrap(message.getPayload()), mqttQoS, retain);
} else { } else {
mqttServer.publish(clientId, topic, ByteBuffer.wrap(mqttMessage.getPayload()), mqttQoS, retain); mqttServer.publish(clientId, topic, ByteBuffer.wrap(message.getPayload()), mqttQoS, retain);
}
} else if (MqttMessageType.SUBSCRIBE == messageType) {
sessionManager.addSubscribe(message.getTopic(), message.getClientId(), message.getQos());
} else if (MqttMessageType.UNSUBSCRIBE == messageType) {
sessionManager.removeSubscribe(message.getTopic(), message.getClientId());
} }
} }
......
...@@ -61,7 +61,7 @@ public class RedisMqttMessageStore implements IMqttMessageStore { ...@@ -61,7 +61,7 @@ public class RedisMqttMessageStore implements IMqttMessageStore {
} }
@Override @Override
public Message getRetainMessage(String topic) { public Message getRetainMessage(String topicFilter) {
return null; return redisCache.get(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topicFilter));
} }
} }
...@@ -16,9 +16,11 @@ ...@@ -16,9 +16,11 @@
package net.dreamlu.iot.mqtt.core.server.dispatcher; package net.dreamlu.iot.mqtt.core.server.dispatcher;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer; import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.tio.core.ChannelContext; import org.tio.core.ChannelContext;
import org.tio.core.Tio; import org.tio.core.Tio;
import org.tio.server.ServerTioConfig; import org.tio.server.ServerTioConfig;
...@@ -33,9 +35,11 @@ import java.util.Objects; ...@@ -33,9 +35,11 @@ import java.util.Objects;
*/ */
public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispatcher { public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispatcher {
protected MqttServer mqttServer; protected MqttServer mqttServer;
protected IMqttSessionManager sessionManager;
public void config(MqttServer mqttServer) { public void config(MqttServer mqttServer) {
this.mqttServer = mqttServer; this.mqttServer = mqttServer;
this.sessionManager = mqttServer.getServerCreator().getSessionManager();
} }
/** /**
...@@ -59,9 +63,16 @@ public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispa ...@@ -59,9 +63,16 @@ public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispa
public boolean send(Message message) { public boolean send(Message message) {
Objects.requireNonNull(mqttServer, "MqttServer require not Null."); Objects.requireNonNull(mqttServer, "MqttServer require not Null.");
// 1. 先发送到本服务 // 1. 先发送到本服务
MqttMessageType messageType = MqttMessageType.valueOf(message.getMessageType());
if (MqttMessageType.PUBLISH == messageType) {
ByteBuffer payload = ByteBuffer.wrap(message.getPayload()); ByteBuffer payload = ByteBuffer.wrap(message.getPayload());
MqttQoS qoS = MqttQoS.valueOf(message.getQos()); MqttQoS qoS = MqttQoS.valueOf(message.getQos());
mqttServer.publishAll(message.getTopic(), payload, qoS); mqttServer.publishAll(message.getTopic(), payload, qoS, message.isRetain());
} else if (MqttMessageType.SUBSCRIBE == messageType) {
sessionManager.addSubscribe(message.getTopic(), message.getClientId(), message.getQos());
} else if (MqttMessageType.UNSUBSCRIBE == messageType) {
sessionManager.removeSubscribe(message.getTopic(), message.getClientId());
}
return sendAll(message); return sendAll(message);
} }
......
...@@ -18,7 +18,6 @@ package net.dreamlu.iot.mqtt.core.server.http.api; ...@@ -18,7 +18,6 @@ package net.dreamlu.iot.mqtt.core.server.http.api;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import net.dreamlu.iot.mqtt.codec.MqttMessageType; import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.server.http.api.form.BaseForm; import net.dreamlu.iot.mqtt.core.server.http.api.form.BaseForm;
...@@ -27,7 +26,6 @@ import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm; ...@@ -27,7 +26,6 @@ import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm;
import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; import net.dreamlu.iot.mqtt.core.server.http.api.result.Result;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.util.PayloadEncode; import net.dreamlu.iot.mqtt.core.util.PayloadEncode;
import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse; import org.tio.http.common.HttpResponse;
...@@ -45,12 +43,9 @@ import java.util.function.Function; ...@@ -45,12 +43,9 @@ import java.util.function.Function;
*/ */
public class MqttHttpApi { public class MqttHttpApi {
private final IMqttMessageDispatcher messageDispatcher; private final IMqttMessageDispatcher messageDispatcher;
private final IMqttSessionManager sessionManager;
public MqttHttpApi(IMqttMessageDispatcher messageDispatcher, public MqttHttpApi(IMqttMessageDispatcher messageDispatcher) {
IMqttSessionManager sessionManager) {
this.messageDispatcher = messageDispatcher; this.messageDispatcher = messageDispatcher;
this.sessionManager = sessionManager;
} }
/** /**
...@@ -74,7 +69,7 @@ public class MqttHttpApi { ...@@ -74,7 +69,7 @@ public class MqttHttpApi {
if (validResponse != null) { if (validResponse != null) {
return validResponse; return validResponse;
} }
send(form); sendPublish(form);
return Result.ok(response); return Result.ok(response);
} }
...@@ -105,12 +100,12 @@ public class MqttHttpApi { ...@@ -105,12 +100,12 @@ public class MqttHttpApi {
} }
// 批量发送 // 批量发送
for (PublishForm form : formList) { for (PublishForm form : formList) {
send(form); sendPublish(form);
} }
return Result.ok(response); return Result.ok(response);
} }
private void send(PublishForm form) { private void sendPublish(PublishForm form) {
String payload = form.getPayload(); String payload = form.getPayload();
Message message = new Message(); Message message = new Message();
message.setMessageType(MqttMessageType.PUBLISH.value()); message.setMessageType(MqttMessageType.PUBLISH.value());
...@@ -151,7 +146,7 @@ public class MqttHttpApi { ...@@ -151,7 +146,7 @@ public class MqttHttpApi {
return Result.fail(response, ResultCode.E101); return Result.fail(response, ResultCode.E101);
} }
// 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(qos)); sendSubscribe(form);
return Result.ok(response); return Result.ok(response);
} }
...@@ -187,7 +182,7 @@ public class MqttHttpApi { ...@@ -187,7 +182,7 @@ public class MqttHttpApi {
// 批量处理 // 批量处理
for (SubscribeForm form : formList) { for (SubscribeForm form : formList) {
// 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(form.getQos())); sendSubscribe(form);
} }
return Result.ok(response); return Result.ok(response);
} }
...@@ -214,7 +209,7 @@ public class MqttHttpApi { ...@@ -214,7 +209,7 @@ public class MqttHttpApi {
return validResponse; return validResponse;
} }
// 接口手动取消的订阅关系,可用来调试,不建议其他场景使用 // 接口手动取消的订阅关系,可用来调试,不建议其他场景使用
sessionManager.removeSubscribe(form.getTopic(), form.getClientId()); sendSubscribe(form);
return Result.ok(response); return Result.ok(response);
} }
...@@ -246,11 +241,24 @@ public class MqttHttpApi { ...@@ -246,11 +241,24 @@ public class MqttHttpApi {
// 批量处理 // 批量处理
for (BaseForm form : formList) { for (BaseForm form : formList) {
// 接口手动添加的订阅关系,可用来调试,不建议其他场景使用 // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
sessionManager.removeSubscribe(form.getTopic(), form.getClientId()); sendSubscribe(form);
} }
return Result.ok(response); return Result.ok(response);
} }
private void sendSubscribe(BaseForm form) {
Message message = new Message();
message.setClientId(form.getClientId());
message.setTopic(form.getTopic());
if (form instanceof SubscribeForm) {
message.setQos(((SubscribeForm) form).getQos());
message.setMessageType(MqttMessageType.SUBSCRIBE.value());
} else {
message.setMessageType(MqttMessageType.UNSUBSCRIBE.value());
}
messageDispatcher.send(message);
}
/** /**
* 读取表单 * 读取表单
* *
......
...@@ -317,7 +317,7 @@ public class MqttWebServer { ...@@ -317,7 +317,7 @@ public class MqttWebServer {
System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50"); System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50");
} }
// 2.2 http 路由配置 // 2.2 http 路由配置
MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager()); MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher());
httpApi.register(); httpApi.register();
// 2.3 认证配置 // 2.3 认证配置
String username = serverCreator.getHttpBasicUsername(); String username = serverCreator.getHttpBasicUsername();
......
...@@ -37,7 +37,7 @@ public interface IMqttSessionManager { ...@@ -37,7 +37,7 @@ public interface IMqttSessionManager {
* @param clientId 客户端 Id * @param clientId 客户端 Id
* @param mqttQoS MqttQoS * @param mqttQoS MqttQoS
*/ */
void addSubscribe(String topicFilter, String clientId, MqttQoS mqttQoS); void addSubscribe(String topicFilter, String clientId, int mqttQoS);
/** /**
* 删除订阅 * 删除订阅
......
...@@ -51,12 +51,12 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { ...@@ -51,12 +51,12 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
private final ConcurrentMap<String, Map<Integer, MqttPendingQos2Publish>> pendingQos2PublishStore = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Map<Integer, MqttPendingQos2Publish>> pendingQos2PublishStore = new ConcurrentHashMap<>();
@Override @Override
public void addSubscribe(String topicFilter, String clientId, MqttQoS mqttQoS) { public void addSubscribe(String topicFilter, String clientId, int mqttQoS) {
Map<String, Integer> data = subscribeStore.computeIfAbsent(topicFilter, (key) -> new ConcurrentHashMap<>(16)); Map<String, Integer> data = subscribeStore.computeIfAbsent(topicFilter, (key) -> new ConcurrentHashMap<>(16));
// 如果不存在或者老的订阅 qos 比较小也重新设置 // 如果不存在或者老的订阅 qos 比较小也重新设置
Integer existingQos = data.get(clientId); Integer existingQos = data.get(clientId);
if (existingQos == null || existingQos < mqttQoS.value()) { if (existingQos == null || existingQos < mqttQoS) {
data.put(clientId, mqttQoS.value()); data.put(clientId, mqttQoS);
} }
} }
......
...@@ -268,7 +268,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { ...@@ -268,7 +268,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
MqttQoS mqttQoS = subscription.qualityOfService(); MqttQoS mqttQoS = subscription.qualityOfService();
mqttQosList.add(mqttQoS); mqttQosList.add(mqttQoS);
topicList.add(topicName); topicList.add(topicName);
sessionManager.addSubscribe(topicName, clientId, mqttQoS); sessionManager.addSubscribe(topicName, clientId, mqttQoS.value());
} }
logger.info("Subscribe - clientId:{} TopicFilters:{} mqttQoS:{} messageId:{}", clientId, topicList, mqttQosList, messageId); logger.info("Subscribe - clientId:{} TopicFilters:{} mqttQoS:{} messageId:{}", clientId, topicList, mqttQosList, messageId);
// 3. 返回 ack // 3. 返回 ack
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册