提交 806e8b65 编写于 作者: 浅梦2013's avatar 浅梦2013

考虑内置 http api。

上级 b9f43e11
......@@ -76,7 +76,7 @@ public class ByteBufferUtil {
/**
* 转成 string
*
* @param buffer ByteBuffer
* @param buffer ByteBuffer
* @param charset Charset
* @return 字符串
*/
......
......@@ -26,13 +26,14 @@ public final class MqttIdentifierRejectedException extends DecoderException {
/**
* Creates a new instance
*/
public MqttIdentifierRejectedException() {}
public MqttIdentifierRejectedException() {
}
/**
* Creates a new instance
*
* @param message message
* @param cause Throwable
* @param cause Throwable
*/
public MqttIdentifierRejectedException(String message, Throwable cause) {
super(message, cause);
......
......@@ -486,7 +486,7 @@ public final class MqttProperties {
* then return the first one.
*
* @param mqttPropertyType Type of the property
* @param <T> 泛型标记
* @param <T> 泛型标记
* @return a property value if it is set, null otherwise
*/
public <T> T getPropertyValue(MqttPropertyType mqttPropertyType) {
......
......@@ -14,19 +14,19 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api;
package net.dreamlu.iot.mqtt.core.server.http.api;
import com.alibaba.fastjson.JSON;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.api.form.BaseForm;
import net.dreamlu.iot.mqtt.core.api.form.PayloadEncode;
import net.dreamlu.iot.mqtt.core.api.form.PublishForm;
import net.dreamlu.iot.mqtt.core.api.form.SubscribeForm;
import net.dreamlu.iot.mqtt.core.api.result.Result;
import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.server.http.api.form.BaseForm;
import net.dreamlu.iot.mqtt.core.server.http.api.form.PayloadEncode;
import net.dreamlu.iot.mqtt.core.server.http.api.form.PublishForm;
import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm;
import net.dreamlu.iot.mqtt.core.server.http.api.result.Result;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.tio.http.common.HttpRequest;
......
......@@ -14,15 +14,14 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.auth;
package net.dreamlu.iot.mqtt.core.server.http.api.auth;
import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.api.result.Result;
import net.dreamlu.iot.mqtt.core.core.HttpFilter;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import net.dreamlu.iot.mqtt.core.server.http.handler.HttpFilter;
import org.tio.http.common.*;
import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Objects;
/**
......@@ -31,12 +30,14 @@ import java.util.Objects;
* @author L.cm
*/
public class BasicAuthFilter implements HttpFilter {
public static final String BASIC_AUTH_HEADER_NAME = "Authorization";
public static final HeaderName WWW_AUTHENTICATE = HeaderName.from("WWW-Authenticate");
public static final HeaderValue BASIC_REALM = HeaderValue.from("Basic realm=\"Mica mqtt realm\"");
public static final String BASIC_AUTH_HEADER_NAME = "authorization";
public static final String AUTHORIZATION_PREFIX = "Basic ";
private final String token;
public BasicAuthFilter(String token) {
this.token = Objects.requireNonNull(token, "Basic auth token is null");
public BasicAuthFilter(String username, String password) {
this.token = getBasicToken(username, password);
}
@Override
......@@ -54,7 +55,15 @@ public class BasicAuthFilter implements HttpFilter {
@Override
public HttpResponse response(HttpRequest request, HttpResponse response) {
return Result.fail(response, ResultCode.E103);
response.addHeader(WWW_AUTHENTICATE, BASIC_REALM);
response.setStatus(HttpResponseStatus.C401);
return response;
}
private static String getBasicToken(String username, String password) {
Objects.requireNonNull(username, "Basic auth username is null");
Objects.requireNonNull(password, "Basic auth password is null");
byte[] tokenBytes = (username + ':' + password).getBytes(StandardCharsets.UTF_8);
return Base64.getEncoder().encodeToString(tokenBytes);
}
}
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.code;
package net.dreamlu.iot.mqtt.core.server.http.api.code;
import org.tio.http.common.HttpResponseStatus;
......@@ -40,6 +40,10 @@ public enum ResultCode {
* 用户名或密码错误
*/
E103(HttpResponseStatus.C401, 103),
/**
* 请求方法错误
*/
E104(HttpResponseStatus.C405, 104),
/**
* 未知错误
*/
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.form;
package net.dreamlu.iot.mqtt.core.server.http.api.form;
import java.io.Serializable;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.form;
package net.dreamlu.iot.mqtt.core.server.http.api.form;
import net.dreamlu.iot.mqtt.core.util.HexUtil;
import org.tio.utils.hutool.StrUtil;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.form;
package net.dreamlu.iot.mqtt.core.server.http.api.form;
/**
* 发布的模型
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.form;
package net.dreamlu.iot.mqtt.core.server.http.api.form;
/**
* 订阅表单
......
......@@ -14,10 +14,10 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.result;
package net.dreamlu.iot.mqtt.core.server.http.api.result;
import com.alibaba.fastjson.JSONObject;
import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode;
import org.tio.http.common.HeaderName;
import org.tio.http.common.HeaderValue;
import org.tio.http.common.HttpResponse;
......
......@@ -191,14 +191,14 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
package net.dreamlu.iot.mqtt.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
package net.dreamlu.iot.mqtt.core.server.http.core;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRequestHandler;
import org.tio.core.TcpConst;
import org.tio.http.common.HttpConfig;
import org.tio.http.common.HttpUuid;
import org.tio.http.common.TioConfigKey;
import org.tio.http.common.handler.HttpRequestHandler;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
......@@ -211,46 +211,41 @@ import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
/**
* mqtt web Server,集成 http 和 websocket
*
* @author tanyaowu
* @author L.cm
*/
public class MqttWebServer {
private static Logger log = LoggerFactory.getLogger(org.tio.http.server.HttpServerStarter.class);
private static final String TIO_SYSTEM_TIMER_PERIOD = "tio.system.timer.period";
private final MqttWebServerAioListener mqttWebServerAioListener;
private final HttpRequestHandler httpRequestHandler;
private HttpConfig httpConfig = null;
private MqttWebServerAioHandler mqttWebServerAioHandler = null;
private MqttWebServerAioListener mqttWebServerAioListener = null;
private HttpRequestHandler httpRequestHandler = null;
private ServerTioConfig serverTioConfig = null;
private MqttWebServerAioHandler mqttWebServerAioHandler = null;
private TioServer tioServer = null;
/**
* @param httpConfig
* @param requestHandler
* @author tanyaowu
*/
public MqttWebServer(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler) {
this(httpConfig, requestHandler, wsMsgHandler, null, null);
public MqttWebServer(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler) {
this(serverCreator, new MqttHttpRequestHandler(), wsMsgHandler);
}
public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler) {
this(serverCreator, requestHandler, wsMsgHandler, null, null);
}
/**
* @param httpConfig
* @param requestHandler
* @param tioExecutor
* @param groupExecutor
* @author tanyaowu
*/
public MqttWebServer(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) {
public MqttWebServer(MqttServerCreator serverCreator, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) {
if (tioExecutor == null) {
tioExecutor = Threads.getTioExecutor();
}
if (groupExecutor == null) {
groupExecutor = Threads.getGroupExecutor();
}
init(httpConfig, requestHandler, wsMsgHandler, tioExecutor, groupExecutor);
this.httpRequestHandler = requestHandler;
this.mqttWebServerAioListener = new MqttWebServerAioListener();
init(serverCreator, wsMsgHandler, tioExecutor, groupExecutor);
}
/**
* @return the httpConfig
*/
public HttpConfig getHttpConfig() {
return httpConfig;
}
......@@ -267,49 +262,34 @@ public class MqttWebServer {
return mqttWebServerAioListener;
}
/**
* @return the serverTioConfig
*/
public ServerTioConfig getServerTioConfig() {
return serverTioConfig;
}
private void init(HttpConfig httpConfig, HttpRequestHandler requestHandler, IWsMsgHandler wsMsgHandler,
private void init(MqttServerCreator serverCreator, IWsMsgHandler wsMsgHandler,
SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) {
String system_timer_period = System.getProperty("tio.system.timer.period");
if (StrUtil.isBlank(system_timer_period)) {
System.setProperty("tio.system.timer.period", "50");
String systemTimerPeriod = System.getProperty(TIO_SYSTEM_TIMER_PERIOD);
if (StrUtil.isBlank(systemTimerPeriod)) {
System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50");
}
HttpConfig httpConfig = new HttpConfig(serverCreator.getWebsocketPort(), false);
httpConfig.setName(serverCreator.getName() + "-HTTP/Websocket");
httpConfig.setCheckHost(false);
this.httpConfig = httpConfig;
this.httpRequestHandler = requestHandler;
httpConfig.setHttpRequestHandler(this.httpRequestHandler);
this.mqttWebServerAioHandler = new MqttWebServerAioHandler(httpConfig, requestHandler, wsMsgHandler);
this.mqttWebServerAioListener = new MqttWebServerAioListener();
String name = httpConfig.getName();
if (StrUtil.isBlank(name)) {
name = "Tio Http Server";
}
serverTioConfig = new ServerTioConfig(name, mqttWebServerAioHandler, mqttWebServerAioListener, tioExecutor, groupExecutor);
serverTioConfig.setHeartbeatTimeout(1000 * 20);
serverTioConfig.setShortConnection(true);
serverTioConfig.setReadBufferSize(TcpConst.MAX_DATA_LENGTH);
serverTioConfig.setAttribute(TioConfigKey.HTTP_REQ_HANDLER, this.httpRequestHandler);
tioServer = new TioServer(serverTioConfig);
HttpUuid imTioUuid = new HttpUuid();
serverTioConfig.setTioUuid(imTioUuid);
}
public void setHttpRequestHandler(HttpRequestHandler requestHandler) {
this.httpRequestHandler = requestHandler;
this.httpConfig.setHttpRequestHandler(this.httpRequestHandler);
this.mqttWebServerAioHandler = new MqttWebServerAioHandler(httpConfig, this.httpRequestHandler, wsMsgHandler);
this.serverTioConfig = new ServerTioConfig(this.httpConfig.getName(), mqttWebServerAioHandler, mqttWebServerAioListener, tioExecutor, groupExecutor);
this.serverTioConfig.setHeartbeatTimeout(1000 * 20);
this.serverTioConfig.setReadBufferSize(TcpConst.MAX_DATA_LENGTH);
this.tioServer = new TioServer(serverTioConfig);
this.serverTioConfig.setTioUuid(new HttpUuid());
}
public void start() throws IOException {
tioServer.start(this.httpConfig.getBindIp(), this.httpConfig.getBindPort());
}
public void stop() throws IOException {
public void stop() {
tioServer.stop();
}
......
......@@ -191,7 +191,8 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
package net.dreamlu.iot.mqtt.core;
package net.dreamlu.iot.mqtt.core.server.http.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -214,7 +215,10 @@ import java.nio.ByteBuffer;
import java.util.*;
/**
* websocket 和 http 共存
*
* @author tanyaowu
* @author L.cm
*/
public class MqttWebServerAioHandler implements ServerAioHandler {
private static Logger log = LoggerFactory.getLogger(org.tio.websocket.server.WsServerAioHandler.class);
......@@ -230,9 +234,8 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
/**
* websocket子协议
*/
private static final String Sec_Websocket_Protocol = "sec-websocket-protocol";
private static final HeaderName Header_Name_Sec_Websocket_Protocol = HeaderName.from(Sec_Websocket_Protocol);
private static final String SEC_WEBSOCKET_PROTOCOL = "sec-websocket-protocol";
private static final HeaderName HEADER_NAME_SEC_WEBSOCKET_PROTOCOL = HeaderName.from(SEC_WEBSOCKET_PROTOCOL);
private final HttpConfig httpConfig;
private final HttpRequestHandler requestHandler;
private final IWsMsgHandler wsMsgHandler;
......@@ -370,7 +373,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
String methodName = "onBytes";
return processRetObj(retObj, methodName, channelContext);
} else if (opcode == Opcode.PING || opcode == Opcode.PONG) {
log.debug("收到" + opcode);
log.debug("收到{}", opcode);
return null;
} else if (opcode == Opcode.CLOSE) {
Object retObj = wsMsgHandler.onClose(websocketPacket, bytes, channelContext);
......@@ -391,11 +394,10 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
HttpResponse httpResponse = request.httpConfig.getRespForBlackIp();
if (httpResponse != null) {
Tio.send(channelContext, httpResponse);
return;
} else {
Tio.remove(channelContext, ip + "在黑名单中");
return;
}
return;
}
HttpResponse httpResponse = requestHandler.handler(request);
if (httpResponse != null) {
......@@ -409,7 +411,8 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
return;
}
WsRequest wsRequest = (WsRequest) packet;
if (wsRequest.isHandShake()) {//是握手包
// 判断握手包
if (wsRequest.isHandShake()) {
WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();
HttpRequest request = wsSessionContext.getHandshakeRequest();
HttpResponse httpResponse = wsSessionContext.getHandshakeResponse();
......@@ -419,7 +422,6 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
return;
}
wsSessionContext.setHandshakeResponse(response);
WsResponse wsResponse = new WsResponse();
wsResponse.setHandShake(true);
Tio.send(channelContext, wsResponse);
......@@ -436,7 +438,7 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
}
}
private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext) throws Exception {
private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext) {
if (obj == null) {
return null;
} else {
......@@ -466,7 +468,6 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
public HttpResponse updateWebSocketProtocol(HttpRequest request) {
Map<String, String> headers = request.getHeaders();
String secWebSocketKey = headers.get(HttpConst.RequestHeaderKey.Sec_WebSocket_Key);
if (StrUtil.isNotBlank(secWebSocketKey)) {
byte[] secWebSocketKeyBytes;
try {
......@@ -477,25 +478,22 @@ public class MqttWebServerAioHandler implements ServerAioHandler {
byte[] allBs = new byte[secWebSocketKeyBytes.length + SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length];
System.arraycopy(secWebSocketKeyBytes, 0, allBs, 0, secWebSocketKeyBytes.length);
System.arraycopy(SEC_WEBSOCKET_KEY_SUFFIX_BYTES, 0, allBs, secWebSocketKeyBytes.length, SEC_WEBSOCKET_KEY_SUFFIX_BYTES.length);
byte[] keyArray = SHA1Util.SHA1(allBs);
String acceptKey = BASE64Util.byteArrayToBase64(keyArray);
HttpResponse httpResponse = new HttpResponse(request);
// 101 协议转换
httpResponse.setStatus(HttpResponseStatus.C101);
Map<HeaderName, HeaderValue> respHeaders = new HashMap<>();
respHeaders.put(HeaderName.Connection, HeaderValue.Connection.Upgrade);
respHeaders.put(HeaderName.Upgrade, HeaderValue.Upgrade.WebSocket);
respHeaders.put(HeaderName.Sec_WebSocket_Accept, HeaderValue.from(acceptKey));
// websocket 子协议协商
String[] supportedSubProtocols = wsMsgHandler.getSupportedSubProtocols();
if (supportedSubProtocols != null && supportedSubProtocols.length > 0) {
String requestedSubProtocols = headers.get(Sec_Websocket_Protocol);
String requestedSubProtocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
String selectSubProtocol = selectSubProtocol(requestedSubProtocols, supportedSubProtocols);
if (selectSubProtocol != null) {
respHeaders.put(Header_Name_Sec_Websocket_Protocol, HeaderValue.from(selectSubProtocol));
respHeaders.put(HEADER_NAME_SEC_WEBSOCKET_PROTOCOL, HeaderValue.from(selectSubProtocol));
}
}
httpResponse.addHeaders(respHeaders);
......
package net.dreamlu.iot.mqtt.core;
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.http.core;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.http.common.HttpConst;
import org.tio.http.common.HttpRequest;
......@@ -17,58 +34,66 @@ import org.tio.server.intf.ServerAioListener;
public class MqttWebServerAioListener implements ServerAioListener {
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) {
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) {
}
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) {
public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) {
}
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) {
public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) {
if (!(packet instanceof HttpResponse)) {
return;
}
// 1. 短链接数据解绑
TioConfig tioConfig = context.getTioConfig();
tioConfig.groups.unbind(context);
tioConfig.bsIds.unbind(context);
tioConfig.ids.unbind(context);
tioConfig.clientNodes.remove(context);
tioConfig.tokens.unbind(context);
// 2. 关闭
HttpResponse httpResponse = (HttpResponse) packet;
HttpRequest request = httpResponse.getHttpRequest();
if (request != null) {
if (request.httpConfig.compatible1_0) {
if (HttpConst.HttpVersion.V1_0.equals(request.requestLine.version)) {
if (!HttpConst.RequestHeaderValue.Connection.keep_alive.equals(request.getConnection())) {
Tio.remove(channelContext, "http 请求头Connection!=keep-alive:" + request.getRequestLine());
Tio.remove(context, "http 请求头Connection!=keep-alive:" + request.getRequestLine());
}
} else {
if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) {
Tio.remove(channelContext, "http 请求头Connection=close:" + request.getRequestLine());
Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine());
}
}
} else {
if (HttpConst.RequestHeaderValue.Connection.close.equals(request.getConnection())) {
Tio.remove(channelContext, "http 请求头Connection=close:" + request.getRequestLine());
Tio.remove(context, "http 请求头Connection=close:" + request.getRequestLine());
}
}
}
}
@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
}
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
public void onAfterHandled(ChannelContext context, Packet packet, long cost) throws Exception {
}
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception {
}
@Override
public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) {
return false;
}
......
......@@ -14,32 +14,32 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.core;
package net.dreamlu.iot.mqtt.core.server.http.handler;
import org.tio.http.common.Method;
import java.util.Objects;
/**
* http Mapping info
* Handler info
*
* @author L.cm
*/
public class MappingInfo {
public class HandlerInfo {
private final Method method;
private final String path;
private final HttpHandler handler;
public MappingInfo(Method method, String path) {
public HandlerInfo(Method method, HttpHandler handler) {
this.method = method;
this.path = path;
this.handler = handler;
}
public Method getMethod() {
return method;
}
public String getPath() {
return path;
public HttpHandler getHandler() {
return handler;
}
@Override
......@@ -47,17 +47,16 @@ public class MappingInfo {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
if (!(o instanceof HandlerInfo)) {
return false;
}
MappingInfo that = (MappingInfo) o;
HandlerInfo that = (HandlerInfo) o;
return method == that.method &&
Objects.equals(path, that.path);
Objects.equals(handler, that.handler);
}
@Override
public int hashCode() {
return Objects.hash(method, path);
return Objects.hash(method, handler);
}
}
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.core;
package net.dreamlu.iot.mqtt.core.server.http.handler;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.core;
package net.dreamlu.iot.mqtt.core.server.http.handler;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
......
......@@ -14,13 +14,10 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core;
package net.dreamlu.iot.mqtt.core.server.http.handler;
import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.api.result.Result;
import net.dreamlu.iot.mqtt.core.core.HttpFilter;
import net.dreamlu.iot.mqtt.core.core.HttpHandler;
import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.server.http.api.result.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.http.common.*;
......@@ -52,13 +49,17 @@ public class MqttHttpRequestHandler implements HttpRequestHandler {
return resp500(request, requestLine, e);
}
// 2. 路由处理
HttpHandler handler = MqttHttpRoutes.getHandler(requestLine);
HandlerInfo handler = MqttHttpRoutes.getHandler(requestLine);
if (handler == null) {
return resp404(request, requestLine);
}
logger.info("mqtt http api {} path:{}", requestLine.getMethod().name(), requestLine.getPathAndQuery());
Method method = requestLine.getMethod();
if (handler.getMethod() != method) {
return Result.fail(new HttpResponse(request), ResultCode.E104);
}
logger.info("mqtt http api {} path:{}", method.name(), requestLine.getPathAndQuery());
try {
return handler.apply(request);
return handler.getHandler().apply(request);
} catch (Exception e) {
return resp500(request, requestLine, e);
}
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.core;
package net.dreamlu.iot.mqtt.core.server.http.handler;
import org.tio.http.common.Method;
import org.tio.http.common.RequestLine;
......@@ -28,7 +28,7 @@ import java.util.*;
*/
public final class MqttHttpRoutes {
private static final LinkedList<HttpFilter> FILTERS = new LinkedList<>();
private static final Map<MappingInfo, HttpHandler> ROUTS = new HashMap<>();
private static final Map<String, HandlerInfo> ROUTS = new HashMap<>();
/**
* 注册路由
......@@ -66,18 +66,17 @@ public final class MqttHttpRoutes {
* @param handler HttpHandler
*/
public static void register(Method method, String path, HttpHandler handler) {
ROUTS.put(new MappingInfo(method, path), handler);
ROUTS.put(path, new HandlerInfo(method, handler));
}
/**
* 读取路由
*
* @param method 请求方法
* @param path 路径
* @return HttpHandler
* @param path 路径
* @return HandlerInfo
*/
public static HttpHandler getHandler(Method method, String path) {
return ROUTS.get(new MappingInfo(method, path));
public static HandlerInfo getHandler(String path) {
return ROUTS.get(path);
}
/**
......@@ -86,8 +85,8 @@ public final class MqttHttpRoutes {
* @param requestLine RequestLine
* @return HttpHandler
*/
public static HttpHandler getHandler(RequestLine requestLine) {
return getHandler(requestLine.getMethod(), requestLine.getPath());
public static HandlerInfo getHandler(RequestLine requestLine) {
return getHandler(requestLine.getPath());
}
}
......@@ -22,10 +22,10 @@ import net.dreamlu.iot.mqtt.codec.WriteBuffer;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.AioHandler;
import org.tio.core.intf.Packet;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.server.intf.ServerAioHandler;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.server.handler.IWsMsgHandler;
......@@ -42,17 +42,19 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
* mqtt websocket message body key
*/
private static final String MQTT_WS_MSG_BODY_KEY = "MQTT_WS_MSG_BODY_KEY";
/**
* websocket 握手端点
*/
private final String[] supportedSubProtocols;
private final ServerAioHandler mqttServerAioHandler;
private final AioHandler mqttServerAioHandler;
public MqttWsMsgHandler(ServerAioHandler mqttServerAioHandler) {
this(new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, mqttServerAioHandler);
public MqttWsMsgHandler(AioHandler aioHandler) {
this(new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler);
}
public MqttWsMsgHandler(String[] supportedSubProtocols, ServerAioHandler mqttServerAioHandler) {
public MqttWsMsgHandler(String[] supportedSubProtocols, AioHandler aioHandler) {
this.supportedSubProtocols = supportedSubProtocols;
this.mqttServerAioHandler = mqttServerAioHandler;
this.mqttServerAioHandler = aioHandler;
}
@Override
......@@ -132,7 +134,7 @@ public class MqttWsMsgHandler implements IWsMsgHandler {
/**
* 读取 mqtt 消息体处理半包的情况
*
* @param bytes 消息类容
* @param bytes 消息类容
* @return ByteBuffer
*/
private static synchronized ByteBuffer getMqttBody(WriteBuffer wsBody, byte[] bytes) {
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core;
/**
* mqtt websocket 子协议测试
*/
public class MqttWebTest {
public static void main(String[] args) throws Exception {
// final int port = 8083;
// HttpConfig httpConfig = new HttpConfig(port, null, null, null);
// httpConfig.setUseSession(false);
// httpConfig.setCheckHost(false);
// httpConfig.setMonitorFileChange(false);
//
// HttpRequestHandler requestHandler = new MqttHttpRequestHandler();
//
// MqttHttpApi httpApi = new MqttHttpApi(null);
// httpApi.register();
// IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler();
// MqttWebServer httpServerStarter = new MqttWebServer(httpConfig, requestHandler, mqttWsMsgHandler);
// httpServerStarter.start(); //启动http服务器
}
}
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core;
package net.dreamlu.iot.mqtt.core.http;
import java.io.IOException;
......
......@@ -11,7 +11,7 @@ public class UdpClusterTest1 {
public static void main(String[] args) throws IOException {
UdpTestHandler udpTestHandler = new UdpTestHandler();
UdpClusterConfig udpServerConf = new UdpClusterConfig("224.0.0.1",12345, udpTestHandler, 5000);
UdpClusterConfig udpServerConf = new UdpClusterConfig("224.0.0.1", 12345, udpTestHandler, 5000);
UdpCluster udpCluster = new UdpCluster(udpServerConf);
udpCluster.start();
......
......@@ -52,7 +52,7 @@ public class MqttClientTest {
.connect();
client.subQos0("/sys/" + productKey + '/' + deviceName + "/thing/event/property/post_reply", (topic, payload) -> {
System.out.println(topic + '\t' +ByteBufferUtil.toString(payload));
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
});
String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":1}}";
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.http;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.http.api.MqttHttpApi;
import net.dreamlu.iot.mqtt.core.server.http.api.auth.BasicAuthFilter;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.websocket.MqttWsMsgHandler;
import org.tio.core.intf.AioHandler;
import org.tio.server.ServerTioConfig;
import org.tio.websocket.server.handler.IWsMsgHandler;
/**
* mqtt websocket 子协议测试
*/
public class MqttWebTest {
public static void main(String[] args) throws Exception {
// 1. 消息转发处理器,可用来实现集群
IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher();
// 2. 收到消息,将消息转发出去
IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> {
Message message = new Message();
message.setTopic(topic);
message.setQos(mqttQoS.value());
message.setPayload(payload.array());
messageDispatcher.send(message);
};
// 3. 启动服务
MqttServerCreator serverCreator = MqttServer.create()
.ip("0.0.0.0")
.port(1883)
.readBufferSize(512)
.messageDispatcher(messageDispatcher)
.messageListener(messageListener)
.websocketEnable(false)
.debug();
MqttServer mqttServer = serverCreator.start();
ServerTioConfig serverConfig = mqttServer.getServerConfig();
AioHandler aioHandler = serverConfig.getAioHandler();
MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher(), serverCreator.getSessionManager());
httpApi.register();
MqttHttpRoutes.addFilter(new BasicAuthFilter("123", "123"));
IWsMsgHandler mqttWsMsgHandler = new MqttWsMsgHandler(aioHandler);
MqttWebServer httpServerStarter = new MqttWebServer(serverCreator, mqttWsMsgHandler);
ServerTioConfig httpIioConfig = httpServerStarter.getServerTioConfig();
httpIioConfig.share(serverConfig);
// 启动http服务器
httpServerStarter.start();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册