提交 40859f62 编写于 作者: 如梦技术's avatar 如梦技术 🐛

完善 mica-mqtt

上级 3d48e3e0
......@@ -17,6 +17,7 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import org.slf4j.Logger;
......
......@@ -18,6 +18,7 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.ClientChannelContext;
......@@ -186,10 +187,10 @@ public final class MqttClient {
.retained(retain)
.messageId(messageId)
.build();
MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
Boolean result = Tio.send(context, message);
logger.debug("MQTT publish topic:{} qos:{} retain:{} result:{}", topic, qos, retain, result);
if (isHighLevelQoS) {
MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
subscriptionManager.addPendingPublish(messageId, pendingPublish);
pendingPublish.startPublishRetransmissionTimer(executor, msg -> Tio.send(context, msg));
}
......
......@@ -16,6 +16,7 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.util.MultiValueMap;
......
package net.dreamlu.iot.mqtt.core.client;
package net.dreamlu.iot.mqtt.core.common;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.RetryProcessor;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......@@ -12,33 +11,33 @@ import java.util.function.Consumer;
/**
* MqttPendingPublish,参考于 netty-mqtt-client
*/
final class MqttPendingPublish {
public final class MqttPendingPublish {
private final ByteBuffer payload;
private final MqttPublishMessage message;
private final MqttQoS qos;
private final RetryProcessor<MqttPublishMessage> pubRetryProcessor = new RetryProcessor<>();
private final RetryProcessor<MqttMessage> pubRelRetryProcessor = new RetryProcessor<>();
MqttPendingPublish(ByteBuffer payload, MqttPublishMessage message, MqttQoS qos) {
public MqttPendingPublish(ByteBuffer payload, MqttPublishMessage message, MqttQoS qos) {
this.payload = payload;
this.message = message;
this.qos = qos;
this.pubRetryProcessor.setOriginalMessage(message);
}
ByteBuffer getPayload() {
public ByteBuffer getPayload() {
return payload;
}
MqttPublishMessage getMessage() {
public MqttPublishMessage getMessage() {
return message;
}
MqttQoS getQos() {
public MqttQoS getQos() {
return qos;
}
void startPublishRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
public void startPublishRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
this.pubRetryProcessor.setHandle(((fixedHeader, originalMessage) -> {
this.payload.rewind();
sendPacket.accept(new MqttPublishMessage(fixedHeader, originalMessage.variableHeader(), this.payload));
......@@ -46,21 +45,21 @@ final class MqttPendingPublish {
this.pubRetryProcessor.start(executor);
}
void onPubAckReceived() {
public void onPubAckReceived() {
this.pubRetryProcessor.stop();
}
void setPubRelMessage(MqttMessage pubRelMessage) {
public void setPubRelMessage(MqttMessage pubRelMessage) {
this.pubRelRetryProcessor.setOriginalMessage(pubRelMessage);
}
void startPubRelRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
public void startPubRelRetransmissionTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
this.pubRelRetryProcessor.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
this.pubRelRetryProcessor.start(executor);
}
void onPubCompReceived() {
public void onPubCompReceived() {
this.pubRelRetryProcessor.stop();
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
/**
* 默认的认证处理
*
* @author L.cm
*/
public class DefaultMqttAuthHandler implements IMqttAuthHandler {
@Override
public boolean authenticate(String clientId, String userName, String password) {
return true;
}
}
......@@ -14,45 +14,49 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.server;
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.server.IMqttAuthHandler;
import net.dreamlu.iot.mqtt.core.server.IMqttMessageIdGenerator;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import net.dreamlu.iot.mqtt.core.server.IMqttSubManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors;
/**
* mqtt broker 处理器
*
* @author L.cm
*/
public class MqttServerProcessorImpl implements MqttServerProcessor {
private static final Logger logger = LoggerFactory.getLogger(MqttServerProcessorImpl.class);
public class DefaultMqttServerProcessor implements MqttServerProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
private final IMqttAuthHandler authHandler;
private final IMqttMessageIdGenerator messageIdGenerator;
private final IMqttPublishManager publishManager;
private final IMqttSubManager subManager;
private final IMqttSubscribeStore subscribeStore;
private final ScheduledThreadPoolExecutor executor;
public MqttServerProcessorImpl(IMqttAuthHandler authHandler,
public DefaultMqttServerProcessor(IMqttAuthHandler authHandler,
IMqttSubManager subManager,
IMqttPublishManager publishManager,
IMqttMessageIdGenerator messageIdGenerator,
IMqttSubscribeStore subscribeStore,
ScheduledThreadPoolExecutor executor) {
this.authHandler = authHandler;
this.subManager = subManager;
this.messageIdGenerator = messageIdGenerator;
this.publishManager = publishManager;
this.subscribeStore = subscribeStore;
this.executor = executor;
}
......@@ -115,7 +119,7 @@ public class MqttServerProcessorImpl implements MqttServerProcessor {
MqttFixedHeader pubRecFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pubRecMessage = new MqttMessage(pubRecFixedHeader, MqttMessageIdVariableHeader.from(packetId));
MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
// subscriptionManager.addPendingQos2Publish(packetId, pendingQos2Publish);
publishManager.addPendingQos2Publish(packetId, pendingQos2Publish);
pendingQos2Publish.startPubRecRetransmitTimer(executor, msg -> Tio.send(context, msg));
}
break;
......@@ -129,6 +133,13 @@ public class MqttServerProcessorImpl implements MqttServerProcessor {
int messageId = variableHeader.messageId();
String clientId = context.getBsId();
logger.debug("PubAck - clientId: {}, messageId: {}", clientId, messageId);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(messageId);
if (pendingPublish == null) {
return;
}
pendingPublish.onPubAckReceived();
publishManager.removePendingPublish(messageId);
pendingPublish.getPayload().clear();
}
@Override
......@@ -136,20 +147,34 @@ public class MqttServerProcessorImpl implements MqttServerProcessor {
String clientId = context.getBsId();
int messageId = variableHeader.messageId();
logger.debug("PubRec - clientId: {}, messageId: {}", clientId, messageId);
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
Tio.send(context, message);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(messageId);
pendingPublish.onPubAckReceived();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader);
Tio.send(context, pubRelMessage);
pendingPublish.setPubRelMessage(pubRelMessage);
pendingPublish.startPubRelRetransmissionTimer(executor, msg -> Tio.send(context, msg));
}
@Override
public void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
String clientId = context.getBsId();
logger.debug("PubRel - clientId: {}, messageId: {}", clientId, variableHeader.messageId());
// TODO L.cm invokeListenerForPublish
int messageId = variableHeader.messageId();
logger.debug("PubRel - clientId: {}, messageId: {}", clientId, messageId);
MqttPendingQos2Publish pendingQos2Publish = publishManager.getPendingQos2Publish(messageId);
if (pendingQos2Publish != null) {
MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
String topicName = incomingPublish.variableHeader().topicName();
MqttQoS mqttQoS = incomingPublish.fixedHeader().qosLevel();
invokeListenerForPublish(mqttQoS, topicName, incomingPublish);
pendingQos2Publish.onPubRelReceived();
publishManager.removePendingQos2Publish(messageId);
}
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(variableHeader.messageId()), null);
MqttMessageIdVariableHeader.from(messageId), null);
Tio.send(context, message);
}
......@@ -158,32 +183,45 @@ public class MqttServerProcessorImpl implements MqttServerProcessor {
int messageId = variableHeader.messageId();
String clientId = context.getBsId();
logger.debug("PubComp - clientId: {}, messageId: {}", clientId, messageId);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(messageId);
pendingPublish.getPayload().clear();
pendingPublish.onPubCompReceived();
publishManager.removePendingPublish(messageId);
}
@Override
public void processSubscribe(ChannelContext context, MqttSubscribeMessage message) {
String clientId = context.getBsId();
int messageId = message.variableHeader().messageId();
logger.debug("Subscribe - clientId: {} messageId:{}", clientId, messageId);
List<MqttTopicSubscription> topicSubscriptions = message.payload().topicSubscriptions();
// 1. 校验 topicFilter
// 2. 存储 clientId 订阅的 topic
List<MqttQoS> mqttQosList = new ArrayList<>();
for (MqttTopicSubscription subscription : topicSubscriptions) {
String topicName = subscription.topicName();
MqttQoS mqttQoS = subscription.qualityOfService();
mqttQosList.add(mqttQoS);
subscribeStore.add(clientId, topicName, mqttQoS);
logger.debug("Subscribe - clientId: {} messageId:{} topicFilter:{} mqttQoS:{}", clientId, messageId, topicName, mqttQoS);
}
// 3. 返回 ack
List<MqttQoS> mqttQoSList = topicSubscriptions.stream()
.map(MqttTopicSubscription::qualityOfService)
.collect(Collectors.toList());
MqttMessage subAckMessage = MqttMessageBuilders.subAck()
.addGrantedQosList(mqttQoSList)
.addGrantedQosList(mqttQosList)
.packetId(messageId)
.build();
Tio.send(context, subAckMessage);
// 4. 发送保留消息
}
@Override
public void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage message) {
String clientId = context.getBsId();
int messageId = message.variableHeader().messageId();
logger.debug("UnSubscribe - clientId: {} messageId:{}", clientId, messageId);
List<String> topicFilterList = message.payload().topics();
for (String topicFilter : topicFilterList) {
subscribeStore.remove(clientId, topicFilter);
logger.debug("UnSubscribe - clientId: {} messageId:{} topicFilter:{}", clientId, messageId, topicFilter);
}
MqttMessage unSubMessage = MqttMessageBuilders.unsubAck()
.packetId(messageId)
.build();
......
......@@ -13,7 +13,7 @@ import java.util.List;
*
* @author L.cm
*/
public class MqttServerDefaultSubManager implements IMqttSubManager {
public class DefaultMqttServerSubManager implements IMqttSubManager {
private final List<MqttSubscription> subscriptionList = new LinkedList<>();
@Override
......
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
/**
......@@ -5,6 +21,7 @@ package net.dreamlu.iot.mqtt.core.server;
*
* @author L.cm
*/
@FunctionalInterface
public interface IMqttAuthHandler {
/**
......
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
/**
......
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
/**
* 服务端 pub 管理
*
* @author L.cm
*/
public interface IMqttPublishManager {
/**
* 添加发布过程存储
*
* @param messageId messageId
* @param pendingPublish MqttPendingPublish
*/
void addPendingPublish(int messageId, MqttPendingPublish pendingPublish);
/**
* 获取发布过程存储
*
* @param messageId messageId
* @return MqttPendingPublish
*/
MqttPendingPublish getPendingPublish(int messageId);
/**
* 删除发布过程中的存储
*
* @param messageId messageId
*/
void removePendingPublish(int messageId);
/**
* 添加发布过程存储
*
* @param messageId messageId
* @param pendingQos2Publish MqttPendingQos2Publish
*/
void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish);
/**
* 获取发布过程存储
* @param messageId messageId
* @return MqttPendingQos2Publish
*/
MqttPendingQos2Publish getPendingQos2Publish(int messageId);
/**
* 删除发布过程中的存储
* @param messageId messageId
*/
void removePendingQos2Publish(int messageId);
}
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
......
......@@ -19,6 +19,9 @@ package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
......@@ -28,6 +31,7 @@ import org.tio.server.TioServer;
import org.tio.utils.lock.SetWithLock;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......@@ -40,13 +44,19 @@ public final class MqttServer {
private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
private final TioServer tioServer;
private final IMqttMessageIdGenerator messageIdGenerator;
private final IMqttPublishManager publishManager;
private final IMqttSubscribeStore subscribeStore;
private final ScheduledThreadPoolExecutor executor;
MqttServer(TioServer tioServer,
IMqttMessageIdGenerator messageIdGenerator,
IMqttPublishManager publishManager,
IMqttSubscribeStore subscribeStore,
ScheduledThreadPoolExecutor executor) {
this.tioServer = tioServer;
this.messageIdGenerator = messageIdGenerator;
this.publishManager = publishManager;
this.subscribeStore = subscribeStore;
this.executor = executor;
}
......@@ -117,7 +127,17 @@ public final class MqttServer {
logger.warn("Mqtt publish to clientId:{} ChannelContext is null May be disconnected.", clientId);
return false;
}
return publish(context, topic, payload, qos, retain);
List<SubscribeStore> subscribeList = subscribeStore.search(clientId, topic);
if (subscribeList.isEmpty()) {
logger.warn("Mqtt publish but clientId:{} subscribeList is empty.", clientId);
return false;
}
for (SubscribeStore subscribe : subscribeList) {
int subMqttQoS = subscribe.getMqttQoS();
MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
publish(context, topic, payload, mqttQoS, retain);
}
return true;
}
/**
......@@ -130,9 +150,10 @@ public final class MqttServer {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
private Boolean publish(ChannelContext context, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
private boolean publish(ChannelContext context, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
int messageId = isHighLevelQoS ? messageIdGenerator.getId() : -1;
payload.rewind();
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
.payload(payload)
......@@ -143,7 +164,9 @@ public final class MqttServer {
Boolean result = Tio.send(context, message);
logger.debug("MQTT publish topic:{} qos:{} retain:{} result:{}", topic, qos, retain, result);
if (isHighLevelQoS) {
MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
publishManager.addPendingPublish(messageId, pendingPublish);
pendingPublish.startPublishRetransmissionTimer(executor, msg -> Tio.send(context, msg));
}
return result;
}
......@@ -201,7 +224,6 @@ public final class MqttServer {
}
for (ChannelContext context : channelContexts) {
String clientId = context.getBsId();
payload.rewind();
publish(clientId, topic, payload, qos, retain);
}
return true;
......
......@@ -32,7 +32,6 @@ public class MqttServerAioListener extends DefaultAioListener {
@Override
public boolean onHeartbeatTimeout(ChannelContext context, Long interval, int heartbeatTimeoutCount) {
// TODO L.cm 微调此处,三次超时时断开,避免长时间占用服务器连接
String clientId = context.getBsId();
logger.info("Mqtt HeartbeatTimeout clientId:{} interval:{} count:{}", clientId, interval, heartbeatTimeoutCount);
return true;
......
......@@ -17,6 +17,7 @@
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.ByteBufferAllocator;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import org.tio.core.ssl.SslConfig;
import org.tio.core.stat.IpStatListener;
import org.tio.server.ServerTioConfig;
......@@ -69,14 +70,26 @@ public class MqttServerCreator {
* tio 的 IpStatListener
*/
private IpStatListener ipStatListener;
/**
* 认证处理器
*/
private IMqttAuthHandler authHandler;
/**
* messageId 生成
*/
private IMqttMessageIdGenerator messageIdGenerator;
/**
* mqtt 服务端处理逻辑
* 发布管理
*/
private IMqttPublishManager publishManager;
/**
* 订阅管路
*/
private IMqttSubManager subManager;
/**
* 订阅存储
*/
private MqttServerProcessor mqttServerProcessor;
private IMqttSubscribeStore subscribeStore;
public String getName() {
return name;
......@@ -163,6 +176,15 @@ public class MqttServerCreator {
return this;
}
public IMqttAuthHandler getAuthHandler() {
return authHandler;
}
public MqttServerCreator authHandler(IMqttAuthHandler authHandler) {
this.authHandler = authHandler;
return this;
}
public IMqttMessageIdGenerator getMessageIdGenerator() {
return messageIdGenerator;
}
......@@ -172,20 +194,45 @@ public class MqttServerCreator {
return this;
}
public MqttServerProcessor getMqttServerCreatorProcessor() {
return mqttServerProcessor;
public IMqttPublishManager getPublishManager() {
return publishManager;
}
public MqttServerCreator processor(MqttServerProcessor mqttServerProcessor) {
this.mqttServerProcessor = mqttServerProcessor;
public MqttServerCreator publishManager(IMqttPublishManager publishManager) {
this.publishManager = publishManager;
return this;
}
public IMqttSubManager getSubManager() {
return subManager;
}
public MqttServerCreator subManager(IMqttSubManager subManager) {
this.subManager = subManager;
return this;
}
public IMqttSubscribeStore getSubscribeStore() {
return subscribeStore;
}
public MqttServerCreator subscribeStore(IMqttSubscribeStore subscribeStore) {
this.subscribeStore = subscribeStore;
return this;
}
public MqttServer start() throws IOException {
Objects.requireNonNull(this.mqttServerProcessor, "Argument mqttServerProcessor is null.");
Objects.requireNonNull(this.messageIdGenerator, "Argument messageIdGenerator is null.");
Objects.requireNonNull(this.publishManager, "Argument publishManager is null.");
Objects.requireNonNull(this.subManager, "Argument subManager is null.");
Objects.requireNonNull(this.subscribeStore, "Argument subscribeStore is null.");
if (this.authHandler == null) {
this.authHandler = new DefaultMqttAuthHandler();
}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttServer"));
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(authHandler, subManager, publishManager, messageIdGenerator, subscribeStore, executor);
// 处理消息
ServerAioHandler handler = new MqttServerAioHandler(this.bufferAllocator, this.mqttServerProcessor);
ServerAioHandler handler = new MqttServerAioHandler(this.bufferAllocator, serverProcessor);
// 监听
ServerAioListener listener = new MqttServerAioListener();
// 配置
......@@ -208,7 +255,7 @@ public class MqttServerCreator {
tioServer.setCheckLastVersion(false);
// 启动
tioServer.start(this.ip, this.port);
return new MqttServer(tioServer, this.messageIdGenerator, executor);
return new MqttServer(tioServer, this.messageIdGenerator, this.publishManager, this.subscribeStore, executor);
}
}
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package net.dreamlu.iot.mqtt.core.server.store;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import java.util.List;
/**
* 客户端订阅存储
*
* @author L.cm
*/
public interface IMqttSubscribeStore {
/**
* 添加订阅存储
*
* @param clientId 客户端 Id
* @param topicFilter topicFilter
* @param mqttQoS MqttQoS
*/
void add(String clientId, String topicFilter, MqttQoS mqttQoS);
/**
* 删除订阅
*
* @param clientId 客户端 Id
* @param topicFilter topicFilter
*/
void remove(String clientId, String topicFilter);
/**
* 查找订阅信息
*
* @param clientId 客户端 Id
* @param topicName topicName
* @return 订阅存储列表
*/
List<SubscribeStore> search(String clientId, String topicName);
}
package net.dreamlu.iot.mqtt.core.server.store;
import java.io.Serializable;
import java.util.Objects;
import java.util.regex.Pattern;
/**
* 订阅存储
*
* @author L.cm
*/
public class SubscribeStore implements Serializable {
private Pattern topicRegex;
private int mqttQoS;
public SubscribeStore() {
}
public SubscribeStore(String topicFilter, int mqttQoS) {
this.topicRegex = Pattern.compile(topicFilter.replace("+", "[^/]+").replace("#", ".+").concat("$"));
this.mqttQoS = mqttQoS;
}
public Pattern getTopicRegex() {
return topicRegex;
}
public SubscribeStore setTopicRegex(Pattern topicRegex) {
this.topicRegex = topicRegex;
return this;
}
public int getMqttQoS() {
return mqttQoS;
}
public SubscribeStore setMqttQoS(int mqttQoS) {
this.mqttQoS = mqttQoS;
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SubscribeStore that = (SubscribeStore) o;
return mqttQoS == that.mqttQoS &&
Objects.equals(topicRegex, that.topicRegex);
}
@Override
public int hashCode() {
return Objects.hash(topicRegex, mqttQoS);
}
@Override
public String toString() {
return "SubscribeStore{" +
"topicRegex=" + topicRegex +
", mqttQoS=" + mqttQoS +
'}';
}
}
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.IMqttPublishManager;
import java.util.LinkedHashMap;
import java.util.Map;
public class MqttPublishManager implements IMqttPublishManager {
private final Map<Integer, MqttPendingPublish> pendingPublishData = new LinkedHashMap<>();
private final Map<Integer, MqttPendingQos2Publish> pendingQos2PublishData = new LinkedHashMap<>();
@Override
public void addPendingPublish(int messageId, MqttPendingPublish pendingPublish) {
pendingPublishData.put(messageId, pendingPublish);
}
@Override
public MqttPendingPublish getPendingPublish(int messageId) {
return pendingPublishData.get(messageId);
}
@Override
public void removePendingPublish(int messageId) {
pendingPublishData.remove(messageId);
}
@Override
public void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish) {
pendingQos2PublishData.put(messageId, pendingQos2Publish);
}
@Override
public MqttPendingQos2Publish getPendingQos2Publish(int messageId) {
return pendingQos2PublishData.get(messageId);
}
@Override
public void removePendingQos2Publish(int messageId) {
pendingQos2PublishData.remove(messageId);
}
}
......@@ -19,16 +19,14 @@ package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.server.IMqttAuthHandler;
import net.dreamlu.iot.mqtt.core.server.DefaultMqttServerSubManager;
import net.dreamlu.iot.mqtt.core.server.IMqttMessageIdGenerator;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerDefaultSubManager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* mqtt 服务端测试
......@@ -38,26 +36,23 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
public class MqttServerTest {
public static void main(String[] args) throws IOException {
IMqttAuthHandler authHandler = (clientId, userName, password) -> true;
MqttServerDefaultSubManager subManager = new MqttServerDefaultSubManager();
DefaultMqttServerSubManager subManager = new DefaultMqttServerSubManager();
subManager.register(new MqttSubscription(MqttQoS.AT_MOST_ONCE, "/test/#", ((topic, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
})));
IMqttMessageIdGenerator messageIdGenerator = new MqttMessageIdGenerator();
ScheduledThreadPoolExecutor executor = null;
MqttServerProcessorImpl processor = new MqttServerProcessorImpl(authHandler, subManager, messageIdGenerator, executor);
MqttSubscribeStore subscribeStore = new MqttSubscribeStore();
MqttPublishManager publishManager = new MqttPublishManager();
MqttServer mqttServer = MqttServer.create()
// 默认 MICA-MQTT-SERVER
.name("mqtt-server")
// 默认:127.0.0.1
.ip("127.0.0.1")
// 默认:1883
.port(1883)
.messageIdGenerator(messageIdGenerator)
.processor(processor)
.publishManager(publishManager)
.subManager(subManager)
.subscribeStore(subscribeStore)
.start();
Timer timer = new Timer();
......
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MqttSubscribeStore implements IMqttSubscribeStore {
private final ConcurrentMap<String, ConcurrentMap<String, SubscribeStore>> data = new ConcurrentHashMap<>();
@Override
public void add(String clientId, String topicFilter, MqttQoS mqttQoS) {
ConcurrentMap<String, SubscribeStore> map = data.get(clientId);
if (map == null) {
map = new ConcurrentHashMap<>();
}
map.put(topicFilter, new SubscribeStore(topicFilter, mqttQoS.value()));
data.put(clientId, map);
}
@Override
public void remove(String clientId, String topicFilter) {
ConcurrentMap<String, SubscribeStore> map = data.get(clientId);
if (map == null) {
return;
}
map.remove(topicFilter);
}
@Override
public List<SubscribeStore> search(String clientId, String topicName) {
List<SubscribeStore> list = new ArrayList<>();
ConcurrentMap<String, SubscribeStore> map = data.get(clientId);
if (map == null) {
return Collections.emptyList();
}
Collection<SubscribeStore> values = map.values();
for (SubscribeStore value : values) {
if (value.getTopicRegex().matcher(topicName).matches()) {
list.add(value);
}
}
return list;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册