提交 9556e282 编写于 作者: 浅梦2013's avatar 浅梦2013

订阅管理集成到 session 管理中

上级 2c32f8c2
......@@ -28,7 +28,6 @@
| IMqttServerAuthHandler | 是 | 用于客户端认证 |
| IMqttMessageListener | 是 | 消息监听 |
| IMqttConnectStatusListener | 是 | 连接状态监听 |
| IMqttServerSubscribeManager | 否 | 订阅管理 |
| IMqttSessionManager | 否 | session 管理 |
| IMqttMessageStore | 集群是,单机否 | 遗嘱和保留消息存储 |
| IMqttMessageDispatcher | 集群是,单机否 | 消息转发 |
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import java.util.List;
/**
* mqtt 服务端 订阅管理
*
* @author L.cm
*/
public interface IMqttServerSubscribeManager {
/**
* 添加订阅存储
*
* @param topicFilter topicFilter
* @param clientId 客户端 Id
* @param mqttQoS MqttQoS
*/
void add(String topicFilter, String clientId, MqttQoS mqttQoS);
/**
* 删除订阅
*
* @param topicFilter topicFilter
* @param clientId 客户端 Id
*/
void remove(String topicFilter, String clientId);
/**
* 删除订阅
*
* @param clientId 客户端 Id
*/
void remove(String clientId);
/**
* 查找订阅信息
*
* @param topicName topicName
* @param clientId 客户端 Id
* @return 订阅存储列表
*/
List<Subscribe> search(String topicName, String clientId);
/**
* 查找订阅信息
*
* @param topicName topicName
* @return 订阅存储列表
*/
List<Subscribe> search(String topicName);
/**
* 清理
*/
void clean();
}
......@@ -43,7 +43,6 @@ public final class MqttServer {
private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
private final TioServer tioServer;
private final IMqttSessionManager sessionManager;
private final IMqttServerSubscribeManager subscribeManager;
private final ScheduledThreadPoolExecutor executor;
MqttServer(TioServer tioServer,
......@@ -51,7 +50,6 @@ public final class MqttServer {
ScheduledThreadPoolExecutor executor) {
this.tioServer = tioServer;
this.sessionManager = serverCreator.getSessionManager();
this.subscribeManager = serverCreator.getSubscribeManager();
this.executor = executor;
}
......@@ -122,7 +120,7 @@ public final class MqttServer {
logger.warn("Mqtt publish to clientId:{} ChannelContext is null may be disconnected.", clientId);
return false;
}
List<Subscribe> subscribeList = subscribeManager.search(topic, clientId);
List<Subscribe> subscribeList = sessionManager.searchSubscribe(topic, clientId);
if (subscribeList.isEmpty()) {
logger.warn("Mqtt publish but clientId:{} subscribeList is empty.", clientId);
return false;
......@@ -212,7 +210,7 @@ public final class MqttServer {
*/
public Boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
// 查找订阅该 topic 的客户端
List<Subscribe> subscribeList = subscribeManager.search(topic);
List<Subscribe> subscribeList = sessionManager.searchSubscribe(topic);
if (subscribeList.isEmpty()) {
logger.warn("Mqtt publish but topic:{} subscribe client list is empty.", topic);
return false;
......@@ -239,11 +237,6 @@ public final class MqttServer {
} catch (Throwable e) {
logger.error("Mqtt server stop session clean error.", e);
}
try {
subscribeManager.clean();
} catch (Throwable e) {
logger.error("Mqtt server stop subscribe clean error.", e);
}
this.executor.shutdown();
return result;
}
......
......@@ -37,14 +37,12 @@ public class MqttServerAioListener extends DefaultAioListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerAioListener.class);
private final IMqttMessageStore messageStore;
private final IMqttSessionManager sessionManager;
private final IMqttServerSubscribeManager subscribeManager;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener connectStatusListener;
public MqttServerAioListener(MqttServerCreator serverCreator) {
this.messageStore = serverCreator.getMessageStore();
this.sessionManager = serverCreator.getSessionManager();
this.subscribeManager = serverCreator.getSubscribeManager();
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.connectStatusListener = serverCreator.getConnectStatusListener();
}
......@@ -66,9 +64,11 @@ public class MqttServerAioListener extends DefaultAioListener {
logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", clientId, remark, isRemove);
// 1. 对于异常断开连接,处理遗嘱消息
sendWillMessage(context, clientId);
// 2. 释放资源
cleanUp(context, clientId);
// 3. 下线事件
// 2. 会话清理
cleanSession(clientId);
// 3. 解绑 clientId
Tio.unbindBsId(context);
// 4. 下线事件
notify(clientId);
}
......@@ -93,18 +93,12 @@ public class MqttServerAioListener extends DefaultAioListener {
}
}
private void cleanUp(ChannelContext context, String clientId) {
private void cleanSession(String clientId) {
try {
sessionManager.remove(clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} session clean error.", clientId, throwable);
}
try {
subscribeManager.remove(clientId);
} catch (Throwable throwable) {
logger.error("Mqtt server clientId:{} subscribe clean error.", clientId, throwable);
}
Tio.unbindBsId(context);
}
private void notify(String clientId) {
......
......@@ -25,7 +25,10 @@ import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.session.InMemoryMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.iot.mqtt.core.server.store.InMemoryMqttMessageStore;
import net.dreamlu.iot.mqtt.core.server.support.*;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerProcessor;
import org.tio.core.ssl.SslConfig;
import org.tio.core.stat.IpStatListener;
import org.tio.server.ServerTioConfig;
......@@ -98,10 +101,6 @@ public class MqttServerCreator {
* session 管理
*/
private IMqttSessionManager sessionManager;
/**
* 订阅管理
*/
private IMqttServerSubscribeManager subscribeManager;
/**
* 消息监听
*/
......@@ -248,15 +247,6 @@ public class MqttServerCreator {
return this;
}
public IMqttServerSubscribeManager getSubscribeManager() {
return subscribeManager;
}
public MqttServerCreator subscribeManager(IMqttServerSubscribeManager subscribeManager) {
this.subscribeManager = subscribeManager;
return this;
}
public IMqttMessageListener getMessageListener() {
return messageListener;
}
......@@ -295,9 +285,6 @@ public class MqttServerCreator {
if (this.sessionManager == null) {
this.sessionManager = new InMemoryMqttSessionManager();
}
if (this.subscribeManager == null) {
this.subscribeManager = new DefaultMqttServerSubscribeManager();
}
if (this.messageStore == null) {
this.messageStore = new InMemoryMqttMessageStore();
}
......
......@@ -16,8 +16,12 @@
package net.dreamlu.iot.mqtt.core.server.session;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import java.util.List;
/**
* session 管理,不封装 MqttSession 实体,方便 redis 等集群处理
......@@ -26,6 +30,40 @@ import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
*/
public interface IMqttSessionManager {
/**
* 添加订阅存储
*
* @param topicFilter topicFilter
* @param clientId 客户端 Id
* @param mqttQoS MqttQoS
*/
void addSubscribe(String topicFilter, String clientId, MqttQoS mqttQoS);
/**
* 删除订阅
*
* @param topicFilter topicFilter
* @param clientId 客户端 Id
*/
void removeSubscribe(String topicFilter, String clientId);
/**
* 查找订阅信息
*
* @param topicName topicName
* @param clientId 客户端 Id
* @return 订阅存储列表
*/
List<Subscribe> searchSubscribe(String topicName, String clientId);
/**
* 查找订阅信息
*
* @param topicName topicName
* @return 订阅存储列表
*/
List<Subscribe> searchSubscribe(String topicName);
/**
* 添加发布过程存储
*
......
......@@ -16,10 +16,16 @@
package net.dreamlu.iot.mqtt.core.server.session;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -31,18 +37,76 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class InMemoryMqttSessionManager implements IMqttSessionManager {
/**
* clientId: messageId
* messageId 存储 clientId: messageId
*/
private final ConcurrentMap<String, AtomicInteger> messageIdStore = new ConcurrentHashMap<>();
/**
* clientId: {msgId: Object}
* 订阅存储 topicFilter: {clientId: SubscribeStore}
*/
private final ConcurrentMap<String, ConcurrentMap<String, Integer>> subscribeStore = new ConcurrentHashMap<>();
/**
* qos1 消息过程存储 clientId: {msgId: Object}
*/
private final ConcurrentMap<String, Map<Integer, MqttPendingPublish>> pendingPublishStore = new ConcurrentHashMap<>();
/**
* clientId: {msgId: Object}
* qos2 消息过程存储 clientId: {msgId: Object}
*/
private final ConcurrentMap<String, Map<Integer, MqttPendingQos2Publish>> pendingQos2PublishStore = new ConcurrentHashMap<>();
@Override
public void addSubscribe(String topicFilter, String clientId, MqttQoS mqttQoS) {
Map<String, Integer> data = subscribeStore.computeIfAbsent(topicFilter, (key) -> new ConcurrentHashMap<>(16));
data.put(clientId, mqttQoS.value());
}
@Override
public void removeSubscribe(String topicFilter, String clientId) {
ConcurrentMap<String, Integer> map = subscribeStore.get(topicFilter);
if (map == null) {
return;
}
map.remove(clientId);
}
public void removeSubscribe(String clientId) {
subscribeStore.forEach((key, value) -> value.remove(clientId));
}
@Override
public List<Subscribe> searchSubscribe(String topicName, String clientId) {
List<Subscribe> list = new ArrayList<>();
Set<String> topicFilterSet = subscribeStore.keySet();
for (String topicFilter : topicFilterSet) {
if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
if (data != null && !data.isEmpty()) {
Integer mqttQoS = data.get(clientId);
if (mqttQoS != null) {
list.add(new Subscribe(topicFilter, mqttQoS));
}
}
}
}
return list;
}
@Override
public List<Subscribe> searchSubscribe(String topicName) {
List<Subscribe> list = new ArrayList<>();
Set<String> topicFilterSet = subscribeStore.keySet();
for (String topicFilter : topicFilterSet) {
if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
if (data != null && !data.isEmpty()) {
data.forEach((clientId, qos) -> {
list.add(new Subscribe(topicFilter, clientId, qos));
});
}
}
}
return list;
}
@Override
public void addPendingPublish(String clientId, int messageId, MqttPendingPublish pendingPublish) {
Map<Integer, MqttPendingPublish> data = pendingPublishStore.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>(16));
......@@ -103,6 +167,7 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
@Override
public void remove(String clientId) {
removeSubscribe(clientId);
pendingPublishStore.remove(clientId);
pendingQos2PublishStore.remove(clientId);
messageIdStore.remove(clientId);
......@@ -110,6 +175,7 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
@Override
public void clean() {
subscribeStore.clear();
pendingPublishStore.clear();
pendingQos2PublishStore.clear();
messageIdStore.clear();
......
......@@ -19,7 +19,10 @@ package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.*;
import net.dreamlu.iot.mqtt.core.server.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.MqttConst;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
......@@ -48,7 +51,6 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
private final IMqttMessageStore messageStore;
private final IMqttSessionManager sessionManager;
private final IMqttServerAuthHandler authHandler;
private final IMqttServerSubscribeManager subscribeManager;
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttConnectStatusListener connectStatusListener;
private final IMqttMessageListener messageListener;
......@@ -58,7 +60,6 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
this.messageStore = serverCreator.getMessageStore();
this.sessionManager = serverCreator.getSessionManager();
this.authHandler = serverCreator.getAuthHandler();
this.subscribeManager = serverCreator.getSubscribeManager();
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.connectStatusListener = serverCreator.getConnectStatusListener();
this.messageListener = serverCreator.getMessageListener();
......@@ -245,7 +246,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
MqttQoS mqttQoS = subscription.qualityOfService();
mqttQosList.add(mqttQoS);
topicList.add(topicName);
subscribeManager.add(topicName, clientId, mqttQoS);
sessionManager.addSubscribe(topicName, clientId, mqttQoS);
logger.debug("Subscribe - clientId:{} messageId:{} topicFilter:{} mqttQoS:{}", clientId, messageId, topicName, mqttQoS);
}
// 3. 返回 ack
......@@ -269,7 +270,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
int messageId = message.variableHeader().messageId();
List<String> topicFilterList = message.payload().topics();
for (String topicFilter : topicFilterList) {
subscribeManager.remove(topicFilter, clientId);
sessionManager.removeSubscribe(topicFilter, clientId);
logger.debug("UnSubscribe - clientId:{} messageId:{} topicFilter:{}", clientId, messageId, topicFilter);
}
MqttMessage unSubMessage = MqttMessageBuilders.unsubAck()
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.IMqttServerSubscribeManager;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 默认的 mqtt 订阅管理
*
* @author L.cm
*/
public class DefaultMqttServerSubscribeManager implements IMqttServerSubscribeManager {
/**
* topicFilter: {clientId: SubscribeStore}
*/
private final ConcurrentMap<String, ConcurrentMap<String, Integer>> subscribeStore = new ConcurrentHashMap<>();
@Override
public void add(String topicFilter, String clientId, MqttQoS mqttQoS) {
Map<String, Integer> data = subscribeStore.computeIfAbsent(topicFilter, (key) -> new ConcurrentHashMap<>(16));
data.put(clientId, mqttQoS.value());
}
@Override
public void remove(String topicFilter, String clientId) {
ConcurrentMap<String, Integer> map = subscribeStore.get(topicFilter);
if (map == null) {
return;
}
map.remove(clientId);
}
@Override
public void remove(String clientId) {
subscribeStore.forEach((key, value) -> value.remove(clientId));
}
@Override
public List<Subscribe> search(String topicName, String clientId) {
List<Subscribe> list = new ArrayList<>();
Set<String> topicFilterSet = subscribeStore.keySet();
for (String topicFilter : topicFilterSet) {
if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
if (data != null && !data.isEmpty()) {
Integer mqttQoS = data.get(clientId);
if (mqttQoS != null) {
list.add(new Subscribe(topicFilter, mqttQoS));
}
}
}
}
return list;
}
@Override
public List<Subscribe> search(String topicName) {
List<Subscribe> list = new ArrayList<>();
Set<String> topicFilterSet = subscribeStore.keySet();
for (String topicFilter : topicFilterSet) {
if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
if (data != null && !data.isEmpty()) {
data.forEach((clientId, qos) -> {
list.add(new Subscribe(topicFilter, clientId, qos));
});
}
}
}
return list;
}
@Override
public void clean() {
subscribeStore.clear();
}
}
......@@ -17,7 +17,6 @@
package net.dreamlu.iot.mqtt.spring.server;
import net.dreamlu.iot.mqtt.core.server.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.IMqttServerSubscribeManager;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
......@@ -49,7 +48,6 @@ public class MqttServerConfiguration {
ObjectProvider<IMqttMessageDispatcher> messageDispatcherObjectProvider,
ObjectProvider<IMqttMessageStore> messageStoreObjectProvider,
ObjectProvider<IMqttSessionManager> sessionManagerObjectProvider,
ObjectProvider<IMqttServerSubscribeManager> subscribeManagerObjectProvider,
ObjectProvider<IMqttMessageListener> messageListenerObjectProvider,
ObjectProvider<IMqttConnectStatusListener> connectStatusListenerObjectProvider,
ObjectProvider<IpStatListener> ipStatListenerObjectProvider,
......@@ -78,7 +76,6 @@ public class MqttServerConfiguration {
messageDispatcherObjectProvider.ifAvailable(serverCreator::messageDispatcher);
messageStoreObjectProvider.ifAvailable(serverCreator::messageStore);
sessionManagerObjectProvider.ifAvailable(serverCreator::sessionManager);
subscribeManagerObjectProvider.ifAvailable(serverCreator::subscribeManager);
messageListenerObjectProvider.ifAvailable(serverCreator::messageListener);
connectStatusListenerObjectProvider.ifAvailable(serverCreator::connectStatusListener);
ipStatListenerObjectProvider.ifAvailable(serverCreator::ipStatListener);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册