MqttServer.java 7.9 KB
Newer Older
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.dreamlu.iot.mqtt.core.server;

19 20 21
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
如梦技术's avatar
如梦技术 已提交
22
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
如梦技术's avatar
如梦技术 已提交
23
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
24
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
如梦技术's avatar
如梦技术 已提交
25 26
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
27 28
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
如梦技术's avatar
如梦技术 已提交
29 30
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
31 32

import java.nio.ByteBuffer;
如梦技术's avatar
如梦技术 已提交
33
import java.util.List;
34
import java.util.concurrent.ScheduledThreadPoolExecutor;
如梦技术's avatar
如梦技术 已提交
35

36 37 38 39 40
/**
 * mqtt 服务端
 *
 * @author L.cm
 */
如梦技术's avatar
如梦技术 已提交
41 42 43
public final class MqttServer {
	private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
	private final TioServer tioServer;
如梦技术's avatar
如梦技术 已提交
44
	private final IMqttSessionManager sessionManager;
45
	private final ScheduledThreadPoolExecutor executor;
浅梦2013's avatar
浅梦2013 已提交
46
	private TioServer tioWsServer;
如梦技术's avatar
如梦技术 已提交
47

浅梦2013's avatar
浅梦2013 已提交
48 49 50
	public MqttServer(TioServer tioServer,
					  MqttServerCreator serverCreator,
					  ScheduledThreadPoolExecutor executor) {
如梦技术's avatar
如梦技术 已提交
51
		this.tioServer = tioServer;
浅梦2013's avatar
浅梦2013 已提交
52
		this.sessionManager = serverCreator.getSessionManager();
53
		this.executor = executor;
如梦技术's avatar
如梦技术 已提交
54 55 56 57 58 59
	}

	public static MqttServerCreator create() {
		return new MqttServerCreator();
	}

浅梦2013's avatar
浅梦2013 已提交
60 61 62 63 64 65 66 67 68
	/**
	 * 获取 TioServer
	 *
	 * @return TioServer
	 */
	public TioServer getTioServer() {
		return this.tioServer;
	}

如梦技术's avatar
如梦技术 已提交
69 70 71 72 73 74 75 76
	/**
	 * 获取 ServerTioConfig
	 *
	 * @return the serverTioConfig
	 */
	public ServerTioConfig getServerConfig() {
		return this.tioServer.getServerTioConfig();
	}
77

78 79 80 81 82 83 84 85
	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
86
	public boolean publish(String clientId, String topic, ByteBuffer payload) {
87 88 89 90 91 92 93 94 95 96 97 98
		return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param qos      MqttQoS
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
99
	public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos) {
100 101 102 103 104 105 106 107 108 109 110 111
		return publish(clientId, topic, payload, qos, false);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param retain   是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
112
	public boolean publish(String clientId, String topic, ByteBuffer payload, boolean retain) {
113 114 115 116 117 118 119 120 121 122 123 124 125
		return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE, retain);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param qos      MqttQoS
	 * @param retain   是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
126
	public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
127
		ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
128
		if (context == null || context.isClosed) {
浅梦2013's avatar
浅梦2013 已提交
129
			logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", topic, clientId);
130 131
			return false;
		}
132
		List<Subscribe> subscribeList = sessionManager.searchSubscribe(topic, clientId);
如梦技术's avatar
如梦技术 已提交
133
		if (subscribeList.isEmpty()) {
浅梦2013's avatar
浅梦2013 已提交
134
			logger.warn("Mqtt Topic:{} publish but clientId:{} subscribeList is empty.", topic, clientId);
如梦技术's avatar
如梦技术 已提交
135 136
			return false;
		}
如梦技术's avatar
如梦技术 已提交
137
		for (Subscribe subscribe : subscribeList) {
如梦技术's avatar
如梦技术 已提交
138 139
			int subMqttQoS = subscribe.getMqttQoS();
			MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
如梦技术's avatar
如梦技术 已提交
140
			publish(context, clientId, topic, payload, mqttQoS, retain);
如梦技术's avatar
如梦技术 已提交
141 142
		}
		return true;
143 144 145 146 147 148 149 150 151 152 153 154
	}

