提交 386302d7 编写于 作者: 如梦技术's avatar 如梦技术 🐛

代码微调,规范化。

上级 bb61ced7
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.broker.listener;
package net.dreamlu.iot.mqtt.broker.cluster;
import lombok.RequiredArgsConstructor;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
......@@ -26,7 +26,7 @@ import net.dreamlu.mica.redis.cache.MicaRedisCache;
* @author L.cm
*/
@RequiredArgsConstructor
public class MqttBrokerConnectListener implements IMqttConnectStatusListener {
public class RedisMqttConnectStatusListener implements IMqttConnectStatusListener {
private final MicaRedisCache redisCache;
private final String connectStatusKey;
......
......@@ -16,13 +16,13 @@
package net.dreamlu.iot.mqtt.broker.config;
import net.dreamlu.iot.mqtt.broker.cluster.RedisMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.broker.cluster.RedisMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.broker.cluster.RedisMqttMessageReceiver;
import net.dreamlu.iot.mqtt.broker.cluster.RedisMqttMessageStore;
import net.dreamlu.iot.mqtt.broker.enums.RedisKeys;
import net.dreamlu.iot.mqtt.broker.listener.MqttBrokerConnectListener;
import net.dreamlu.iot.mqtt.broker.listener.MqttBrokerMessageListener;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.broker.MqttBrokerMessageListener;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
......@@ -40,7 +40,7 @@ public class MqttBrokerConfiguration {
@Bean
public IMqttConnectStatusListener mqttBrokerConnectListener(MicaRedisCache redisCache) {
return new MqttBrokerConnectListener(redisCache, RedisKeys.CONNECT_STATUS.getKey());
return new RedisMqttConnectStatusListener(redisCache, RedisKeys.CONNECT_STATUS.getKey());
}
@Bean
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.broker.listener;
package net.dreamlu.iot.mqtt.core.server.broker;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
......@@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
import java.util.Objects;
/**
* 集群消息监听器
* broker 消息监听转发
*
* @author L.cm
*/
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.biz;
package net.dreamlu.iot.mqtt.broker;
import net.dreamlu.iot.mqtt.client.MqttClientTest;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.biz;
package net.dreamlu.iot.mqtt.broker;
import net.dreamlu.iot.mqtt.client.MqttClientTest;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.biz;
package net.dreamlu.iot.mqtt.broker;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
......
......@@ -14,12 +14,12 @@
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.biz;
package net.dreamlu.iot.mqtt.broker;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.broker.MqttBrokerMessageListener;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher;
/**
......@@ -39,14 +39,8 @@ public class Server {
public static void main(String[] args) {
// 1. 消息转发处理器,可用来实现集群
IMqttMessageDispatcher messageDispatcher = new DefaultMqttMessageDispatcher();
// 2. 收到消息,将消息转发出去
IMqttMessageListener messageListener = (clientId, topic, mqttQoS, payload) -> {
Message message = new Message();
message.setTopic(topic);
message.setQos(mqttQoS.value());
message.setPayload(payload.array());
messageDispatcher.send(message);
};
// 2. mqtt broker 消息转发处理
IMqttMessageListener messageListener = new MqttBrokerMessageListener(messageDispatcher);
// 3. 启动服务
MqttServer.create()
.ip("0.0.0.0")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册