提交 5bb106b9 编写于 作者: 如梦技术's avatar 如梦技术 🐛

完善客户端。

上级 0cd6112a
......@@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
......@@ -32,12 +32,12 @@ import java.util.concurrent.CountDownLatch;
*/
public class DefaultMqttClientProcessor implements MqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
private final MqttClientMessageHandler messageHandler;
private final MqttClientSubscriptionManager subscriptionManager;
private final CountDownLatch connLatch;
public DefaultMqttClientProcessor(MqttClientMessageHandler messageHandler,
public DefaultMqttClientProcessor(MqttClientSubscriptionManager subscriptionManager,
CountDownLatch connLatch) {
this.messageHandler = messageHandler;
this.subscriptionManager = subscriptionManager;
this.connLatch = connLatch;
}
......@@ -61,31 +61,55 @@ public class DefaultMqttClientProcessor implements MqttClientProcessor {
case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
default:
Tio.close(context, "MqttClient connect error error ReturnCode:" + returnCode);
throw new IllegalStateException("MqttClient connect error ReturnCode:" + returnCode);
String remark = "MqttClient connect error error ReturnCode:" + returnCode;
Tio.close(context, remark);
throw new IllegalStateException(remark);
}
}
@Override
public void processSubAck(MqttSubAckMessage message) {
System.out.println(message);
int messageId = message.variableHeader().messageId();
logger.debug("MQTT SubAck messageId:{}", messageId);
MqttSubscription paddingSubscribe = subscriptionManager.getPaddingSubscribe(messageId);
if (paddingSubscribe == null) {
return;
}
subscriptionManager.addSubscription(paddingSubscribe);
}
@Override
public void processPublish(ChannelContext context, MqttPublishMessage message) {
ByteBuffer byteBuffer = message.payload();
if (byteBuffer != null) {
System.out.println(ByteBufferUtil.toString(byteBuffer));
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
String topicName = message.variableHeader().topicName();
switch (mqttFixedHeader.qosLevel()) {
case AT_MOST_ONCE:
List<MqttSubscription> subscriptionList = subscriptionManager.getMatchedSubscription(topicName);
subscriptionList.forEach(subscription -> subscription.getListener().onMessage(topicName, message.payload()));
break;
case AT_LEAST_ONCE:
break;
case EXACTLY_ONCE:
break;
case FAILURE:
default:
}
}
@Override
public void processUnSubAck(MqttUnsubAckMessage message) {
System.out.println(message);
int messageId = message.variableHeader().messageId();
String topicFilter = subscriptionManager.getPaddingUnSubscribe(messageId);
logger.debug("MQTT UnSubAck messageId:{} topicFilter:{}", messageId, topicFilter);
if (topicFilter == null) {
return;
}
subscriptionManager.removeSubscriptions(topicFilter);
}
@Override
public void processPubAck(MqttPubAckMessage message) {
int messageId = message.variableHeader().messageId();
System.out.println(message);
}
......
......@@ -18,6 +18,8 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.*;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.ClientChannelContext;
import org.tio.client.TioClient;
import org.tio.core.Tio;
......@@ -30,10 +32,11 @@ import java.nio.ByteBuffer;
* @author L.cm
*/
public final class MqttClient {
private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);
private final TioClient tioClient;
private final MqttClientCreator config;
private final ClientChannelContext context;
private final MqttClientMessageHandler messageHandler;
private final MqttClientSubscriptionManager subscriptionManager;
public static MqttClientCreator create() {
return new MqttClientCreator();
......@@ -42,11 +45,11 @@ public final class MqttClient {
MqttClient(TioClient tioClient,
MqttClientCreator config,
ClientChannelContext context,
MqttClientMessageHandler messageHandler) {
MqttClientSubscriptionManager subscriptionManager) {
this.tioClient = tioClient;
this.config = config;
this.context = context;
this.messageHandler = messageHandler;
this.subscriptionManager = subscriptionManager;
}
/**
......@@ -57,12 +60,7 @@ public final class MqttClient {
* @return MqttClient
*/
public MqttClient subQos0(String topicFilter, MqttMessageListener listener) {
// TODO L.cm 对 topicFilter 校验
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(MqttQoS.AT_MOST_ONCE, topicFilter)
.messageId(MqttClientMessageId.getId())
.build();
return subscribe(message, listener);
return subscribe(MqttQoS.AT_MOST_ONCE, topicFilter, listener);
}
/**
......@@ -73,12 +71,7 @@ public final class MqttClient {
* @return MqttClient
*/
public MqttClient subQos1(String topicFilter, MqttMessageListener listener) {
// TODO L.cm 对 topicFilter 校验
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(MqttQoS.AT_LEAST_ONCE, topicFilter)
.messageId(MqttClientMessageId.getId())
.build();
return subscribe(message, listener);
return subscribe(MqttQoS.AT_LEAST_ONCE, topicFilter, listener);
}
/**
......@@ -89,25 +82,29 @@ public final class MqttClient {
* @return MqttClient
*/
public MqttClient subQos2(String topicFilter, MqttMessageListener listener) {
// TODO L.cm 对 topicFilter 校验
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(MqttQoS.EXACTLY_ONCE, topicFilter)
.messageId(MqttClientMessageId.getId())
.build();
Tio.send(context, message);
return subscribe(message, listener);
return subscribe(MqttQoS.EXACTLY_ONCE, topicFilter, listener);
}
/**
* 订阅
*
* @param message MqttSubscribeMessage
* @param listener MqttMessageListener
* @param mqttQoS MqttQoS
* @param topicFilter topicFilter
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subscribe(MqttSubscribeMessage message, MqttMessageListener listener) {
Tio.send(context, message);
// 绑定 subManage listener
public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, MqttMessageListener listener) {
int messageId = MqttClientMessageId.getId();
MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.addSubscription(mqttQoS, topicFilter)
.messageId(messageId)
.build();
Boolean result = Tio.send(context, message);
logger.debug("MQTT subscribe topicFilter:{} mqttQoS:{} messageId:{} result:{}", topicFilter, mqttQoS, messageId, result);
// 先 tio 发送数据才添加,考虑会不会 ack 早回???
if (Boolean.TRUE.equals(result)) {
subscriptionManager.addPaddingSubscribe(new MqttSubscription(messageId, mqttQoS, topicFilter, listener));
}
return this;
}
......@@ -118,13 +115,17 @@ public final class MqttClient {
* @return MqttClient
*/
public MqttClient unSubscribe(String topicFilter) {
// TODO L.cm 对 topicFilter 校验
int messageId = MqttClientMessageId.getId();
MqttUnsubscribeMessage message = MqttMessageBuilders.unsubscribe()
.addTopicFilter(topicFilter)
.messageId(MqttClientMessageId.getId())
.build();
Tio.send(context, message);
Boolean result = Tio.send(context, message);
logger.debug("MQTT unSubscribe topicFilter:{} messageId:{} result:{}", topicFilter, messageId, result);
// 解绑 subManage listener
if (Boolean.TRUE.equals(result)) {
subscriptionManager.addPaddingUnSubscribe(messageId, topicFilter);
}
return this;
}
......@@ -180,6 +181,7 @@ public final class MqttClient {
.retained(retain)
.messageId(MqttClientMessageId.getId())
.build();
logger.debug("MQTT publish topic:{} qos:{} retain:{}", topic, qos, retain);
return Tio.send(context, message);
}
......
......@@ -253,10 +253,10 @@ public final class MqttClientCreator {
// 默认为:MICA-MQTT- 前缀和 36进制的毫秒数
this.clientId("MICA-MQTT-" + Long.toString(System.currentTimeMillis(), 36));
}
MqttClientMessageHandler messageHandler = new MqttClientMessageHandler();
MqttClientSubscriptionManager subscriptionManager = new MqttClientSubscriptionManager();
// 客户端处理器
CountDownLatch connLatch = new CountDownLatch(1);
MqttClientProcessor processor = new DefaultMqttClientProcessor(messageHandler, connLatch);
MqttClientProcessor processor = new DefaultMqttClientProcessor(subscriptionManager, connLatch);
// 2. 初始化 mqtt 处理器
ClientAioHandler clientAioHandler = new MqttClientAioHandler(this.bufferAllocator, Objects.requireNonNull(processor));
ClientAioListener clientAioListener = new MqttClientAioListener(this);
......@@ -274,7 +274,7 @@ public final class MqttClientCreator {
ClientChannelContext context = tioClient.connect(new Node(this.ip, this.port), this.timeout);
// 5. 等待连接成功之后继续
connLatch.await();
return new MqttClient(tioClient, this, context, messageHandler);
return new MqttClient(tioClient, this, context, subscriptionManager);
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.core.util.MultiValueMap;
import java.util.*;
/**
* 客户端管理处理,包括 sub 和 pub
*
* @author L.cm
*/
final class MqttClientSubscriptionManager {
/**
* 订阅的数据承载
*/
private final MultiValueMap<String, MqttSubscription> subscriptions = new MultiValueMap<>();
private final Map<Integer, MqttSubscription> pendingSubscriptions = new LinkedHashMap<>();
private final Map<Integer, String> pendingUnSubscriptions = new LinkedHashMap<>();
public void addPaddingSubscribe(MqttSubscription pendingSubscription) {
pendingSubscriptions.put(pendingSubscription.getMessageId(), pendingSubscription);
}
public MqttSubscription getPaddingSubscribe(int messageId) {
return pendingSubscriptions.remove(messageId);
}
public void addSubscription(MqttSubscription subscription) {
subscriptions.add(subscription.getTopicFilter(), subscription);
}
public List<MqttSubscription> getMatchedSubscription(String topicName) {
List<MqttSubscription> subscriptionList = new ArrayList<>();
for (List<MqttSubscription> mqttSubscriptions : subscriptions.values()) {
for (MqttSubscription subscription : mqttSubscriptions) {
if (subscription.matches(topicName)) {
subscriptionList.add(subscription);
}
}
}
return Collections.unmodifiableList(subscriptionList);
}
public void removeSubscriptions(String topicFilter) {
subscriptions.remove(topicFilter);
}
public void addPaddingUnSubscribe(int messageId, String topicFilter) {
pendingUnSubscriptions.put(messageId, topicFilter);
}
public String getPaddingUnSubscribe(int messageId) {
return pendingUnSubscriptions.remove(messageId);
}
}
......@@ -16,23 +16,62 @@
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttMessageListener;
import net.dreamlu.iot.mqtt.core.util.MultiValueMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
* 客户端管理处理,包括 sub 和 pub
* 发送订阅,未 ack 前的数据承载
*
* @author L.cm
*/
public class MqttClientMessageHandler {
/**
* 订阅的数据承载
*/
private final MultiValueMap<String, MqttMessageListener> subscribing = new MultiValueMap<>();
private final Map<Integer, String> messageIdTopics = new LinkedHashMap<>();
private final Map<String, String> subscribed = new LinkedHashMap<>();
final class MqttSubscription {
private final int messageId;
private final MqttQoS mqttQoS;
private final String topicFilter;
private final Pattern topicRegex;
private final MqttMessageListener listener;
public MqttSubscription(int messageId,
MqttQoS mqttQoS,
String topicFilter,
MqttMessageListener listener) {
this.messageId = messageId;
this.mqttQoS = mqttQoS;
this.topicFilter = topicFilter;
this.topicRegex = Pattern.compile(topicFilter.replace("+", "[^/]+").replace("#", ".+").concat("$"));
this.listener = listener;
}
public int getMessageId() {
return messageId;
}
public MqttQoS getMqttQoS() {
return mqttQoS;
}
public String getTopicFilter() {
return topicFilter;
}
public MqttMessageListener getListener() {
return listener;
}
boolean matches(String topic){
return this.topicRegex.matcher(topic).matches();
}
@Override
public String toString() {
return "MqttPendingSubscription{" +
"messageId=" + messageId +
", mqttQoS=" + mqttQoS +
", topicFilter='" + topicFilter + '\'' +
", listener=" + listener +
'}';
}
}
package net.dreamlu.iot.mqtt.client;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
......@@ -23,8 +24,8 @@ public class MqttClientTest {
.protocolVersion(MqttVersion.MQTT_5)
.connect();
client.subQos0("test", (topic, payload) -> {
System.out.println(payload);
client.subQos0("/test/#", (topic, payload) -> {
System.out.println(ByteBufferUtil.toString(payload));
});
Timer timer = new Timer();
......
......@@ -50,7 +50,7 @@ public class MqttServerTest {
channelContexts.forEach(context -> {
MqttPublishMessage message = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttPublishVariableHeader("testtopic", 0), ByteBuffer.wrap("mica最牛皮".getBytes()));
new MqttPublishVariableHeader("/test/123", 0), ByteBuffer.wrap("mica最牛皮".getBytes()));
Tio.send(context, message);
});
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册