	/**
	 * 发布消息
	 *
	 * @param context ChannelContext
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
155
	public boolean publish(ChannelContext context, String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
156
		boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
如梦技术's avatar
如梦技术 已提交
157
		int messageId = isHighLevelQoS ? sessionManager.getMessageId(clientId) : -1;
158
		payload.rewind();
159 160 161 162 163 164 165
		MqttPublishMessage message = MqttMessageBuilders.publish()
			.topicName(topic)
			.payload(payload)
			.qos(qos)
			.retained(retain)
			.messageId(messageId)
			.build();
浅梦2013's avatar
浅梦2013 已提交
166
		boolean result = Tio.send(context, message);
浅梦2013's avatar
浅梦2013 已提交
167
		logger.info("MQTT Topic:{} qos:{} retain:{} publish clientId:{} result:{}", topic, qos, retain, clientId, result);
168
		if (isHighLevelQoS) {
如梦技术's avatar
如梦技术 已提交
169
			MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
如梦技术's avatar
如梦技术 已提交
170
			sessionManager.addPendingPublish(clientId, messageId, pendingPublish);
如梦技术's avatar
如梦技术 已提交
171
			pendingPublish.startPublishRetransmissionTimer(executor, msg -> Tio.send(context, msg));
172 173 174 175 176 177 178 179 180 181 182
		}
		return result;
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
183
	public boolean publishAll(String topic, ByteBuffer payload) {
184 185 186 187 188 189 190 191 192 193 194
		return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE);
	}

	/**
	 * 发布消息
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
195
	public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos) {
196 197 198 199 200 201 202 203 204 205 206
		return publishAll(topic, payload, qos, false);
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
207
	public boolean publishAll(String topic, ByteBuffer payload, boolean retain) {
208 209 210 211 212 213 214 215 216 217 218 219
		return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
220
	public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
221
		// 查找订阅该 topic 的客户端
222
		List<Subscribe> subscribeList = sessionManager.searchSubscribe(topic);
223
		if (subscribeList.isEmpty()) {
浅梦2013's avatar
浅梦2013 已提交
224
			logger.warn("Mqtt Topic:{} publishAll but subscribe client list is empty.", topic);
225 226 227 228 229 230
			return false;
		}
		for (Subscribe subscribe : subscribeList) {
			String clientId = subscribe.getClientId();
			ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
			if (context == null || context.isClosed) {
浅梦2013's avatar
浅梦2013 已提交
231
				logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", topic, clientId);
232
				continue;
如梦技术's avatar
如梦技术 已提交
233
			}
234 235 236 237
			int subMqttQoS = subscribe.getMqttQoS();
			MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
			publish(context, clientId, topic, payload, mqttQoS, retain);
		}
238 239 240
		return true;
	}

浅梦2013's avatar
浅梦2013 已提交
241 242 243 244 245 246 247 248 249
	/**
	 * 绑定 websocket 服务
	 *
	 * @param tioWsServer TioServer
	 */
	public void setTioWsServer(TioServer tioWsServer) {
		this.tioWsServer = tioWsServer;
	}

如梦技术's avatar
如梦技术 已提交
250 251
	public boolean stop() {
		boolean result = this.tioServer.stop();
浅梦2013's avatar
浅梦2013 已提交
252 253 254 255 256
		logger.info("Mqtt tcp server stop result:{}", result);
		if (tioWsServer != null) {
			result &= tioWsServer.stop();
			logger.info("Mqtt websocket server stop result:{}", result);
		}
257 258 259
		try {
			sessionManager.clean();
		} catch (Throwable e) {
浅梦2013's avatar
浅梦2013 已提交
260
			logger.error("MqttServer stop session clean error.", e);
261
		}
262
		this.executor.shutdown();
如梦技术's avatar
如梦技术 已提交
263 264
		return result;
	}
265 266

}