MqttServer.java 8.5 KB
Newer Older
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.dreamlu.iot.mqtt.core.server;

19 20 21
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
如梦技术's avatar
如梦技术 已提交
22
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
23
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
如梦技术's avatar
如梦技术 已提交
24
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
25
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
如梦技术's avatar
如梦技术 已提交
26 27
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
28 29
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
如梦技术's avatar
如梦技术 已提交
30 31
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
32

33
import java.io.IOException;
34
import java.nio.ByteBuffer;
如梦技术's avatar
如梦技术 已提交
35
import java.util.List;
36
import java.util.concurrent.ScheduledThreadPoolExecutor;
如梦技术's avatar
如梦技术 已提交
37

38 39 40 41 42
/**
 * mqtt 服务端
 *
 * @author L.cm
 */
如梦技术's avatar
如梦技术 已提交
43 44 45
public final class MqttServer {
	private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
	private final TioServer tioServer;
46 47
	private final MqttWebServer webServer;
	private final MqttServerCreator serverCreator;
如梦技术's avatar
如梦技术 已提交
48
	private final IMqttSessionManager sessionManager;
49
	private final ScheduledThreadPoolExecutor executor;
如梦技术's avatar
如梦技术 已提交
50

51
	MqttServer(TioServer tioServer,
浅梦2013's avatar
浅梦2013 已提交
52 53 54
			   MqttWebServer webServer,
			   MqttServerCreator serverCreator,
			   ScheduledThreadPoolExecutor executor) {
如梦技术's avatar
如梦技术 已提交
55
		this.tioServer = tioServer;
56 57
		this.webServer = webServer;
		this.serverCreator = serverCreator;
浅梦2013's avatar
浅梦2013 已提交
58
		this.sessionManager = serverCreator.getSessionManager();
59
		this.executor = executor;
如梦技术's avatar
如梦技术 已提交
60 61 62 63 64 65
	}

