DefaultMqttServerProcessor.java 15.0 KB
Newer Older
1
/*
2
 * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

如梦技术's avatar
如梦技术 已提交
17
package net.dreamlu.iot.mqtt.core.server.support;
18 19

import net.dreamlu.iot.mqtt.codec.*;
如梦技术's avatar
如梦技术 已提交
20 21
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
22 23 24 25
import net.dreamlu.iot.mqtt.core.server.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.MqttConst;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
26 27 28 29
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
如梦技术's avatar
如梦技术 已提交
30
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
31
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
32 33 34 35 36 37
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;

如梦技术's avatar
如梦技术 已提交
38
import java.nio.ByteBuffer;
如梦技术's avatar
如梦技术 已提交
39
import java.util.ArrayList;
40 41 42 43 44 45 46 47
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;

/**
 * mqtt broker 处理器
 *
 * @author L.cm
 */
如梦技术's avatar
如梦技术 已提交
48 49
public class DefaultMqttServerProcessor implements MqttServerProcessor {
	private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
50 51 52 53 54
	/**
	 * 2 倍客户端 keepAlive 时间
	 */
	private static final long KEEP_ALIVE_UNIT = 2000L;
	private final long heartbeatTimeout;
55
	private final IMqttMessageStore messageStore;
如梦技术's avatar
如梦技术 已提交
56 57
	private final IMqttSessionManager sessionManager;
	private final IMqttServerAuthHandler authHandler;
58
	private final IMqttMessageDispatcher messageDispatcher;
浅梦2013's avatar
浅梦2013 已提交
59
	private final IMqttConnectStatusListener connectStatusListener;
60
	private final IMqttMessageListener messageListener;
61 62
	private final ScheduledThreadPoolExecutor executor;

浅梦2013's avatar
浅梦2013 已提交
63
	public DefaultMqttServerProcessor(MqttServerCreator serverCreator, ScheduledThreadPoolExecutor executor) {
64
		this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? 120_000L : serverCreator.getHeartbeatTimeout();
浅梦2013's avatar
浅梦2013 已提交
65 66 67 68 69 70
		this.messageStore = serverCreator.getMessageStore();
		this.sessionManager = serverCreator.getSessionManager();
		this.authHandler = serverCreator.getAuthHandler();
		this.messageDispatcher = serverCreator.getMessageDispatcher();
		this.connectStatusListener = serverCreator.getConnectStatusListener();
		this.messageListener = serverCreator.getMessageListener();
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
		this.executor = executor;
	}

	@Override
	public void processConnect(ChannelContext context, MqttConnectMessage mqttMessage) {
		MqttConnectPayload payload = mqttMessage.payload();
		String clientId = payload.clientIdentifier();
		// 1. 客户端必须提供 clientId, 不管 cleanSession 是否为1, 此处没有参考标准协议实现
		if (StrUtil.isBlank(clientId)) {
			connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
			return;
		}
		// 2. 认证
		String userName = payload.userName();
		String password = payload.password();
		if (!authHandler.authenticate(clientId, userName, password)) {
			connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
			return;
		}
浅梦2013's avatar
浅梦2013 已提交
90 91 92 93
		// 3. 判断 clientId 是否在多个地方使用,如果在其他地方有使用,先解绑
		ChannelContext otherContext = Tio.getByBsId(context.getTioConfig(), clientId);
		if (otherContext != null) {
			Tio.unbindBsId(otherContext);
94
			Tio.remove(otherContext, "clientId:" + clientId + " now bind on new context id:" + context.getId());
浅梦2013's avatar
浅梦2013 已提交
95 96
		}
		// 4. 绑定 clientId
97
		Tio.bindBsId(context, clientId);
98
		MqttConnectVariableHeader variableHeader = mqttMessage.variableHeader();
浅梦2013's avatar
浅梦2013 已提交
99
		// 5. 心跳超时时间,当然这个值如果小于全局配置(默认:120s),定时检查的时间间隔还是以全局为准,只是在判断时用此值
100
		int keepAliveSeconds = variableHeader.keepAliveTimeSeconds();
101 102 103
		// 2倍客户端 keepAlive 时间作为服务端心跳超时时间,如果配置同全局默认不设置,节约内存
		if (keepAliveSeconds > 0 && heartbeatTimeout != keepAliveSeconds * KEEP_ALIVE_UNIT) {
			context.setHeartbeatTimeout(keepAliveSeconds * KEEP_ALIVE_UNIT);
104
		}
浅梦2013's avatar
浅梦2013 已提交
105
		// 6. session 处理,先默认全部连接关闭时清除
106
//		boolean cleanSession = variableHeader.isCleanSession();
107 108 109 110 111 112 113
//		if (cleanSession) {
//			// TODO L.cm 考虑 session 处理 可参数: https://www.emqx.com/zh/blog/mqtt-session
//			// mqtt v5.0 会话超时时间
//			MqttProperties properties = variableHeader.properties();
//			Integer sessionExpiryInterval = properties.getPropertyValue(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL);
//			System.out.println(sessionExpiryInterval);
//		}
浅梦2013's avatar
浅梦2013 已提交
114
		// 7. 存储遗嘱消息
115 116 117 118 119 120 121 122 123
		boolean willFlag = variableHeader.isWillFlag();
		if (willFlag) {
			Message willMessage = new Message();
			willMessage.setTopic(payload.willTopic());
			willMessage.setPayload(payload.willMessageInBytes());
			willMessage.setQos(variableHeader.willQos());
			willMessage.setRetain(variableHeader.isWillRetain());
			messageStore.addWillMessage(clientId, willMessage);
		}
浅梦2013's avatar
浅梦2013 已提交
124
		// 8. 返回 ack
125
		connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
浅梦2013's avatar
浅梦2013 已提交
126
		// 9. 在线状态
127
		connectStatusListener.online(clientId);
128 129 130 131 132 133 134 135
	}

