提交 b9f43e11 编写于 作者: 浅梦2013's avatar 浅梦2013

考虑内置 http api。

上级 624a57e9
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.util;
import org.tio.utils.hutool.StrUtil;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* hex 工具,编解码全用 byte
*
* @author L.cm
*/
public final class HexUtil {
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private static final byte[] DIGITS_LOWER = new byte[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
private static final byte[] DIGITS_UPPER = new byte[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
/**
* encode Hex
*
* @param data data to hex
* @return hex bytes
*/
public static byte[] encode(byte[] data) {
return encode(data, true);
}
/**
* encode Hex
*
* @param data data to hex
* @param toLowerCase 是否小写
* @return hex bytes
*/
public static byte[] encode(byte[] data, boolean toLowerCase) {
return encode(data, toLowerCase ? DIGITS_LOWER : DIGITS_UPPER);
}
/**
* encode Hex
*
* @param data Data to Hex
* @return bytes as a hex string
*/
private static byte[] encode(byte[] data, byte[] digits) {
int len = data.length;
byte[] out = new byte[len << 1];
for (int i = 0, j = 0; i < len; i++) {
out[j++] = digits[(0xF0 & data[i]) >>> 4];
out[j++] = digits[0xF & data[i]];
}
return out;
}
/**
* encode Hex
*
* @param data Data to Hex
* @param toLowerCase 是否小写
* @return bytes as a hex string
*/
public static String encodeToString(byte[] data, boolean toLowerCase) {
return new String(encode(data, toLowerCase), DEFAULT_CHARSET);
}
/**
* encode Hex
*
* @param data Data to Hex
* @return bytes as a hex string
*/
public static String encodeToString(byte[] data) {
return new String(encode(data), DEFAULT_CHARSET);
}
/**
* encode Hex
*
* @param data Data to Hex
* @return bytes as a hex string
*/
public static String encodeToString(String data) {
if (StrUtil.isBlank(data)) {
return null;
}
return encodeToString(data.getBytes(DEFAULT_CHARSET));
}
/**
* decode Hex
*
* @param data Hex data
* @return decode hex to bytes
*/
public static byte[] decode(String data) {
if (StrUtil.isBlank(data)) {
return null;
}
return decode(data.getBytes(DEFAULT_CHARSET));
}
/**
* decodeToString Hex
*
* @param data Data to Hex
* @return bytes as a hex string
*/
public static String decodeToString(byte[] data) {
byte[] decodeBytes = decode(data);
return new String(decodeBytes, DEFAULT_CHARSET);
}
/**
* decodeToString Hex
*
* @param data Data to Hex
* @return bytes as a hex string
*/
public static String decodeToString(String data) {
if (StrUtil.isBlank(data)) {
return null;
}
return decodeToString(data.getBytes(DEFAULT_CHARSET));
}
/**
* decode Hex
*
* @param data Hex data
* @return decode hex to bytes
*/
public static byte[] decode(byte[] data) {
int len = data.length;
if ((len & 0x01) != 0) {
throw new IllegalArgumentException("hexBinary needs to be even-length: " + len);
}
byte[] out = new byte[len >> 1];
for (int i = 0, j = 0; j < len; i++) {
int f = toDigit(data[j], j) << 4;
j++;
f |= toDigit(data[j], j);
j++;
out[i] = (byte) (f & 0xFF);
}
return out;
}
private static int toDigit(byte b, int index) {
int digit = Character.digit(b, 16);
if (digit == -1) {
throw new IllegalArgumentException("Illegal hexadecimal byte " + b + " at index " + index);
}
return digit;
}
}
......@@ -18,6 +18,7 @@ package net.dreamlu.iot.mqtt.core;
import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.api.result.Result;
import net.dreamlu.iot.mqtt.core.core.HttpFilter;
import net.dreamlu.iot.mqtt.core.core.HttpHandler;
import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes;
import org.slf4j.Logger;
......@@ -25,6 +26,8 @@ import org.slf4j.LoggerFactory;
import org.tio.http.common.*;
import org.tio.http.common.handler.HttpRequestHandler;
import java.util.List;
/**
* mqtt http 消息处理
*
......@@ -36,10 +39,24 @@ public class MqttHttpRequestHandler implements HttpRequestHandler {
@Override
public HttpResponse handler(HttpRequest request) {
RequestLine requestLine = request.getRequestLine();
// 1. 处理过滤器
List<HttpFilter> httpFilters = MqttHttpRoutes.getFilters();
try {
for (HttpFilter filter : httpFilters) {
if (!filter.filter(request)) {
HttpResponse response = new HttpResponse(request);
return filter.response(request, response);
}
}
} catch (Exception e) {
return resp500(request, requestLine, e);
}
// 2. 路由处理
HttpHandler handler = MqttHttpRoutes.getHandler(requestLine);
if (handler == null) {
return resp404(request, requestLine);
}
logger.info("mqtt http api {} path:{}", requestLine.getMethod().name(), requestLine.getPathAndQuery());
try {
return handler.apply(request);
} catch (Exception e) {
......
......@@ -17,14 +17,26 @@
package net.dreamlu.iot.mqtt.core.api;
import com.alibaba.fastjson.JSON;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.api.form.BaseForm;
import net.dreamlu.iot.mqtt.core.api.form.PayloadEncode;
import net.dreamlu.iot.mqtt.core.api.form.PublishForm;
import net.dreamlu.iot.mqtt.core.api.form.SubscribeForm;
import net.dreamlu.iot.mqtt.core.api.result.Result;
import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.http.common.HttpResponseStatus;
import org.tio.http.common.Method;
import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Function;
/**
* mqtt http api
......@@ -32,10 +44,13 @@ import org.tio.http.common.Method;
* @author L.cm
*/
public class MqttHttpApi {
private final MqttServer mqttServer;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttSessionManager sessionManager;
public MqttHttpApi(MqttServer mqttServer) {
this.mqttServer = mqttServer;
public MqttHttpApi(IMqttMessageDispatcher messageDispatcher,
IMqttSessionManager sessionManager) {
this.messageDispatcher = messageDispatcher;
this.sessionManager = sessionManager;
}
/**
......@@ -47,17 +62,19 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse publish(HttpRequest request) throws Exception {
byte[] requestBody = request.getBody();
PublishForm form = readForm(request, (requestBody) ->
JSON.parseObject(requestBody, PublishForm.class)
);
HttpResponse response = new HttpResponse();
if (requestBody == null) {
response.setStatus(HttpResponseStatus.C400);
return response;
if (form == null) {
return Result.fail(response, ResultCode.E101);
}
PublishForm form = JSON.parseObject(requestBody, PublishForm.class);
String clientId = form.getClientId();
String topic = form.getTopic();
String payload = form.getPayload();
// mqttServer.publish()
// 表单校验
HttpResponse validResponse = validForm(form, response);
if (validResponse != null) {
return validResponse;
}
send(form);
return Result.ok(response);
}
......@@ -70,7 +87,42 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse publishBatch(HttpRequest request) throws Exception {
return null;
List<PublishForm> formList = readForm(request, (requestBody) -> {
String jsonBody = new String(requestBody, StandardCharsets.UTF_8);
return JSON.parseArray(jsonBody, PublishForm.class);
});
HttpResponse response = new HttpResponse();
if (formList == null || formList.isEmpty()) {
return Result.fail(response, ResultCode.E101);
}
// 参数校验,保证一个批次同时不成功,所以先校验
for (PublishForm form : formList) {
// 表单校验
HttpResponse validResponse = validForm(form, response);
if (validResponse != null) {
return validResponse;
}
}
// 批量发送
for (PublishForm form : formList) {
send(form);
}
return Result.ok(response);
}
private void send(PublishForm form) {
String payload = form.getPayload();
Message message = new Message();
message.setMessageType(MqttMessageType.PUBLISH.value());
message.setClientId(form.getClientId());
message.setTopic(form.getTopic());
message.setQos(form.getQos());
message.setRetain(form.isRetain());
// payload 解码
if (StrUtil.isNotBlank(payload)) {
message.setPayload(PayloadEncode.decode(payload, form.getEncoding()));
}
messageDispatcher.send(message);
}
/**
......@@ -82,7 +134,25 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse subscribe(HttpRequest request) throws Exception {
return null;
SubscribeForm form = readForm(request, (requestBody) ->
JSON.parseObject(requestBody, SubscribeForm.class)
);
HttpResponse response = new HttpResponse();
if (form == null) {
return Result.fail(response, ResultCode.E101);
}
// 表单校验
HttpResponse validResponse = validForm(form, response);
if (validResponse != null) {
return validResponse;
}
int qos = form.getQos();
if (qos < 0 || qos > 2) {
return Result.fail(response, ResultCode.E101);
}
// 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(qos));
return Result.ok(response);
}
/**
......@@ -94,7 +164,32 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse subscribeBatch(HttpRequest request) throws Exception {
return null;
List<SubscribeForm> formList = readForm(request, (requestBody) -> {
String jsonBody = new String(requestBody, StandardCharsets.UTF_8);
return JSON.parseArray(jsonBody, SubscribeForm.class);
});
HttpResponse response = new HttpResponse();
if (formList == null || formList.isEmpty()) {
return Result.fail(response, ResultCode.E101);
}
// 参数校验,保证一个批次同时不成功,所以先校验
for (SubscribeForm form : formList) {
// 表单校验
HttpResponse validResponse = validForm(form, response);
if (validResponse != null) {
return validResponse;
}
int qos = form.getQos();
if (qos < 0 || qos > 2) {
return Result.fail(response, ResultCode.E101);
}
}
// 批量处理
for (SubscribeForm form : formList) {
// 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(form.getQos()));
}
return Result.ok(response);
}
/**
......@@ -106,7 +201,21 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse unsubscribe(HttpRequest request) throws Exception {
return null;
BaseForm form = readForm(request, (requestBody) ->
JSON.parseObject(requestBody, BaseForm.class)
);
HttpResponse response = new HttpResponse();
if (form == null) {
return Result.fail(response, ResultCode.E101);
}
// 表单校验
HttpResponse validResponse = validForm(form, response);
if (validResponse != null) {
return validResponse;
}
// 接口手动取消的订阅关系,可用来调试,不建议其他场景使用
sessionManager.removeSubscribe(form.getTopic(), form.getClientId());
return Result.ok(response);
}
/**
......@@ -118,6 +227,63 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse unsubscribeBatch(HttpRequest request) throws Exception {
List<BaseForm> formList = readForm(request, (requestBody) -> {
String jsonBody = new String(requestBody, StandardCharsets.UTF_8);
return JSON.parseArray(jsonBody, BaseForm.class);
});
HttpResponse response = new HttpResponse();
if (formList == null || formList.isEmpty()) {
return Result.fail(response, ResultCode.E101);
}
// 参数校验,保证一个批次同时不成功,所以先校验
for (BaseForm form : formList) {
// 表单校验
HttpResponse validResponse = validForm(form, response);
if (validResponse != null) {
return validResponse;
}
}
// 批量处理
for (BaseForm form : formList) {
// 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
sessionManager.removeSubscribe(form.getTopic(), form.getClientId());
}
return Result.ok(response);
}
/**
* 读取表单
*
* @param request HttpRequest
* @param function Function
* @param <T> 泛型
* @return 表单
*/
private static <T> T readForm(HttpRequest request, Function<byte[], T> function) {
byte[] requestBody = request.getBody();
if (requestBody == null) {
return null;
}
return function.apply(requestBody);
}
/**
* 校验表单
*
* @param form BaseForm
* @param response HttpResponse
* @return 表单
*/
private static HttpResponse validForm(BaseForm form, HttpResponse response) {
// 必须的参数
String clientId = form.getClientId();
if (StrUtil.isBlank(clientId)) {
return Result.fail(response, ResultCode.E101);
}
String topic = form.getTopic();
if (StrUtil.isBlank(topic)) {
return Result.fail(response, ResultCode.E101);
}
return null;
}
......@@ -126,12 +292,12 @@ public class MqttHttpApi {
*/
public void register() {
// @formatter:off
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish", this::publish);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish/batch", this::publishBatch);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe", this::subscribe);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe/batch", this::subscribeBatch);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe", this::unsubscribe);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe/batch", this::unsubscribeBatch);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish", this::publish);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish/batch", this::publishBatch);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe", this::subscribe);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe/batch", this::subscribeBatch);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe", this::unsubscribe);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe/batch", this::unsubscribeBatch);
// @formatter:on
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.auth;
import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.api.result.Result;
import net.dreamlu.iot.mqtt.core.core.HttpFilter;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.utils.hutool.StrUtil;
import java.util.Objects;
/**
* Basic 认证
*
* @author L.cm
*/
public class BasicAuthFilter implements HttpFilter {
public static final String BASIC_AUTH_HEADER_NAME = "Authorization";
public static final String AUTHORIZATION_PREFIX = "Basic ";
private final String token;
public BasicAuthFilter(String token) {
this.token = Objects.requireNonNull(token, "Basic auth token is null");
}
@Override
public boolean filter(HttpRequest request) throws Exception {
String authorization = request.getHeader(BASIC_AUTH_HEADER_NAME);
if (StrUtil.isBlank(authorization)) {
return false;
}
int length = AUTHORIZATION_PREFIX.length();
if (length >= authorization.length()) {
return false;
}
return token.equals(authorization.substring(length));
}
@Override
public HttpResponse response(HttpRequest request, HttpResponse response) {
return Result.fail(response, ResultCode.E103);
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.form;
import net.dreamlu.iot.mqtt.core.util.HexUtil;
import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
/**
* 消息正文编码
*
* @author L.cm
*/
public enum PayloadEncode {
/**
* 纯文本、hex、base64
*/
plain {
@Override
public byte[] decode(String data) {
return data.getBytes(StandardCharsets.UTF_8);
}
},
hex {
@Override
public byte[] decode(String data) {
return HexUtil.decode(data);
}
},
base64 {
@Override
public byte[] decode(String data) {
return Base64.getDecoder().decode(data);
}
};
/**
* 解码
*
* @return byte array
*/
public abstract byte[] decode(String data);
/**
* 解码
*
* @param data data
* @param encoding encoding
* @return byte array
*/
public static byte[] decode(String data, String encoding) {
return PayloadEncode.getEncode(encoding).decode(data);
}
/**
* 获取解码器
*
* @param encoding encoding
* @return PayloadEncode
*/
public static PayloadEncode getEncode(String encoding) {
if (StrUtil.isBlank(encoding)) {
return PayloadEncode.plain;
}
PayloadEncode[] values = PayloadEncode.values();
for (PayloadEncode encode : values) {
if (encode.name().equalsIgnoreCase(encoding)) {
return encode;
}
}
return PayloadEncode.plain;
}
}
......@@ -23,10 +23,6 @@ package net.dreamlu.iot.mqtt.core.api.form;
*/
public class PublishForm extends BaseForm {
/**
* 以 , 分割的多个主题,使用此字段能够同时发布消息到多个主题
*/
private String topics;
/**
* 消息正文
*/
......@@ -44,14 +40,6 @@ public class PublishForm extends BaseForm {
*/
private boolean retain = false;
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getPayload() {
return payload;
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.api.form;
/**
* 订阅表单
*
* @author L.cm
*/
public class SubscribeForm extends BaseForm {
/**
* QoS 等级 0
*/
private int qos = 0;
public int getQos() {
return qos;
}
public void setQos(int qos) {
this.qos = qos;
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.core;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
/**
* http 过滤器
*
* @author L.cm
*/
public interface HttpFilter {
/**
* 处理请求
*
* @param request HttpRequest
* @return 可以为null
* @throws Exception Exception
*/
boolean filter(HttpRequest request) throws Exception;
/**
* 响应
*
* @param request HttpRequest
* @param response HttpResponse
* @return HttpResponse
*/
HttpResponse response(HttpRequest request, HttpResponse response);
}
......@@ -30,10 +30,10 @@ public interface HttpHandler {
/**
* 处理请求
*
* @param packet HttpRequest
* @param request HttpRequest
* @return 可以为null
* @throws Exception Exception
*/
HttpResponse apply(HttpRequest packet) throws Exception;
HttpResponse apply(HttpRequest request) throws Exception;
}
......@@ -19,8 +19,7 @@ package net.dreamlu.iot.mqtt.core.core;
import org.tio.http.common.Method;
import org.tio.http.common.RequestLine;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
/**
* mqtt http api 路由
......@@ -28,8 +27,37 @@ import java.util.Map;
* @author L.cm
*/
public final class MqttHttpRoutes {
private static final LinkedList<HttpFilter> FILTERS = new LinkedList<>();
private static final Map<MappingInfo, HttpHandler> ROUTS = new HashMap<>();
/**
* 注册路由
*
* @param filter HttpFilter
*/
public static void addFilter(HttpFilter filter) {
FILTERS.add(filter);
}
/**
* 注册路由
*
* @param index index
* @param filter HttpFilter
*/
public static void addFilter(int index, HttpFilter filter) {
FILTERS.add(index, filter);
}
/**
* 读取所以的过滤器
*
* @return 过滤器集合
*/
public static List<HttpFilter> getFilters() {
return Collections.unmodifiableList(FILTERS);
}
/**
* 注册路由
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册