	public static MqttServerCreator create() {
		return new MqttServerCreator();
	}

浅梦2013's avatar
浅梦2013 已提交
66 67 68 69 70 71 72 73 74
	/**
	 * 获取 TioServer
	 *
	 * @return TioServer
	 */
	public TioServer getTioServer() {
		return this.tioServer;
	}

如梦技术's avatar
如梦技术 已提交
75 76 77 78 79 80 81 82
	/**
	 * 获取 ServerTioConfig
	 *
	 * @return the serverTioConfig
	 */
	public ServerTioConfig getServerConfig() {
		return this.tioServer.getServerTioConfig();
	}
83

84 85 86 87 88 89 90 91
	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
92
	public boolean publish(String clientId, String topic, ByteBuffer payload) {
93 94 95 96 97 98 99 100 101 102 103 104
		return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param qos      MqttQoS
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
105
	public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos) {
106 107 108 109 110 111 112 113 114 115 116 117
		return publish(clientId, topic, payload, qos, false);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param retain   是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
118
	public boolean publish(String clientId, String topic, ByteBuffer payload, boolean retain) {
119 120 121 122 123 124 125 126 127 128 129 130 131
		return publish(clientId, topic, payload, MqttQoS.AT_MOST_ONCE, retain);
	}

	/**
	 * 发布消息
	 *
	 * @param clientId clientId
	 * @param topic    topic
	 * @param payload  消息体
	 * @param qos      MqttQoS
	 * @param retain   是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
132
	public boolean publish(String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
133
		ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
134
		if (context == null || context.isClosed) {
浅梦2013's avatar
浅梦2013 已提交
135
			logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", topic, clientId);
136 137
			return false;
		}
138
		List<Subscribe> subscribeList = sessionManager.searchSubscribe(topic, clientId);
如梦技术's avatar
如梦技术 已提交
139
		if (subscribeList.isEmpty()) {
浅梦2013's avatar
浅梦2013 已提交
140
			logger.warn("Mqtt Topic:{} publish but clientId:{} subscribeList is empty.", topic, clientId);
如梦技术's avatar
如梦技术 已提交
141 142
			return false;
		}
如梦技术's avatar
如梦技术 已提交
143
		for (Subscribe subscribe : subscribeList) {
如梦技术's avatar
如梦技术 已提交
144 145
			int subMqttQoS = subscribe.getMqttQoS();
			MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
如梦技术's avatar
如梦技术 已提交
146
			publish(context, clientId, topic, payload, mqttQoS, retain);
如梦技术's avatar
如梦技术 已提交
147 148
		}
		return true;
149 150 151 152 153 154 155 156 157 158 159 160
	}

	/**
	 * 发布消息
	 *
	 * @param context ChannelContext
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
161
	public boolean publish(ChannelContext context, String clientId, String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
162
		boolean isHighLevelQoS = MqttQoS.AT_LEAST_ONCE == qos || MqttQoS.EXACTLY_ONCE == qos;
如梦技术's avatar
如梦技术 已提交
163
		int messageId = isHighLevelQoS ? sessionManager.getMessageId(clientId) : -1;
164
		payload.rewind();
165 166 167 168 169 170 171
		MqttPublishMessage message = MqttMessageBuilders.publish()
			.topicName(topic)
			.payload(payload)
			.qos(qos)
			.retained(retain)
			.messageId(messageId)
			.build();
浅梦2013's avatar
浅梦2013 已提交
172
		boolean result = Tio.send(context, message);
浅梦2013's avatar
浅梦2013 已提交
173
		logger.info("MQTT Topic:{} qos:{} retain:{} publish clientId:{} result:{}", topic, qos, retain, clientId, result);
174
		if (isHighLevelQoS) {
如梦技术's avatar
如梦技术 已提交
175
			MqttPendingPublish pendingPublish = new MqttPendingPublish(payload, message, qos);
如梦技术's avatar
如梦技术 已提交
176
			sessionManager.addPendingPublish(clientId, messageId, pendingPublish);
如梦技术's avatar
如梦技术 已提交
177
			pendingPublish.startPublishRetransmissionTimer(executor, msg -> Tio.send(context, msg));
178 179 180 181 182 183 184 185 186 187 188
		}
		return result;
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
189
	public boolean publishAll(String topic, ByteBuffer payload) {
190 191 192 193 194 195 196 197 198 199 200
		return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE);
	}

	/**
	 * 发布消息
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
201
	public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos) {
202 203 204 205 206 207 208 209 210 211 212
		return publishAll(topic, payload, qos, false);
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
213
	public boolean publishAll(String topic, ByteBuffer payload, boolean retain) {
214 215 216 217 218 219 220 221 222 223 224 225
		return publishAll(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
	}

	/**
	 * 发布消息给所以的在线设备
	 *
	 * @param topic   topic
	 * @param payload 消息体
	 * @param qos     MqttQoS
	 * @param retain  是否在服务器上保留消息
	 * @return 是否发送成功
	 */
浅梦2013's avatar
浅梦2013 已提交
226
	public boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
227
		// 查找订阅该 topic 的客户端
228
		List<Subscribe> subscribeList = sessionManager.searchSubscribe(topic);
229
		if (subscribeList.isEmpty()) {
浅梦2013's avatar
浅梦2013 已提交
230
			logger.warn("Mqtt Topic:{} publishAll but subscribe client list is empty.", topic);
231 232 233 234 235 236
			return false;
		}
		for (Subscribe subscribe : subscribeList) {
			String clientId = subscribe.getClientId();
			ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
			if (context == null || context.isClosed) {
浅梦2013's avatar
浅梦2013 已提交
237
				logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", topic, clientId);
238
				continue;
如梦技术's avatar
如梦技术 已提交
239
			}
240 241 242 243
			int subMqttQoS = subscribe.getMqttQoS();
			MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
			publish(context, clientId, topic, payload, mqttQoS, retain);
		}
244 245 246
		return true;
	}

浅梦2013's avatar
浅梦2013 已提交
247
	public boolean start() {
248 249 250 251 252 253 254 255 256 257 258 259 260 261
		// 1. 启动 mqtt tcp
		try {
			tioServer.start(this.serverCreator.getIp(), this.serverCreator.getPort());
		} catch (IOException e) {
			throw new IllegalStateException("Mica mqtt tcp server start fail.", e);
		}
		// 2. 启动 mqtt web
		if (webServer != null) {
			try {
				webServer.start();
			} catch (IOException e) {
				throw new IllegalStateException("Mica mqtt http/websocket server start fail.", e);
			}
		}
浅梦2013's avatar
浅梦2013 已提交
262
		return true;
浅梦2013's avatar
浅梦2013 已提交
263 264
	}

如梦技术's avatar
如梦技术 已提交
265 266
	public boolean stop() {
		boolean result = this.tioServer.stop();
浅梦2013's avatar
浅梦2013 已提交
267
		logger.info("Mqtt tcp server stop result:{}", result);
268 269
		if (webServer != null) {
			result &= webServer.stop();
浅梦2013's avatar
浅梦2013 已提交
270 271
			logger.info("Mqtt websocket server stop result:{}", result);
		}
272 273 274
		try {
			sessionManager.clean();
		} catch (Throwable e) {
浅梦2013's avatar
浅梦2013 已提交
275
			logger.error("MqttServer stop session clean error.", e);
276
		}
277
		this.executor.shutdown();
如梦技术's avatar
如梦技术 已提交
278 279
		return result;
	}
280 281

}