	private void connAckByReturnCode(String clientId, ChannelContext context, MqttConnectReturnCode returnCode) {
		MqttConnAckMessage message = MqttMessageBuilders.connAck()
			.returnCode(returnCode)
			.sessionPresent(false)
			.build();
		Tio.send(context, message);
浅梦2013's avatar
浅梦2013 已提交
136
		logger.info("Connect ack send - clientId: {} returnCode:{}", clientId, returnCode);
137 138 139 140 141 142 143 144 145 146
	}

	@Override
	public void processPublish(ChannelContext context, MqttPublishMessage message) {
		String clientId = context.getBsId();
		MqttFixedHeader fixedHeader = message.fixedHeader();
		MqttQoS mqttQoS = fixedHeader.qosLevel();
		MqttPublishVariableHeader variableHeader = message.variableHeader();
		String topicName = variableHeader.topicName();
		int packetId = variableHeader.packetId();
如梦技术's avatar
如梦技术 已提交
147
		logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topicName, mqttQoS, packetId);
148 149
		switch (mqttQoS) {
			case AT_MOST_ONCE:
150
				invokeListenerForPublish(clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
151 152
				break;
			case AT_LEAST_ONCE:
153
				invokeListenerForPublish(clientId, mqttQoS, topicName, message, fixedHeader.isRetain());
154 155 156 157
				if (packetId != -1) {
					MqttMessage messageAck = MqttMessageBuilders.pubAck()
						.packetId(packetId)
						.build();
如梦技术's avatar
如梦技术 已提交
158 159
					Boolean resultPubAck = Tio.send(context, messageAck);
					logger.debug("Publish - PubAck send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", clientId, topicName, mqttQoS, packetId, resultPubAck);
160 161 162 163 164 165 166
				}
				break;
			case EXACTLY_ONCE:
				if (packetId != -1) {
					MqttFixedHeader pubRecFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
					MqttMessage pubRecMessage = new MqttMessage(pubRecFixedHeader, MqttMessageIdVariableHeader.from(packetId));
					MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
如梦技术's avatar
如梦技术 已提交
167 168
					Boolean resultPubRec = Tio.send(context, pubRecMessage);
					logger.debug("Publish - PubRec send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", clientId, topicName, mqttQoS, packetId, resultPubRec);
如梦技术's avatar
如梦技术 已提交
169
					sessionManager.addPendingQos2Publish(clientId, packetId, pendingQos2Publish);
170 171 172 173 174
					pendingQos2Publish.startPubRecRetransmitTimer(executor, msg -> Tio.send(context, msg));
				}
				break;
			case FAILURE:
			default:
如梦技术's avatar
如梦技术 已提交
175
				break;
176 177 178 179 180 181 182
		}
	}

