提交 cfec10b7 编写于 作者: 如梦技术's avatar 如梦技术 🐛

代码优化。

上级 124041b4
......@@ -17,10 +17,8 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
......@@ -36,7 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
*
* @author L.cm
*/
public class DefaultMqttClientProcessor implements MqttClientProcessor {
public class DefaultMqttClientProcessor implements IMqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
private final MqttClientStore clientStore;
private final CountDownLatch connLatch;
......@@ -199,10 +197,10 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
* @param message MqttPublishMessage
*/
private void invokeListenerForPublish(String topicName, MqttPublishMessage message) {
List<MqttSubscription> subscriptionList = clientStore.getMatchedSubscription(topicName);
List<MqttClientSubscription> subscriptionList = clientStore.getMatchedSubscription(topicName);
final ByteBuffer payload = message.payload();
subscriptionList.forEach(subscription -> {
MqttMessageListener listener = subscription.getListener();
IMqttClientMessageListener listener = subscription.getListener();
payload.rewind();
listener.onMessage(topicName, payload);
});
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.common;
package net.dreamlu.iot.mqtt.core.client;
import java.nio.ByteBuffer;
......@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
* @author L.cm
*/
@FunctionalInterface
public interface MqttMessageListener {
public interface IMqttClientMessageListener {
/**
* 监听到消息
......
......@@ -24,7 +24,7 @@ import org.tio.core.ChannelContext;
*
* @author L.cm
*/
public interface MqttClientProcessor {
public interface IMqttClientProcessor {
/**
* 处理编解码失败
......
......@@ -17,7 +17,6 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -64,7 +63,7 @@ public final class MqttClient {
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subQos0(String topicFilter, MqttMessageListener listener) {
public MqttClient subQos0(String topicFilter, IMqttClientMessageListener listener) {
return subscribe(MqttQoS.AT_MOST_ONCE, topicFilter, listener);
}
......@@ -75,7 +74,7 @@ public final class MqttClient {
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subQos1(String topicFilter, MqttMessageListener listener) {
public MqttClient subQos1(String topicFilter, IMqttClientMessageListener listener) {
return subscribe(MqttQoS.AT_LEAST_ONCE, topicFilter, listener);
}
......@@ -86,7 +85,7 @@ public final class MqttClient {
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subQos2(String topicFilter, MqttMessageListener listener) {
public MqttClient subQos2(String topicFilter, IMqttClientMessageListener listener) {
return subscribe(MqttQoS.EXACTLY_ONCE, topicFilter, listener);
}
......@@ -98,7 +97,7 @@ public final class MqttClient {
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, MqttMessageListener listener) {
public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, IMqttClientMessageListener listener) {
int messageId = MqttClientMessageId.getId();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(mqttQoS, topicFilter)
......
......@@ -34,9 +34,9 @@ public class MqttClientAioHandler implements ClientAioHandler {
private final MqttDecoder mqttDecoder;
private final MqttEncoder mqttEncoder;
private final ByteBufferAllocator allocator;
private final MqttClientProcessor processor;
private final IMqttClientProcessor processor;
public MqttClientAioHandler(ByteBufferAllocator bufferAllocator, MqttClientProcessor processor) {
public MqttClientAioHandler(ByteBufferAllocator bufferAllocator, IMqttClientProcessor processor) {
this.mqttDecoder = MqttDecoder.INSTANCE;
this.mqttEncoder = MqttEncoder.INSTANCE;
this.allocator = bufferAllocator;
......
......@@ -20,7 +20,6 @@ import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.DefaultClientAioListener;
......@@ -92,8 +91,8 @@ public class MqttClientAioListener extends DefaultClientAioListener {
}
private void reSendSubscription(ChannelContext context) {
List<MqttSubscription> subscriptionList = clientStore.getAndCleanSubscription();
for (MqttSubscription subscription : subscriptionList) {
List<MqttClientSubscription> subscriptionList = clientStore.getAndCleanSubscription();
for (MqttClientSubscription subscription : subscriptionList) {
int messageId = MqttClientMessageId.getId();
MqttQoS mqttQoS = subscription.getMqttQoS();
String topicFilter = subscription.getTopicFilter();
......
......@@ -272,7 +272,7 @@ public final class MqttClientCreator {
// 客户端处理器
CountDownLatch connLatch = new CountDownLatch(1);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, DefaultThreadFactory.getInstance("MqttClient"));
MqttClientProcessor processor = new DefaultMqttClientProcessor(clientStore, connLatch, executor);
IMqttClientProcessor processor = new DefaultMqttClientProcessor(clientStore, connLatch, executor);
// 2. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this.bufferAllocator, Objects.requireNonNull(processor));
ClientAioListener clientAioListener = new MqttClientAioListener(this, clientStore, executor);
......
......@@ -18,7 +18,6 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.util.MultiValueMap;
import java.util.*;
......@@ -32,7 +31,7 @@ final class MqttClientStore {
/**
* 订阅的数据承载
*/
private final MultiValueMap<String, MqttSubscription> subscriptions = new MultiValueMap<>();
private final MultiValueMap<String, MqttClientSubscription> subscriptions = new MultiValueMap<>();
private final Map<Integer, MqttPendingSubscription> pendingSubscriptions = new LinkedHashMap<>();
private final Map<Integer, MqttPendingUnSubscription> pendingUnSubscriptions = new LinkedHashMap<>();
private final Map<Integer, MqttPendingPublish> pendingPublishData = new LinkedHashMap<>();
......@@ -50,24 +49,24 @@ final class MqttClientStore {
return pendingSubscriptions.remove(messageId);
}
protected void addSubscription(MqttSubscription subscription) {
protected void addSubscription(MqttClientSubscription subscription) {
subscriptions.add(subscription.getTopicFilter(), subscription);
}
protected List<MqttSubscription> getAndCleanSubscription() {
List<MqttSubscription> subscriptionList = new ArrayList<>();
for (List<MqttSubscription> mqttSubscriptions : subscriptions.values()) {
protected List<MqttClientSubscription> getAndCleanSubscription() {
List<MqttClientSubscription> subscriptionList = new ArrayList<>();
for (List<MqttClientSubscription> mqttSubscriptions : subscriptions.values()) {
subscriptionList.addAll(mqttSubscriptions);
}
List<MqttSubscription> data = Collections.unmodifiableList(subscriptionList);
List<MqttClientSubscription> data = Collections.unmodifiableList(subscriptionList);
subscriptions.clear();
return data;
}
protected List<MqttSubscription> getMatchedSubscription(String topicName) {
List<MqttSubscription> subscriptionList = new ArrayList<>();
for (List<MqttSubscription> mqttSubscriptions : subscriptions.values()) {
for (MqttSubscription subscription : mqttSubscriptions) {
protected List<MqttClientSubscription> getMatchedSubscription(String topicName) {
List<MqttClientSubscription> subscriptionList = new ArrayList<>();
for (List<MqttClientSubscription> mqttSubscriptions : subscriptions.values()) {
for (MqttClientSubscription subscription : mqttSubscriptions) {
if (subscription.matches(topicName)) {
subscriptionList.add(subscription);
}
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.common;
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
......@@ -28,15 +28,15 @@ import java.util.regex.Pattern;
*
* @author L.cm
*/
public final class MqttSubscription implements Serializable {
final class MqttClientSubscription implements Serializable {
private final String topicFilter;
private final MqttQoS mqttQoS;
private final Pattern topicRegex;
private final MqttMessageListener listener;
private final IMqttClientMessageListener listener;
public MqttSubscription(MqttQoS mqttQoS,
String topicFilter,
MqttMessageListener listener) {
public MqttClientSubscription(MqttQoS mqttQoS,
String topicFilter,
IMqttClientMessageListener listener) {
this.mqttQoS = mqttQoS;
this.topicFilter = topicFilter;
this.topicRegex = MqttTopicUtil.getTopicPattern(topicFilter);
......@@ -51,7 +51,7 @@ public final class MqttSubscription implements Serializable {
return topicFilter;
}
public MqttMessageListener getListener() {
public IMqttClientMessageListener getListener() {
return listener;
}
......@@ -67,7 +67,7 @@ public final class MqttSubscription implements Serializable {
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttSubscription that = (MqttSubscription) o;
MqttClientSubscription that = (MqttClientSubscription) o;
return Objects.equals(topicFilter, that.topicFilter) &&
mqttQoS == that.mqttQoS &&
Objects.equals(topicRegex, that.topicRegex) &&
......
......@@ -4,8 +4,6 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.common.RetryProcessor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
......@@ -17,12 +15,12 @@ import java.util.function.Consumer;
final class MqttPendingSubscription {
private final MqttQoS mqttQoS;
private final String topicFilter;
private final MqttMessageListener listener;
private final IMqttClientMessageListener listener;
private final RetryProcessor<MqttSubscribeMessage> retryProcessor = new RetryProcessor<>();
MqttPendingSubscription(MqttQoS mqttQoS,
String topicFilter,
MqttMessageListener listener,
IMqttClientMessageListener listener,
MqttSubscribeMessage message) {
this.mqttQoS = mqttQoS;
this.topicFilter = topicFilter;
......@@ -38,12 +36,12 @@ final class MqttPendingSubscription {
return topicFilter;
}
protected MqttMessageListener getListener() {
protected IMqttClientMessageListener getListener() {
return listener;
}
public MqttSubscription toSubscription() {
return new MqttSubscription(getMqttQoS(), getTopicFilter(), getListener());
public MqttClientSubscription toSubscription() {
return new MqttClientSubscription(getMqttQoS(), getTopicFilter(), getListener());
}
protected void startRetransmitTimer(ScheduledThreadPoolExecutor executor, Consumer<MqttMessage> sendPacket) {
......
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
/**
* 服务端消息id
*
* @author L.cm
*/
public interface IMqttMessageIdGenerator {
/**
* 获取消息 id
*
* @return 消息id
*/
int getId();
}
......@@ -22,7 +22,7 @@ package net.dreamlu.iot.mqtt.core.server;
* @author L.cm
*/
@FunctionalInterface
public interface IMqttAuthHandler {
public interface IMqttServerAuthHandler {
/**
* 认证
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
import java.nio.ByteBuffer;
/**
* mqtt 消息处理
*
* @author L.cm
*/
@FunctionalInterface
public interface IMqttServerMessageListener {
/**
* 监听到消息
*
* @param clientId clientId
* @param topic topic
* @param payload payload
*/
void onMessage(String clientId, String topic, ByteBuffer payload);
}
......@@ -8,7 +8,7 @@ import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
*
* @author L.cm
*/
public interface IMqttPublishManager {
public interface IMqttServerPublishManager {
/**
* 添加发布过程存储
......@@ -16,7 +16,7 @@ public interface IMqttPublishManager {
* @param messageId messageId
* @param pendingPublish MqttPendingPublish
*/
void addPendingPublish(int messageId, MqttPendingPublish pendingPublish);
void addPendingPublish(String clientId, int messageId, MqttPendingPublish pendingPublish);
/**
* 获取发布过程存储
......@@ -24,14 +24,14 @@ public interface IMqttPublishManager {
* @param messageId messageId
* @return MqttPendingPublish
*/
MqttPendingPublish getPendingPublish(int messageId);
MqttPendingPublish getPendingPublish(String clientId, int messageId);
/**
* 删除发布过程中的存储
*
* @param messageId messageId
*/
void removePendingPublish(int messageId);
void removePendingPublish(String clientId, int messageId);
/**
* 添加发布过程存储
......@@ -39,18 +39,20 @@ public interface IMqttPublishManager {
* @param messageId messageId
* @param pendingQos2Publish MqttPendingQos2Publish
*/
void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish);
void addPendingQos2Publish(String clientId, int messageId, MqttPendingQos2Publish pendingQos2Publish);
/**
* 获取发布过程存储
*
* @param messageId messageId
* @return MqttPendingQos2Publish
*/
MqttPendingQos2Publish getPendingQos2Publish(int messageId);
MqttPendingQos2Publish getPendingQos2Publish(String clientId, int messageId);
/**
* 删除发布过程中的存储
*
* @param messageId messageId
*/
void removePendingQos2Publish(int messageId);
void removePendingQos2Publish(String clientId, int messageId);
}
......@@ -17,7 +17,6 @@
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import java.util.List;
......@@ -26,14 +25,14 @@ import java.util.List;
*
* @author L.cm
*/
public interface IMqttSubManager {
public interface IMqttServerSubscribeManager {
/**
* 订阅
*
* @param subscription 订阅信息
*/
void subscribe(MqttSubscription subscription);
void subscribe(MqttServerSubscription subscription);
/**
* 获取匹配的订阅
......@@ -42,7 +41,7 @@ public interface IMqttSubManager {
* @param mqttQoS MqttQoS
* @return 订阅信息
*/
List<MqttSubscription> getMatchedSubscription(String topicName, MqttQoS mqttQoS);
List<MqttServerSubscription> getMatchedSubscription(String topicName, MqttQoS mqttQoS);
/**
* 清理
......
......@@ -20,7 +20,7 @@ import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,20 +44,17 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
public final class MqttServer {
private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
private final TioServer tioServer;
private final IMqttMessageIdGenerator messageIdGenerator;
private final IMqttPublishManager publishManager;
private final IMqttSubscribeStore subscribeStore;
private final IMqttSessionManager sessionManager;
private final IMqttServerPublishManager publishManager;
private final ScheduledThreadPoolExecutor executor;
MqttServer(TioServer tioServer,
IMqttMessageIdGenerator messageIdGenerator,
IMqttPublishManager publishManager,
IMqttSubscribeStore subscribeStore,
IMqttSessionManager sessionManager,
IMqttServerPublishManager publishManager,
ScheduledThreadPoolExecutor executor) {
this.tioServer = tioServer;
this.messageIdGenerator = messageIdGenerator;
this.sessionManager = sessionManager;
this.publishManager = publishManager;
this.subscribeStore = subscribeStore;
this.executor = executor;
}
......@@ -128,7 +125,7 @@ public final class MqttServer {
logger.warn("Mqtt publish to clientId:{} ChannelContext is null May be disconnected.", clientId);
return false;
}
List<SubscribeStore> subscribeList = subscribeStore.search(clientId, topic);
List<SubscribeStore> subscribeList = sessionManager.searchSubscribe(clientId, topic);
if (subscribeList.isEmpty()) {
logger.warn("Mqtt publish but clientId:{} subscribeList is empty.", clientId);
return false;
......@@ -136,7 +133,7 @@ public final class MqttServer {
for (SubscribeStore subscribe : subscribeList) {
int subMqttQoS = subscribe.getMqttQoS();
MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
publish(context, topic, payload, mqttQoS, retain);
publish(context, clientId, topic, payload, mqttQoS, retain);
}
return true;
}
......@@ -151,9 +148,9 @@ public final class MqttServer {
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
private boolean publish(ChannelContext context, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
private boolean publish(ChannelContext context, String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
int messageId = isHighLevelQoS ? messageIdGenerator.getId() : -1;
int messageId = isHighLevelQoS ? sessionManager.getMessageId(clientId) : -1;
payload.rewind();
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
......@@ -166,7 +163,7 @@ public final class MqttServer {
logger.debug("MQTT publish topic:{} qos:{} retain:{} result:{}", topic, qos, retain, result);
if (isHighLevelQoS) {
MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
publishManager.addPendingPublish(messageId, pendingPublish);
publishManager.addPendingPublish(clientId, messageId, pendingPublish);
pendingPublish.startPublishRetransmissionTimer(executor, msg -> Tio.send(context, msg));
}
return result;
......
......@@ -16,7 +16,7 @@
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
......@@ -30,10 +30,10 @@ import org.tio.core.Tio;
*/
public class MqttServerAioListener extends DefaultAioListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerAioListener.class);
private final IMqttSubscribeStore subscribeStore;
private final IMqttSessionManager sessionManager;
public MqttServerAioListener(IMqttSubscribeStore subscribeStore) {
this.subscribeStore = subscribeStore;
public MqttServerAioListener(IMqttSessionManager sessionManager) {
this.sessionManager = sessionManager;
}
@Override
......@@ -51,7 +51,7 @@ public class MqttServerAioListener extends DefaultAioListener {
if (throwable != null) {
// TODO 遗嘱消息处理
}
subscribeStore.remove(clientId);
sessionManager.clean(clientId);
Tio.unbindBsId(context);
}
......
......@@ -17,7 +17,11 @@
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.ByteBufferAllocator;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.session.InMemoryMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerPublishManager;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerProcessor;
import org.tio.core.ssl.SslConfig;
import org.tio.core.stat.IpStatListener;
import org.tio.server.ServerTioConfig;
......@@ -73,23 +77,15 @@ public class MqttServerCreator {
/**
* 认证处理器
*/
private IMqttAuthHandler authHandler;
private IMqttServerAuthHandler authHandler;
/**
* messageId 生成
* session 管理
*/
private IMqttMessageIdGenerator messageIdGenerator;
/**
* 发布管理
*/
private IMqttPublishManager publishManager;
private IMqttSessionManager sessionManager;
/**
* 订阅管路
*/
private IMqttSubManager subManager;
/**
* 订阅存储
*/
private IMqttSubscribeStore subscribeStore;
private IMqttServerSubscribeManager subManager;
/**
* debug
*/
......@@ -180,51 +176,33 @@ public class MqttServerCreator {
return this;
}
public IMqttAuthHandler getAuthHandler() {
public IMqttServerAuthHandler getAuthHandler() {
return authHandler;
}
public MqttServerCreator authHandler(IMqttAuthHandler authHandler) {
public MqttServerCreator authHandler(IMqttServerAuthHandler authHandler) {
this.authHandler = authHandler;
return this;
}
public IMqttMessageIdGenerator getMessageIdGenerator() {
return messageIdGenerator;
public IMqttSessionManager getSessionManager() {
return sessionManager;
}
public MqttServerCreator messageIdGenerator(IMqttMessageIdGenerator messageIdGenerator) {
this.messageIdGenerator = messageIdGenerator;
public MqttServerCreator sessionManager(IMqttSessionManager sessionManager) {
this.sessionManager = sessionManager;
return this;
}
public IMqttPublishManager getPublishManager() {
return publishManager;
}
public MqttServerCreator publishManager(IMqttPublishManager publishManager) {
this.publishManager = publishManager;
return this;
}
public IMqttSubManager getSubManager() {
public IMqttServerSubscribeManager getSubManager() {
return subManager;
}
public MqttServerCreator subManager(IMqttSubManager subManager) {
public MqttServerCreator subManager(IMqttServerSubscribeManager subManager) {
this.subManager = subManager;
return this;
}
public IMqttSubscribeStore getSubscribeStore() {
return subscribeStore;
}
public MqttServerCreator subscribeStore(IMqttSubscribeStore subscribeStore) {
this.subscribeStore = subscribeStore;
return this;
}
public boolean isDebug() {
return debug;
}
......@@ -235,19 +213,21 @@ public class MqttServerCreator {
}
public MqttServer start() throws IOException {
Objects.requireNonNull(this.messageIdGenerator, "Argument messageIdGenerator is null.");
Objects.requireNonNull(this.publishManager, "Argument publishManager is null.");
Objects.requireNonNull(this.subManager, "Argument subManager is null.");
Objects.requireNonNull(this.subscribeStore, "Argument subscribeStore is null.");
if (this.sessionManager == null) {
this.sessionManager = new InMemoryMqttSessionManager();
}
if (this.authHandler == null) {
this.authHandler = new DefaultMqttAuthHandler();
this.authHandler = new DefaultMqttServerAuthHandler();
}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, DefaultThreadFactory.getInstance("MqttServer"));
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this.authHandler, this.subManager, this.publishManager, this.subscribeStore, executor);
// 过程消息存储
IMqttServerPublishManager publishManager = new DefaultMqttServerPublishManager();
DefaultMqttServerProcessor serverProcessor = new DefaultMqttServerProcessor(this.sessionManager, this.authHandler, this.subManager, publishManager, executor);
// 处理消息
ServerAioHandler handler = new MqttServerAioHandler(this.bufferAllocator, serverProcessor);
// 监听
ServerAioListener listener = new MqttServerAioListener(this.subscribeStore);
ServerAioListener listener = new MqttServerAioListener(this.sessionManager);
// 配置
ServerTioConfig config = new ServerTioConfig(this.name, handler, listener);
// 设置心跳 timeout
......@@ -271,7 +251,7 @@ public class MqttServerCreator {
tioServer.setCheckLastVersion(false);
// 启动
tioServer.start(this.ip, this.port);
return new MqttServer(tioServer, this.messageIdGenerator, this.publishManager, this.subscribeStore, executor);
return new MqttServer(tioServer, this.sessionManager, publishManager, executor);
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import java.io.Serializable;
import java.util.Objects;
import java.util.regex.Pattern;
/**
* 发送订阅,未 ack 前的数据承载
*
* @author L.cm
*/
public final class MqttServerSubscription implements Serializable {
private final String topicFilter;
private final MqttQoS mqttQoS;
private final Pattern topicRegex;
private final IMqttServerMessageListener listener;
public MqttServerSubscription(MqttQoS mqttQoS,
String topicFilter,
IMqttServerMessageListener listener) {
this.mqttQoS = mqttQoS;
this.topicFilter = topicFilter;
this.topicRegex = MqttTopicUtil.getTopicPattern(topicFilter);
this.listener = listener;
}
public MqttQoS getMqttQoS() {
return mqttQoS;
}
public String getTopicFilter() {
return topicFilter;
}
public IMqttServerMessageListener getListener() {
return listener;
}
public boolean matches(String topic) {
return this.topicRegex.matcher(topic).matches();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttServerSubscription that = (MqttServerSubscription) o;
return Objects.equals(topicFilter, that.topicFilter) &&
mqttQoS == that.mqttQoS &&
Objects.equals(topicRegex, that.topicRegex) &&
Objects.equals(listener, that.listener);
}
@Override
public int hashCode() {
return Objects.hash(topicFilter, mqttQoS, topicRegex, listener);
}
@Override
public String toString() {
return "MqttSubscription{" +
"topicFilter='" + topicFilter + '\'' +
", mqttQoS=" + mqttQoS +
", topicRegex=" + topicRegex +
", listener=" + listener +
'}';
}
}
package net.dreamlu.iot.mqtt.core.server.session;
/**
* mqtt session
*
* @author L.cm
*/
public interface IMqttSession {
}
/*
* Copyright 2014 The Netty Project
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.store;
package net.dreamlu.iot.mqtt.core.server.session;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import java.util.List;
/**
* 客户端订阅存储
* session 管理,不封装 MqttSession 实体,方便 redis 等集群处理
*
* @author L.cm
*/
public interface IMqttSubscribeStore {
public interface IMqttSessionManager {
/**
* 添加订阅存储
......@@ -34,7 +35,7 @@ public interface IMqttSubscribeStore {
* @param topicFilter topicFilter
* @param mqttQoS MqttQoS
*/
void add(String clientId, String topicFilter, MqttQoS mqttQoS);
void addSubscribe(String clientId, String topicFilter, MqttQoS mqttQoS);
/**
* 删除订阅
......@@ -42,14 +43,7 @@ public interface IMqttSubscribeStore {
* @param clientId 客户端 Id
* @param topicFilter topicFilter
*/
void remove(String clientId, String topicFilter);
/**
* 删除订阅
*
* @param clientId 客户端 Id
*/
void remove(String clientId);
void removeSubscribe(String clientId, String topicFilter);
/**
* 查找订阅信息
......@@ -58,6 +52,20 @@ public interface IMqttSubscribeStore {
* @param topicName topicName
* @return 订阅存储列表
*/
List<SubscribeStore> search(String clientId, String topicName);
List<SubscribeStore> searchSubscribe(String clientId, String topicName);
/**
* 生成消息 Id
*
* @return messageId
*/
int getMessageId(String clientId);
/**
* 清除 session
*
* @param clientId clientId
*/
void clean(String clientId);
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.session;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 内存 session 管理
*
* @author L.cm
*/
public class InMemoryMqttSessionManager implements IMqttSessionManager {
/**
* clientId: messageId
*/
private final Map<String, AtomicInteger> messageIdStore = new ConcurrentHashMap<>();
/**
* clientId:{topicFilter: SubscribeStore}
*/
private final ConcurrentMap<String, ConcurrentMap<String, SubscribeStore>> subscribeStore = new ConcurrentHashMap<>();
@Override
public void addSubscribe(String clientId, String topicFilter, MqttQoS mqttQoS) {
Map<String, SubscribeStore> data = subscribeStore.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>(16));
data.put(topicFilter, new SubscribeStore(topicFilter, mqttQoS.value()));
}
@Override
public void removeSubscribe(String clientId, String topicFilter) {
ConcurrentMap<String, SubscribeStore> map = subscribeStore.get(clientId);
if (map == null) {
return;
}
map.remove(topicFilter);
}
@Override
public List<SubscribeStore> searchSubscribe(String clientId, String topicName) {
List<SubscribeStore> list = new ArrayList<>();
ConcurrentMap<String, SubscribeStore> map = subscribeStore.get(clientId);
if (map == null) {
return Collections.emptyList();
}
Collection<SubscribeStore> values = map.values();
for (SubscribeStore value : values) {
if (value.getTopicRegex().matcher(topicName).matches()) {
list.add(value);
}
}
return list;
}
@Override
public int getMessageId(String clientId) {
AtomicInteger value = messageIdStore.computeIfAbsent(clientId, (key) -> new AtomicInteger(1));
value.compareAndSet(0xffff, 1);
return value.getAndIncrement();
}
@Override
public void clean(String clientId) {
subscribeStore.remove(clientId);
messageIdStore.remove(clientId);
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.store;
import java.io.Serializable;
......
......@@ -14,14 +14,16 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.core.server.IMqttServerAuthHandler;
/**
* 默认的认证处理
*
* @author L.cm
*/
public class DefaultMqttAuthHandler implements IMqttAuthHandler {
public class DefaultMqttServerAuthHandler implements IMqttServerAuthHandler {
@Override
public boolean authenticate(String clientId, String userName, String password) {
......
......@@ -14,14 +14,13 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import net.dreamlu.iot.mqtt.core.server.*;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
......@@ -40,21 +39,21 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
*/
public class DefaultMqttServerProcessor implements MqttServerProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
private final IMqttAuthHandler authHandler;
private final IMqttPublishManager publishManager;
private final IMqttSubManager subManager;
private final IMqttSubscribeStore subscribeStore;
private final IMqttSessionManager sessionManager;
private final IMqttServerAuthHandler authHandler;
private final IMqttServerPublishManager publishManager;
private final IMqttServerSubscribeManager subManager;
private final ScheduledThreadPoolExecutor executor;
public DefaultMqttServerProcessor(IMqttAuthHandler authHandler,
IMqttSubManager subManager,
IMqttPublishManager publishManager,
IMqttSubscribeStore subscribeStore,
public DefaultMqttServerProcessor(IMqttSessionManager sessionManager,
IMqttServerAuthHandler authHandler,
IMqttServerSubscribeManager subManager,
IMqttServerPublishManager publishManager,
ScheduledThreadPoolExecutor executor) {
this.sessionManager = sessionManager;
this.authHandler = authHandler;
this.subManager = subManager;
this.publishManager = publishManager;
this.subscribeStore = subscribeStore;
this.executor = executor;
}
......@@ -74,9 +73,14 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
// TODO session 处理
MqttConnectVariableHeader variableHeader = mqttMessage.variableHeader();
boolean cleanSession = variableHeader.isCleanSession();
// 3. 绑定 clientId
Tio.bindBsId(context, clientId);
// 4. TODO 存储遗嘱消息
// variableHeader.isWillFlag()
// 5. 返回 ack
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
}
......@@ -101,10 +105,10 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topicName, mqttQoS, packetId);
switch (mqttQoS) {
case AT_MOST_ONCE:
invokeListenerForPublish(mqttQoS, topicName, message);
invokeListenerForPublish(clientId, mqttQoS, topicName, message);
break;
case AT_LEAST_ONCE:
invokeListenerForPublish(mqttQoS, topicName, message);
invokeListenerForPublish(clientId, mqttQoS, topicName, message);
if (packetId != -1) {
MqttMessage messageAck = MqttMessageBuilders.pubAck()
.packetId(packetId)
......@@ -120,7 +124,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
Boolean resultPubRec = Tio.send(context, pubRecMessage);
logger.debug("Publish - PubRec send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", clientId, topicName, mqttQoS, packetId, resultPubRec);
publishManager.addPendingQos2Publish(packetId, pendingQos2Publish);
publishManager.addPendingQos2Publish(clientId, packetId, pendingQos2Publish);
pendingQos2Publish.startPubRecRetransmitTimer(executor, msg -> Tio.send(context, msg));
}
break;
......@@ -135,12 +139,12 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
int messageId = variableHeader.messageId();
String clientId = context.getBsId();
logger.debug("PubAck - clientId:{}, messageId: {}", clientId, messageId);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(messageId);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(clientId, messageId);
if (pendingPublish == null) {
return;
}
pendingPublish.onPubAckReceived();
publishManager.removePendingPublish(messageId);
publishManager.removePendingPublish(clientId, messageId);
pendingPublish.getPayload().clear();
}
......@@ -149,7 +153,10 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
String clientId = context.getBsId();
int messageId = variableHeader.messageId();
logger.debug("PubRec - clientId:{}, messageId: {}", clientId, messageId);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(messageId);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(clientId, messageId);
if (pendingPublish == null) {
return;
}
pendingPublish.onPubAckReceived();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
......@@ -165,14 +172,14 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
String clientId = context.getBsId();
int messageId = variableHeader.messageId();
logger.debug("PubRel - clientId:{}, messageId: {}", clientId, messageId);
MqttPendingQos2Publish pendingQos2Publish = publishManager.getPendingQos2Publish(messageId);
MqttPendingQos2Publish pendingQos2Publish = publishManager.getPendingQos2Publish(clientId, messageId);
if (pendingQos2Publish != null) {
MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
String topicName = incomingPublish.variableHeader().topicName();
MqttQoS mqttQoS = incomingPublish.fixedHeader().qosLevel();
invokeListenerForPublish(mqttQoS, topicName, incomingPublish);
invokeListenerForPublish(clientId, mqttQoS, topicName, incomingPublish);
pendingQos2Publish.onPubRelReceived();
publishManager.removePendingQos2Publish(messageId);
publishManager.removePendingQos2Publish(clientId, messageId);
}
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
......@@ -185,10 +192,12 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
int messageId = variableHeader.messageId();
String clientId = context.getBsId();
logger.debug("PubComp - clientId:{}, messageId: {}", clientId, messageId);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(messageId);
pendingPublish.getPayload().clear();
pendingPublish.onPubCompReceived();
publishManager.removePendingPublish(messageId);
MqttPendingPublish pendingPublish = publishManager.getPendingPublish(clientId, messageId);
if (pendingPublish != null) {
pendingPublish.getPayload().clear();
pendingPublish.onPubCompReceived();
publishManager.removePendingPublish(clientId, messageId);
}
}
@Override
......@@ -203,7 +212,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
String topicName = subscription.topicName();
MqttQoS mqttQoS = subscription.qualityOfService();
mqttQosList.add(mqttQoS);
subscribeStore.add(clientId, topicName, mqttQoS);
sessionManager.addSubscribe(clientId, topicName, mqttQoS);
logger.debug("Subscribe - clientId:{} messageId:{} topicFilter:{} mqttQoS:{}", clientId, messageId, topicName, mqttQoS);
}
// 3. 返回 ack
......@@ -221,7 +230,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
int messageId = message.variableHeader().messageId();
List<String> topicFilterList = message.payload().topics();
for (String topicFilter : topicFilterList) {
subscribeStore.remove(clientId, topicFilter);
sessionManager.removeSubscribe(clientId, topicFilter);
logger.debug("UnSubscribe - clientId:{} messageId:{} topicFilter:{}", clientId, messageId, topicFilter);
}
MqttMessage unSubMessage = MqttMessageBuilders.unsubAck()
......@@ -247,16 +256,17 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
/**
* 处理订阅的消息
*
* @param clientId clientId
* @param topicName topicName
* @param message MqttPublishMessage
*/
private void invokeListenerForPublish(MqttQoS mqttQoS, String topicName, MqttPublishMessage message) {
List<MqttSubscription> subscriptionList = subManager.getMatchedSubscription(topicName, mqttQoS);
private void invokeListenerForPublish(String clientId, MqttQoS mqttQoS, String topicName, MqttPublishMessage message) {
List<MqttServerSubscription> subscriptionList = subManager.getMatchedSubscription(topicName, mqttQoS);
final ByteBuffer payload = message.payload();
subscriptionList.forEach(subscription -> {
MqttMessageListener listener = subscription.getListener();
IMqttServerMessageListener listener = subscription.getListener();
payload.rewind();
listener.onMessage(topicName, payload);
listener.onMessage(clientId, topicName, payload);
});
}
......
package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.IMqttServerPublishManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 发布过程中的数据存储
*
* @author L.cm
*/
public class DefaultMqttServerPublishManager implements IMqttServerPublishManager {
/**
* clientId: {msgId: Object}
*/
private final Map<String, Map<Integer, MqttPendingPublish>> pendingPublishData = new ConcurrentHashMap<>();
/**
* clientId: {msgId: Object}
*/
private final Map<String , Map<Integer, MqttPendingQos2Publish>> pendingQos2PublishData = new ConcurrentHashMap<>();
@Override
public void addPendingPublish(String clientId, int messageId, MqttPendingPublish pendingPublish) {
Map<Integer, MqttPendingPublish> data = pendingPublishData.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>(16));
data.put(messageId, pendingPublish);
}
@Override
public MqttPendingPublish getPendingPublish(String clientId, int messageId) {
return pendingPublishData.get(clientId).get(messageId);
}
@Override
public void removePendingPublish(String clientId, int messageId) {
pendingPublishData.get(clientId).remove(messageId);
}
@Override
public void addPendingQos2Publish(String clientId, int messageId, MqttPendingQos2Publish pendingQos2Publish) {
Map<Integer, MqttPendingQos2Publish> data = pendingQos2PublishData.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>());
data.put(messageId, pendingQos2Publish);
}
@Override
public MqttPendingQos2Publish getPendingQos2Publish(String clientId, int messageId) {
return pendingQos2PublishData.get(clientId).get(messageId);
}
@Override
public void removePendingQos2Publish(String clientId, int messageId) {
pendingQos2PublishData.get(clientId).remove(messageId);
}
}
package net.dreamlu.iot.mqtt.core.server;
package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.server.IMqttServerSubscribeManager;
import net.dreamlu.iot.mqtt.core.server.MqttServerSubscription;
import java.util.ArrayList;
import java.util.Collections;
......@@ -13,18 +14,18 @@ import java.util.List;
*
* @author L.cm
*/
public class DefaultMqttServerSubManager implements IMqttSubManager {
private final List<MqttSubscription> subscriptionList = new LinkedList<>();
public class DefaultMqttServerSubscribeManager implements IMqttServerSubscribeManager {
private final List<MqttServerSubscription> subscriptionList = new LinkedList<>();
@Override
public void subscribe(MqttSubscription subscription) {
public void subscribe(MqttServerSubscription subscription) {
subscriptionList.add(subscription);
}
@Override
public List<MqttSubscription> getMatchedSubscription(String topicName, MqttQoS mqttQoS) {
List<MqttSubscription> list = new ArrayList<>();
for (MqttSubscription subscription : subscriptionList) {
public List<MqttServerSubscription> getMatchedSubscription(String topicName, MqttQoS mqttQoS) {
List<MqttServerSubscription> list = new ArrayList<>();
for (MqttServerSubscription subscription : subscriptionList) {
MqttQoS qos = subscription.getMqttQoS();
if (subscription.matches(topicName) && (qos == null || qos == mqttQoS)) {
list.add(subscription);
......
......@@ -12,7 +12,7 @@
<properties>
<log4j2.version>2.14.1</log4j2.version>
<graalvm.version>21.1.0</graalvm.version>
<graalvm.version>21.2.0</graalvm.version>
<mainClass.server>net.dreamlu.iot.mqtt.server.MqttServerTest</mainClass.server>
<mainClass.client>net.dreamlu.iot.mqtt.client.MqttClientTest</mainClass.client>
</properties>
......
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.core.server.IMqttMessageIdGenerator;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 仅仅作为演示,实际需要考虑集群
*/
public class MqttMessageIdGenerator implements IMqttMessageIdGenerator {
private final AtomicInteger value = new AtomicInteger(1);
@Override
public int getId() {
this.value.compareAndSet(0xffff, 1);
return this.value.getAndIncrement();
}
}
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.IMqttPublishManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MqttPublishManager implements IMqttPublishManager {
private final Map<Integer, MqttPendingPublish> pendingPublishData = new ConcurrentHashMap<>();
private final Map<Integer, MqttPendingQos2Publish> pendingQos2PublishData = new ConcurrentHashMap<>();
@Override
public void addPendingPublish(int messageId, MqttPendingPublish pendingPublish) {
pendingPublishData.put(messageId, pendingPublish);
}
@Override
public MqttPendingPublish getPendingPublish(int messageId) {
return pendingPublishData.get(messageId);
}
@Override
public void removePendingPublish(int messageId) {
pendingPublishData.remove(messageId);
}
@Override
public void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish) {
pendingQos2PublishData.put(messageId, pendingQos2Publish);
}
@Override
public MqttPendingQos2Publish getPendingQos2Publish(int messageId) {
return pendingQos2PublishData.get(messageId);
}
@Override
public void removePendingQos2Publish(int messageId) {
pendingQos2PublishData.remove(messageId);
}
}
......@@ -18,10 +18,9 @@ package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.server.DefaultMqttServerSubManager;
import net.dreamlu.iot.mqtt.core.server.IMqttMessageIdGenerator;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerSubscription;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerSubscribeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -40,15 +39,12 @@ public class MqttServerTest {
public static void main(String[] args) throws IOException {
// 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
DefaultMqttServerSubManager subManager = new DefaultMqttServerSubManager();
DefaultMqttServerSubscribeManager subManager = new DefaultMqttServerSubscribeManager();
// 服务端注册订阅
subManager.subscribe(new MqttSubscription(MqttQoS.AT_MOST_ONCE, "/test/#", ((topic, payload) -> {
logger.info("subscribe:\t" + topic + '\t' + ByteBufferUtil.toString(payload));
subManager.subscribe(new MqttServerSubscription(MqttQoS.AT_MOST_ONCE, "/test/#", ((clientId, topic, payload) -> {
logger.info("clientId:{} subscribe:{} message:{}", clientId, topic, ByteBufferUtil.toString(payload));
})));
IMqttMessageIdGenerator messageIdGenerator = new MqttMessageIdGenerator();
MqttSubscribeStore subscribeStore = new MqttSubscribeStore();
MqttPublishManager publishManager = new MqttPublishManager();
MqttServer mqttServer = MqttServer.create()
// 默认:127.0.0.1
.ip("127.0.0.1")
......@@ -56,10 +52,7 @@ public class MqttServerTest {
.port(1883)
// 默认为: 20480,为了降低内存可以小此参数
.readBufferSize(512)
.messageIdGenerator(messageIdGenerator)
.publishManager(publishManager)
.subManager(subManager)
.subscribeStore(subscribeStore)
.debug() // 开启 debug 信息日志
.start();
......@@ -67,7 +60,7 @@ public class MqttServerTest {
timer.schedule(new TimerTask() {
@Override
public void run() {
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()), MqttQoS.EXACTLY_ONCE);
}
}, 1000, 2000);
}
......
package net.dreamlu.iot.mqtt.server;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.store.IMqttSubscribeStore;
import net.dreamlu.iot.mqtt.core.server.store.SubscribeStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MqttSubscribeStore implements IMqttSubscribeStore {
private final ConcurrentMap<String, ConcurrentMap<String, SubscribeStore>> data = new ConcurrentHashMap<>();
@Override
public void add(String clientId, String topicFilter, MqttQoS mqttQoS) {
ConcurrentMap<String, SubscribeStore> map = data.get(clientId);
if (map == null) {
map = new ConcurrentHashMap<>();
}
map.put(topicFilter, new SubscribeStore(topicFilter, mqttQoS.value()));
data.put(clientId, map);
}
@Override
public void remove(String clientId, String topicFilter) {
ConcurrentMap<String, SubscribeStore> map = data.get(clientId);
if (map == null) {
return;
}
map.remove(topicFilter);
}
@Override
public void remove(String clientId) {
data.remove(clientId);
}
@Override
public List<SubscribeStore> search(String clientId, String topicName) {
List<SubscribeStore> list = new ArrayList<>();
ConcurrentMap<String, SubscribeStore> map = data.get(clientId);
if (map == null) {
return Collections.emptyList();
}
Collection<SubscribeStore> values = map.values();
for (SubscribeStore value : values) {
if (value.getTopicRegex().matcher(topicName).matches()) {
list.add(value);
}
}
return list;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册