InMemoryMqttSessionManager.java 6.4 KB
Newer Older
如梦技术's avatar
如梦技术 已提交
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
如梦技术's avatar
如梦技术 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.dreamlu.iot.mqtt.core.server.session;

19
import net.dreamlu.iot.mqtt.codec.MqttQoS;
如梦技术's avatar
如梦技术 已提交
20 21
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
22 23
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
如梦技术's avatar
如梦技术 已提交
24

25
import java.util.*;
如梦技术's avatar
如梦技术 已提交
26 27 28 29 30 31 32 33 34 35 36
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 内存 session 管理
 *
 * @author L.cm
 */
public class InMemoryMqttSessionManager implements IMqttSessionManager {
	/**
37
	 * messageId 存储 clientId: messageId
如梦技术's avatar
如梦技术 已提交
38
	 */
如梦技术's avatar
如梦技术 已提交
39
	private final ConcurrentMap<String, AtomicInteger> messageIdStore = new ConcurrentHashMap<>();
如梦技术's avatar
如梦技术 已提交
40
	/**
41 42 43 44 45
	 * 订阅存储 topicFilter: {clientId: SubscribeStore}
	 */
	private final ConcurrentMap<String, ConcurrentMap<String, Integer>> subscribeStore = new ConcurrentHashMap<>();
	/**
	 * qos1 消息过程存储 clientId: {msgId: Object}
如梦技术's avatar
如梦技术 已提交
46
	 */
如梦技术's avatar
如梦技术 已提交
47
	private final ConcurrentMap<String, Map<Integer, MqttPendingPublish>> pendingPublishStore = new ConcurrentHashMap<>();
如梦技术's avatar
如梦技术 已提交
48
	/**
49
	 * qos2 消息过程存储 clientId: {msgId: Object}
如梦技术's avatar
如梦技术 已提交
50
	 */
51
	private final ConcurrentMap<String, Map<Integer, MqttPendingQos2Publish>> pendingQos2PublishStore = new ConcurrentHashMap<>();
如梦技术's avatar
如梦技术 已提交
52

53
	@Override
54
	public void addSubscribe(String topicFilter, String clientId, int mqttQoS) {
55
		Map<String, Integer> data = subscribeStore.computeIfAbsent(topicFilter, (key) -> new ConcurrentHashMap<>(16));
如梦技术's avatar
如梦技术 已提交
56 57
		// 如果不存在或者老的订阅 qos 比较小也重新设置
		Integer existingQos = data.get(clientId);
58 59
		if (existingQos == null || existingQos < mqttQoS) {
			data.put(clientId, mqttQoS);
如梦技术's avatar
如梦技术 已提交
60
		}
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
	}

	@Override
	public void removeSubscribe(String topicFilter, String clientId) {
		ConcurrentMap<String, Integer> map = subscribeStore.get(topicFilter);
		if (map == null) {
			return;
		}
		map.remove(clientId);
	}

	public void removeSubscribe(String clientId) {
		subscribeStore.forEach((key, value) -> value.remove(clientId));
	}

	@Override
77 78
	public Integer searchSubscribe(String topicName, String clientId) {
		Integer qosValue = null;
79 80 81 82 83 84 85
		Set<String> topicFilterSet = subscribeStore.keySet();
		for (String topicFilter : topicFilterSet) {
			if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
				ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
				if (data != null && !data.isEmpty()) {
					Integer mqttQoS = data.get(clientId);
					if (mqttQoS != null) {
86 87 88 89 90
						if (qosValue == null) {
							qosValue = mqttQoS;
						} else {
							qosValue = Math.min(qosValue, mqttQoS);
						}
91 92 93 94
					}
				}
			}
		}
95
		return qosValue;
96 97 98 99
	}

	@Override
	public List<Subscribe> searchSubscribe(String topicName) {
100 101
		// 排除重复订阅,例如: /test/# 和 /# 只发一份
		Map<String, Integer> subscribeMap = new HashMap<>(32);
102 103 104 105 106 107
		Set<String> topicFilterSet = subscribeStore.keySet();
		for (String topicFilter : topicFilterSet) {
			if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
				ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
				if (data != null && !data.isEmpty()) {
					data.forEach((clientId, qos) -> {
108
						subscribeMap.merge(clientId, qos, Math::min);
109 110 111 112
					});
				}
			}
		}
