MqttServer.java 9.1 KB
Newer Older
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.dreamlu.iot.mqtt.core.server;

19 20 21
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
如梦技术's avatar
如梦技术 已提交
22
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
23
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
如梦技术's avatar
如梦技术 已提交
24
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
25
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
如梦技术's avatar
如梦技术 已提交
26 27
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
28 29
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
如梦技术's avatar
如梦技术 已提交
30 31
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
32

33
import java.io.IOException;
34
import java.nio.ByteBuffer;
如梦技术's avatar
如梦技术 已提交
35
import java.util.List;
36
import java.util.concurrent.ScheduledThreadPoolExecutor;
如梦技术's avatar
如梦技术 已提交
37

38 39 40 41 42
/**
 * mqtt 服务端
 *
 * @author L.cm
 */
如梦技术's avatar
如梦技术 已提交
43 44 45
public final class MqttServer {
	private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
	private final TioServer tioServer;
46 47
	private final MqttWebServer webServer;
	private final MqttServerCreator serverCreator;
如梦技术's avatar
如梦技术 已提交
48
	private final IMqttSessionManager sessionManager;
49
	private final ScheduledThreadPoolExecutor executor;
如梦技术's avatar
如梦技术 已提交
50

51
	MqttServer(TioServer tioServer,
浅梦2013's avatar
浅梦2013 已提交
52 53 54
			   MqttWebServer webServer,
			   MqttServerCreator serverCreator,
			   ScheduledThreadPoolExecutor executor) {
如梦技术's avatar
如梦技术 已提交
55
		this.tioServer = tioServer;
56 57
		this.webServer = webServer;
		this.serverCreator = serverCreator;
浅梦2013's avatar
浅梦2013 已提交
58
		this.sessionManager = serverCreator.getSessionManager();
59
		this.executor = executor;
如梦技术's avatar
如梦技术 已提交
60 61 62 63 64 65
	}

	public static MqttServerCreator create() {
		return new MqttServerCreator();
	}

浅梦2013's avatar
浅梦2013 已提交
66 67 68 69 70 71 72 73 74
	/**
	 * 获取 TioServer
	 *
	 * @return TioServer
	 */
	public TioServer getTioServer() {
		return this.tioServer;
	}

75 76 77 78 79 80 81 82 83
	/**
	 * 获取 http、websocket 服务
	 *
	 * @return MqttWebServer
	 */
	public MqttWebServer getWebServer() {
		return webServer;
	}

如梦技术's avatar
如梦技术 已提交
84 85 86 87 88 89 90 91
	/**
	 * 获取 ServerTioConfig
	 *
	 * @return the serverTioConfig
	 */
	public ServerTioConfig getServerConfig() {
		return this.tioServer.getServerTioConfig();
	}
92

93 94 95 96 97 98 99 100 101
	/**
	 * 获取 mqtt 配置
	 *
	 * @return MqttServerCreator
	 */
	public MqttServerCreator getServerCreator() {
		return serverCreator;
	}

102 103 104 105 106 107 108 109
	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
110
	public boolean publish(String clientId, String topic, ByteBuffer payload) {
111 112 113 114 115 116 117 118 119 120 121 122
		return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param qos      MqttQoS
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
123
	public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos) {
124 125 126 127 128 129 130 131 132 133 134 135
		return publish(clientId, topic, payload, qos, false);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param retain   是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
136
	public boolean publish(String clientId, String topic, ByteBuffer payload, boolean retain) {
137 138 139 140 141 142 143 144 145 146 147 148 149
		return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE, retain);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param qos      MqttQoS
	 * @param retain   是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
150
	public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
151
		ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
152
		if (context == null || context.isClosed) {
浅梦2013's avatar
浅梦2013 已提交
153
			logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", topic, clientId);
154 155
			return false;
		}
156 157 158
		Integer subMqttQoS = sessionManager.searchSubscribe(topic, clientId);
		if (subMqttQoS == null) {
			logger.warn("Mqtt Topic:{} publish but clientId:{} not subscribed.", topic, clientId);
如梦技术's avatar
如梦技术 已提交
159 160
			return false;
		}
161 162
		MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
		publish(context, clientId, topic, payload, mqttQoS, retain);
如梦技术's avatar
如梦技术 已提交
163
		return true;
164 165 166 167 168 169 170 171 172 173 174 175
	}