	@Override
	public void processPubAck(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
		int messageId = variableHeader.messageId();
		String clientId = context.getBsId();
浅梦2013's avatar
浅梦2013 已提交
183
		logger.debug("PubAck - clientId:{}, messageId:{}", clientId, messageId);
如梦技术's avatar
如梦技术 已提交
184
		MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
185 186 187 188
		if (pendingPublish == null) {
			return;
		}
		pendingPublish.onPubAckReceived();
如梦技术's avatar
如梦技术 已提交
189
		sessionManager.removePendingPublish(clientId, messageId);
190
		pendingPublish.getPayload().clear();
191 192 193 194 195 196
	}

	@Override
	public void processPubRec(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
		String clientId = context.getBsId();
		int messageId = variableHeader.messageId();
浅梦2013's avatar
浅梦2013 已提交
197
		logger.debug("PubRec - clientId:{}, messageId:{}", clientId, messageId);
如梦技术's avatar
如梦技术 已提交
198
		MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
199 200 201
		if (pendingPublish == null) {
			return;
		}
如梦技术's avatar
如梦技术 已提交
202 203 204 205 206 207 208 209
		pendingPublish.onPubAckReceived();

		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
		MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader);
		Tio.send(context, pubRelMessage);

		pendingPublish.setPubRelMessage(pubRelMessage);
		pendingPublish.startPubRelRetransmissionTimer(executor, msg -> Tio.send(context, msg));
210 211 212 213 214
	}

	@Override
	public void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
		String clientId = context.getBsId();
如梦技术's avatar
如梦技术 已提交
215
		int messageId = variableHeader.messageId();
浅梦2013's avatar
浅梦2013 已提交
216
		logger.debug("PubRel - clientId:{}, messageId:{}", clientId, messageId);
如梦技术's avatar
如梦技术 已提交
217
		MqttPendingQos2Publish pendingQos2Publish = sessionManager.getPendingQos2Publish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
218 219 220
		if (pendingQos2Publish != null) {
			MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
			String topicName = incomingPublish.variableHeader().topicName();
221 222 223
			MqttFixedHeader incomingFixedHeader = incomingPublish.fixedHeader();
			MqttQoS mqttQoS = incomingFixedHeader.qosLevel();
			boolean retain = incomingFixedHeader.isRetain();
224
			invokeListenerForPublish(clientId, mqttQoS, topicName, incomingPublish, retain);
如梦技术's avatar
如梦技术 已提交
225
			pendingQos2Publish.onPubRelReceived();
如梦技术's avatar
如梦技术 已提交
226
			sessionManager.removePendingQos2Publish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
227
		}
228 229
		MqttMessage message = MqttMessageFactory.newMessage(
			new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
如梦技术's avatar
如梦技术 已提交
230
			MqttMessageIdVariableHeader.from(messageId), null);
