提交 6ee3d65b 编写于 作者: 浅梦2013's avatar 浅梦2013

代码优化。

上级 f8e3b77d
......@@ -31,7 +31,6 @@ import org.tio.core.ssl.SslConfig;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Consumer;
......
......@@ -46,12 +46,11 @@ public final class MqttServer {
private final ScheduledThreadPoolExecutor executor;
MqttServer(TioServer tioServer,
IMqttSessionManager sessionManager,
IMqttServerSubscribeManager subscribeManager,
MqttServerCreator serverCreator,
ScheduledThreadPoolExecutor executor) {
this.tioServer = tioServer;
this.sessionManager = sessionManager;
this.subscribeManager = subscribeManager;
this.sessionManager = serverCreator.getSessionManager();
this.subscribeManager = serverCreator.getSubscribeManager();
this.executor = executor;
}
......
......@@ -39,12 +39,10 @@ public class MqttServerAioHandler implements ServerAioHandler {
private final ByteBufferAllocator allocator;
private final MqttServerProcessor processor;
public MqttServerAioHandler(int maxBytesInMessage,
ByteBufferAllocator bufferAllocator,
MqttServerProcessor processor) {
this.mqttDecoder = new MqttDecoder(maxBytesInMessage);
public MqttServerAioHandler(MqttServerCreator serverCreator, MqttServerProcessor processor) {
this.mqttDecoder = new MqttDecoder(serverCreator.getMaxBytesInMessage());
this.mqttEncoder = MqttEncoder.INSTANCE;
this.allocator = bufferAllocator;
this.allocator = serverCreator.getBufferAllocator();
this.processor = processor;
}
......
......@@ -39,18 +39,14 @@ public class MqttServerAioListener extends DefaultAioListener {
private final IMqttSessionManager sessionManager;
private final IMqttServerSubscribeManager subscribeManager;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener clientStatusListener;
private final IMqttConnectStatusListener connectStatusListener;
public MqttServerAioListener(IMqttMessageStore messageStore,
IMqttSessionManager sessionManager,
IMqttServerSubscribeManager subscribeManager,
IMqttMessageDispatcher messageDispatcher,
IMqttConnectStatusListener clientStatusListener) {
this.messageStore = messageStore;
this.sessionManager = sessionManager;
this.subscribeManager = subscribeManager;
this.messageDispatcher = messageDispatcher;
this.clientStatusListener = clientStatusListener;
public MqttServerAioListener(MqttServerCreator serverCreator) {
this.messageStore = serverCreator.getMessageStore();
this.sessionManager = serverCreator.getSessionManager();
this.subscribeManager = serverCreator.getSubscribeManager();
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.connectStatusListener = serverCreator.getConnectStatusListener();
}
@Override
......@@ -113,7 +109,7 @@ public class MqttServerAioListener extends DefaultAioListener {
private void notify(String clientId) {
try {
clientStatusListener.offline(clientId);
connectStatusListener.offline(clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable);
}
......
......@@ -305,13 +305,11 @@ public class MqttServerCreator {
this.connectStatusListener = new DefaultMqttConnectStatusListener();
}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttServer"));
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this.messageStore, this.sessionManager,
this.authHandler, this.subscribeManager, this.messageDispatcher, this.connectStatusListener, this.messageListener, executor);
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this, executor);
// 1. 处理消息
ServerAioHandler handler = new MqttServerAioHandler(this.maxBytesInMessage, this.bufferAllocator, serverProcessor);
ServerAioHandler handler = new MqttServerAioHandler(this, serverProcessor);
// 2. t-io 监听
ServerAioListener listener = new MqttServerAioListener(this.messageStore, this.sessionManager, this.subscribeManager,
this.messageDispatcher, this.connectStatusListener);
ServerAioListener listener = new MqttServerAioListener(this);
// 2. t-io 配置
ServerTioConfig tioConfig = new ServerTioConfig(this.name, handler, listener);
// 4. 设置 t-io 心跳 timeout
......@@ -338,7 +336,7 @@ public class MqttServerCreator {
} catch (IOException e) {
throw new IllegalStateException("Mica mqtt server start fail.", e);
}
MqttServer mqttServer = new MqttServer(tioServer, this.sessionManager, this.subscribeManager, executor);
MqttServer mqttServer = new MqttServer(tioServer, this, executor);
messageDispatcher.config(mqttServer);
return mqttServer;
}
......
......@@ -19,10 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.IMqttServerSubscribeManager;
import net.dreamlu.iot.mqtt.core.server.MqttConst;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import net.dreamlu.iot.mqtt.core.server.*;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
......@@ -53,25 +50,18 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
private final IMqttServerAuthHandler authHandler;
private final IMqttServerSubscribeManager subscribeManager;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener clientStatusListener;
private final IMqttConnectStatusListener connectStatusListener;
private final IMqttMessageListener messageListener;
private final ScheduledThreadPoolExecutor executor;
public DefaultMqttServerProcessor(IMqttMessageStore mqttMessageStore,
IMqttSessionManager sessionManager,
IMqttServerAuthHandler authHandler,
IMqttServerSubscribeManager subscribeManager,
IMqttMessageDispatcher messageDispatcher,
IMqttConnectStatusListener clientStatusListener,
IMqttMessageListener messageListener,
ScheduledThreadPoolExecutor executor) {
this.messageStore = mqttMessageStore;
this.sessionManager = sessionManager;
this.authHandler = authHandler;
this.subscribeManager = subscribeManager;
this.messageDispatcher = messageDispatcher;
this.clientStatusListener = clientStatusListener;
this.messageListener = messageListener;
public DefaultMqttServerProcessor(MqttServerCreator serverCreator, ScheduledThreadPoolExecutor executor) {
this.messageStore = serverCreator.getMessageStore();
this.sessionManager = serverCreator.getSessionManager();
this.authHandler = serverCreator.getAuthHandler();
this.subscribeManager = serverCreator.getSubscribeManager();
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.connectStatusListener = serverCreator.getConnectStatusListener();
this.messageListener = serverCreator.getMessageListener();
this.executor = executor;
}
......@@ -114,7 +104,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
// 6. 返回 ack
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
// 7. 在线状态
clientStatusListener.online(clientId);
connectStatusListener.online(clientId);
}
private void connAckByReturnCode(String clientId, ChannelContext context, MqttConnectReturnCode returnCode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册