diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java index a22256d04a5f8f94db7a49f565bfa053ac088879..578d443ae28111d806691b3b6e252278b4f13816 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java @@ -20,6 +20,7 @@ import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders; import net.dreamlu.iot.mqtt.codec.MqttPublishMessage; import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish; +import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer; import net.dreamlu.iot.mqtt.core.server.model.Subscribe; import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import org.slf4j.Logger; @@ -29,6 +30,7 @@ import org.tio.core.Tio; import org.tio.server.ServerTioConfig; import org.tio.server.TioServer; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -41,14 +43,18 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; public final class MqttServer { private static final Logger logger = LoggerFactory.getLogger(MqttServer.class); private final TioServer tioServer; + private final MqttWebServer webServer; + private final MqttServerCreator serverCreator; private final IMqttSessionManager sessionManager; private final ScheduledThreadPoolExecutor executor; - private TioServer tioWsServer; - public MqttServer(TioServer tioServer, + MqttServer(TioServer tioServer, + MqttWebServer webServer, MqttServerCreator serverCreator, ScheduledThreadPoolExecutor executor) { this.tioServer = tioServer; + this.webServer = webServer; + this.serverCreator = serverCreator; this.sessionManager = serverCreator.getSessionManager(); this.executor = executor; } @@ -238,20 +244,29 @@ public final class MqttServer { return true; } - /** - * 绑定 websocket 服务 - * - * @param tioWsServer TioServer - */ - public void setTioWsServer(TioServer tioWsServer) { - this.tioWsServer = tioWsServer; + public MqttServer start() { + // 1. 启动 mqtt tcp + try { + tioServer.start(this.serverCreator.getIp(), this.serverCreator.getPort()); + } catch (IOException e) { + throw new IllegalStateException("Mica mqtt tcp server start fail.", e); + } + // 2. 启动 mqtt web + if (webServer != null) { + try { + webServer.start(); + } catch (IOException e) { + throw new IllegalStateException("Mica mqtt http/websocket server start fail.", e); + } + } + return this; } public boolean stop() { boolean result = this.tioServer.stop(); logger.info("Mqtt tcp server stop result:{}", result); - if (tioWsServer != null) { - result &= tioWsServer.stop(); + if (webServer != null) { + result &= webServer.stop(); logger.info("Mqtt websocket server stop result:{}", result); } try { diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java index 29181075a6d404686db74f5adfc3fc7d8ff2a376..28bfbaa46564005ad249fbfc1264faaf48650cf1 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java @@ -76,7 +76,7 @@ public class MqttServerAioListener extends DefaultAioListener { // 6. 解绑 clientId Tio.unbindBsId(context); // 7. 下线事件 - notify(clientId); + notify(context, clientId); } private void sendWillMessage(ChannelContext context, String clientId) { @@ -108,9 +108,9 @@ public class MqttServerAioListener extends DefaultAioListener { } } - private void notify(String clientId) { + private void notify(ChannelContext context, String clientId) { try { - connectStatusListener.offline(clientId); + connectStatusListener.offline(context, clientId); } catch (Throwable throwable) { logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerCreator.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerCreator.java index 79d978b37158d3d994abcaf5f731aa3069ab2932..e2e23bba7b6bbcab2c85c5663aeacfe2a3585d74 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerCreator.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerCreator.java @@ -22,6 +22,7 @@ import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; +import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer; import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import net.dreamlu.iot.mqtt.core.server.session.InMemoryMqttSessionManager; import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore; @@ -37,6 +38,7 @@ import org.tio.server.ServerTioConfig; import org.tio.server.TioServer; import org.tio.server.intf.ServerAioHandler; import org.tio.server.intf.ServerAioListener; +import org.tio.utils.hutool.StrUtil; import org.tio.utils.thread.pool.DefaultThreadFactory; import org.tio.websocket.common.WsTioUuid; import org.tio.websocket.server.WsServerAioHandler; @@ -124,14 +126,26 @@ public class MqttServerCreator { * mqtt 3.1 会校验此参数 */ private int maxClientIdLength = MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH; + /** + * http、websocket 端口,默认:8083 + */ + private int webPort = 8083; /** * 开启 websocket 服务,默认:true */ private boolean websocketEnable = true; /** - * websocket 端口,默认:8083 + * 开启 http 服务,默认:true + */ + private boolean httpEnable = false; + /** + * http Basic 认证账号 + */ + private String httpBasicUsername; + /** + * http Basic 认证密码 */ - private int websocketPort = 8083; + private String httpBasicPassword; public String getName() { return name; @@ -302,6 +316,15 @@ public class MqttServerCreator { return this; } + public int getWebPort() { + return webPort; + } + + public MqttServerCreator webPort(int webPort) { + this.webPort = webPort; + return this; + } + public boolean isWebsocketEnable() { return websocketEnable; } @@ -311,16 +334,33 @@ public class MqttServerCreator { return this; } - public int getWebsocketPort() { - return websocketPort; + public boolean isHttpEnable() { + return httpEnable; } - public MqttServerCreator websocketPort(int websocketPort) { - this.websocketPort = websocketPort; + public MqttServerCreator httpEnable(boolean httpEnable) { + this.httpEnable = httpEnable; return this; } - public MqttServer start() { + public String getHttpBasicUsername() { + return httpBasicUsername; + } + + public MqttServerCreator httpBasicAuth(String username, String password) { + if (StrUtil.isBlank(username) || StrUtil.isBlank(password)) { + throw new IllegalArgumentException("Mqtt http basic auth username or password is blank."); + } + this.httpBasicUsername = username; + this.httpBasicPassword = password; + return this; + } + + public String getHttpBasicPassword() { + return httpBasicPassword; + } + + public MqttServer build() { Objects.requireNonNull(this.messageListener, "Mqtt Server message listener cannot be null."); if (this.authHandler == null) { this.authHandler = new DefaultMqttServerAuthHandler(); @@ -343,7 +383,7 @@ public class MqttServerCreator { ServerAioHandler handler = new MqttServerAioHandler(this, serverProcessor); // 2. t-io 监听 ServerAioListener listener = new MqttServerAioListener(this); - // 2. t-io 配置 + // 3. t-io 配置 ServerTioConfig tioConfig = new ServerTioConfig(this.name, handler, listener); // 4. 设置 t-io 心跳 timeout if (this.heartbeatTimeout != null) { @@ -363,37 +403,13 @@ public class MqttServerCreator { TioServer tioServer = new TioServer(tioConfig); // 6. 不校验版本号,社区版设置无效 tioServer.setCheckLastVersion(false); - MqttServer mqttServer = new MqttServer(tioServer, this, executor); - // 7. 如果是默认的消息转发器,设置 mqttServer + // 7. 配置 mqtt http/websocket server + MqttWebServer webServer = MqttWebServer.config(this, tioConfig); + MqttServer mqttServer = new MqttServer(tioServer, webServer, this, executor); + // 8. 如果是默认的消息转发器,设置 mqttServer if (this.messageDispatcher instanceof AbstractMqttMessageDispatcher) { ((AbstractMqttMessageDispatcher) this.messageDispatcher).config(mqttServer); } - // 8. 启动 mqtt tcp - try { - tioServer.start(this.ip, this.port); - } catch (IOException e) { - throw new IllegalStateException("Mica mqtt tcp server start fail.", e); - } - // 9. 启动 mqtt websocket server - if (this.websocketEnable) { - WsServerConfig wsServerConfig = new WsServerConfig(this.websocketPort, false); - IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(handler); - WsServerAioHandler wsServerAioHandler = new WsServerAioHandler(wsServerConfig, mqttWsMsgHandler); - WsServerAioListener wsServerAioListener = new WsServerAioListener(); - ServerTioConfig wsTioConfig = new ServerTioConfig(this.name + "-Websocket", wsServerAioHandler, wsServerAioListener); - wsTioConfig.setHeartbeatTimeout(0); - wsTioConfig.setTioUuid(new WsTioUuid()); - wsTioConfig.setReadBufferSize(1024 * 30); - TioServer tioWsServer = new TioServer(wsTioConfig); - mqttServer.setTioWsServer(tioWsServer); - wsTioConfig.share(tioConfig); - wsTioConfig.groupStat = tioConfig.groupStat; - try { - tioWsServer.start(this.ip, wsServerConfig.getBindPort()); - } catch (IOException e) { - throw new IllegalStateException("Mica mqtt websocket server start fail.", e); - } - } return mqttServer; } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/event/IMqttConnectStatusListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/event/IMqttConnectStatusListener.java index 83d8630d62d3f0f7d3b8ca06288000d7607f7895..fe0768f53aaee412771130f0cdb0c7b032912c43 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/event/IMqttConnectStatusListener.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/event/IMqttConnectStatusListener.java @@ -16,6 +16,8 @@ package net.dreamlu.iot.mqtt.core.server.event; +import org.tio.core.ChannelContext; + /** * mqtt 链接状态事件 * @@ -26,15 +28,17 @@ public interface IMqttConnectStatusListener { /** * 设备上线(连接成功) * + * @param context ChannelContext * @param clientId clientId */ - void online(String clientId); + void online(ChannelContext context, String clientId); /** * 设备离线 * + * @param context ChannelContext * @param clientId clientId */ - void offline(String clientId); + void offline(ChannelContext context, String clientId); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/event/IMqttMessageListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/event/IMqttMessageListener.java index 0e926a6a468e1d4abb61e456abc03fd8cd59f701..4176fa12470efb5a667056b3f9c35e6d028b164a 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/event/IMqttMessageListener.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/event/IMqttMessageListener.java @@ -17,6 +17,7 @@ package net.dreamlu.iot.mqtt.core.server.event; import net.dreamlu.iot.mqtt.codec.MqttQoS; +import org.tio.core.ChannelContext; import java.nio.ByteBuffer; @@ -31,11 +32,12 @@ public interface IMqttMessageListener { /** * 监听到消息 * + * @param context ChannelContext * @param clientId clientId * @param topic topic * @param mqttQoS MqttQoS * @param payload payload */ - void onMessage(String clientId, String topic, MqttQoS mqttQoS, ByteBuffer payload); + void onMessage(ChannelContext context, String clientId, String topic, MqttQoS mqttQoS, ByteBuffer payload); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java index e6f572541edb38adaec827319b3f98bff6b68744..4b01c9f97fd6b4366dec83e64f5d4e984f1f8c4e 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java @@ -195,7 +195,12 @@ package net.dreamlu.iot.mqtt.core.server.http.core; import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; +import net.dreamlu.iot.mqtt.core.server.http.api.MqttHttpApi; +import net.dreamlu.iot.mqtt.core.server.http.api.auth.BasicAuthFilter; import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRequestHandler; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; +import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler; +import org.tio.core.intf.AioHandler; import org.tio.http.common.HttpConfig; import org.tio.http.common.HttpUuid; import org.tio.http.common.handler.HttpRequestHandler; @@ -207,6 +212,7 @@ import org.tio.utils.thread.pool.SynThreadPoolExecutor; import org.tio.websocket.server.handler.IWsMsgHandler; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.ThreadPoolExecutor; /** @@ -265,13 +271,13 @@ public class MqttWebServer { return serverTioConfig; } + public TioServer getTioServer() { + return tioServer; + } + private void init(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { - String systemTimerPeriod = System.getProperty(TIO_SYSTEM_TIMER_PERIOD); - if (StrUtil.isBlank(systemTimerPeriod)) { - System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50"); - } - HttpConfig httpConfig = new HttpConfig(serverCreator.getWebsocketPort(), false); + HttpConfig httpConfig = new HttpConfig(serverCreator.getWebPort(), false); httpConfig.setBindIp(serverCreator.getIp()); httpConfig.setName(serverCreator.getName() + "-HTTP/Websocket"); httpConfig.setCheckHost(false); @@ -289,12 +295,50 @@ public class MqttWebServer { tioServer.start(this.httpConfig.getBindIp(), this.httpConfig.getBindPort()); } - public void stop() { - tioServer.stop(); + public boolean stop() { + return tioServer.stop(); } - public TioServer getTioServer() { - return tioServer; + /** + * 配置 web 服务 + * + * @param serverCreator MqttServerCreator + * @param mqttServerConfig ServerTioConfig + * @return MqttWebServer + */ + public static MqttWebServer config(MqttServerCreator serverCreator, ServerTioConfig mqttServerConfig) { + // 1. 判断是否开启 + boolean httpEnable = serverCreator.isHttpEnable(); + boolean websocketEnable = serverCreator.isWebsocketEnable(); + if (!httpEnable && !websocketEnable) { + return null; + } + // 2. 如果开启 mqtt http api + if (httpEnable) { + // 2.1 http 特有的配置 + String systemTimerPeriod = System.getProperty(TIO_SYSTEM_TIMER_PERIOD); + if (StrUtil.isBlank(systemTimerPeriod)) { + System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50"); + } + // 2.2 http 路由配置 + MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager()); + httpApi.register(); + // 2.3 认证配置 + String username = serverCreator.getHttpBasicUsername(); + String password = serverCreator.getHttpBasicPassword(); + if (Objects.nonNull(username) && Objects.nonNull(password)) { + MqttHttpRoutes.addFilter(new BasicAuthFilter(username, password)); + } + } + // 3. 初始化处理器 + AioHandler mqttAioHandler = mqttServerConfig.getAioHandler(); + IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(serverCreator, mqttAioHandler); + MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler); + ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig(); + // 4. tcp + websocket mqtt 共享公共配置 + httpIioConfig.share(mqttServerConfig); + httpIioConfig.groupStat = mqttServerConfig.groupStat; + return httpServerStarter; } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttConnectStatusListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttConnectStatusListener.java index 5057f46ba406957f9379cbe04a76b2ea59e8d796..1ede79ebc8d19b525dea511027d6630eb7838ea1 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttConnectStatusListener.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttConnectStatusListener.java @@ -19,6 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.support; import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.tio.core.ChannelContext; /** * 默认的链接状态监听 @@ -29,12 +30,12 @@ public class DefaultMqttConnectStatusListener implements IMqttConnectStatusListe private static final Logger logger = LoggerFactory.getLogger(DefaultMqttConnectStatusListener.class); @Override - public void online(String clientId) { + public void online(ChannelContext context, String clientId) { logger.info("Mqtt clientId:{} online.", clientId); } @Override - public void offline(String clientId) { + public void offline(ChannelContext context, String clientId) { logger.info("Mqtt clientId:{} offline.", clientId); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java index 0b96d0188cb0261af65bed4460859a4cf43734fd..e51a9c8dea9708bde227cb3867889fc5c5802d49 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java @@ -118,7 +118,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { // 8. 返回 ack connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED); // 9. 在线状态 - connectStatusListener.online(clientId); + connectStatusListener.online(context, clientId); } private void connAckByReturnCode(String clientId, ChannelContext context, MqttConnectReturnCode returnCode) { @@ -141,10 +141,10 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topicName, mqttQoS, packetId); switch (mqttQoS) { case AT_MOST_ONCE: - invokeListenerForPublish(clientId, mqttQoS, topicName, message, fixedHeader.isRetain()); + invokeListenerForPublish(context, clientId, mqttQoS, topicName, message, fixedHeader.isRetain()); break; case AT_LEAST_ONCE: - invokeListenerForPublish(clientId, mqttQoS, topicName, message, fixedHeader.isRetain()); + invokeListenerForPublish(context, clientId, mqttQoS, topicName, message, fixedHeader.isRetain()); if (packetId != -1) { MqttMessage messageAck = MqttMessageBuilders.pubAck() .packetId(packetId) @@ -215,7 +215,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { MqttFixedHeader incomingFixedHeader = incomingPublish.fixedHeader(); MqttQoS mqttQoS = incomingFixedHeader.qosLevel(); boolean retain = incomingFixedHeader.isRetain(); - invokeListenerForPublish(clientId, mqttQoS, topicName, incomingPublish, retain); + invokeListenerForPublish(context, clientId, mqttQoS, topicName, incomingPublish, retain); pendingQos2Publish.onPubRelReceived(); sessionManager.removePendingQos2Publish(clientId, messageId); } @@ -312,11 +312,8 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { * @param topicName topicName * @param message MqttPublishMessage */ - private void invokeListenerForPublish(String clientId, - MqttQoS mqttQoS, - String topicName, - MqttPublishMessage message, - boolean isRetain) { + private void invokeListenerForPublish(ChannelContext context, String clientId, MqttQoS mqttQoS, + String topicName, MqttPublishMessage message, boolean isRetain) { ByteBuffer payload = message.payload(); // 1. retain 消息逻辑 if (isRetain) { @@ -334,7 +331,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { } } // 2. 消息发布 - messageListener.onMessage(clientId, topicName, mqttQoS, payload); + messageListener.onMessage(context, clientId, topicName, mqttQoS, payload); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java index 271855e3436526608579ecb57f97063f1dd55607..c1fb39d63d0095f0071d03cf7d2d9487c0f877a8 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java @@ -19,6 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.websocket; import net.dreamlu.iot.mqtt.codec.ByteBufferUtil; import net.dreamlu.iot.mqtt.codec.MqttMessage; import net.dreamlu.iot.mqtt.codec.WriteBuffer; +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; import org.tio.core.ChannelContext; import org.tio.core.Tio; import org.tio.core.TioConfig; @@ -42,17 +43,24 @@ public class MqttWsMsgHandler implements IWsMsgHandler { * mqtt websocket message body key */ private static final String MQTT_WS_MSG_BODY_KEY = "MQTT_WS_MSG_BODY_KEY"; + /** + * MqttServerCreator + */ + private final MqttServerCreator serverCreator; /** * websocket 握手端点 */ private final String[] supportedSubProtocols; private final AioHandler mqttServerAioHandler; - public MqttWsMsgHandler(AioHandler aioHandler) { - this(new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler); + public MqttWsMsgHandler(MqttServerCreator serverCreator, AioHandler aioHandler) { + this(serverCreator, new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler); } - public MqttWsMsgHandler(String[] supportedSubProtocols, AioHandler aioHandler) { + public MqttWsMsgHandler(MqttServerCreator serverCreator, + String[] supportedSubProtocols, + AioHandler aioHandler) { + this.serverCreator = serverCreator; this.supportedSubProtocols = supportedSubProtocols; this.mqttServerAioHandler = aioHandler; } @@ -64,7 +72,10 @@ public class MqttWsMsgHandler implements IWsMsgHandler { @Override public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) { - return httpResponse; + if (serverCreator.isWebsocketEnable()) { + return httpResponse; + } + return null; } /** diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/Server.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/Server.java index 9cc1e7d0fd7474805696aa7c193f6ef82e5a9225..f6aec7deb88e68164bcf917b9484be0118f36a04 100644 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/Server.java +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/biz/Server.java @@ -40,7 +40,7 @@ public class Server { // 1. 消息转发处理器,可用来实现集群 IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher(); // 2. 收到消息,将消息转发出去 - IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> { + IMqttMessageListener messageListener = (context, clientId, topic, mqttQoS, payload) -> { Message message = new Message(); message.setTopic(topic); message.setQos(mqttQoS.value()); @@ -55,6 +55,7 @@ public class Server { .messageDispatcher(messageDispatcher) .messageListener(messageListener) .debug() + .build() .start(); } } diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebServerConfig.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebServerConfig.java deleted file mode 100644 index 009e64d66b923669a07fa415fae42d8ceb57c0b9..0000000000000000000000000000000000000000 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebServerConfig.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.dreamlu.iot.mqtt.http; - -/** - * mqtt http、websocket 启动配置 - * - * @author L.cm - */ -public class MqttWebServerConfig { - - /** - * http、websocket 端口,默认:8083 - */ - private int port = 8083; - /** - * 开启 websocket 服务,默认:true - */ - private boolean websocketEnable = true; - /** - * 开启 http 服务,默认:true - */ - private boolean httpEnable = true; - /** - * Basic 认证账号 - */ - private String username; - /** - * Basic 认证密码 - */ - private String password; - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public boolean isWebsocketEnable() { - return websocketEnable; - } - - public void setWebsocketEnable(boolean websocketEnable) { - this.websocketEnable = websocketEnable; - } - - public boolean isHttpEnable() { - return httpEnable; - } - - public void setHttpEnable(boolean httpEnable) { - this.httpEnable = httpEnable; - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } -} diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebServerStater.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebServerStater.java deleted file mode 100644 index 86b7b8f0b60caa0c9a27354e093af4be27a0209e..0000000000000000000000000000000000000000 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebServerStater.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.dreamlu.iot.mqtt.http; - -/** - * mqtt http、websocket 启动器 - * - * @author L.cm - */ -public class MqttWebServerStater { - - public static void star() { - - } - -} diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java deleted file mode 100644 index 8e375175a1d31e4e7e83cf52499e4d879d2adfbb..0000000000000000000000000000000000000000 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.dreamlu.iot.mqtt.http; - -import net.dreamlu.iot.mqtt.core.server.MqttServer; -import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; -import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; -import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; -import net.dreamlu.iot.mqtt.core.server.http.api.MqttHttpApi; -import net.dreamlu.iot.mqtt.core.server.http.api.auth.BasicAuthFilter; -import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer; -import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; -import net.dreamlu.iot.mqtt.core.server.model.Message; -import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher; -import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler; -import org.tio.core.intf.AioHandler; -import org.tio.server.ServerTioConfig; -import org.tio.websocket.server.handler.IWsMsgHandler; - -/** - * mqtt websocket 子协议测试 - */ -public class MqttWebTest { - - public static void main(String[] args) throws Exception { - // 1. 消息转发处理器,可用来实现集群 - IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher(); - // 2. 收到消息,将消息转发出去 - IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> { - Message message = new Message(); - message.setTopic(topic); - message.setQos(mqttQoS.value()); - message.setPayload(payload.array()); - messageDispatcher.send(message); - }; - - // 3. 启动服务 - MqttServerCreator serverCreator = MqttServer.create() - .ip("0.0.0.0") - .port(1883) - .readBufferSize(512) - .messageDispatcher(messageDispatcher) - .messageListener(messageListener) - .websocketEnable(false) - .debug(); - MqttServer mqttServer = serverCreator.start(); - ServerTioConfig serverConfig = mqttServer.getServerConfig(); - AioHandler aioHandler = serverConfig.getAioHandler(); - - MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager()); - httpApi.register(); - MqttHttpRoutes.addFilter(new BasicAuthFilter("123", "123")); - - IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(aioHandler); - MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler); - ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig(); - httpIioConfig.share(serverConfig); - httpIioConfig.groupStat = serverConfig.groupStat; - // 启动http服务器 - httpServerStarter.start(); - } - -} diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/server/MqttServerTest.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/server/MqttServerTest.java index a1113d37152c165357e4c43c63d2afa8d7f102d7..ce65acc6869f5e448042b26d7f1795f62272dcfd 100644 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/server/MqttServerTest.java +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/server/MqttServerTest.java @@ -46,10 +46,11 @@ public class MqttServerTest { // .maxBytesInMessage(1024 * 100) // mqtt 3.1 协议会校验 clientId 长度。 // .maxClientIdLength(64) - .messageListener((clientId, topic, mqttQoS, payload) -> { + .messageListener((context, clientId, topic, mqttQoS, payload) -> { logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload)); }) .debug() // 开启 debug 信息日志 + .build() .start(); Timer timer = new Timer(); diff --git a/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/MqttServerTest.java b/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/MqttServerTest.java index ff9b75bcde30b8cb00d1e46752dd759e38403253..7fbb63b325d2cf6864bc6c8ecc5da623424722ae 100644 --- a/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/MqttServerTest.java +++ b/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/MqttServerTest.java @@ -45,10 +45,11 @@ public class MqttServerTest { .readBufferSize(512) // 关闭 websocket,避免和 spring boot 启动的冲突 .websocketEnable(false) - .messageListener((clientId, topic, mqttQoS, payload) -> { + .messageListener((context, clientId, topic, mqttQoS, payload) -> { logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload)); }) .debug() // 开启 debug 信息日志 + .build() .start(); Timer timer = new Timer(); diff --git a/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttServerMessageListener.java b/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttServerMessageListener.java index 77ba1a586c14fc0531c6c13a11a8e9bde87b7702..e7caf695ea2cef0c3ab2b15001946cf49bbffd0c 100644 --- a/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttServerMessageListener.java +++ b/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttServerMessageListener.java @@ -6,6 +6,7 @@ import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import org.tio.core.ChannelContext; import java.nio.ByteBuffer; @@ -17,7 +18,7 @@ public class MqttServerMessageListener implements IMqttMessageListener { private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class); @Override - public void onMessage(String clientId, String topic, MqttQoS mqttQoS, ByteBuffer byteBuffer) { + public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS mqttQoS, ByteBuffer byteBuffer) { logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(byteBuffer)); } } diff --git a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerConfiguration.java b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerConfiguration.java index e0069814ed1b08b81acf7fd1e1340c7562fe901e..d5aeb1112d659bde558acc534b2fa043ae6876aa 100644 --- a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerConfiguration.java +++ b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerConfiguration.java @@ -16,8 +16,9 @@ package net.dreamlu.iot.mqtt.spring.server; -import net.dreamlu.iot.mqtt.core.server.*; -import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher; +import net.dreamlu.iot.mqtt.core.server.IMqttServerAuthHandler; +import net.dreamlu.iot.mqtt.core.server.MqttServer; +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; @@ -28,23 +29,15 @@ import net.dreamlu.iot.mqtt.core.server.store.InMemoryMqttMessageStore; import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttConnectStatusListener; import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerAuthHandler; -import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerProcessor; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.tio.core.ssl.SslConfig; import org.tio.core.stat.IpStatListener; -import org.tio.server.ServerTioConfig; -import org.tio.server.TioServer; -import org.tio.server.intf.ServerAioHandler; -import org.tio.server.intf.ServerAioListener; import org.tio.utils.hutool.StrUtil; -import org.tio.utils.thread.pool.DefaultThreadFactory; import java.util.Objects; -import java.util.concurrent.ScheduledThreadPoolExecutor; /** * mqtt server 配置 @@ -80,11 +73,16 @@ public class MqttServerConfiguration { .maxBytesInMessage(properties.getMaxBytesInMessage()) .bufferAllocator(properties.getBufferAllocator()) .maxClientIdLength(properties.getMaxClientIdLength()) + .webPort(properties.getWebPort()) .websocketEnable(properties.isWebsocketEnable()) - .websocketPort(properties.getWebsocketPort()); + .httpEnable(properties.isHttpEnable()); if (properties.isDebug()) { serverCreator.debug(); } + MqttServerProperties.HttpBasicAuth httpBasicAuth = properties.getHttpBasicAuth(); + if (serverCreator.isHttpEnable() && httpBasicAuth.isEnable()) { + serverCreator.httpBasicAuth(httpBasicAuth.getUsername(), httpBasicAuth.getPassword()); + } MqttServerProperties.Ssl ssl = properties.getSsl(); String keyStorePath = ssl.getKeyStorePath(); String trustStorePath = ssl.getTrustStorePath(); @@ -98,22 +96,22 @@ public class MqttServerConfiguration { // 消息监听器不能为 null Objects.requireNonNull(messageListener, "Mqtt server IMqttMessageListener Bean not found."); serverCreator.messageListener(messageListener); - + // 认证处理器 IMqttServerAuthHandler authHandler = authHandlerObjectProvider.getIfAvailable(DefaultMqttServerAuthHandler::new); serverCreator.authHandler(authHandler); - + // 消息转发 IMqttMessageDispatcher messageDispatcher = messageDispatcherObjectProvider.getIfAvailable(DefaultMqttMessageDispatcher::new); serverCreator.messageDispatcher(messageDispatcher); - + // 消息存储 IMqttMessageStore messageStore = messageStoreObjectProvider.getIfAvailable(InMemoryMqttMessageStore::new); serverCreator.messageStore(messageStore); - + // session 管理 IMqttSessionManager sessionManager = sessionManagerObjectProvider.getIfAvailable(InMemoryMqttSessionManager::new); serverCreator.sessionManager(sessionManager); - + // 状态监听 IMqttConnectStatusListener connectStatusListener = connectStatusListenerObjectProvider.getIfAvailable(DefaultMqttConnectStatusListener::new); serverCreator.connectStatusListener(connectStatusListener); - + // ip 状态监听 IpStatListener ipStatListener = ipStatListenerObjectProvider.getIfAvailable(); serverCreator.ipStatListener(ipStatListener); // 自定义处理 @@ -123,48 +121,12 @@ public class MqttServerConfiguration { @Bean public MqttServer mqttServer(MqttServerCreator mqttServerCreator) { - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttServer")); - DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(mqttServerCreator, executor); - // 1. 处理消息 - ServerAioHandler handler = new MqttServerAioHandler(mqttServerCreator, serverProcessor); - // 2. t-io 监听 - ServerAioListener listener = new MqttServerAioListener(mqttServerCreator); - // 2. t-io 配置 - ServerTioConfig tioConfig = new ServerTioConfig(mqttServerCreator.getName(), handler, listener); - // 4. 设置 t-io 心跳 timeout - Long heartbeatTimeout = mqttServerCreator.getHeartbeatTimeout(); - if (heartbeatTimeout != null && heartbeatTimeout > 0) { - tioConfig.setHeartbeatTimeout(heartbeatTimeout); - } - IpStatListener ipStatListener = mqttServerCreator.getIpStatListener(); - if (ipStatListener != null) { - tioConfig.setIpStatListener(ipStatListener); - } - SslConfig sslConfig = mqttServerCreator.getSslConfig(); - if (sslConfig != null) { - tioConfig.setSslConfig(sslConfig); - } - if (mqttServerCreator.isDebug()) { - tioConfig.debug = true; - } - // 5. mqtt 消息最大长度 - tioConfig.setReadBufferSize(mqttServerCreator.getReadBufferSize()); - TioServer tioServer = new TioServer(tioConfig); - // 6. 不校验版本号,社区版设置无效 - tioServer.setCheckLastVersion(false); - MqttServer mqttServer = new MqttServer(tioServer, mqttServerCreator, executor); - IMqttMessageDispatcher messageDispatcher = mqttServerCreator.getMessageDispatcher(); - // 7. 如果是默认的消息转发器,设置 mqttServer - if (messageDispatcher instanceof AbstractMqttMessageDispatcher) { - ((AbstractMqttMessageDispatcher) messageDispatcher).config(mqttServer); - } - return mqttServer; + return mqttServerCreator.build(); } @Bean - public MqttServerLauncher mqttServerLauncher(MqttServerCreator serverCreator, - MqttServer mqttServer) { - return new MqttServerLauncher(serverCreator, mqttServer); + public MqttServerLauncher mqttServerLauncher(MqttServer mqttServer) { + return new MqttServerLauncher(mqttServer); } @Bean diff --git a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerLauncher.java b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerLauncher.java index 4829317848be4bfc9dd93e31280b2880591e6a09..b26a17c4c8c5a76378dff2c9fd4bbab9f635fafa 100644 --- a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerLauncher.java +++ b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerLauncher.java @@ -19,19 +19,8 @@ package net.dreamlu.iot.mqtt.spring.server; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.dreamlu.iot.mqtt.core.server.MqttServer; -import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; -import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler; import org.springframework.context.SmartLifecycle; import org.springframework.core.Ordered; -import org.tio.server.ServerTioConfig; -import org.tio.server.TioServer; -import org.tio.websocket.common.WsTioUuid; -import org.tio.websocket.server.WsServerAioHandler; -import org.tio.websocket.server.WsServerAioListener; -import org.tio.websocket.server.WsServerConfig; -import org.tio.websocket.server.handler.IWsMsgHandler; - -import java.io.IOException; /** * MqttServer 启动器 @@ -41,52 +30,18 @@ import java.io.IOException; @Slf4j @RequiredArgsConstructor public class MqttServerLauncher implements SmartLifecycle, Ordered { - private final MqttServerCreator serverCreator; private final MqttServer mqttServer; - private boolean running = false; + private volatile boolean running = false; @Override public void start() { - // 1. 启动 mqtt tcp server - TioServer tioServer = mqttServer.getTioServer(); - try { - int port = serverCreator.getPort(); - tioServer.start(serverCreator.getIp(), serverCreator.getPort()); - log.info("Mica mqtt tcp start successful on {}:{}", tioServer.getServerNode().getIp(), port); - running = true; - } catch (IOException e) { - throw new IllegalStateException("Mica mqtt server start fail.", e); - } - // 2. 启动 mqtt websocket server - if (serverCreator.isWebsocketEnable()) { - int websocketPort = serverCreator.getWebsocketPort(); - ServerTioConfig tioConfig = tioServer.getServerTioConfig(); - WsServerConfig wsServerConfig = new WsServerConfig(websocketPort, false); - IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(tioConfig.getServerAioHandler()); - WsServerAioHandler wsServerAioHandler = new WsServerAioHandler(wsServerConfig, mqttWsMsgHandler); - WsServerAioListener wsServerAioListener = new WsServerAioListener(); - ServerTioConfig wsTioConfig = new ServerTioConfig(tioConfig.getName() + "-Websocket", wsServerAioHandler, wsServerAioListener); - wsTioConfig.setHeartbeatTimeout(0); - wsTioConfig.setTioUuid(new WsTioUuid()); - wsTioConfig.setReadBufferSize(1024 * 30); - TioServer websocketServer = new TioServer(wsTioConfig); - mqttServer.setTioWsServer(websocketServer); - wsTioConfig.share(tioConfig); - wsTioConfig.groupStat = tioConfig.groupStat; - try { - websocketServer.start(tioServer.getServerNode().getIp(), wsServerConfig.getBindPort()); - log.info("Mica mqtt websocket start successful on {}:{}", tioServer.getServerNode().getIp(), websocketPort); - } catch (IOException e) { - throw new IllegalStateException("Mica mqtt websocket server start fail.", e); - } - } + mqttServer.start(); + running = true; } @Override public void stop() { - if (mqttServer != null) { - mqttServer.stop(); - } + mqttServer.stop(); } @Override diff --git a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerProperties.java b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerProperties.java index 2b7da568598d7974602d1efcc8fca261c925cdce..e64f639ff4b751f9cb682cd0e8f610540665bf49 100644 --- a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerProperties.java +++ b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/server/MqttServerProperties.java @@ -80,14 +80,22 @@ public class MqttServerProperties { * mqtt 3.1 会校验此参数 */ private int maxClientIdLength = MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH; + /** + * http、websocket 端口,默认:8083 + */ + private int webPort = 8083; /** * 开启 websocket 服务,默认:true */ private boolean websocketEnable = true; /** - * websocket 端口,默认:8083 + * 开启 http 服务,默认:true + */ + private boolean httpEnable = false; + /** + * http basic auth */ - private int websocketPort = 8083; + private HttpBasicAuth httpBasicAuth = new HttpBasicAuth(); @Getter @Setter @@ -106,4 +114,21 @@ public class MqttServerProperties { private String password; } + @Getter + @Setter + public static class HttpBasicAuth { + /** + * 是否启用,默认:关闭 + */ + private boolean enable = false; + /** + * http Basic 认证账号 + */ + private String username; + /** + * http Basic 认证密码 + */ + private String password; + } + }