提交 2e722326 编写于 作者: 如梦技术's avatar 如梦技术 🐛

mqtt 服务端代码优化。

上级 b6ece9e3
......@@ -234,8 +234,16 @@ public final class MqttServer {
public boolean stop() {
boolean result = this.tioServer.stop();
logger.info("MqttServer stop result:{}", result);
sessionManager.clean();
subscribeManager.clean();
try {
sessionManager.clean();
} catch (Throwable e) {
logger.error("Mqtt server stop session clean error.", e);
}
try {
subscribeManager.clean();
} catch (Throwable e) {
logger.error("Mqtt server stop subscribe clean error.", e);
}
this.executor.shutdown();
return result;
}
......
......@@ -37,15 +37,18 @@ public class MqttServerAioListener extends DefaultAioListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerAioListener.class);
private final IMqttMessageStore messageStore;
private final IMqttSessionManager sessionManager;
private final IMqttServerSubscribeManager subscribeManager;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener clientStatusListener;
public MqttServerAioListener(IMqttMessageStore messageStore,
IMqttSessionManager sessionManager,
IMqttServerSubscribeManager subscribeManager,
IMqttMessageDispatcher messageDispatcher,
IMqttConnectStatusListener clientStatusListener) {
this.messageStore = messageStore;
this.sessionManager = sessionManager;
this.subscribeManager = subscribeManager;
this.messageDispatcher = messageDispatcher;
this.clientStatusListener = clientStatusListener;
}
......@@ -68,10 +71,9 @@ public class MqttServerAioListener extends DefaultAioListener {
// 1. 对于异常断开连接,处理遗嘱消息
sendWillMessage(context, clientId);
// 2. 释放资源
sessionManager.remove(clientId);
Tio.unbindBsId(context);
cleanUp(context, clientId);
// 3. 下线事件
clientStatusListener.offline(clientId);
notify(clientId);
}
private void sendWillMessage(ChannelContext context, String clientId) {
......@@ -80,18 +82,40 @@ public class MqttServerAioListener extends DefaultAioListener {
if (normalDisconnectMark != null) {
return;
}
// 2. 获取遗嘱消息
Message willMessage = messageStore.getWillMessage(clientId);
if (willMessage == null) {
return;
}
// 3. 遗嘱消息发送
// 2. 发送遗嘱消息
try {
Message willMessage = messageStore.getWillMessage(clientId);
if (willMessage == null) {
return;
}
boolean result = messageDispatcher.send(willMessage);
logger.info("Mqtt server send willMessage result:{}.", result);
logger.info("Mqtt server clientId:{} send willMessage result:{}.", clientId, result);
// 4. 清理遗嘱消息
messageStore.clearWillMessage(clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} send willMessage error.", clientId, throwable);
}
}
private void cleanUp(ChannelContext context, String clientId) {
try {
sessionManager.remove(clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server send willMessage error.", throwable);
logger.error("Mqtt server clientId:{} session clean error.", clientId, throwable);
}
try {
subscribeManager.remove(clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} subscribe clean error.", clientId, throwable);
}
Tio.unbindBsId(context);
}
private void notify(String clientId) {
try {
clientStatusListener.offline(clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable);
}
}
}
......@@ -287,7 +287,7 @@ public class MqttServerCreator {
ServerAioHandler handler = new MqttServerAioHandler(this.bufferAllocator, serverProcessor);
// 2. t-io 监听
ServerAioListener listener = new MqttServerAioListener(
this.messageStore, this.sessionManager, this.messageDispatcher, this.connectStatusListener);
this.messageStore, this.sessionManager, this.subscribeManager, this.messageDispatcher, this.connectStatusListener);
// 2. t-io 配置
ServerTioConfig tioConfig = new ServerTioConfig(this.name, handler, listener);
// 4. 设置 t-io 心跳 timeout
......
......@@ -39,6 +39,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* mqtt broker 处理器
......@@ -93,9 +94,14 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
// 3. 绑定 clientId
Tio.bindBsId(context, clientId);
MqttConnectVariableHeader variableHeader = mqttMessage.variableHeader();
// 4. 心跳超时时间,当然这个值如果小于全局配置(默认:120s),定时检查的时间间隔还是以全局为准,只是在判断时用此值
int keepAliveSeconds = variableHeader.keepAliveTimeSeconds();
if (keepAliveSeconds > 0) {
context.setHeartbeatTimeout(TimeUnit.SECONDS.toMillis(keepAliveSeconds));
}
// TODO session 处理,先默认全部连接关闭时清除
// boolean cleanSession = variableHeader.isCleanSession();
// 4. 存储遗嘱消息
// 5. 存储遗嘱消息
boolean willFlag = variableHeader.isWillFlag();
if (willFlag) {
Message willMessage = new Message();
......@@ -105,9 +111,9 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
willMessage.setRetain(variableHeader.isWillRetain());
messageStore.addWillMessage(clientId, willMessage);
}
// 5. 返回 ack
// 6. 返回 ack
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
// 6. 在线状态
// 7. 在线状态
clientStatusListener.online(clientId);
}
......
......@@ -38,6 +38,7 @@ public class MqttBenchmark {
MqttClient client = MqttClient.create()
.username("admin")
.password("123456")
.readBufferSize(512)
.connect();
// 3. 订阅服务端消息
client.subQos0("/#", (topic, payload) -> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册