提交 99caddb6 编写于 作者: 如梦技术's avatar 如梦技术 🐛

🐛 修复 mqtt-server 多个订阅同时匹配时消息重复的问题。

上级 2ad29ae3
......@@ -153,16 +153,13 @@ public final class MqttServer {
logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", topic, clientId);
return false;
}
List<Subscribe> subscribeList = sessionManager.searchSubscribe(topic, clientId);
if (subscribeList.isEmpty()) {
logger.warn("Mqtt Topic:{} publish but clientId:{} subscribeList is empty.", topic, clientId);
Integer subMqttQoS = sessionManager.searchSubscribe(topic, clientId);
if (subMqttQoS == null) {
logger.warn("Mqtt Topic:{} publish but clientId:{} not subscribed.", topic, clientId);
return false;
}
for (Subscribe subscribe : subscribeList) {
int subMqttQoS = subscribe.getMqttQoS();
MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
publish(context, clientId, topic, payload, mqttQoS, retain);
}
MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
publish(context, clientId, topic, payload, mqttQoS, retain);
return true;
}
......
......@@ -32,13 +32,8 @@ public class Subscribe implements Serializable {
public Subscribe() {
}
public Subscribe(String topicFilter, String clientId) {
this.topicFilter = topicFilter;
public Subscribe(String clientId, int mqttQoS) {
this.clientId = clientId;
}
public Subscribe(String topicFilter, int mqttQoS) {
this.topicFilter = topicFilter;
this.mqttQoS = mqttQoS;
}
......
......@@ -48,13 +48,13 @@ public interface IMqttSessionManager {
void removeSubscribe(String topicFilter, String clientId);
/**
* 查找订阅信息
* 查找订阅 qos 信息
*
* @param topicName topicName
* @param clientId 客户端 Id
* @return 订阅存储列表
*/
List<Subscribe> searchSubscribe(String topicName, String clientId);
Integer searchSubscribe(String topicName, String clientId);
/**
* 查找订阅信息
......
......@@ -22,10 +22,7 @@ import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -77,8 +74,8 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
}
@Override
public List<Subscribe> searchSubscribe(String topicName, String clientId) {
List<Subscribe> list = new ArrayList<>();
public Integer searchSubscribe(String topicName, String clientId) {
Integer qosValue = null;
Set<String> topicFilterSet = subscribeStore.keySet();
for (String topicFilter : topicFilterSet) {
if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
......@@ -86,29 +83,39 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
if (data != null && !data.isEmpty()) {
Integer mqttQoS = data.get(clientId);
if (mqttQoS != null) {
list.add(new Subscribe(topicFilter, mqttQoS));
if (qosValue == null) {
qosValue = mqttQoS;
} else {
qosValue = Math.min(qosValue, mqttQoS);
}
}
}
}
}
return list;
return qosValue;
}
@Override
public List<Subscribe> searchSubscribe(String topicName) {
List<Subscribe> list = new ArrayList<>();
// 排除重复订阅,例如: /test/# 和 /# 只发一份
Map<String, Integer> subscribeMap = new HashMap<>(32);
Set<String> topicFilterSet = subscribeStore.keySet();
for (String topicFilter : topicFilterSet) {
if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
if (data != null && !data.isEmpty()) {
data.forEach((clientId, qos) -> {
list.add(new Subscribe(topicFilter, clientId, qos));
subscribeMap.merge(clientId, qos, Math::min);
});
}
}
}
return list;
List<Subscribe> subscribeList = new ArrayList<>();
subscribeMap.forEach((clientId, qos) -> {
subscribeList.add(new Subscribe(clientId, qos));
});
subscribeMap.clear();
return subscribeList;
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册