InMemoryMqttSessionManager.java 3.6 KB
Newer Older
如梦技术's avatar
如梦技术 已提交
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
如梦技术's avatar
如梦技术 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.dreamlu.iot.mqtt.core.server.session;

如梦技术's avatar
如梦技术 已提交
19 20
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
如梦技术's avatar
如梦技术 已提交
21

22
import java.util.Map;
如梦技术's avatar
如梦技术 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 内存 session 管理
 *
 * @author L.cm
 */
public class InMemoryMqttSessionManager implements IMqttSessionManager {
	/**
	 * clientId: messageId
	 */
如梦技术's avatar
如梦技术 已提交
36
	private final ConcurrentMap<String, AtomicInteger> messageIdStore = new ConcurrentHashMap<>();
如梦技术's avatar
如梦技术 已提交
37 38 39
	/**
	 * clientId: {msgId: Object}
	 */
如梦技术's avatar
如梦技术 已提交
40
	private final ConcurrentMap<String, Map<Integer, MqttPendingPublish>> pendingPublishStore = new ConcurrentHashMap<>();
如梦技术's avatar
如梦技术 已提交
41 42 43
	/**
	 * clientId: {msgId: Object}
	 */
44
	private final ConcurrentMap<String, Map<Integer, MqttPendingQos2Publish>> pendingQos2PublishStore = new ConcurrentHashMap<>();
如梦技术's avatar
如梦技术 已提交
45

如梦技术's avatar
如梦技术 已提交
46 47 48 49 50 51 52 53
	@Override
	public void addPendingPublish(String clientId, int messageId, MqttPendingPublish pendingPublish) {
		Map<Integer, MqttPendingPublish> data = pendingPublishStore.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>(16));
		data.put(messageId, pendingPublish);
	}

	@Override
	public MqttPendingPublish getPendingPublish(String clientId, int messageId) {
54 55 56 57 58
		Map<Integer, MqttPendingPublish> data = pendingPublishStore.get(clientId);
		if (data == null) {
			return null;
		}
		return data.get(messageId);
如梦技术's avatar
如梦技术 已提交
59 60 61 62
	}

	@Override
	public void removePendingPublish(String clientId, int messageId) {
63 64 65 66
		Map<Integer, MqttPendingPublish> data = pendingPublishStore.get(clientId);
		if (data != null) {
			data.remove(messageId);
		}
如梦技术's avatar
如梦技术 已提交
67 68 69 70 71 72 73 74 75 76
	}

	@Override
	public void addPendingQos2Publish(String clientId, int messageId, MqttPendingQos2Publish pendingQos2Publish) {
		Map<Integer, MqttPendingQos2Publish> data = pendingQos2PublishStore.computeIfAbsent(clientId, (key) -> new ConcurrentHashMap<>());
		data.put(messageId, pendingQos2Publish);
	}

	@Override
	public MqttPendingQos2Publish getPendingQos2Publish(String clientId, int messageId) {
77 78 79 80 81
		Map<Integer, MqttPendingQos2Publish> data = pendingQos2PublishStore.get(clientId);
		if (data == null) {
			return null;
		}
		return data.get(messageId);
如梦技术's avatar
如梦技术 已提交
82 83 84 85
	}

	@Override
	public void removePendingQos2Publish(String clientId, int messageId) {
86 87 88 89
		Map<Integer, MqttPendingQos2Publish> data = pendingQos2PublishStore.get(clientId);
		if (data != null) {
			data.remove(messageId);
		}
如梦技术's avatar
如梦技术 已提交
90 91
	}

如梦技术's avatar
如梦技术 已提交
92 93 94 95 96 97 98
	@Override
	public int getMessageId(String clientId) {
		AtomicInteger value = messageIdStore.computeIfAbsent(clientId, (key) -> new AtomicInteger(1));
		value.compareAndSet(0xffff, 1);
		return value.getAndIncrement();
	}

99 100 101 102 103
	@Override
	public boolean hasSession(String clientId) {
		return false;
	}

如梦技术's avatar
如梦技术 已提交
104
	@Override
如梦技术's avatar
如梦技术 已提交
105 106 107
	public void remove(String clientId) {
		pendingPublishStore.remove(clientId);
		pendingQos2PublishStore.remove(clientId);
如梦技术's avatar
如梦技术 已提交
108 109
		messageIdStore.remove(clientId);
	}
如梦技术's avatar
如梦技术 已提交
110

111 112 113 114 115 116 117
	@Override
	public void clean() {
		pendingPublishStore.clear();
		pendingQos2PublishStore.clear();
		messageIdStore.clear();
	}

如梦技术's avatar
如梦技术 已提交
118
}