提交 7c5f9b5c 编写于 作者: 如梦技术's avatar 如梦技术 🐛

使用 Tio.close 进行关闭。

上级 3f2bd444
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
/**
* MqttClient 配置
*
* @author L.cm
*/
public class MqttClientConfig {
/**
* ip,可为空,为空 t-io 默认为 127.0.0.1
*/
private String ip;
/**
* 端口
*/
private int port;
/**
* 超时时间,t-io 配置,可为 null
*/
private Integer timeout;
/**
* 自动重连
*/
private boolean reconnect = true;
/**
* 重连重试时间
*/
private Long reInterval;
/**
* 客户端 id,默认:随机生成
*/
private String clientId;
/**
* mqtt 协议,默认:3_1_1
*/
private MqttVersion protocolVersion = MqttVersion.MQTT_3_1_1;
/**
* 用户名
*/
private String username = null;
/**
* 密码
*/
private String password = null;
/**
* 清除会话
*/
private boolean cleanSession = true;
/**
* 遗嘱消息
*/
private MqttWillMessage willMessage;
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.net.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import java.util.Objects;
/**
* 遗嘱消息
*
* @author L.cm
*/
public final class MqttWillMessage {
private final String topic;
private final String message;
private final boolean retain;
private final MqttQoS qos;
private MqttWillMessage(String topic, String message, boolean retain, MqttQoS qos) {
this.topic = topic;
this.message = message;
this.retain = retain;
this.qos = qos;
}
public String getTopic() {
return topic;
}
public String getMessage() {
return message;
}
public boolean isRetain() {
return retain;
}
public MqttQoS getQos() {
return qos;
}
public static MqttWillMessage.Builder builder() {
return new MqttWillMessage.Builder();
}
public static final class Builder {
private String topic;
private String message;
private boolean retain;
private MqttQoS qos;
public Builder topic(String topic) {
this.topic = Objects.requireNonNull(topic);
return this;
}
public Builder message(String message) {
this.message = Objects.requireNonNull(message);
return this;
}
public Builder retain(boolean retain) {
this.retain = retain;
return this;
}
public Builder qos(MqttQoS qos) {
this.qos = Objects.requireNonNull(qos);
return this;
}
public MqttWillMessage build() {
return new MqttWillMessage(topic, message, retain, qos);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttWillMessage that = (MqttWillMessage) o;
return retain == that.retain &&
Objects.equals(topic, that.topic) &&
Objects.equals(message, that.message) &&
qos == that.qos;
}
@Override
public int hashCode() {
return Objects.hash(topic, message, retain, qos);
}
@Override
public String toString() {
return "MqttWillMessage{" +
"topic='" + topic + '\'' +
", message='" + message + '\'' +
", retain=" + retain +
", qos=" + qos +
'}';
}
}
......@@ -148,23 +148,21 @@ public class MqttServerAioHandler implements ServerAioHandler {
private void processFailure(ChannelContext context, MqttMessage mqttMessage) {
Throwable cause = mqttMessage.decoderResult().getCause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
log.error(cause.getMessage());
// 不支持的协议版本
MqttConnAckMessage message = MqttMessageBuilders.connAck()
.returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION)
.sessionPresent(false)
.build();
Tio.send(context, message);
context.setClosed(true);
Tio.close(context, cause, "MqttUnacceptableProtocolVersion");
} else if (cause instanceof MqttIdentifierRejectedException) {
log.error(cause.getMessage());
// 不合格的 clientId
MqttConnAckMessage message = MqttMessageBuilders.connAck()
.returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)
.sessionPresent(false)
.build();
Tio.send(context, message);
context.setClosed(true);
Tio.close(context, cause, "MqttIdentifierRejected");
} else if (cause instanceof DecoderException) {
log.error(cause.getMessage(), cause);
// 消息解码异常,怎么处理?只打印异常?
......
......@@ -140,7 +140,7 @@ public class MqttBrokerProcessorImpl implements MqttServerProcessor {
public void processDisConnect(ChannelContext context) {
String clientId = context.getBsId();
log.debug("DISCONNECT - clientId: {}", clientId);
context.setClosed(true);
Tio.close(context, "MqttIdentifierRejected");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册