提交 ebc9fd81 编写于 作者: 浅梦2013's avatar 浅梦2013

重构,内置 http,http 和 websocket 公用端口。

上级 b2e24428
......@@ -20,6 +20,7 @@ import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.slf4j.Logger;
......@@ -29,6 +30,7 @@ import org.tio.core.Tio;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......@@ -41,14 +43,18 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
public final class MqttServer {
private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
private final TioServer tioServer;
private final MqttWebServer webServer;
private final MqttServerCreator serverCreator;
private final IMqttSessionManager sessionManager;
private final ScheduledThreadPoolExecutor executor;
private TioServer tioWsServer;
public MqttServer(TioServer tioServer,
MqttServer(TioServer tioServer,
MqttWebServer webServer,
MqttServerCreator serverCreator,
ScheduledThreadPoolExecutor executor) {
this.tioServer = tioServer;
this.webServer = webServer;
this.serverCreator = serverCreator;
this.sessionManager = serverCreator.getSessionManager();
this.executor = executor;
}
......@@ -238,20 +244,29 @@ public final class MqttServer {
return true;
}
/**
* 绑定 websocket 服务
*
* @param tioWsServer TioServer
*/
public void setTioWsServer(TioServer tioWsServer) {
this.tioWsServer = tioWsServer;
public MqttServer start() {
// 1. 启动 mqtt tcp
try {
tioServer.start(this.serverCreator.getIp(), this.serverCreator.getPort());
} catch (IOException e) {
throw new IllegalStateException("Mica mqtt tcp server start fail.", e);
}
// 2. 启动 mqtt web
if (webServer != null) {
try {
webServer.start();
} catch (IOException e) {
throw new IllegalStateException("Mica mqtt http/websocket server start fail.", e);
}
}
return this;
}
public boolean stop() {
boolean result = this.tioServer.stop();
logger.info("Mqtt tcp server stop result:{}", result);
if (tioWsServer != null) {
result &= tioWsServer.stop();
if (webServer != null) {
result &= webServer.stop();
logger.info("Mqtt websocket server stop result:{}", result);
}
try {
......
......@@ -76,7 +76,7 @@ public class MqttServerAioListener extends DefaultAioListener {
// 6. 解绑 clientId
Tio.unbindBsId(context);
// 7. 下线事件
notify(clientId);
notify(context, clientId);
}
private void sendWillMessage(ChannelContext context, String clientId) {
......@@ -108,9 +108,9 @@ public class MqttServerAioListener extends DefaultAioListener {
}
}
private void notify(String clientId) {
private void notify(ChannelContext context, String clientId) {
try {
connectStatusListener.offline(clientId);
connectStatusListener.offline(context, clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} offline notify error.", clientId, throwable);
}
......
......@@ -22,6 +22,7 @@ import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.session.InMemoryMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
......@@ -37,6 +38,7 @@ import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import org.tio.websocket.common.WsTioUuid;
import org.tio.websocket.server.WsServerAioHandler;
......@@ -124,14 +126,26 @@ public class MqttServerCreator {
* mqtt 3.1 会校验此参数
*/
private int maxClientIdLength = MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
/**
* http、websocket 端口,默认:8083
*/
private int webPort = 8083;
/**
* 开启 websocket 服务,默认:true
*/
private boolean websocketEnable = true;
/**
* websocket 端口,默认:8083
* 开启 http 服务,默认:true
*/
private boolean httpEnable = false;
/**
* http Basic 认证账号
*/
private String httpBasicUsername;
/**
* http Basic 认证密码
*/
private int websocketPort = 8083;
private String httpBasicPassword;
public String getName() {
return name;
......@@ -302,6 +316,15 @@ public class MqttServerCreator {
return this;
}
public int getWebPort() {
return webPort;
}
public MqttServerCreator webPort(int webPort) {
this.webPort = webPort;
return this;
}
public boolean isWebsocketEnable() {
return websocketEnable;
}
......@@ -311,16 +334,33 @@ public class MqttServerCreator {
return this;
}
public int getWebsocketPort() {
return websocketPort;
public boolean isHttpEnable() {
return httpEnable;
}
public MqttServerCreator websocketPort(int websocketPort) {
this.websocketPort = websocketPort;
public MqttServerCreator httpEnable(boolean httpEnable) {
this.httpEnable = httpEnable;
return this;
}
public MqttServer start() {
public String getHttpBasicUsername() {
return httpBasicUsername;
}
public MqttServerCreator httpBasicAuth(String username, String password) {
if (StrUtil.isBlank(username) || StrUtil.isBlank(password)) {
throw new IllegalArgumentException("Mqtt http basic auth username or password is blank.");
}
this.httpBasicUsername = username;
this.httpBasicPassword = password;
return this;
}
public String getHttpBasicPassword() {
return httpBasicPassword;
}
public MqttServer build() {
Objects.requireNonNull(this.messageListener, "Mqtt Server message listener cannot be null.");
if (this.authHandler == null) {
this.authHandler = new DefaultMqttServerAuthHandler();
......@@ -343,7 +383,7 @@ public class MqttServerCreator {
ServerAioHandler handler = new MqttServerAioHandler(this, serverProcessor);
// 2. t-io 监听
ServerAioListener listener = new MqttServerAioListener(this);
// 2. t-io 配置
// 3. t-io 配置
ServerTioConfig tioConfig = new ServerTioConfig(this.name, handler, listener);
// 4. 设置 t-io 心跳 timeout
if (this.heartbeatTimeout != null) {
......@@ -363,37 +403,13 @@ public class MqttServerCreator {
TioServer tioServer = new TioServer(tioConfig);
// 6. 不校验版本号,社区版设置无效
tioServer.setCheckLastVersion(false);
MqttServer mqttServer = new MqttServer(tioServer, this, executor);
// 7. 如果是默认的消息转发器,设置 mqttServer
// 7. 配置 mqtt http/websocket server
MqttWebServer webServer = MqttWebServer.config(this, tioConfig);
MqttServer mqttServer = new MqttServer(tioServer, webServer, this, executor);
// 8. 如果是默认的消息转发器,设置 mqttServer
if (this.messageDispatcher instanceof AbstractMqttMessageDispatcher) {
((AbstractMqttMessageDispatcher) this.messageDispatcher).config(mqttServer);
}
// 8. 启动 mqtt tcp
try {
tioServer.start(this.ip, this.port);
} catch (IOException e) {
throw new IllegalStateException("Mica mqtt tcp server start fail.", e);
}
// 9. 启动 mqtt websocket server
if (this.websocketEnable) {
WsServerConfig wsServerConfig = new WsServerConfig(this.websocketPort, false);
IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(handler);
WsServerAioHandler wsServerAioHandler = new WsServerAioHandler(wsServerConfig, mqttWsMsgHandler);
WsServerAioListener wsServerAioListener = new WsServerAioListener();
ServerTioConfig wsTioConfig = new ServerTioConfig(this.name + "-Websocket", wsServerAioHandler, wsServerAioListener);
wsTioConfig.setHeartbeatTimeout(0);
wsTioConfig.setTioUuid(new WsTioUuid());
wsTioConfig.setReadBufferSize(1024 * 30);
TioServer tioWsServer = new TioServer(wsTioConfig);
mqttServer.setTioWsServer(tioWsServer);
wsTioConfig.share(tioConfig);
wsTioConfig.groupStat = tioConfig.groupStat;
try {
tioWsServer.start(this.ip, wsServerConfig.getBindPort());
} catch (IOException e) {
throw new IllegalStateException("Mica mqtt websocket server start fail.", e);
}
}
return mqttServer;
}
......
......@@ -16,6 +16,8 @@
package net.dreamlu.iot.mqtt.core.server.event;
import org.tio.core.ChannelContext;
/**
* mqtt 链接状态事件
*
......@@ -26,15 +28,17 @@ public interface IMqttConnectStatusListener {
/**
* 设备上线(连接成功)
*
* @param context ChannelContext
* @param clientId clientId
*/
void online(String clientId);
void online(ChannelContext context, String clientId);
/**
* 设备离线
*
* @param context ChannelContext
* @param clientId clientId
*/
void offline(String clientId);
void offline(ChannelContext context, String clientId);
}
......@@ -17,6 +17,7 @@
package net.dreamlu.iot.mqtt.core.server.event;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
......@@ -31,11 +32,12 @@ public interface IMqttMessageListener {
/**
* 监听到消息
*
* @param context ChannelContext
* @param clientId clientId
* @param topic topic
* @param mqttQoS MqttQoS
* @param payload payload
*/
void onMessage(String clientId, String topic, MqttQoS mqttQoS, ByteBuffer payload);
void onMessage(ChannelContext context, String clientId, String topic, MqttQoS mqttQoS, ByteBuffer payload);
}
......@@ -195,7 +195,12 @@
package net.dreamlu.iot.mqtt.core.server.http.core;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.http.api.MqttHttpApi;
import net.dreamlu.iot.mqtt.core.server.http.api.auth.BasicAuthFilter;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRequestHandler;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler;
import org.tio.core.intf.AioHandler;
import org.tio.http.common.HttpConfig;
import org.tio.http.common.HttpUuid;
import org.tio.http.common.handler.HttpRequestHandler;
......@@ -207,6 +212,7 @@ import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import org.tio.websocket.server.handler.IWsMsgHandler;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
/**
......@@ -265,13 +271,13 @@ public class MqttWebServer {
return serverTioConfig;
}
public TioServer getTioServer() {
return tioServer;
}
private void init(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler,
SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) {
String systemTimerPeriod = System.getProperty(TIO_SYSTEM_TIMER_PERIOD);
if (StrUtil.isBlank(systemTimerPeriod)) {
System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50");
}
HttpConfig httpConfig = new HttpConfig(serverCreator.getWebsocketPort(), false);
HttpConfig httpConfig = new HttpConfig(serverCreator.getWebPort(), false);
httpConfig.setBindIp(serverCreator.getIp());
httpConfig.setName(serverCreator.getName() + "-HTTP/Websocket");
httpConfig.setCheckHost(false);
......@@ -289,12 +295,50 @@ public class MqttWebServer {
tioServer.start(this.httpConfig.getBindIp(), this.httpConfig.getBindPort());
}
public void stop() {
tioServer.stop();
public boolean stop() {
return tioServer.stop();
}
public TioServer getTioServer() {
return tioServer;
/**
* 配置 web 服务
*
* @param serverCreator MqttServerCreator
* @param mqttServerConfig ServerTioConfig
* @return MqttWebServer
*/
public static MqttWebServer config(MqttServerCreator serverCreator, ServerTioConfig mqttServerConfig) {
// 1. 判断是否开启
boolean httpEnable = serverCreator.isHttpEnable();
boolean websocketEnable = serverCreator.isWebsocketEnable();
if (!httpEnable && !websocketEnable) {
return null;
}
// 2. 如果开启 mqtt http api
if (httpEnable) {
// 2.1 http 特有的配置
String systemTimerPeriod = System.getProperty(TIO_SYSTEM_TIMER_PERIOD);
if (StrUtil.isBlank(systemTimerPeriod)) {
System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50");
}
// 2.2 http 路由配置
MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager());
httpApi.register();
// 2.3 认证配置
String username = serverCreator.getHttpBasicUsername();
String password = serverCreator.getHttpBasicPassword();
if (Objects.nonNull(username) && Objects.nonNull(password)) {
MqttHttpRoutes.addFilter(new BasicAuthFilter(username, password));
}
}
// 3. 初始化处理器
AioHandler mqttAioHandler = mqttServerConfig.getAioHandler();
IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(serverCreator, mqttAioHandler);
MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler);
ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig();
// 4. tcp + websocket mqtt 共享公共配置
httpIioConfig.share(mqttServerConfig);
httpIioConfig.groupStat = mqttServerConfig.groupStat;
return httpServerStarter;
}
}
......@@ -19,6 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
/**
* 默认的链接状态监听
......@@ -29,12 +30,12 @@ public class DefaultMqttConnectStatusListener implements IMqttConnectStatusListe
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttConnectStatusListener.class);
@Override
public void online(String clientId) {
public void online(ChannelContext context, String clientId) {
logger.info("Mqtt clientId:{} online.", clientId);
}
@Override
public void offline(String clientId) {
public void offline(ChannelContext context, String clientId) {
logger.info("Mqtt clientId:{} offline.", clientId);
}
}
......@@ -118,7 +118,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
// 8. 返回 ack
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
// 9. 在线状态
connectStatusListener.online(clientId);
connectStatusListener.online(context, clientId);
}
private void connAckByReturnCode(String clientId, ChannelContext context, MqttConnectReturnCode returnCode) {
......@@ -141,10 +141,10 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topicName, mqttQoS, packetId);
switch (mqttQoS) {
case AT_MOST_ONCE:
invokeListenerForPublish(clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
invokeListenerForPublish(context, clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
break;
case AT_LEAST_ONCE:
invokeListenerForPublish(clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
invokeListenerForPublish(context, clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
if (packetId != -1) {
MqttMessage messageAck = MqttMessageBuilders.pubAck()
.packetId(packetId)
......@@ -215,7 +215,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
MqttFixedHeader incomingFixedHeader = incomingPublish.fixedHeader();
MqttQoS mqttQoS = incomingFixedHeader.qosLevel();
boolean retain = incomingFixedHeader.isRetain();
invokeListenerForPublish(clientId, mqttQoS, topicName, incomingPublish, retain);
invokeListenerForPublish(context, clientId, mqttQoS, topicName, incomingPublish, retain);
pendingQos2Publish.onPubRelReceived();
sessionManager.removePendingQos2Publish(clientId, messageId);
}
......@@ -312,11 +312,8 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
* @param topicName topicName
* @param message MqttPublishMessage
*/
private void invokeListenerForPublish(String clientId,
MqttQoS mqttQoS,
String topicName,
MqttPublishMessage message,
boolean isRetain) {
private void invokeListenerForPublish(ChannelContext context, String clientId, MqttQoS mqttQoS,
String topicName, MqttPublishMessage message, boolean isRetain) {
ByteBuffer payload = message.payload();
// 1. retain 消息逻辑
if (isRetain) {
......@@ -334,7 +331,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
}
}
// 2. 消息发布
messageListener.onMessage(clientId, topicName, mqttQoS, payload);
messageListener.onMessage(context, clientId, topicName, mqttQoS, payload);
}
}
......@@ -19,6 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.websocket;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.WriteBuffer;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
......@@ -42,17 +43,24 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
* mqtt websocket message body key
*/
private static final String MQTT_WS_MSG_BODY_KEY = "MQTT_WS_MSG_BODY_KEY";
/**
* MqttServerCreator
*/
private final MqttServerCreator serverCreator;
/**
* websocket 握手端点
*/
private final String[] supportedSubProtocols;
private final AioHandler mqttServerAioHandler;
public MqttWsMsgHandler(AioHandler aioHandler) {
this(new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler);
public MqttWsMsgHandler(MqttServerCreator serverCreator, AioHandler aioHandler) {
this(serverCreator, new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler);
}
public MqttWsMsgHandler(String[] supportedSubProtocols, AioHandler aioHandler) {
public MqttWsMsgHandler(MqttServerCreator serverCreator,
String[] supportedSubProtocols,
AioHandler aioHandler) {
this.serverCreator = serverCreator;
this.supportedSubProtocols = supportedSubProtocols;
this.mqttServerAioHandler = aioHandler;
}
......@@ -64,7 +72,10 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
@Override
public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) {
return httpResponse;
if (serverCreator.isWebsocketEnable()) {
return httpResponse;
}
return null;
}
/**
......
......@@ -40,7 +40,7 @@ public class Server {
// 1. 消息转发处理器,可用来实现集群
IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher();
// 2. 收到消息,将消息转发出去
IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> {
IMqttMessageListener messageListener = (context, clientId, topic, mqttQoS, payload) -> {
Message message = new Message();
message.setTopic(topic);
message.setQos(mqttQoS.value());
......@@ -55,6 +55,7 @@ public class Server {
.messageDispatcher(messageDispatcher)
.messageListener(messageListener)
.debug()
.build()
.start();
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.http;
/**
* mqtt http、websocket 启动配置
*
* @author L.cm
*/
public class MqttWebServerConfig {
/**
* http、websocket 端口,默认:8083
*/
private int port = 8083;
/**
* 开启 websocket 服务,默认:true
*/
private boolean websocketEnable = true;
/**
* 开启 http 服务,默认:true
*/
private boolean httpEnable = true;
/**
* Basic 认证账号
*/
private String username;
/**
* Basic 认证密码
*/
private String password;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public boolean isWebsocketEnable() {
return websocketEnable;
}
public void setWebsocketEnable(boolean websocketEnable) {
this.websocketEnable = websocketEnable;
}
public boolean isHttpEnable() {
return httpEnable;
}
public void setHttpEnable(boolean httpEnable) {
this.httpEnable = httpEnable;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.http;
/**
* mqtt http、websocket 启动器
*
* @author L.cm
*/
public class MqttWebServerStater {
public static void star() {
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.http;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.http.api.MqttHttpApi;
import net.dreamlu.iot.mqtt.core.server.http.api.auth.BasicAuthFilter;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler;
import org.tio.core.intf.AioHandler;
import org.tio.server.ServerTioConfig;
import org.tio.websocket.server.handler.IWsMsgHandler;
/**
* mqtt websocket 子协议测试
*/
public class MqttWebTest {
public static void main(String[] args) throws Exception {
// 1. 消息转发处理器,可用来实现集群
IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher();
// 2. 收到消息,将消息转发出去
IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> {
Message message = new Message();
message.setTopic(topic);
message.setQos(mqttQoS.value());
message.setPayload(payload.array());
messageDispatcher.send(message);
};
// 3. 启动服务
MqttServerCreator serverCreator = MqttServer.create()
.ip("0.0.0.0")
.port(1883)
.readBufferSize(512)
.messageDispatcher(messageDispatcher)
.messageListener(messageListener)
.websocketEnable(false)
.debug();
MqttServer mqttServer = serverCreator.start();
ServerTioConfig serverConfig = mqttServer.getServerConfig();
AioHandler aioHandler = serverConfig.getAioHandler();
MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager());
httpApi.register();
MqttHttpRoutes.addFilter(new BasicAuthFilter("123", "123"));
IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(aioHandler);
MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler);
ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig();
httpIioConfig.share(serverConfig);
httpIioConfig.groupStat = serverConfig.groupStat;
// 启动http服务器
httpServerStarter.start();
}
}
......@@ -46,10 +46,11 @@ public class MqttServerTest {
// .maxBytesInMessage(1024 * 100)
// mqtt 3.1 协议会校验 clientId 长度。
// .maxClientIdLength(64)
.messageListener((clientId, topic, mqttQoS, payload) -> {
.messageListener((context, clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
.debug() // 开启 debug 信息日志
.build()
.start();
Timer timer = new Timer();
......
......@@ -45,10 +45,11 @@ public class MqttServerTest {
.readBufferSize(512)
// 关闭 websocket,避免和 spring boot 启动的冲突
.websocketEnable(false)
.messageListener((clientId, topic, mqttQoS, payload) -> {
.messageListener((context, clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
.debug() // 开启 debug 信息日志
.build()
.start();
Timer timer = new Timer();
......
......@@ -6,6 +6,7 @@ import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
......@@ -17,7 +18,7 @@ public class MqttServerMessageListener implements IMqttMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class);
@Override
public void onMessage(String clientId, String topic, MqttQoS mqttQoS, ByteBuffer byteBuffer) {
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS mqttQoS, ByteBuffer byteBuffer) {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(byteBuffer));
}
}
......@@ -16,8 +16,9 @@
package net.dreamlu.iot.mqtt.spring.server;
import net.dreamlu.iot.mqtt.core.server.*;
import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
......@@ -28,23 +29,15 @@ import net.dreamlu.iot.mqtt.core.server.store.InMemoryMqttMessageStore;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerProcessor;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ssl.SslConfig;
import org.tio.core.stat.IpStatListener;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* mqtt server 配置
......@@ -80,11 +73,16 @@ public class MqttServerConfiguration {
.maxBytesInMessage(properties.getMaxBytesInMessage())
.bufferAllocator(properties.getBufferAllocator())
.maxClientIdLength(properties.getMaxClientIdLength())
.webPort(properties.getWebPort())
.websocketEnable(properties.isWebsocketEnable())
.websocketPort(properties.getWebsocketPort());
.httpEnable(properties.isHttpEnable());
if (properties.isDebug()) {
serverCreator.debug();
}
MqttServerProperties.HttpBasicAuth httpBasicAuth = properties.getHttpBasicAuth();
if (serverCreator.isHttpEnable() && httpBasicAuth.isEnable()) {
serverCreator.httpBasicAuth(httpBasicAuth.getUsername(), httpBasicAuth.getPassword());
}
MqttServerProperties.Ssl ssl = properties.getSsl();
String keyStorePath = ssl.getKeyStorePath();
String trustStorePath = ssl.getTrustStorePath();
......@@ -98,22 +96,22 @@ public class MqttServerConfiguration {
// 消息监听器不能为 null
Objects.requireNonNull(messageListener, "Mqtt server IMqttMessageListener Bean not found.");
serverCreator.messageListener(messageListener);
// 认证处理器
IMqttServerAuthHandler authHandler = authHandlerObjectProvider.getIfAvailable(DefaultMqttServerAuthHandler::new);
serverCreator.authHandler(authHandler);
// 消息转发
IMqttMessageDispatcher messageDispatcher = messageDispatcherObjectProvider.getIfAvailable(DefaultMqttMessageDispatcher::new);
serverCreator.messageDispatcher(messageDispatcher);
// 消息存储
IMqttMessageStore messageStore = messageStoreObjectProvider.getIfAvailable(InMemoryMqttMessageStore::new);
serverCreator.messageStore(messageStore);
// session 管理
IMqttSessionManager sessionManager = sessionManagerObjectProvider.getIfAvailable(InMemoryMqttSessionManager::new);
serverCreator.sessionManager(sessionManager);
// 状态监听
IMqttConnectStatusListener connectStatusListener = connectStatusListenerObjectProvider.getIfAvailable(DefaultMqttConnectStatusListener::new);
serverCreator.connectStatusListener(connectStatusListener);
// ip 状态监听
IpStatListener ipStatListener = ipStatListenerObjectProvider.getIfAvailable();
serverCreator.ipStatListener(ipStatListener);
// 自定义处理
......@@ -123,48 +121,12 @@ public class MqttServerConfiguration {
@Bean
public MqttServer mqttServer(MqttServerCreator mqttServerCreator) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttServer"));
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(mqttServerCreator, executor);
// 1. 处理消息
ServerAioHandler handler = new MqttServerAioHandler(mqttServerCreator, serverProcessor);
// 2. t-io 监听
ServerAioListener listener = new MqttServerAioListener(mqttServerCreator);
// 2. t-io 配置
ServerTioConfig tioConfig = new ServerTioConfig(mqttServerCreator.getName(), handler, listener);
// 4. 设置 t-io 心跳 timeout
Long heartbeatTimeout = mqttServerCreator.getHeartbeatTimeout();
if (heartbeatTimeout != null && heartbeatTimeout > 0) {
tioConfig.setHeartbeatTimeout(heartbeatTimeout);
}
IpStatListener ipStatListener = mqttServerCreator.getIpStatListener();
if (ipStatListener != null) {
tioConfig.setIpStatListener(ipStatListener);
}
SslConfig sslConfig = mqttServerCreator.getSslConfig();
if (sslConfig != null) {
tioConfig.setSslConfig(sslConfig);
}
if (mqttServerCreator.isDebug()) {
tioConfig.debug = true;
}
// 5. mqtt 消息最大长度
tioConfig.setReadBufferSize(mqttServerCreator.getReadBufferSize());
TioServer tioServer = new TioServer(tioConfig);
// 6. 不校验版本号,社区版设置无效
tioServer.setCheckLastVersion(false);
MqttServer mqttServer = new MqttServer(tioServer, mqttServerCreator, executor);
IMqttMessageDispatcher messageDispatcher = mqttServerCreator.getMessageDispatcher();
// 7. 如果是默认的消息转发器,设置 mqttServer
if (messageDispatcher instanceof AbstractMqttMessageDispatcher) {
((AbstractMqttMessageDispatcher) messageDispatcher).config(mqttServer);
}
return mqttServer;
return mqttServerCreator.build();
}
@Bean
public MqttServerLauncher mqttServerLauncher(MqttServerCreator serverCreator,
MqttServer mqttServer) {
return new MqttServerLauncher(serverCreator, mqttServer);
public MqttServerLauncher mqttServerLauncher(MqttServer mqttServer) {
return new MqttServerLauncher(mqttServer);
}
@Bean
......
......@@ -19,19 +19,8 @@ package net.dreamlu.iot.mqtt.spring.server;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.Ordered;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import org.tio.websocket.common.WsTioUuid;
import org.tio.websocket.server.WsServerAioHandler;
import org.tio.websocket.server.WsServerAioListener;
import org.tio.websocket.server.WsServerConfig;
import org.tio.websocket.server.handler.IWsMsgHandler;
import java.io.IOException;
/**
* MqttServer 启动器
......@@ -41,52 +30,18 @@ import java.io.IOException;
@Slf4j
@RequiredArgsConstructor
public class MqttServerLauncher implements SmartLifecycle, Ordered {
private final MqttServerCreator serverCreator;
private final MqttServer mqttServer;
private boolean running = false;
private volatile boolean running = false;
@Override
public void start() {
// 1. 启动 mqtt tcp server
TioServer tioServer = mqttServer.getTioServer();
try {
int port = serverCreator.getPort();
tioServer.start(serverCreator.getIp(), serverCreator.getPort());
log.info("Mica mqtt tcp start successful on {}:{}", tioServer.getServerNode().getIp(), port);
running = true;
} catch (IOException e) {
throw new IllegalStateException("Mica mqtt server start fail.", e);
}
// 2. 启动 mqtt websocket server
if (serverCreator.isWebsocketEnable()) {
int websocketPort = serverCreator.getWebsocketPort();
ServerTioConfig tioConfig = tioServer.getServerTioConfig();
WsServerConfig wsServerConfig = new WsServerConfig(websocketPort, false);
IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(tioConfig.getServerAioHandler());
WsServerAioHandler wsServerAioHandler = new WsServerAioHandler(wsServerConfig, mqttWsMsgHandler);
WsServerAioListener wsServerAioListener = new WsServerAioListener();
ServerTioConfig wsTioConfig = new ServerTioConfig(tioConfig.getName() + "-Websocket", wsServerAioHandler, wsServerAioListener);
wsTioConfig.setHeartbeatTimeout(0);
wsTioConfig.setTioUuid(new WsTioUuid());
wsTioConfig.setReadBufferSize(1024 * 30);
TioServer websocketServer = new TioServer(wsTioConfig);
mqttServer.setTioWsServer(websocketServer);
wsTioConfig.share(tioConfig);
wsTioConfig.groupStat = tioConfig.groupStat;
try {
websocketServer.start(tioServer.getServerNode().getIp(), wsServerConfig.getBindPort());
log.info("Mica mqtt websocket start successful on {}:{}", tioServer.getServerNode().getIp(), websocketPort);
} catch (IOException e) {
throw new IllegalStateException("Mica mqtt websocket server start fail.", e);
}
}
mqttServer.start();
running = true;
}
@Override
public void stop() {
if (mqttServer != null) {
mqttServer.stop();
}
mqttServer.stop();
}
@Override
......
......@@ -80,14 +80,22 @@ public class MqttServerProperties {
* mqtt 3.1 会校验此参数
*/
private int maxClientIdLength = MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
/**
* http、websocket 端口,默认:8083
*/
private int webPort = 8083;
/**
* 开启 websocket 服务,默认:true
*/
private boolean websocketEnable = true;
/**
* websocket 端口,默认:8083
* 开启 http 服务,默认:true
*/
private boolean httpEnable = false;
/**
* http basic auth
*/
private int websocketPort = 8083;
private HttpBasicAuth httpBasicAuth = new HttpBasicAuth();
@Getter
@Setter
......@@ -106,4 +114,21 @@ public class MqttServerProperties {
private String password;
}
@Getter
@Setter
public static class HttpBasicAuth {
/**
* 是否启用,默认:关闭
*/
private boolean enable = false;
/**
* http Basic 认证账号
*/
private String username;
/**
* http Basic 认证密码
*/
private String password;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册