	/**
	 * 发布消息
	 *
	 * @param context ChannelContext
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
176
	public boolean publish(ChannelContext context, String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
177
		boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
如梦技术's avatar
如梦技术 已提交
178
		int messageId = isHighLevelQoS ? sessionManager.getMessageId(clientId) : -1;
179
		payload.rewind();
180 181 182 183 184 185 186
		MqttPublishMessage message = MqttMessageBuilders.publish()
			.topicName(topic)
			.payload(payload)
			.qos(qos)
			.retained(retain)
			.messageId(messageId)
			.build();
浅梦2013's avatar
浅梦2013 已提交
187
		boolean result = Tio.send(context, message);
浅梦2013's avatar
浅梦2013 已提交
188
		logger.info("MQTT Topic:{} qos:{} retain:{} publish clientId:{} result:{}", topic, qos, retain, clientId, result);
189
		if (isHighLevelQoS) {
如梦技术's avatar
如梦技术 已提交
190
			MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
如梦技术's avatar
如梦技术 已提交
191
			sessionManager.addPendingPublish(clientId, messageId, pendingPublish);
如梦技术's avatar
如梦技术 已提交
192
			pendingPublish.startPublishRetransmissionTimer(executor, msg -> Tio.send(context, msg));
193 194 195 196 197 198 199 200 201 202 203
		}
		return result;
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
204
	public boolean publishAll(String topic, ByteBuffer payload) {
205 206 207 208 209 210 211 212 213 214 215
		return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE);
	}

	/**
	 * 发布消息
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
216
	public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos) {
217 218 219 220 221 222 223 224 225 226 227
		return publishAll(topic, payload, qos, false);
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
228
	public boolean publishAll(String topic, ByteBuffer payload, boolean retain) {
229 230 231 232 233 234 235 236 237 238 239 240
		return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
241
	public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
242
		// 查找订阅该 topic 的客户端
243
		List<Subscribe> subscribeList = sessionManager.searchSubscribe(topic);
244
		if (subscribeList.isEmpty()) {
浅梦2013's avatar
浅梦2013 已提交
245
			logger.warn("Mqtt Topic:{} publishAll but subscribe client list is empty.", topic);
246 247 248 249 250 251
			return false;
		}
		for (Subscribe subscribe : subscribeList) {
			String clientId = subscribe.getClientId();
			ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
			if (context == null || context.isClosed) {
浅梦2013's avatar
浅梦2013 已提交
252
				logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", topic, clientId);
253
				continue;
如梦技术's avatar
如梦技术 已提交
254
			}
255 256 257 258
			int subMqttQoS = subscribe.getMqttQoS();
			MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
			publish(context, clientId, topic, payload, mqttQoS, retain);
		}
259 260 261
		return true;
	}

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
	/**
	 * 获取 ChannelContext
	 *
	 * @param clientId clientId
	 * @return ChannelContext
	 */
	public ChannelContext getChannelContext(String clientId) {
		return Tio.getByBsId(getServerConfig(), clientId);
	}

	/**
	 * 服务端主动断开连接
	 *
	 * @param clientId clientId
	 */
	public void close(String clientId) {
		Tio.remove(getChannelContext(clientId), "Mqtt server close this connects.");
	}

	/**
	 * 启动服务
	 *
	 * @return 是否启动
	 */
浅梦2013's avatar
浅梦2013 已提交
286
	public boolean start() {
287 288 289 290 291 292 293 294 295 296 297 298 299 300
		// 1. 启动 mqtt tcp
		try {
			tioServer.start(this.serverCreator.getIp(), this.serverCreator.getPort());
		} catch (IOException e) {
			throw new IllegalStateException("Mica mqtt tcp server start fail.", e);
		}
		// 2. 启动 mqtt web
		if (webServer != null) {
			try {
				webServer.start();
			} catch (IOException e) {
				throw new IllegalStateException("Mica mqtt http/websocket server start fail.", e);
			}
		}
浅梦2013's avatar
浅梦2013 已提交
301
		return true;
浅梦2013's avatar
浅梦2013 已提交
302 303
	}

304 305 306 307 308
	/**
	 * 停止服务
	 *
	 * @return 是否停止
	 */
如梦技术's avatar
如梦技术 已提交
309 310
	public boolean stop() {
		boolean result = this.tioServer.stop();
浅梦2013's avatar
浅梦2013 已提交
311
		logger.info("Mqtt tcp server stop result:{}", result);
312 313
		if (webServer != null) {
			result &= webServer.stop();
浅梦2013's avatar
浅梦2013 已提交
314 315
			logger.info("Mqtt websocket server stop result:{}", result);
		}
316 317 318
		try {
			sessionManager.clean();
		} catch (Throwable e) {
浅梦2013's avatar
浅梦2013 已提交
319
			logger.error("MqttServer stop session clean error.", e);
320
		}
321
		this.executor.shutdown();
如梦技术's avatar
如梦技术 已提交
322 323
		return result;
	}
324 325

}