提交 ae72a4dc 编写于 作者: 浅梦2013's avatar 浅梦2013

遗嘱、保留消息内部消息转发抽象。

上级 60ca6128
......@@ -18,6 +18,7 @@ package net.dreamlu.iot.mqtt.core.server;
import net.dreamlu.iot.mqtt.codec.ByteBufferAllocator;
import net.dreamlu.iot.mqtt.codec.MqttConstant;
import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
......@@ -317,14 +318,17 @@ public class MqttServerCreator {
TioServer tioServer = new TioServer(tioConfig);
// 6. 不校验版本号,社区版设置无效
tioServer.setCheckLastVersion(false);
// 7. 启动
MqttServer mqttServer = new MqttServer(tioServer, this, executor);
// 7. 如果是默认的消息转发器,设置 mqttServer
if (this.messageDispatcher instanceof AbstractMqttMessageDispatcher) {
((AbstractMqttMessageDispatcher) this.messageDispatcher).config(mqttServer);
}
// 8. 启动
try {
tioServer.start(this.ip, this.port);
} catch (IOException e) {
throw new IllegalStateException("Mica mqtt server start fail.", e);
}
MqttServer mqttServer = new MqttServer(tioServer, this, executor);
messageDispatcher.config(mqttServer);
return mqttServer;
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.server.dispatcher;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.server.ServerTioConfig;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* 内部消息转发抽象
*
* @author L.cm
*/
public abstract class AbstractMqttMessageDispatcher implements IMqttMessageDispatcher {
protected MqttServer mqttServer;
public void config(MqttServer mqttServer) {
this.mqttServer = mqttServer;
}
/**
* 转发到所有订阅了该 topic 的设备
*
* @param message Message
* @return 是否成功
*/
abstract public boolean sendAll(Message message);
/**
* 转发消息到设备,如果 clientId 就在本服务,会自行消化
*
* @param message Message
* @return 是否成功
*/
abstract public boolean sendTo(String clientId, Message message);
@Override
public boolean send(Message message) {
Objects.requireNonNull(mqttServer, "MqttServer require not Null.");
return sendAll(message);
}
@Override
public boolean send(String clientId, Message message) {
Objects.requireNonNull(mqttServer, "MqttServer require not Null.");
// 判断如果 clientId 就在本服务
ServerTioConfig config = this.mqttServer.getServerConfig();
ChannelContext context = Tio.getByBsId(config, clientId);
if (context != null) {
ByteBuffer payload = ByteBuffer.wrap(message.getPayload());
MqttQoS qoS = MqttQoS.valueOf(message.getQos());
return this.mqttServer.publish(context, clientId, message.getTopic(), payload, qoS, false);
}
return this.sendTo(clientId, message);
}
}
......@@ -16,7 +16,6 @@
package net.dreamlu.iot.mqtt.core.server.dispatcher;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.model.Message;
/**
......@@ -26,13 +25,6 @@ import net.dreamlu.iot.mqtt.core.server.model.Message;
*/
public interface IMqttMessageDispatcher {
/**
* 配置
*
* @param mqttServer MqttServer
*/
void config(MqttServer mqttServer);
/**
* 发送消息
*
......
......@@ -17,8 +17,7 @@
package net.dreamlu.iot.mqtt.core.server.support;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import java.nio.ByteBuffer;
......@@ -28,31 +27,20 @@ import java.nio.ByteBuffer;
*
* @author L.cm
*/
public class DefaultMqttMessageDispatcher implements IMqttMessageDispatcher {
private MqttServer mqttServer;
public class DefaultMqttMessageDispatcher extends AbstractMqttMessageDispatcher {
@Override
public void config(MqttServer mqttServer) {
this.mqttServer = mqttServer;
}
@Override
public boolean send(Message message) {
if (mqttServer == null) {
return false;
}
public boolean sendAll(Message message) {
ByteBuffer payload = ByteBuffer.wrap(message.getPayload());
MqttQoS qoS = MqttQoS.valueOf(message.getQos());
return mqttServer.publishAll(message.getTopic(), payload, qoS);
}
@Override
public boolean send(String clientId, Message message) {
if (mqttServer == null) {
return false;
}
public boolean sendTo(String clientId, Message message) {
ByteBuffer payload = ByteBuffer.wrap(message.getPayload());
MqttQoS qoS = MqttQoS.valueOf(message.getQos());
return mqttServer.publish(clientId, message.getTopic(), payload, qoS);
}
}
......@@ -43,15 +43,15 @@ mqtt:
### 2.3 可实现接口(注册成 Spring Bean 即可)
| 接口 | 是否必须 | 说明 |
| --------------------------- | -------------- | ------------------ |
| IMqttServerAuthHandler | 是 | 用于客户端认证 |
| IMqttMessageListener | 是 | 消息监听 |
| IMqttConnectStatusListener | 是 | 连接状态监听 |
| IMqttSessionManager | 否 | session 管理 |
| IMqttMessageStore | 集群是,单机否 | 遗嘱和保留消息存储 |
| IMqttMessageDispatcher | 集群是,单机否 | 消息转发 |
| IpStatListener | 否 | t-io ip 状态监听 |
| 接口 | 是否必须 | 说明 |
| --------------------------- | -------------- | ------------------------- |
| IMqttServerAuthHandler | 是 | 用于客户端认证 |
| IMqttMessageListener | 是 | 消息监听 |
| IMqttConnectStatusListener | 是 | 连接状态监听 |
| IMqttSessionManager | 否 | session 管理 |
| IMqttMessageStore | 集群是,单机否 | 遗嘱和保留消息存储 |
| AbstractMqttMessageDispatcher | 集群是,单机否 | 消息转发,(遗嘱、保留消息转发) |
| IpStatListener | 否 | t-io ip 状态监听 |
### 2.4 IMqttMessageListener (用于监听客户端上传的消息) 使用示例
......@@ -92,8 +92,6 @@ public class MqttServerCustomizerConfiguration {
```java
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -104,7 +102,6 @@ import java.nio.ByteBuffer;
*/
@Service
public class ServerService {
private static final Logger logger = LoggerFactory.getLogger(ServerService.class);
@Autowired
private MqttServerTemplate server;
......
......@@ -17,6 +17,7 @@
package net.dreamlu.iot.mqtt.spring.server;
import net.dreamlu.iot.mqtt.core.server.*;
import net.dreamlu.iot.mqtt.core.server.dispatcher.AbstractMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
......@@ -149,7 +150,11 @@ public class MqttServerConfiguration {
// 6. 不校验版本号,社区版设置无效
tioServer.setCheckLastVersion(false);
MqttServer mqttServer = new MqttServer(tioServer, mqttServerCreator, executor);
mqttServerCreator.getMessageDispatcher().config(mqttServer);
IMqttMessageDispatcher messageDispatcher = mqttServerCreator.getMessageDispatcher();
// 7. 如果是默认的消息转发器,设置 mqttServer
if (messageDispatcher instanceof AbstractMqttMessageDispatcher) {
((AbstractMqttMessageDispatcher) messageDispatcher).config(mqttServer);
}
return mqttServer;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册