From 806e8b65e07401083ebe25422aee868a664f6984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AF=BB=E6=AC=A2?= <1101766085@qq.com> Date: Thu, 26 Aug 2021 14:45:57 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E8=80=83=E8=99=91=E5=86=85?= =?UTF-8?q?=E7=BD=AE=20http=20api=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/mqtt/codec/ByteBufferUtil.java | 2 +- .../MqttIdentifierRejectedException.java | 5 +- .../iot/mqtt/codec/MqttProperties.java | 2 +- .../core/server/http}/api/MqttHttpApi.java | 16 +-- .../http}/api/auth/BasicAuthFilter.java | 29 +++-- .../server/http}/api/code/ResultCode.java | 6 +- .../core/server/http}/api/form/BaseForm.java | 2 +- .../server/http}/api/form/PayloadEncode.java | 2 +- .../server/http}/api/form/PublishForm.java | 2 +- .../server/http}/api/form/SubscribeForm.java | 2 +- .../core/server/http}/api/result/Result.java | 4 +- .../core/server/http}/core/MqttWebServer.java | 94 +++++++--------- .../http}/core/MqttWebServerAioHandler.java | 32 +++--- .../http/core/MqttWebServerAioListener.java | 100 ++++++++++++++++++ .../server/http/handler/HandlerInfo.java} | 25 +++-- .../core/server/http/handler}/HttpFilter.java | 2 +- .../server/http/handler}/HttpHandler.java | 2 +- .../http/handler}/MqttHttpRequestHandler.java | 19 ++-- .../server/http/handler}/MqttHttpRoutes.java | 19 ++-- .../mqtt/core/server/http/package-info.java | 1 - .../server/websocket/MqttWsMsgHandler.java | 18 ++-- .../mqtt/core/MqttWebServerAioListener.java | 75 ------------- .../dreamlu/iot/mqtt/core/MqttWebTest.java | 40 ------- .../core/{ => http}/MqttWebsocketTest.java | 2 +- .../iot/mqtt/core/udp/UdpClusterTest1.java | 2 +- .../iot/mqtt/aliyun/MqttClientTest.java | 2 +- .../dreamlu/iot/mqtt/http/MqttWebTest.java | 76 +++++++++++++ 27 files changed, 317 insertions(+), 264 deletions(-) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http}/api/MqttHttpApi.java (94%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http}/api/auth/BasicAuthFilter.java (56%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http}/api/code/ResultCode.java (91%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http}/api/form/BaseForm.java (95%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http}/api/form/PayloadEncode.java (97%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http}/api/form/PublishForm.java (96%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http}/api/form/SubscribeForm.java (94%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http}/api/result/Result.java (95%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt => main/java/net/dreamlu/iot/mqtt/core/server/http}/core/MqttWebServer.java (84%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt => main/java/net/dreamlu/iot/mqtt/core/server/http}/core/MqttWebServerAioHandler.java (97%) create mode 100644 mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core/core/MappingInfo.java => main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HandlerInfo.java} (70%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core/core => main/java/net/dreamlu/iot/mqtt/core/server/http/handler}/HttpFilter.java (95%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core/core => main/java/net/dreamlu/iot/mqtt/core/server/http/handler}/HttpHandler.java (94%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core => main/java/net/dreamlu/iot/mqtt/core/server/http/handler}/MqttHttpRequestHandler.java (82%) rename mica-mqtt-core/src/{test/java/net/dreamlu/iot/mqtt/core/core => main/java/net/dreamlu/iot/mqtt/core/server/http/handler}/MqttHttpRoutes.java (77%) delete mode 100644 mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/package-info.java delete mode 100644 mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioListener.java delete mode 100644 mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebTest.java rename mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/{ => http}/MqttWebsocketTest.java (96%) create mode 100644 mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/ByteBufferUtil.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/ByteBufferUtil.java index 1ab180b..c31b67a 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/ByteBufferUtil.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/ByteBufferUtil.java @@ -76,7 +76,7 @@ public class ByteBufferUtil { /** * 转成 string * - * @param buffer ByteBuffer + * @param buffer ByteBuffer * @param charset Charset * @return 字符串 */ diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttIdentifierRejectedException.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttIdentifierRejectedException.java index ebc7276..e7d8bd8 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttIdentifierRejectedException.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttIdentifierRejectedException.java @@ -26,13 +26,14 @@ public final class MqttIdentifierRejectedException extends DecoderException { /** * Creates a new instance */ - public MqttIdentifierRejectedException() {} + public MqttIdentifierRejectedException() { + } /** * Creates a new instance * * @param message message - * @param cause Throwable + * @param cause Throwable */ public MqttIdentifierRejectedException(String message, Throwable cause) { super(message, cause); diff --git a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java index 21f4fac..83019cb 100644 --- a/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java +++ b/mica-mqtt-codec/src/main/java/net/dreamlu/iot/mqtt/codec/MqttProperties.java @@ -486,7 +486,7 @@ public final class MqttProperties { * then return the first one. * * @param mqttPropertyType Type of the property - * @param 泛型标记 + * @param 泛型标记 * @return a property value if it is set, null otherwise */ public T getPropertyValue(MqttPropertyType mqttPropertyType) { diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java similarity index 94% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java index 206274d..bf890aa 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java @@ -14,19 +14,19 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api; +package net.dreamlu.iot.mqtt.core.server.http.api; import com.alibaba.fastjson.JSON; import net.dreamlu.iot.mqtt.codec.MqttMessageType; import net.dreamlu.iot.mqtt.codec.MqttQoS; -import net.dreamlu.iot.mqtt.core.api.code.ResultCode; -import net.dreamlu.iot.mqtt.core.api.form.BaseForm; -import net.dreamlu.iot.mqtt.core.api.form.PayloadEncode; -import net.dreamlu.iot.mqtt.core.api.form.PublishForm; -import net.dreamlu.iot.mqtt.core.api.form.SubscribeForm; -import net.dreamlu.iot.mqtt.core.api.result.Result; -import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; +import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; +import net.dreamlu.iot.mqtt.core.server.http.api.form.BaseForm; +import net.dreamlu.iot.mqtt.core.server.http.api.form.PayloadEncode; +import net.dreamlu.iot.mqtt.core.server.http.api.form.PublishForm; +import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm; +import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import org.tio.http.common.HttpRequest; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/auth/BasicAuthFilter.java similarity index 56% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/auth/BasicAuthFilter.java index 2c9eab3..353bd27 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/auth/BasicAuthFilter.java @@ -14,15 +14,14 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.auth; +package net.dreamlu.iot.mqtt.core.server.http.api.auth; -import net.dreamlu.iot.mqtt.core.api.code.ResultCode; -import net.dreamlu.iot.mqtt.core.api.result.Result; -import net.dreamlu.iot.mqtt.core.core.HttpFilter; -import org.tio.http.common.HttpRequest; -import org.tio.http.common.HttpResponse; +import net.dreamlu.iot.mqtt.core.server.http.handler.HttpFilter; +import org.tio.http.common.*; import org.tio.utils.hutool.StrUtil; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Objects; /** @@ -31,12 +30,14 @@ import java.util.Objects; * @author L.cm */ public class BasicAuthFilter implements HttpFilter { - public static final String BASIC_AUTH_HEADER_NAME = "Authorization"; + public static final HeaderName WWW_AUTHENTICATE = HeaderName.from("WWW-Authenticate"); + public static final HeaderValue BASIC_REALM = HeaderValue.from("Basic realm=\"Mica mqtt realm\""); + public static final String BASIC_AUTH_HEADER_NAME = "authorization"; public static final String AUTHORIZATION_PREFIX = "Basic "; private final String token; - public BasicAuthFilter(String token) { - this.token = Objects.requireNonNull(token, "Basic auth token is null"); + public BasicAuthFilter(String username, String password) { + this.token = getBasicToken(username, password); } @Override @@ -54,7 +55,15 @@ public class BasicAuthFilter implements HttpFilter { @Override public HttpResponse response(HttpRequest request, HttpResponse response) { - return Result.fail(response, ResultCode.E103); + response.addHeader(WWW_AUTHENTICATE, BASIC_REALM); + response.setStatus(HttpResponseStatus.C401); + return response; } + private static String getBasicToken(String username, String password) { + Objects.requireNonNull(username, "Basic auth username is null"); + Objects.requireNonNull(password, "Basic auth password is null"); + byte[] tokenBytes = (username + ':' + password).getBytes(StandardCharsets.UTF_8); + return Base64.getEncoder().encodeToString(tokenBytes); + } } diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/code/ResultCode.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/code/ResultCode.java similarity index 91% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/code/ResultCode.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/code/ResultCode.java index c41942c..3267e5f 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/code/ResultCode.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/code/ResultCode.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.code; +package net.dreamlu.iot.mqtt.core.server.http.api.code; import org.tio.http.common.HttpResponseStatus; @@ -40,6 +40,10 @@ public enum ResultCode { * 用户名或密码错误 */ E103(HttpResponseStatus.C401, 103), + /** + * 请求方法错误 + */ + E104(HttpResponseStatus.C405, 104), /** * 未知错误 */ diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/BaseForm.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/BaseForm.java similarity index 95% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/BaseForm.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/BaseForm.java index b90e93a..dd2a5bc 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/BaseForm.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/BaseForm.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.form; +package net.dreamlu.iot.mqtt.core.server.http.api.form; import java.io.Serializable; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PayloadEncode.java similarity index 97% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PayloadEncode.java index 3ab15ac..96fbef8 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PayloadEncode.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.form; +package net.dreamlu.iot.mqtt.core.server.http.api.form; import net.dreamlu.iot.mqtt.core.util.HexUtil; import org.tio.utils.hutool.StrUtil; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PublishForm.java similarity index 96% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PublishForm.java index f2bfe5e..967c273 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/PublishForm.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.form; +package net.dreamlu.iot.mqtt.core.server.http.api.form; /** * 发布的模型 diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/SubscribeForm.java similarity index 94% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/SubscribeForm.java index 5378391..d1bf9c2 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/form/SubscribeForm.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.form; +package net.dreamlu.iot.mqtt.core.server.http.api.form; /** * 订阅表单 diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/result/Result.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java similarity index 95% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/result/Result.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java index 8c4c27e..2438f6b 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/result/Result.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java @@ -14,10 +14,10 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.api.result; +package net.dreamlu.iot.mqtt.core.server.http.api.result; import com.alibaba.fastjson.JSONObject; -import net.dreamlu.iot.mqtt.core.api.code.ResultCode; +import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; import org.tio.http.common.HeaderName; import org.tio.http.common.HeaderValue; import org.tio.http.common.HttpResponse; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java similarity index 84% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServer.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java index ea1604d..7b03113 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java @@ -191,14 +191,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package net.dreamlu.iot.mqtt.core; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package net.dreamlu.iot.mqtt.core.server.http.core; + +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRequestHandler; import org.tio.core.TcpConst; import org.tio.http.common.HttpConfig; import org.tio.http.common.HttpUuid; -import org.tio.http.common.TioConfigKey; import org.tio.http.common.handler.HttpRequestHandler; import org.tio.server.ServerTioConfig; import org.tio.server.TioServer; @@ -211,46 +211,41 @@ import java.io.IOException; import java.util.concurrent.ThreadPoolExecutor; /** + * mqtt web Server,集成 http 和 websocket + * * @author tanyaowu + * @author L.cm */ public class MqttWebServer { - private static Logger log = LoggerFactory.getLogger(org.tio.http.server.HttpServerStarter.class); + private static final String TIO_SYSTEM_TIMER_PERIOD = "tio.system.timer.period"; + private final MqttWebServerAioListener mqttWebServerAioListener; + private final HttpRequestHandler httpRequestHandler; private HttpConfig httpConfig = null; - private MqttWebServerAioHandler mqttWebServerAioHandler = null; - private MqttWebServerAioListener mqttWebServerAioListener = null; - private HttpRequestHandler httpRequestHandler = null; private ServerTioConfig serverTioConfig = null; + private MqttWebServerAioHandler mqttWebServerAioHandler = null; private TioServer tioServer = null; - /** - * @param httpConfig - * @param requestHandler - * @author tanyaowu - */ - public MqttWebServer(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler) { - this(httpConfig, requestHandler, wsMsgHandler, null, null); + public MqttWebServer(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler) { + this(serverCreator, new MqttHttpRequestHandler(), wsMsgHandler); + } + + public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler) { + this(serverCreator, requestHandler, wsMsgHandler, null, null); } - /** - * @param httpConfig - * @param requestHandler - * @param tioExecutor - * @param groupExecutor - * @author tanyaowu - */ - public MqttWebServer(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { + public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { if (tioExecutor == null) { tioExecutor = Threads.getTioExecutor(); } if (groupExecutor == null) { groupExecutor = Threads.getGroupExecutor(); } - init(httpConfig, requestHandler, wsMsgHandler, tioExecutor, groupExecutor); + + this.httpRequestHandler = requestHandler; + this.mqttWebServerAioListener = new MqttWebServerAioListener(); + init(serverCreator, wsMsgHandler, tioExecutor, groupExecutor); } - /** - * @return the httpConfig - */ public HttpConfig getHttpConfig() { return httpConfig; } @@ -267,49 +262,34 @@ public class MqttWebServer { return mqttWebServerAioListener; } - /** - * @return the serverTioConfig - */ public ServerTioConfig getServerTioConfig() { return serverTioConfig; } - private void init(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, + private void init(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) { - String system_timer_period = System.getProperty("tio.system.timer.period"); - if (StrUtil.isBlank(system_timer_period)) { - System.setProperty("tio.system.timer.period", "50"); + String systemTimerPeriod = System.getProperty(TIO_SYSTEM_TIMER_PERIOD); + if (StrUtil.isBlank(systemTimerPeriod)) { + System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50"); } - + HttpConfig httpConfig = new HttpConfig(serverCreator.getWebsocketPort(), false); + httpConfig.setName(serverCreator.getName() + "-HTTP/Websocket"); + httpConfig.setCheckHost(false); this.httpConfig = httpConfig; - this.httpRequestHandler = requestHandler; - httpConfig.setHttpRequestHandler(this.httpRequestHandler); - this.mqttWebServerAioHandler = new MqttWebServerAioHandler(httpConfig, requestHandler, wsMsgHandler); - this.mqttWebServerAioListener = new MqttWebServerAioListener(); - String name = httpConfig.getName(); - if (StrUtil.isBlank(name)) { - name = "Tio Http Server"; - } - serverTioConfig = new ServerTioConfig(name, mqttWebServerAioHandler, mqttWebServerAioListener, tioExecutor, groupExecutor); - serverTioConfig.setHeartbeatTimeout(1000 * 20); - serverTioConfig.setShortConnection(true); - serverTioConfig.setReadBufferSize(TcpConst.MAX_DATA_LENGTH); - serverTioConfig.setAttribute(TioConfigKey.HTTP_REQ_HANDLER, this.httpRequestHandler); - - tioServer = new TioServer(serverTioConfig); - HttpUuid imTioUuid = new HttpUuid(); - serverTioConfig.setTioUuid(imTioUuid); - } - - public void setHttpRequestHandler(HttpRequestHandler requestHandler) { - this.httpRequestHandler = requestHandler; + this.httpConfig.setHttpRequestHandler(this.httpRequestHandler); + this.mqttWebServerAioHandler = new MqttWebServerAioHandler(httpConfig, this.httpRequestHandler, wsMsgHandler); + this.serverTioConfig = new ServerTioConfig(this.httpConfig.getName(), mqttWebServerAioHandler, mqttWebServerAioListener, tioExecutor, groupExecutor); + this.serverTioConfig.setHeartbeatTimeout(1000 * 20); + this.serverTioConfig.setReadBufferSize(TcpConst.MAX_DATA_LENGTH); + this.tioServer = new TioServer(serverTioConfig); + this.serverTioConfig.setTioUuid(new HttpUuid()); } public void start() throws IOException { tioServer.start(this.httpConfig.getBindIp(), this.httpConfig.getBindPort()); } - public void stop() throws IOException { + public void stop() { tioServer.stop(); } diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java similarity index 97% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioHandler.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java index d03d4d2..5770480 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioHandler.java @@ -191,7 +191,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -package net.dreamlu.iot.mqtt.core; + +package net.dreamlu.iot.mqtt.core.server.http.core; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -214,7 +215,10 @@ import java.nio.ByteBuffer; import java.util.*; /** + * websocket 和 http 共存 + * * @author tanyaowu + * @author L.cm */ public class MqttWebServerAioHandler implements ServerAioHandler { private static Logger log = LoggerFactory.getLogger(org.tio.websocket.server.WsServerAioHandler.class); @@ -230,9 +234,8 @@ public class MqttWebServerAioHandler implements ServerAioHandler { /** * websocket子协议 */ - private static final String Sec_Websocket_Protocol = "sec-websocket-protocol"; - private static final HeaderName Header_Name_Sec_Websocket_Protocol = HeaderName.from(Sec_Websocket_Protocol); - + private static final String SEC_WEBSOCKET_PROTOCOL = "sec-websocket-protocol"; + private static final HeaderName HEADER_NAME_SEC_WEBSOCKET_PROTOCOL = HeaderName.from(SEC_WEBSOCKET_PROTOCOL); private final HttpConfig httpConfig; private final HttpRequestHandler requestHandler; private final IWsMsgHandler wsMsgHandler; @@ -370,7 +373,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler { String methodName = "onBytes"; return processRetObj(retObj, methodName, channelContext); } else if (opcode == Opcode.PING || opcode == Opcode.PONG) { - log.debug("收到" + opcode); + log.debug("收到{}", opcode); return null; } else if (opcode == Opcode.CLOSE) { Object retObj = wsMsgHandler.onClose(websocketPacket, bytes, channelContext); @@ -391,11 +394,10 @@ public class MqttWebServerAioHandler implements ServerAioHandler { HttpResponse httpResponse = request.httpConfig.getRespForBlackIp(); if (httpResponse != null) { Tio.send(channelContext, httpResponse); - return; } else { Tio.remove(channelContext, ip + "在黑名单中"); - return; } + return; } HttpResponse httpResponse = requestHandler.handler(request); if (httpResponse != null) { @@ -409,7 +411,8 @@ public class MqttWebServerAioHandler implements ServerAioHandler { return; } WsRequest wsRequest = (WsRequest) packet; - if (wsRequest.isHandShake()) {//是握手包 + // 判断握手包 + if (wsRequest.isHandShake()) { WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get(); HttpRequest request = wsSessionContext.getHandshakeRequest(); HttpResponse httpResponse = wsSessionContext.getHandshakeResponse(); @@ -419,7 +422,6 @@ public class MqttWebServerAioHandler implements ServerAioHandler { return; } wsSessionContext.setHandshakeResponse(response); - WsResponse wsResponse = new WsResponse(); wsResponse.setHandShake(true); Tio.send(channelContext, wsResponse); @@ -436,7 +438,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler { } } - private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext) throws Exception { + private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext) { if (obj == null) { return null; } else { @@ -466,7 +468,6 @@ public class MqttWebServerAioHandler implements ServerAioHandler { public HttpResponse updateWebSocketProtocol(HttpRequest request) { Map headers = request.getHeaders(); String secWebSocketKey = headers.get(HttpConst.RequestHeaderKey.Sec_WebSocket_Key); - if (StrUtil.isNotBlank(secWebSocketKey)) { byte[] secWebSocketKeyBytes; try { @@ -477,25 +478,22 @@ public class MqttWebServerAioHandler implements ServerAioHandler { byte[] allBs = new byte[secWebSocketKeyBytes.length + SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length]; System.arraycopy(secWebSocketKeyBytes, 0, allBs, 0, secWebSocketKeyBytes.length); System.arraycopy(SEC_WEBSOCKET_KEY_SUFFIX_BYTES, 0, allBs, secWebSocketKeyBytes.length, SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length); - byte[] keyArray = SHA1Util.SHA1(allBs); String acceptKey = BASE64Util.byteArrayToBase64(keyArray); HttpResponse httpResponse = new HttpResponse(request); - + // 101 协议转换 httpResponse.setStatus(HttpResponseStatus.C101); - Map respHeaders = new HashMap<>(); respHeaders.put(HeaderName.Connection, HeaderValue.Connection.Upgrade); respHeaders.put(HeaderName.Upgrade, HeaderValue.Upgrade.WebSocket); respHeaders.put(HeaderName.Sec_WebSocket_Accept, HeaderValue.from(acceptKey)); - // websocket 子协议协商 String[] supportedSubProtocols = wsMsgHandler.getSupportedSubProtocols(); if (supportedSubProtocols != null && supportedSubProtocols.length > 0) { - String requestedSubProtocols = headers.get(Sec_Websocket_Protocol); + String requestedSubProtocols = headers.get(SEC_WEBSOCKET_PROTOCOL); String selectSubProtocol = selectSubProtocol(requestedSubProtocols, supportedSubProtocols); if (selectSubProtocol != null) { - respHeaders.put(Header_Name_Sec_Websocket_Protocol, HeaderValue.from(selectSubProtocol)); + respHeaders.put(HEADER_NAME_SEC_WEBSOCKET_PROTOCOL, HeaderValue.from(selectSubProtocol)); } } httpResponse.addHeaders(respHeaders); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java new file mode 100644 index 0000000..3409147 --- /dev/null +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServerAioListener.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.core.server.http.core; + +import org.tio.core.ChannelContext; +import org.tio.core.Tio; +import org.tio.core.TioConfig; +import org.tio.core.intf.Packet; +import org.tio.http.common.HttpConst; +import org.tio.http.common.HttpRequest; +import org.tio.http.common.HttpResponse; +import org.tio.server.intf.ServerAioListener; + +/** + * 兼容 websocket,参考 HTTPServerAioListener + * + * @author tanyaowu + * @author L.cm + */ +public class MqttWebServerAioListener implements ServerAioListener { + + @Override + public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) { + + } + + @Override + public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) { + + } + + @Override + public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) { + if (!(packet instanceof HttpResponse)) { + return; + } + // 1. 短链接数据解绑 + TioConfig tioConfig = context.getTioConfig(); + tioConfig.groups.unbind(context); + tioConfig.bsIds.unbind(context); + tioConfig.ids.unbind(context); + tioConfig.clientNodes.remove(context); + tioConfig.tokens.unbind(context); + // 2. 关闭 + HttpResponse httpResponse = (HttpResponse) packet; + HttpRequest request = httpResponse.getHttpRequest(); + if (request != null) { + if (request.httpConfig.compatible1_0) { + if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) { + if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection!=keep-alive:" + request.getRequestLine()); + } + } else { + if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine()); + } + } + } else { + if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { + Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine()); + } + } + } + } + + @Override + public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) { + + } + + @Override + public void onAfterHandled(ChannelContext context, Packet packet, long cost) throws Exception { + + } + + @Override + public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception { + + } + + @Override + public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) { + return false; + } + +} diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MappingInfo.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HandlerInfo.java similarity index 70% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MappingInfo.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HandlerInfo.java index 07c13ff..b514922 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MappingInfo.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HandlerInfo.java @@ -14,32 +14,32 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; import org.tio.http.common.Method; import java.util.Objects; /** - * http Mapping info + * Handler info * * @author L.cm */ -public class MappingInfo { +public class HandlerInfo { private final Method method; - private final String path; + private final HttpHandler handler; - public MappingInfo(Method method, String path) { + public HandlerInfo(Method method, HttpHandler handler) { this.method = method; - this.path = path; + this.handler = handler; } public Method getMethod() { return method; } - public String getPath() { - return path; + public HttpHandler getHandler() { + return handler; } @Override @@ -47,17 +47,16 @@ public class MappingInfo { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof HandlerInfo)) { return false; } - MappingInfo that = (MappingInfo) o; + HandlerInfo that = (HandlerInfo) o; return method == that.method && - Objects.equals(path, that.path); + Objects.equals(handler, that.handler); } @Override public int hashCode() { - return Objects.hash(method, path); + return Objects.hash(method, handler); } - } diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpFilter.java similarity index 95% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpFilter.java index fee2ada..e32ee89 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpFilter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpHandler.java similarity index 94% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpHandler.java index af6e7a3..57eb199 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/HttpHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRequestHandler.java similarity index 82% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRequestHandler.java index 246091c..c79fde8 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRequestHandler.java @@ -14,13 +14,10 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; -import net.dreamlu.iot.mqtt.core.api.code.ResultCode; -import net.dreamlu.iot.mqtt.core.api.result.Result; -import net.dreamlu.iot.mqtt.core.core.HttpFilter; -import net.dreamlu.iot.mqtt.core.core.HttpHandler; -import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes; +import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; +import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.http.common.*; @@ -52,13 +49,17 @@ public class MqttHttpRequestHandler implements HttpRequestHandler { return resp500(request, requestLine, e); } // 2. 路由处理 - HttpHandler handler = MqttHttpRoutes.getHandler(requestLine); + HandlerInfo handler = MqttHttpRoutes.getHandler(requestLine); if (handler == null) { return resp404(request, requestLine); } - logger.info("mqtt http api {} path:{}", requestLine.getMethod().name(), requestLine.getPathAndQuery()); + Method method = requestLine.getMethod(); + if (handler.getMethod() != method) { + return Result.fail(new HttpResponse(request), ResultCode.E104); + } + logger.info("mqtt http api {} path:{}", method.name(), requestLine.getPathAndQuery()); try { - return handler.apply(request); + return handler.getHandler().apply(request); } catch (Exception e) { return resp500(request, requestLine, e); } diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRoutes.java similarity index 77% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRoutes.java index f132685..1317ce2 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/handler/MqttHttpRoutes.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core.core; +package net.dreamlu.iot.mqtt.core.server.http.handler; import org.tio.http.common.Method; import org.tio.http.common.RequestLine; @@ -28,7 +28,7 @@ import java.util.*; */ public final class MqttHttpRoutes { private static final LinkedList FILTERS = new LinkedList<>(); - private static final Map ROUTS = new HashMap<>(); + private static final Map ROUTS = new HashMap<>(); /** * 注册路由 @@ -66,18 +66,17 @@ public final class MqttHttpRoutes { * @param handler HttpHandler */ public static void register(Method method, String path, HttpHandler handler) { - ROUTS.put(new MappingInfo(method, path), handler); + ROUTS.put(path, new HandlerInfo(method, handler)); } /** * 读取路由 * - * @param method 请求方法 - * @param path 路径 - * @return HttpHandler + * @param path 路径 + * @return HandlerInfo */ - public static HttpHandler getHandler(Method method, String path) { - return ROUTS.get(new MappingInfo(method, path)); + public static HandlerInfo getHandler(String path) { + return ROUTS.get(path); } /** @@ -86,8 +85,8 @@ public final class MqttHttpRoutes { * @param requestLine RequestLine * @return HttpHandler */ - public static HttpHandler getHandler(RequestLine requestLine) { - return getHandler(requestLine.getMethod(), requestLine.getPath()); + public static HandlerInfo getHandler(RequestLine requestLine) { + return getHandler(requestLine.getPath()); } } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/package-info.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/package-info.java deleted file mode 100644 index 06f9436..0000000 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package net.dreamlu.iot.mqtt.core.server.http; diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java index a9436c0..271855e 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/websocket/MqttWsMsgHandler.java @@ -22,10 +22,10 @@ import net.dreamlu.iot.mqtt.codec.WriteBuffer; import org.tio.core.ChannelContext; import org.tio.core.Tio; import org.tio.core.TioConfig; +import org.tio.core.intf.AioHandler; import org.tio.core.intf.Packet; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; -import org.tio.server.intf.ServerAioHandler; import org.tio.websocket.common.WsRequest; import org.tio.websocket.common.WsResponse; import org.tio.websocket.server.handler.IWsMsgHandler; @@ -42,17 +42,19 @@ public class MqttWsMsgHandler implements IWsMsgHandler { * mqtt websocket message body key */ private static final String MQTT_WS_MSG_BODY_KEY = "MQTT_WS_MSG_BODY_KEY"; - + /** + * websocket 握手端点 + */ private final String[] supportedSubProtocols; - private final ServerAioHandler mqttServerAioHandler; + private final AioHandler mqttServerAioHandler; - public MqttWsMsgHandler(ServerAioHandler mqttServerAioHandler) { - this(new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, mqttServerAioHandler); + public MqttWsMsgHandler(AioHandler aioHandler) { + this(new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler); } - public MqttWsMsgHandler(String[] supportedSubProtocols, ServerAioHandler mqttServerAioHandler) { + public MqttWsMsgHandler(String[] supportedSubProtocols, AioHandler aioHandler) { this.supportedSubProtocols = supportedSubProtocols; - this.mqttServerAioHandler = mqttServerAioHandler; + this.mqttServerAioHandler = aioHandler; } @Override @@ -132,7 +134,7 @@ public class MqttWsMsgHandler implements IWsMsgHandler { /** * 读取 mqtt 消息体处理半包的情况 * - * @param bytes 消息类容 + * @param bytes 消息类容 * @return ByteBuffer */ private static synchronized ByteBuffer getMqttBody(WriteBuffer wsBody, byte[] bytes) { diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioListener.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioListener.java deleted file mode 100644 index bebd804..0000000 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebServerAioListener.java +++ /dev/null @@ -1,75 +0,0 @@ -package net.dreamlu.iot.mqtt.core; - -import org.tio.core.ChannelContext; -import org.tio.core.Tio; -import org.tio.core.intf.Packet; -import org.tio.http.common.HttpConst; -import org.tio.http.common.HttpRequest; -import org.tio.http.common.HttpResponse; -import org.tio.server.intf.ServerAioListener; - -/** - * 兼容 websocket,参考 HTTPServerAioListener - * - * @author tanyaowu - * @author L.cm - */ -public class MqttWebServerAioListener implements ServerAioListener { - - @Override - public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) { - - } - - @Override - public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) { - - } - - @Override - public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) { - if (!(packet instanceof HttpResponse)) { - return; - } - HttpResponse httpResponse = (HttpResponse) packet; - HttpRequest request = httpResponse.getHttpRequest(); - if (request != null) { - if (request.httpConfig.compatible1_0) { - if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) { - if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) { - Tio.remove(channelContext, "http 请求头Connection!=keep-alive:" + request.getRequestLine()); - } - } else { - if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { - Tio.remove(channelContext, "http 请求头Connection=close:" + request.getRequestLine()); - } - } - } else { - if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) { - Tio.remove(channelContext, "http 请求头Connection=close:" + request.getRequestLine()); - } - } - } - } - - @Override - public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) { - - } - - @Override - public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception { - - } - - @Override - public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception { - - } - - @Override - public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) { - return false; - } - -} diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebTest.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebTest.java deleted file mode 100644 index beb6a61..0000000 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.dreamlu.iot.mqtt.core; - -/** - * mqtt websocket 子协议测试 - */ -public class MqttWebTest { - - public static void main(String[] args) throws Exception { -// final int port = 8083; -// HttpConfig httpConfig = new HttpConfig(port, null, null, null); -// httpConfig.setUseSession(false); -// httpConfig.setCheckHost(false); -// httpConfig.setMonitorFileChange(false); -// -// HttpRequestHandler requestHandler = new MqttHttpRequestHandler(); -// -// MqttHttpApi httpApi = new MqttHttpApi(null); -// httpApi.register(); -// IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(); -// MqttWebServer httpServerStarter = new MqttWebServer(httpConfig, requestHandler, mqttWsMsgHandler); -// httpServerStarter.start(); //启动http服务器 - } - -} diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebsocketTest.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/http/MqttWebsocketTest.java similarity index 96% rename from mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebsocketTest.java rename to mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/http/MqttWebsocketTest.java index a20b57d..6bcaf2e 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttWebsocketTest.java +++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/http/MqttWebsocketTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package net.dreamlu.iot.mqtt.core; +package net.dreamlu.iot.mqtt.core.http; import java.io.IOException; diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/udp/UdpClusterTest1.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/udp/UdpClusterTest1.java index 0f24af0..33f8c2d 100644 --- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/udp/UdpClusterTest1.java +++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/udp/UdpClusterTest1.java @@ -11,7 +11,7 @@ public class UdpClusterTest1 { public static void main(String[] args) throws IOException { UdpTestHandler udpTestHandler = new UdpTestHandler(); - UdpClusterConfig udpServerConf = new UdpClusterConfig("224.0.0.1",12345, udpTestHandler, 5000); + UdpClusterConfig udpServerConf = new UdpClusterConfig("224.0.0.1", 12345, udpTestHandler, 5000); UdpCluster udpCluster = new UdpCluster(udpServerConf); udpCluster.start(); diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java index 6986010..0fc5c18 100644 --- a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/aliyun/MqttClientTest.java @@ -52,7 +52,7 @@ public class MqttClientTest { .connect(); client.subQos0("/sys/" + productKey + '/' + deviceName + "/thing/event/property/post_reply", (topic, payload) -> { - System.out.println(topic + '\t' +ByteBufferUtil.toString(payload)); + System.out.println(topic + '\t' + ByteBufferUtil.toString(payload)); }); String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":1}}"; diff --git a/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java new file mode 100644 index 0000000..e08afe3 --- /dev/null +++ b/mica-mqtt-example/src/main/java/net/dreamlu/iot/mqtt/http/MqttWebTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.iot.mqtt.http; + +import net.dreamlu.iot.mqtt.core.server.MqttServer; +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; +import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; +import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; +import net.dreamlu.iot.mqtt.core.server.http.api.MqttHttpApi; +import net.dreamlu.iot.mqtt.core.server.http.api.auth.BasicAuthFilter; +import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; +import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher; +import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler; +import org.tio.core.intf.AioHandler; +import org.tio.server.ServerTioConfig; +import org.tio.websocket.server.handler.IWsMsgHandler; + +/** + * mqtt websocket 子协议测试 + */ +public class MqttWebTest { + + public static void main(String[] args) throws Exception { + // 1. 消息转发处理器,可用来实现集群 + IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher(); + // 2. 收到消息,将消息转发出去 + IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> { + Message message = new Message(); + message.setTopic(topic); + message.setQos(mqttQoS.value()); + message.setPayload(payload.array()); + messageDispatcher.send(message); + }; + + // 3. 启动服务 + MqttServerCreator serverCreator = MqttServer.create() + .ip("0.0.0.0") + .port(1883) + .readBufferSize(512) + .messageDispatcher(messageDispatcher) + .messageListener(messageListener) + .websocketEnable(false) + .debug(); + MqttServer mqttServer = serverCreator.start(); + ServerTioConfig serverConfig = mqttServer.getServerConfig(); + AioHandler aioHandler = serverConfig.getAioHandler(); + + MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager()); + httpApi.register(); + MqttHttpRoutes.addFilter(new BasicAuthFilter("123", "123")); + + IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(aioHandler); + MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler); + ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig(); + httpIioConfig.share(serverConfig); + // 启动http服务器 + httpServerStarter.start(); + } + +} -- GitLab