提交 9d205ba1 编写于 作者: 如梦技术's avatar 如梦技术 🐛

完善 mica-mqtt

上级 205d5465
......@@ -47,9 +47,9 @@ public final class MqttClientCreator {
*/
private String name = "Mica-Mqtt-Client";
/**
* ip,可为空,为空 t-io 默认为 127.0.0.1
* ip,可为空,默认为 127.0.0.1
*/
private String ip;
private String ip = "127.0.0.1";
/**
* 端口,默认:1883
*/
......
......@@ -28,6 +28,7 @@ import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
import org.tio.utils.lock.ReadLockHandler;
import org.tio.utils.lock.SetWithLock;
import java.nio.ByteBuffer;
......@@ -217,15 +218,16 @@ public final class MqttServer {
*/
public Boolean publishAll(String topic, ByteBuffer payload, MqttQoS qos, boolean retain) {
SetWithLock<ChannelContext> contextSet = Tio.getAll(getServerConfig());
Set<ChannelContext> channelContexts = contextSet.getObj();
if (channelContexts.isEmpty()) {
logger.warn("Mqtt publish to all ChannelContext is empty.");
return false;
}
for (ChannelContext context : channelContexts) {
String clientId = context.getBsId();
publish(clientId, topic, payload, qos, retain);
}
contextSet.handle((ReadLockHandler<Set<ChannelContext>>) channelContexts -> {
if (channelContexts.isEmpty()) {
logger.warn("Mqtt publish to all ChannelContext is empty.");
return;
}
for (ChannelContext context : channelContexts) {
String clientId = context.getBsId();
publish(clientId, topic, payload, qos, retain);
}
});
return true;
}
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.benchmark;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* mqtt 压力测试
*
* @author L.cm
*/
public class MqttBenchmark {
private static final Logger logger = LoggerFactory.getLogger(MqttBenchmark.class);
public static void main(String[] args) throws Exception {
// 1. 模拟 1w 连接,在开发机(i5-7500 4核4线程 win10 MqttServer 6G)1万连连接很轻松。
int clientCount = 1_0000;
for (int i = 0; i < clientCount; i++) {
// 2. 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.username("admin")
.password("123456")
.connect();
// 3. 订阅服务端消息
client.subQos0("/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
}
}
}
......@@ -19,6 +19,8 @@ package net.dreamlu.iot.mqtt.client;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
......@@ -31,6 +33,7 @@ import java.util.TimerTask;
* @author L.cm
*/
public class MqttClientTest {
private static final Logger logger = LoggerFactory.getLogger(MqttClientTest.class);
public static void main(String[] args) throws Exception {
// 初始化 mqtt 客户端
......@@ -42,11 +45,11 @@ public class MqttClientTest {
.connect();
client.subQos0("/test/#", (topic, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
client.subQos0("/#", (topic, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
Timer timer = new Timer();
......
......@@ -4,12 +4,12 @@ import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.IMqttPublishManager;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MqttPublishManager implements IMqttPublishManager {
private final Map<Integer, MqttPendingPublish> pendingPublishData = new LinkedHashMap<>();
private final Map<Integer, MqttPendingQos2Publish> pendingQos2PublishData = new LinkedHashMap<>();
private final Map<Integer, MqttPendingPublish> pendingPublishData = new ConcurrentHashMap<>();
private final Map<Integer, MqttPendingQos2Publish> pendingQos2PublishData = new ConcurrentHashMap<>();
@Override
public void addPendingPublish(int messageId, MqttPendingPublish pendingPublish) {
......
......@@ -22,6 +22,8 @@ import net.dreamlu.iot.mqtt.core.common.MqttSubscription;
import net.dreamlu.iot.mqtt.core.server.DefaultMqttServerSubManager;
import net.dreamlu.iot.mqtt.core.server.IMqttMessageIdGenerator;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
......@@ -34,12 +36,13 @@ import java.util.TimerTask;
* @author L.cm
*/
public class MqttServerTest {
private static final Logger logger = LoggerFactory.getLogger(MqttServerTest.class);
public static void main(String[] args) throws IOException {
DefaultMqttServerSubManager subManager = new DefaultMqttServerSubManager();
// 服务端注册订阅
subManager.subscribe(new MqttSubscription(MqttQoS.AT_MOST_ONCE, "/test/#", ((topic, payload) -> {
System.out.println(topic + '\t' + ByteBufferUtil.toString(payload));
logger.info("subscribe:\t" + topic + '\t' + ByteBufferUtil.toString(payload));
})));
IMqttMessageIdGenerator messageIdGenerator = new MqttMessageIdGenerator();
......
log4j.rootLogger = DEBUG
log4j.rootLogger = DEBUG
appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d %-5p %c{2} - %m%n
# debug
rootLogger.level = debug
rootLogger.level = INFO
rootLogger.appenderRef.stdout.ref = STDOUT
......@@ -19,7 +19,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven-flatten.version>1.2.2</maven-flatten.version>
<!-- tio version -->
<tio.version>3.7.2.v20210316-RELEASE</tio.version>
<tio.version>3.7.3.v20210706-RELEASE</tio.version>
</properties>
<modules>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册