231 232 233 234 235 236 237
		Tio.send(context, message);
	}

	@Override
	public void processPubComp(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
		int messageId = variableHeader.messageId();
		String clientId = context.getBsId();
浅梦2013's avatar
浅梦2013 已提交
238
		logger.debug("PubComp - clientId:{}, messageId:{}", clientId, messageId);
如梦技术's avatar
如梦技术 已提交
239
		MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
240
		if (pendingPublish != null) {
241
			pendingPublish.getPayload().clear();
如梦技术's avatar
如梦技术 已提交
242
			pendingPublish.onPubCompReceived();
如梦技术's avatar
如梦技术 已提交
243
			sessionManager.removePendingPublish(clientId, messageId);
如梦技术's avatar
如梦技术 已提交
244
		}
245 246 247 248 249 250
	}

	@Override
	public void processSubscribe(ChannelContext context, MqttSubscribeMessage message) {
		String clientId = context.getBsId();
		int messageId = message.variableHeader().messageId();
251
		// 1. 校验订阅的 topicFilter
252
		List<MqttTopicSubscription> topicSubscriptions = message.payload().topicSubscriptions();
253
		if (!authHandler.isValidSubscribe(topicSubscriptions)) {
254
			logger.error("Subscribe - clientId:{} topicFilters:{} verification failed messageId:{}", clientId, topicSubscriptions, messageId);
255 256 257 258 259 260
			// 3. 返回 ack
			MqttMessage subAckMessage = MqttMessageBuilders.subAck()
				.addGrantedQos(MqttQoS.FAILURE)
				.packetId(messageId)
				.build();
			Tio.send(context, subAckMessage);
261 262
			return;
		}
263
		// 2. 存储 clientId 订阅的 topic
如梦技术's avatar
如梦技术 已提交
264
		List<MqttQoS> mqttQosList = new ArrayList<>();
265
		List<String> topicList = new ArrayList<>();
如梦技术's avatar
如梦技术 已提交
266 267 268 269
		for (MqttTopicSubscription subscription : topicSubscriptions) {
			String topicName = subscription.topicName();
			MqttQoS mqttQoS = subscription.qualityOfService();
			mqttQosList.add(mqttQoS);
270
			topicList.add(topicName);
271
			sessionManager.addSubscribe(topicName, clientId, mqttQoS.value());
如梦技术's avatar
如梦技术 已提交
272
		}
273
		logger.info("Subscribe - clientId:{} TopicFilters:{} mqttQoS:{} messageId:{}", clientId, topicList, mqttQosList, messageId);
274 275
		// 3. 返回 ack
		MqttMessage subAckMessage = MqttMessageBuilders.subAck()
如梦技术's avatar
如梦技术 已提交
276
			.addGrantedQosList(mqttQosList)
277 278 279
			.packetId(messageId)
			.build();
		Tio.send(context, subAckMessage);
280 281 282 283 284 285 286
		// 4. 发送保留消息
		for (String topic : topicList) {
			Message retainMessage = messageStore.getRetainMessage(topic);
			if (retainMessage != null) {
				messageDispatcher.send(clientId, retainMessage);
			}
		}
287 288 289 290 291 292
	}

	@Override
	public void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage message) {
		String clientId = context.getBsId();
		int messageId = message.variableHeader().messageId();
如梦技术's avatar
如梦技术 已提交
293 294
		List<String> topicFilterList = message.payload().topics();
		for (String topicFilter : topicFilterList) {
295
			sessionManager.removeSubscribe(topicFilter, clientId);
如梦技术's avatar
如梦技术 已提交
296
		}
浅梦2013's avatar
浅梦2013 已提交
297
		logger.info("UnSubscribe - clientId:{} Topic:{} messageId:{}", clientId, topicFilterList, messageId);
298 299 300 301 302 303 304 305 306
		MqttMessage unSubMessage = MqttMessageBuilders.unsubAck()
			.packetId(messageId)
			.build();
		Tio.send(context, unSubMessage);
	}

	@Override
	public void processPingReq(ChannelContext context) {
		String clientId = context.getBsId();
如梦技术's avatar
如梦技术 已提交
307
		logger.debug("PingReq - clientId:{}", clientId);
308 309 310 311 312 313
		Tio.send(context, MqttMessage.PINGRESP);
	}

	@Override
	public void processDisConnect(ChannelContext context) {
		String clientId = context.getBsId();
浅梦2013's avatar
浅梦2013 已提交
314
		logger.info("DisConnect - clientId:{} contextId:{}", clientId, context.getId());
315 316
		// 设置正常断开的标识
		context.set(MqttConst.DIS_CONNECTED, (byte) 1);
317
		Tio.remove(context, "Mqtt DisConnect");
318 319 320 321 322
	}

	/**
	 * 处理订阅的消息
	 *
如梦技术's avatar
如梦技术 已提交
323
	 * @param clientId  clientId
324 325 326
	 * @param topicName topicName
	 * @param message   MqttPublishMessage
	 */
327 328
	private void invokeListenerForPublish(String clientId, MqttQoS mqttQoS, String topicName,
										  MqttPublishMessage message, boolean isRetain) {
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
		ByteBuffer payload = message.payload();
		// 1. retain 消息逻辑
		if (isRetain) {
			// qos == 0 or payload is none,then clear previous retain message
			if (MqttQoS.AT_MOST_ONCE == mqttQoS || payload == null || payload.array().length == 0) {
				this.messageStore.clearRetainMessage(topicName);
			} else {
				Message retainMessage = new Message();
				retainMessage.setTopic(topicName);
				retainMessage.setQos(mqttQoS.value());
				retainMessage.setPayload(payload.array());
				retainMessage.setClientId(clientId);
				retainMessage.setMessageType(MqttMessageType.PUBLISH.value());
				this.messageStore.addRetainMessage(topicName, retainMessage);
			}
		}
		// 2. 消息发布
346 347 348 349 350
		try {
			messageListener.onMessage(clientId, topicName, mqttQoS, payload);
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}
351 352 353
	}

}