113 114 115 116 117 118
		List<Subscribe> subscribeList = new ArrayList<>();
		subscribeMap.forEach((clientId, qos) -> {
			subscribeList.add(new Subscribe(clientId, qos));
		});
		subscribeMap.clear();
		return subscribeList;
119 120
	}

如梦技术's avatar
如梦技术 已提交
121 122 123 124 125 126 127 128
	@Override
	public void addPendingPublish(String clientId, int messageId, MqttPendingPublish pendingPublish) {
		Map<Integer, MqttPendingPublish> data = pendingPublishStore.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>(16));
		data.put(messageId, pendingPublish);
	}

	@Override
	public MqttPendingPublish getPendingPublish(String clientId, int messageId) {
129 130 131 132 133
		Map<Integer, MqttPendingPublish> data = pendingPublishStore.get(clientId);
		if (data == null) {
			return null;
		}
		return data.get(messageId);
如梦技术's avatar
如梦技术 已提交
134 135 136 137
	}

	@Override
	public void removePendingPublish(String clientId, int messageId) {
138 139 140 141
		Map<Integer, MqttPendingPublish> data = pendingPublishStore.get(clientId);
		if (data != null) {
			data.remove(messageId);
		}
如梦技术's avatar
如梦技术 已提交
142 143 144 145 146 147 148 149 150 151
	}

	@Override
	public void addPendingQos2Publish(String clientId, int messageId, MqttPendingQos2Publish pendingQos2Publish) {
		Map<Integer, MqttPendingQos2Publish> data = pendingQos2PublishStore.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>());
		data.put(messageId, pendingQos2Publish);
	}

	@Override
	public MqttPendingQos2Publish getPendingQos2Publish(String clientId, int messageId) {
152 153 154 155 156
		Map<Integer, MqttPendingQos2Publish> data = pendingQos2PublishStore.get(clientId);
		if (data == null) {
			return null;
		}
		return data.get(messageId);
如梦技术's avatar
如梦技术 已提交
157 158 159 160
	}

	@Override
	public void removePendingQos2Publish(String clientId, int messageId) {
161 162 163 164
		Map<Integer, MqttPendingQos2Publish> data = pendingQos2PublishStore.get(clientId);
		if (data != null) {
			data.remove(messageId);
		}
如梦技术's avatar
如梦技术 已提交
165 166
	}

如梦技术's avatar
如梦技术 已提交
167 168 169 170 171 172 173
	@Override
	public int getMessageId(String clientId) {
		AtomicInteger value = messageIdStore.computeIfAbsent(clientId, (key) -> new AtomicInteger(1));
		value.compareAndSet(0xffff, 1);
		return value.getAndIncrement();
	}

174 175
	@Override
	public boolean hasSession(String clientId) {
176 177 178 179
		return pendingQos2PublishStore.containsKey(clientId)
			|| pendingPublishStore.containsKey(clientId)
			|| messageIdStore.containsKey(clientId)
			|| subscribeStore.values().stream().anyMatch(data -> data.containsKey(clientId));
180 181
	}

如梦技术's avatar
如梦技术 已提交
182
	@Override
如梦技术's avatar
如梦技术 已提交
183
	public void remove(String clientId) {
184
		removeSubscribe(clientId);
如梦技术's avatar
如梦技术 已提交
185 186
		pendingPublishStore.remove(clientId);
		pendingQos2PublishStore.remove(clientId);
如梦技术's avatar
如梦技术 已提交
187 188
		messageIdStore.remove(clientId);
	}
如梦技术's avatar
如梦技术 已提交
189

190 191
	@Override
	public void clean() {
192
		subscribeStore.clear();
193 194 195 196 197
		pendingPublishStore.clear();
		pendingQos2PublishStore.clear();
		messageIdStore.clear();
	}

如梦技术's avatar
如梦技术 已提交
198
}