diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/ByteBufferUtil.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/ByteBufferUtil.java index 1ab180b794b76ac02247535e6049d77cfef123f7..c31b67a772cc1dbb8ed26ac01db1acdbc12b52a7 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/ByteBufferUtil.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/ByteBufferUtil.java @@ -76,7 +76,7 @@ public class ByteBufferUtil { /** * 转成 string * - * @param buffer ByteBuffer + * @param buffer ByteBuffer * @param charset Charset * @return 字符串 */ diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttIdentifierRejectedException.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttIdentifierRejectedException.java index ebc727686ef1de7f4bc1d90fc1a2c2a4d24d680c..e7d8bd8b08f8ecb007199229a4a60658af44f472 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttIdentifierRejectedException.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttIdentifierRejectedException.java @@ -26,13 +26,14 @@ public final class MqttIdentifierRejectedException extends DecoderException { /** * Creates a new instance */ - public MqttIdentifierRejectedException() {} + public MqttIdentifierRejectedException() { + } /** * Creates a new instance * * @param message message - * @param cause Throwable + * @param cause Throwable */ public MqttIdentifierRejectedException(String message, Throwable cause) { super(message, cause); diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java index 21f4face87a8cb1684bcdd68c04d50016dee2caa..83019cb25d47f7feed710432b34a0a169dc98eed 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java @@ -486,7 +486,7 @@ public final class MqttProperties { * then return the first one. * * @param mqttPropertyType Type of the property - * @param 泛型标记 + * @param 泛型标记 * @return a property value if it is set, null otherwise */ public T getPropertyValue(MqttPropertyType mqttPropertyType) { diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java similarity index 94% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java index 206274d40ec7e14533b2b7f04e6e32068d36dc7d..bf890aaf20252ca34335f378757398de6629f26b 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java @@ -14,19 +14,19 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api; +package net.dreamlu.iot.mqtt.core.server.http.api; import com.alibaba.fastjson.JSON; import net.dreamlu.iot.mqtt.codec.MqttMessageType; import net.dreamlu.iot.mqtt.codec.MqttQoS; -import net.dreamlu.iot.mqtt.core.api.code.ResultCode; -import net.dreamlu.iot.mqtt.core.api.form.BaseForm; -import net.dreamlu.iot.mqtt.core.api.form.PayloadEncode; -import net.dreamlu.iot.mqtt.core.api.form.PublishForm; -import net.dreamlu.iot.mqtt.core.api.form.SubscribeForm; -import net.dreamlu.iot.mqtt.core.api.result.Result; -import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; +import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; +import net.dreamlu.iot.mqtt.core.server.http.api.form.BaseForm; +import net.dreamlu.iot.mqtt.core.server.http.api.form.PayloadEncode; +import net.dreamlu.iot.mqtt.core.server.http.api.form.PublishForm; +import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm; +import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import org.tio.http.common.HttpRequest; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/auth/BasicAuthFilter.java similarity index 56% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/auth/BasicAuthFilter.java index 2c9eab388f2f325a069e48833a54180dfc2058cd..353bd2783d950e6cbc2eefd4a52a2de436c33dfc 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/auth/BasicAuthFilter.java @@ -14,15 +14,14 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.auth; +package net.dreamlu.iot.mqtt.core.server.http.api.auth; -import net.dreamlu.iot.mqtt.core.api.code.ResultCode; -import net.dreamlu.iot.mqtt.core.api.result.Result; -import net.dreamlu.iot.mqtt.core.core.HttpFilter; -import org.tio.http.common.HttpRequest; -import org.tio.http.common.HttpResponse; +import net.dreamlu.iot.mqtt.core.server.http.handler.HttpFilter; +import org.tio.http.common.*; import org.tio.utils.hutool.StrUtil; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Objects; /** @@ -31,12 +30,14 @@ import java.util.Objects; * @author L.cm */ public class BasicAuthFilter implements HttpFilter { - public static final String BASIC_AUTH_HEADER_NAME = "Authorization"; + public static final HeaderName WWW_AUTHENTICATE = HeaderName.from("WWW-Authenticate"); + public static final HeaderValue BASIC_REALM = HeaderValue.from("Basic realm=\"Mica mqtt realm\""); + public static final String BASIC_AUTH_HEADER_NAME = "authorization"; public static final String AUTHORIZATION_PREFIX = "Basic "; private final String token; - public BasicAuthFilter(String token) { - this.token = Objects.requireNonNull(token, "Basic auth token is null"); + public BasicAuthFilter(String username, String password) { + this.token = getBasicToken(username, password); } @Override @@ -54,7 +55,15 @@ public class BasicAuthFilter implements HttpFilter { @Override public HttpResponse response(HttpRequest request, HttpResponse response) { - return Result.fail(response, ResultCode.E103); + response.addHeader(WWW_AUTHENTICATE, BASIC_REALM); + response.setStatus(HttpResponseStatus.C401); + return response; } + private static String getBasicToken(String username, String password) { + Objects.requireNonNull(username, "Basic auth username is null"); + Objects.requireNonNull(password, "Basic auth password is null"); + byte[] tokenBytes = (username + ':' + password).getBytes(StandardCharsets.UTF_8); + return Base64.getEncoder().encodeToString(tokenBytes); + } } diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/code/ResultCode.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/code/ResultCode.java similarity index 91% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/code/ResultCode.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/code/ResultCode.java index c41942c494957c27075735d30b4028c17ad89d71..3267e5fc2ff60fe66bd8da8c4e301478b3428300 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/code/ResultCode.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/code/ResultCode.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.code; +package net.dreamlu.iot.mqtt.core.server.http.api.code; import org.tio.http.common.HttpResponseStatus; @@ -40,6 +40,10 @@ public enum ResultCode { * 用户名或密码错误 */ E103(HttpResponseStatus.C401, 103), + /** + * 请求方法错误 + */ + E104(HttpResponseStatus.C405, 104), /** * 未知错误 */ diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/BaseForm.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/BaseForm.java similarity index 95% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/BaseForm.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/BaseForm.java index b90e93a5bb7b238b13fdb5603dfa187209e9c6f8..dd2a5bca8c6f3817a8575c7ecedc1ce58655640f 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/BaseForm.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/BaseForm.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.form; +package net.dreamlu.iot.mqtt.core.server.http.api.form; import java.io.Serializable; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PayloadEncode.java similarity index 97% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PayloadEncode.java index 3ab15acfbd610b0968f4e6807bcd832dc930822c..96fbef815c104fb4b8f2e54e4ed54924eb748633 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PayloadEncode.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.form; +package net.dreamlu.iot.mqtt.core.server.http.api.form; import net.dreamlu.iot.mqtt.core.util.HexUtil; import org.tio.utils.hutool.StrUtil; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PublishForm.java similarity index 96% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PublishForm.java index f2bfe5eefd77c289a0e09f4ac700059d9a5d40b9..967c273c9ef82ce7603c2e2eddf905b3df48ade7 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PublishForm.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.form; +package net.dreamlu.iot.mqtt.core.server.http.api.form; /** * 发布的模型 diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/SubscribeForm.java similarity index 94% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/SubscribeForm.java index 53783912238a15c8eef767dce8168384da55fb12..d1bf9c2d969d3fcfb68d8efd2f0c06cd757bbea4 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/SubscribeForm.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.form; +package net.dreamlu.iot.mqtt.core.server.http.api.form; /** * 订阅表单 diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/result/Result.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java similarity index 95% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/result/Result.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java index 8c4c27eeaa51cac644cc277c688732efa0e83a60..2438f6b6b1f1b20fc23ba92a1a9d7e8f1b03ac33 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/result/Result.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java @@ -14,10 +14,10 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.result; +package net.dreamlu.iot.mqtt.core.server.http.api.result; import com.alibaba.fastjson.JSONObject; -import net.dreamlu.iot.mqtt.core.api.code.ResultCode; +import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; import org.tio.http.common.HeaderName; import org.tio.http.common.HeaderValue; import org.tio.http.common.HttpResponse; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java similarity index 84% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServer.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java index ea1604d36b70acfe9f1dbfb36755bd802f7b4075..7b031137cd6916e0e0ea17d026b6612db36a19d6 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java @@ -191,14 +191,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package net.dreamlu.iot.mqtt.core; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package net.dreamlu.iot.mqtt.core.server.http.core; + +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRequestHandler; import org.tio.core.TcpConst; import org.tio.http.common.HttpConfig; import org.tio.http.common.HttpUuid; -import org.tio.http.common.TioConfigKey; import org.tio.http.common.handler.HttpRequestHandler; import org.tio.server.ServerTioConfig; import org.tio.server.TioServer; @@ -211,46 +211,41 @@ import java.io.IOException; import java.util.concurrent.ThreadPoolExecutor; /** + * mqtt web Server,集成 http 和 websocket + * * @author tanyaowu + * @author L.cm */ public class MqttWebServer { - private static Logger log = LoggerFactory.getLogger(org.tio.http.server.HttpServerStarter.class); + private static final String TIO_SYSTEM_TIMER_PERIOD = "tio.system.timer.period"; + private final MqttWebServerAioListener mqttWebServerAioListener; + private final HttpRequestHandler httpRequestHandler; private HttpConfig httpConfig = null; - private MqttWebServerAioHandler mqttWebServerAioHandler = null; - private MqttWebServerAioListener mqttWebServerAioListener = null; - private HttpRequestHandler httpRequestHandler = null; private ServerTioConfig serverTioConfig = null; + private MqttWebServerAioHandler mqttWebServerAioHandler = null; private TioServer tioServer = null; - /** - * @param httpConfig - * @param requestHandler - * @author tanyaowu - */ - public MqttWebServer(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler) { - this(httpConfig, requestHandler, wsMsgHandler, null, null); + public MqttWebServer(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler) { + this(serverCreator, new MqttHttpRequestHandler(), wsMsgHandler); + } + + public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler) { + this(serverCreator, requestHandler, wsMsgHandler, null, null); } - /** - * @param httpConfig - * @param requestHandler - * @param tioExecutor - * @param groupExecutor - * @author tanyaowu - */ - public MqttWebServer(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { + public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { if (tioExecutor == null) { tioExecutor = Threads.getTioExecutor(); } if (groupExecutor == null) { groupExecutor = Threads.getGroupExecutor(); } - init(httpConfig, requestHandler, wsMsgHandler, tioExecutor, groupExecutor); + + this.httpRequestHandler = requestHandler; + this.mqttWebServerAioListener = new MqttWebServerAioListener(); + init(serverCreator, wsMsgHandler, tioExecutor, groupExecutor); } - /** - * @return the httpConfig - */ public HttpConfig getHttpConfig() { return httpConfig; } @@ -267,49 +262,34 @@ public class MqttWebServer { return mqttWebServerAioListener; } - /** - * @return the serverTioConfig - */ public ServerTioConfig getServerTioConfig() { return serverTioConfig; } - private void init(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, + private void init(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { - String system_timer_period = System.getProperty("tio.system.timer.period"); - if (StrUtil.isBlank(system_timer_period)) { - System.setProperty("tio.system.timer.period", "50"); + String systemTimerPeriod = System.getProperty(TIO_SYSTEM_TIMER_PERIOD); + if (StrUtil.isBlank(systemTimerPeriod)) { + System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50"); } - + HttpConfig httpConfig = new HttpConfig(serverCreator.getWebsocketPort(), false); + httpConfig.setName(serverCreator.getName() + "-HTTP/Websocket"); + httpConfig.setCheckHost(false); this.httpConfig = httpConfig; - this.httpRequestHandler = requestHandler; - httpConfig.setHttpRequestHandler(this.httpRequestHandler); - this.mqttWebServerAioHandler = new MqttWebServerAioHandler(httpConfig, requestHandler, wsMsgHandler); - this.mqttWebServerAioListener = new MqttWebServerAioListener(); - String name = httpConfig.getName(); - if (StrUtil.isBlank(name)) { - name = "Tio Http Server"; - } - serverTioConfig = new ServerTioConfig(name, mqttWebServerAioHandler, mqttWebServerAioListener, tioExecutor, groupExecutor); - serverTioConfig.setHeartbeatTimeout(1000 * 20); - serverTioConfig.setShortConnection(true); - serverTioConfig.setReadBufferSize(TcpConst.MAX_DATA_LENGTH); - serverTioConfig.setAttribute(TioConfigKey.HTTP_REQ_HANDLER, this.httpRequestHandler); - - tioServer = new TioServer(serverTioConfig); - HttpUuid imTioUuid = new HttpUuid(); - serverTioConfig.setTioUuid(imTioUuid); - } - - public void setHttpRequestHandler(HttpRequestHandler requestHandler) { - this.httpRequestHandler = requestHandler; + this.httpConfig.setHttpRequestHandler(this.httpRequestHandler); + this.mqttWebServerAioHandler = new MqttWebServerAioHandler(httpConfig, this.httpRequestHandler, wsMsgHandler); + this.serverTioConfig = new ServerTioConfig(this.httpConfig.getName(), mqttWebServerAioHandler, mqttWebServerAioListener, tioExecutor, groupExecutor); + this.serverTioConfig.setHeartbeatTimeout(1000 * 20); + this.serverTioConfig.setReadBufferSize(TcpConst.MAX_DATA_LENGTH); + this.tioServer = new TioServer(serverTioConfig); + this.serverTioConfig.setTioUuid(new HttpUuid()); } public void start() throws IOException { tioServer.start(this.httpConfig.getBindIp(), this.httpConfig.getBindPort()); } - public void stop() throws IOException { + public void stop() { tioServer.stop(); } diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java similarity index 97% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioHandler.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java index d03d4d26891b5c3de547fc481f9de3caa7dd767f..577048080bc94b37f5b6b22e90216ceb96c84a4d 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java @@ -191,7 +191,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -package net.dreamlu.iot.mqtt.core; + +package net.dreamlu.iot.mqtt.core.server.http.core; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -214,7 +215,10 @@ import java.nio.ByteBuffer; import java.util.*; /** + * websocket 和 http 共存 + * * @author tanyaowu + * @author L.cm */ public class MqttWebServerAioHandler implements ServerAioHandler { private static Logger log = LoggerFactory.getLogger(org.tio.websocket.server.WsServerAioHandler.class); @@ -230,9 +234,8 @@ public class MqttWebServerAioHandler implements ServerAioHandler { /** * websocket子协议 */ - private static final String Sec_Websocket_Protocol = "sec-websocket-protocol"; - private static final HeaderName Header_Name_Sec_Websocket_Protocol = HeaderName.from(Sec_Websocket_Protocol); - + private static final String SEC_WEBSOCKET_PROTOCOL = "sec-websocket-protocol"; + private static final HeaderName HEADER_NAME_SEC_WEBSOCKET_PROTOCOL = HeaderName.from(SEC_WEBSOCKET_PROTOCOL); private final HttpConfig httpConfig; private final HttpRequestHandler requestHandler; private final IWsMsgHandler wsMsgHandler; @@ -370,7 +373,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler { String methodName = "onBytes"; return processRetObj(retObj, methodName, channelContext); } else if (opcode == Opcode.PING || opcode == Opcode.PONG) { - log.debug("收到" + opcode); + log.debug("收到{}", opcode); return null; } else if (opcode == Opcode.CLOSE) { Object retObj = wsMsgHandler.onClose(websocketPacket, bytes, channelContext); @@ -391,11 +394,10 @@ public class MqttWebServerAioHandler implements ServerAioHandler { HttpResponse httpResponse = request.httpConfig.getRespForBlackIp(); if (httpResponse != null) { Tio.send(channelContext, httpResponse); - return; } else { Tio.remove(channelContext, ip + "在黑名单中"); - return; } + return; } HttpResponse httpResponse = requestHandler.handler(request); if (httpResponse != null) { @@ -409,7 +411,8 @@ public class MqttWebServerAioHandler implements ServerAioHandler { return; } WsRequest wsRequest = (WsRequest) packet; - if (wsRequest.isHandShake()) {//是握手包 + // 判断握手包 + if (wsRequest.isHandShake()) { WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get(); HttpRequest request = wsSessionContext.getHandshakeRequest(); HttpResponse httpResponse = wsSessionContext.getHandshakeResponse(); @@ -419,7 +422,6 @@ public class MqttWebServerAioHandler implements ServerAioHandler { return; } wsSessionContext.setHandshakeResponse(response); - WsResponse wsResponse = new WsResponse(); wsResponse.setHandShake(true); Tio.send(channelContext, wsResponse); @@ -436,7 +438,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler { } } - private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext) throws Exception { + private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext) { if (obj == null) { return null; } else { @@ -466,7 +468,6 @@ public class MqttWebServerAioHandler implements ServerAioHandler { public HttpResponse updateWebSocketProtocol(HttpRequest request) { Map headers = request.getHeaders(); String secWebSocketKey = headers.get(HttpConst.RequestHeaderKey.Sec_WebSocket_Key); - if (StrUtil.isNotBlank(secWebSocketKey)) { byte[] secWebSocketKeyBytes; try { @@ -477,25 +478,22 @@ public class MqttWebServerAioHandler implements ServerAioHandler { byte[] allBs = new byte[secWebSocketKeyBytes.length + SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length]; System.arraycopy(secWebSocketKeyBytes, 0, allBs, 0, secWebSocketKeyBytes.length); System.arraycopy(SEC_WEBSOCKET_KEY_SUFFIX_BYTES, 0, allBs, secWebSocketKeyBytes.length, SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length); - byte[] keyArray = SHA1Util.SHA1(allBs); String acceptKey = BASE64Util.byteArrayToBase64(keyArray); HttpResponse httpResponse = new HttpResponse(request); - + // 101 协议转换 httpResponse.setStatus(HttpResponseStatus.C101); - Map respHeaders = new HashMap<>(); respHeaders.put(HeaderName.Connection, HeaderValue.Connection.Upgrade); respHeaders.put(HeaderName.Upgrade, HeaderValue.Upgrade.WebSocket); respHeaders.put(HeaderName.Sec_WebSocket_Accept, HeaderValue.from(acceptKey)); - // websocket 子协议协商 String[] supportedSubProtocols = wsMsgHandler.getSupportedSubProtocols(); if (supportedSubProtocols != null && supportedSubProtocols.length > 0) { - String requestedSubProtocols = headers.get(Sec_Websocket_Protocol); + String requestedSubProtocols = headers.get(SEC_WEBSOCKET_PROTOCOL); String selectSubProtocol = selectSubProtocol(requestedSubProtocols, supportedSubProtocols); if (selectSubProtocol != null) { - respHeaders.put(Header_Name_Sec_Websocket_Protocol, HeaderValue.from(selectSubProtocol)); + respHeaders.put(HEADER_NAME_SEC_WEBSOCKET_PROTOCOL, HeaderValue.from(selectSubProtocol)); } } httpResponse.addHeaders(respHeaders); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java new file mode 100644 index 0000000000000000000000000000000000000000..340914738ea6cedf2383be22449efb137ede00e3 --- /dev/null +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.core.server.http.core; + +import org.tio.core.ChannelContext; +import org.tio.core.Tio; +import org.tio.core.TioConfig; +import org.tio.core.intf.Packet; +import org.tio.http.common.HttpConst; +import org.tio.http.common.HttpRequest; +import org.tio.http.common.HttpResponse; +import org.tio.server.intf.ServerAioListener; + +/** + * 兼容 websocket,参考 HTTPServerAioListener + * + * @author tanyaowu + * @author L.cm + */ +public class MqttWebServerAioListener implements ServerAioListener { + + @Override + public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) { + + } + + @Override + public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) { + + } + + @Override + public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) { + if (!(packet instanceof HttpResponse)) { + return; + } + // 1. 短链接数据解绑 + TioConfig tioConfig = context.getTioConfig(); + tioConfig.groups.unbind(context); + tioConfig.bsIds.unbind(context); + tioConfig.ids.unbind(context); + tioConfig.clientNodes.remove(context); + tioConfig.tokens.unbind(context); + // 2. 关闭 + HttpResponse httpResponse = (HttpResponse) packet; + HttpRequest request = httpResponse.getHttpRequest(); + if (request != null) { + if (request.httpConfig.compatible1_0) { + if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) { + if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection!=keep-alive:" + request.getRequestLine()); + } + } else { + if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine()); + } + } + } else { + if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine()); + } + } + } + } + + @Override + public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) { + + } + + @Override + public void onAfterHandled(ChannelContext context, Packet packet, long cost) throws Exception { + + } + + @Override + public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception { + + } + + @Override + public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) { + return false; + } + +} diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MappingInfo.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HandlerInfo.java similarity index 70% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MappingInfo.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HandlerInfo.java index 07c13ff249f6ebbdacbf6d5cf893ebe79e4bc58a..b514922b24e1d9252e5c68123d3ab364f263b98f 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MappingInfo.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HandlerInfo.java @@ -14,32 +14,32 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; import org.tio.http.common.Method; import java.util.Objects; /** - * http Mapping info + * Handler info * * @author L.cm */ -public class MappingInfo { +public class HandlerInfo { private final Method method; - private final String path; + private final HttpHandler handler; - public MappingInfo(Method method, String path) { + public HandlerInfo(Method method, HttpHandler handler) { this.method = method; - this.path = path; + this.handler = handler; } public Method getMethod() { return method; } - public String getPath() { - return path; + public HttpHandler getHandler() { + return handler; } @Override @@ -47,17 +47,16 @@ public class MappingInfo { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof HandlerInfo)) { return false; } - MappingInfo that = (MappingInfo) o; + HandlerInfo that = (HandlerInfo) o; return method == that.method && - Objects.equals(path, that.path); + Objects.equals(handler, that.handler); } @Override public int hashCode() { - return Objects.hash(method, path); + return Objects.hash(method, handler); } - } diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpFilter.java similarity index 95% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpFilter.java index fee2ada760e50adac39f667d0919d64dbab1addb..e32ee8921bdcc28b5725e20862d8671ec0e0b7e7 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpFilter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpHandler.java similarity index 94% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpHandler.java index af6e7a327a9851811a462190458be4f24d541e68..57eb199fd6d9d8a3b21470b74f8b947a694c8a5e 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRequestHandler.java similarity index 82% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRequestHandler.java index 246091c63708f62158bcf7817e60ba4834d1a052..c79fde87ad573ab809c49a9fa84f669ee7153ffe 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRequestHandler.java @@ -14,13 +14,10 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; -import net.dreamlu.iot.mqtt.core.api.code.ResultCode; -import net.dreamlu.iot.mqtt.core.api.result.Result; -import net.dreamlu.iot.mqtt.core.core.HttpFilter; -import net.dreamlu.iot.mqtt.core.core.HttpHandler; -import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes; +import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; +import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.http.common.*; @@ -52,13 +49,17 @@ public class MqttHttpRequestHandler implements HttpRequestHandler { return resp500(request, requestLine, e); } // 2. 路由处理 - HttpHandler handler = MqttHttpRoutes.getHandler(requestLine); + HandlerInfo handler = MqttHttpRoutes.getHandler(requestLine); if (handler == null) { return resp404(request, requestLine); } - logger.info("mqtt http api {} path:{}", requestLine.getMethod().name(), requestLine.getPathAndQuery()); + Method method = requestLine.getMethod(); + if (handler.getMethod() != method) { + return Result.fail(new HttpResponse(request), ResultCode.E104); + } + logger.info("mqtt http api {} path:{}", method.name(), requestLine.getPathAndQuery()); try { - return handler.apply(request); + return handler.getHandler().apply(request); } catch (Exception e) { return resp500(request, requestLine, e); } diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRoutes.java similarity index 77% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRoutes.java index f132685b65d5235ef89831575d7fb3ad32d425e4..1317ce2d987c14eb2809a3ec2b410f2c523c88d8 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRoutes.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; import org.tio.http.common.Method; import org.tio.http.common.RequestLine; @@ -28,7 +28,7 @@ import java.util.*; */ public final class MqttHttpRoutes { private static final LinkedList FILTERS = new LinkedList<>(); - private static final Map ROUTS = new HashMap<>(); + private static final Map ROUTS = new HashMap<>(); /** * 注册路由 @@ -66,18 +66,17 @@ public final class MqttHttpRoutes { * @param handler HttpHandler */ public static void register(Method method, String path, HttpHandler handler) { - ROUTS.put(new MappingInfo(method, path), handler); + ROUTS.put(path, new HandlerInfo(method, handler)); } /** * 读取路由 * - * @param method 请求方法 - * @param path 路径 - * @return HttpHandler + * @param path 路径 + * @return HandlerInfo */ - public static HttpHandler getHandler(Method method, String path) { - return ROUTS.get(new MappingInfo(method, path)); + public static HandlerInfo getHandler(String path) { + return ROUTS.get(path); } /** @@ -86,8 +85,8 @@ public final class MqttHttpRoutes { * @param requestLine RequestLine * @return HttpHandler */ - public static HttpHandler getHandler(RequestLine requestLine) { - return getHandler(requestLine.getMethod(), requestLine.getPath()); + public static HandlerInfo getHandler(RequestLine requestLine) { + return getHandler(requestLine.getPath()); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/package-info.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/package-info.java deleted file mode 100644 index 06f9436b73a1da53fa89561c07b94cf85b9917b8..0000000000000000000000000000000000000000 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package net.dreamlu.iot.mqtt.core.server.http; diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java index a9436c0298714573bffc2e83396426a60e4fe98e..271855e3436526608579ecb57f97063f1dd55607 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java @@ -22,10 +22,10 @@ import net.dreamlu.iot.mqtt.codec.WriteBuffer; import org.tio.core.ChannelContext; import org.tio.core.Tio; import org.tio.core.TioConfig; +import org.tio.core.intf.AioHandler; import org.tio.core.intf.Packet; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; -import org.tio.server.intf.ServerAioHandler; import org.tio.websocket.common.WsRequest; import org.tio.websocket.common.WsResponse; import org.tio.websocket.server.handler.IWsMsgHandler; @@ -42,17 +42,19 @@ public class MqttWsMsgHandler implements IWsMsgHandler { * mqtt websocket message body key */ private static final String MQTT_WS_MSG_BODY_KEY = "MQTT_WS_MSG_BODY_KEY"; - + /** + * websocket 握手端点 + */ private final String[] supportedSubProtocols; - private final ServerAioHandler mqttServerAioHandler; + private final AioHandler mqttServerAioHandler; - public MqttWsMsgHandler(ServerAioHandler mqttServerAioHandler) { - this(new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, mqttServerAioHandler); + public MqttWsMsgHandler(AioHandler aioHandler) { + this(new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler); } - public MqttWsMsgHandler(String[] supportedSubProtocols, ServerAioHandler mqttServerAioHandler) { + public MqttWsMsgHandler(String[] supportedSubProtocols, AioHandler aioHandler) { this.supportedSubProtocols = supportedSubProtocols; - this.mqttServerAioHandler = mqttServerAioHandler; + this.mqttServerAioHandler = aioHandler; } @Override @@ -132,7 +134,7 @@ public class MqttWsMsgHandler implements IWsMsgHandler { /** * 读取 mqtt 消息体处理半包的情况 * - * @param bytes 消息类容 + * @param bytes 消息类容 * @return ByteBuffer */ private static synchronized ByteBuffer getMqttBody(WriteBuffer wsBody, byte[] bytes) { diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioListener.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioListener.java deleted file mode 100644 index bebd8045cb44968e5c8344044b032551d9de62c2..0000000000000000000000000000000000000000 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioListener.java +++ /dev/null @@ -1,75 +0,0 @@ -package net.dreamlu.iot.mqtt.core; - -import org.tio.core.ChannelContext; -import org.tio.core.Tio; -import org.tio.core.intf.Packet; -import org.tio.http.common.HttpConst; -import org.tio.http.common.HttpRequest; -import org.tio.http.common.HttpResponse; -import org.tio.server.intf.ServerAioListener; - -/** - * 兼容 websocket,参考 HTTPServerAioListener - * - * @author tanyaowu - * @author L.cm - */ -public class MqttWebServerAioListener implements ServerAioListener { - - @Override - public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) { - - } - - @Override - public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) { - - } - - @Override - public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) { - if (!(packet instanceof HttpResponse)) { - return; - } - HttpResponse httpResponse = (HttpResponse) packet; - HttpRequest request = httpResponse.getHttpRequest(); - if (request != null) { - if (request.httpConfig.compatible1_0) { - if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) { - if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) { - Tio.remove(channelContext, "http 请求头Connection!=keep-alive:" + request.getRequestLine()); - } - } else { - if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { - Tio.remove(channelContext, "http 请求头Connection=close:" + request.getRequestLine()); - } - } - } else { - if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { - Tio.remove(channelContext, "http 请求头Connection=close:" + request.getRequestLine()); - } - } - } - } - - @Override - public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) { - - } - - @Override - public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception { - - } - - @Override - public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception { - - } - - @Override - public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) { - return false; - } - -} diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebTest.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebTest.java deleted file mode 100644 index beb6a6174599b0472679455a4b03f35d0cc009aa..0000000000000000000000000000000000000000 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.dreamlu.iot.mqtt.core; - -/** - * mqtt websocket 子协议测试 - */ -public class MqttWebTest { - - public static void main(String[] args) throws Exception { -// final int port = 8083; -// HttpConfig httpConfig = new HttpConfig(port, null, null, null); -// httpConfig.setUseSession(false); -// httpConfig.setCheckHost(false); -// httpConfig.setMonitorFileChange(false); -// -// HttpRequestHandler requestHandler = new MqttHttpRequestHandler(); -// -// MqttHttpApi httpApi = new MqttHttpApi(null); -// httpApi.register(); -// IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(); -// MqttWebServer httpServerStarter = new MqttWebServer(httpConfig, requestHandler, mqttWsMsgHandler); -// httpServerStarter.start(); //启动http服务器 - } - -} diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebsocketTest.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/http/MqttWebsocketTest.java similarity index 96% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebsocketTest.java rename to mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/http/MqttWebsocketTest.java index a20b57da16e23c139bca72d135b6c79f822ce1f7..6bcaf2efff51b06a9629da362b722ed868853ba3 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebsocketTest.java +++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/http/MqttWebsocketTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core; +package net.dreamlu.iot.mqtt.core.http; import java.io.IOException; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/udp/UdpClusterTest1.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/udp/UdpClusterTest1.java index 0f24af095c620ea057dacc479f0f603e21b3bb08..33f8c2d565a7f16ed88f41324a059b6ed49b3ae8 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/udp/UdpClusterTest1.java +++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/udp/UdpClusterTest1.java @@ -11,7 +11,7 @@ public class UdpClusterTest1 { public static void main(String[] args) throws IOException { UdpTestHandler udpTestHandler = new UdpTestHandler(); - UdpClusterConfig udpServerConf = new UdpClusterConfig("224.0.0.1",12345, udpTestHandler, 5000); + UdpClusterConfig udpServerConf = new UdpClusterConfig("224.0.0.1", 12345, udpTestHandler, 5000); UdpCluster udpCluster = new UdpCluster(udpServerConf); udpCluster.start(); diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java index 6986010030c812831e1f7175d9cac4c1b9cbe7f6..0fc5c1801e6e8547c46c6a5950ad4d825363a490 100644 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java @@ -52,7 +52,7 @@ public class MqttClientTest { .connect(); client.subQos0("/sys/" + productKey + '/' + deviceName + "/thing/event/property/post_reply", (topic, payload) -> { - System.out.println(topic + '\t' +ByteBufferUtil.toString(payload)); + System.out.println(topic + '\t' + ByteBufferUtil.toString(payload)); }); String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":1}}"; diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java new file mode 100644 index 0000000000000000000000000000000000000000..e08afe31b5813ccd4af54561a50dfa7c0fe96e71 --- /dev/null +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.http; + +import net.dreamlu.iot.mqtt.core.server.MqttServer; +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; +import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; +import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; +import net.dreamlu.iot.mqtt.core.server.http.api.MqttHttpApi; +import net.dreamlu.iot.mqtt.core.server.http.api.auth.BasicAuthFilter; +import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; +import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher; +import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler; +import org.tio.core.intf.AioHandler; +import org.tio.server.ServerTioConfig; +import org.tio.websocket.server.handler.IWsMsgHandler; + +/** + * mqtt websocket 子协议测试 + */ +public class MqttWebTest { + + public static void main(String[] args) throws Exception { + // 1. 消息转发处理器,可用来实现集群 + IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher(); + // 2. 收到消息,将消息转发出去 + IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> { + Message message = new Message(); + message.setTopic(topic); + message.setQos(mqttQoS.value()); + message.setPayload(payload.array()); + messageDispatcher.send(message); + }; + + // 3. 启动服务 + MqttServerCreator serverCreator = MqttServer.create() + .ip("0.0.0.0") + .port(1883) + .readBufferSize(512) + .messageDispatcher(messageDispatcher) + .messageListener(messageListener) + .websocketEnable(false) + .debug(); + MqttServer mqttServer = serverCreator.start(); + ServerTioConfig serverConfig = mqttServer.getServerConfig(); + AioHandler aioHandler = serverConfig.getAioHandler(); + + MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager()); + httpApi.register(); + MqttHttpRoutes.addFilter(new BasicAuthFilter("123", "123")); + + IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(aioHandler); + MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler); + ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig(); + httpIioConfig.share(serverConfig); + // 启动http服务器 + httpServerStarter.start(); + } + +}