提交 2c32f8c2 编写于 作者: 浅梦2013's avatar 浅梦2013

考虑 session 处理和 MqttProperties 添加注释。

上级 8074999f
......@@ -19,6 +19,7 @@
## 待办
- [ ] 添加 websocket 支持。
- [ ] 优化处理 mqtt session,以及支持 v5.0
## 依赖
### Spring boot 项目
......
......@@ -27,42 +27,122 @@ public final class MqttProperties {
public enum MqttPropertyType {
// single byte properties
/**
* 有效载荷标识(Payload Format Indicator),该属性只存在于 PUBLISH 报文和 CONNECT 报文的遗嘱属性中。
*/
PAYLOAD_FORMAT_INDICATOR(0x01),
/**
* 请求问题信息
*/
REQUEST_PROBLEM_INFORMATION(0x17),
/**
* 请求响应信息
*/
REQUEST_RESPONSE_INFORMATION(0x19),
/**
* 服务器支持得最高 qos 级别
*/
MAXIMUM_QOS(0x24),
/**
* 保留消息可用
*/
RETAIN_AVAILABLE(0x25),
/**
* 订阅通配符可用
*/
WILDCARD_SUBSCRIPTION_AVAILABLE(0x28),
/**
* 订阅标识符可用
*/
SUBSCRIPTION_IDENTIFIER_AVAILABLE(0x29),
/**
* $share 共享订阅可用
*/
SHARED_SUBSCRIPTION_AVAILABLE(0x2A),
// two bytes properties
/**
* 服务器 keep alive
*/
SERVER_KEEP_ALIVE(0x13),
/**
* 告知对方自己希望处理未决的最大的 Qos1 或者 Qos2 PUBLISH消息个数,如果不存在,则默认是65535。作用:流控。
*/
RECEIVE_MAXIMUM(0x21),
/**
* topic 别名最大值
*/
TOPIC_ALIAS_MAXIMUM(0x22),
/**
* topic 别名
*/
TOPIC_ALIAS(0x23),
// four bytes properties
PUBLICATION_EXPIRY_INTERVAL(0x02),
/**
* session 超时时间,连接时使用
*/
SESSION_EXPIRY_INTERVAL(0x11),
/**
* 遗嘱消息延迟时间
*/
WILL_DELAY_INTERVAL(0x18),
/**
* 最大包体大小
*/
MAXIMUM_PACKET_SIZE(0x27),
// Variable Byte Integer
/**
* 订阅标识符
*/
SUBSCRIPTION_IDENTIFIER(0x0B),
// UTF-8 Encoded String properties
/**
* 内容类型(Content Type),只存在于 PUBLISH 报文和 CONNECT 报文的遗嘱属性中。
* 例如:存放 MIME 类型,比如 text/plain 表示文本文件,audio/aac 表示音频文件。
*/
CONTENT_TYPE(0x03),
/**
* 响应的 topic
*/
RESPONSE_TOPIC(0x08),
/**
* 指定的客户标识符
*/
ASSIGNED_CLIENT_IDENTIFIER(0x12),
/**
* 身份验证方法
*/
AUTHENTICATION_METHOD(0x15),
/**
* 响应信息
*/
RESPONSE_INFORMATION(0x1A),
/**
* 服务器参考
*/
SERVER_REFERENCE(0x1C),
/**
* 所有的ACK以及DISCONNECT 都可以携带 Reason String属性告知对方一些特殊的信息,
* 一般来说是ACK失败的情况下会使用该属性告知对端为什么失败,可用来弥补Reason Code信息不够。
*/
REASON_STRING(0x1F),
/**
* 用户属性
*/
USER_PROPERTY(0x26),
// Binary Data
/**
* 相关数据
*/
CORRELATION_DATA(0x09),
/**
* 认证数据
*/
AUTHENTICATION_DATA(0x16);
private static final MqttPropertyType[] VALUES;
......@@ -371,7 +451,7 @@ public final class MqttProperties {
* @return a property if it is set, null otherwise
*/
public MqttProperty getProperty(int propertyId) {
if (propertyId == MqttPropertyType.USER_PROPERTY.value) {
if (MqttPropertyType.USER_PROPERTY.value == propertyId) {
//special handling to keep compatibility with earlier versions
List<UserProperty> userProperties = this.userProperties;
if (userProperties == null) {
......@@ -379,7 +459,7 @@ public final class MqttProperties {
}
return UserProperties.fromUserPropertyCollection(userProperties);
}
if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
if (MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value == propertyId) {
List<IntegerProperty> subscriptionIds = this.subscriptionIds;
if (subscriptionIds == null || subscriptionIds.isEmpty()) {
return null;
......@@ -390,6 +470,32 @@ public final class MqttProperties {
return props == null ? null : props.get(propertyId);
}
/**
* Get property by ID. If there are multiple properties of this type (can be with Subscription ID)
* then return the first one.
*
* @param mqttPropertyType Type of the property
* @return a property if it is set, null otherwise
*/
public MqttProperty getProperty(MqttPropertyType mqttPropertyType) {
return getProperty(mqttPropertyType.value);
}
/**
* Get property by ID. If there are multiple properties of this type (can be with Subscription ID)
* then return the first one.
*
* @param mqttPropertyType Type of the property
* @return a property if it is set, null otherwise
*/
public <T> T getPropertyValue(MqttPropertyType mqttPropertyType) {
MqttProperty property = getProperty(mqttPropertyType.value);
if (property == null) {
return null;
}
return (T) property.value();
}
/**
* Get properties by ID.
* Some properties (Subscription ID and User Properties) may occur multiple times,
......
......@@ -79,6 +79,14 @@ public interface IMqttSessionManager {
*/
int getMessageId(String clientId);
/**
* 判断是否存在 session
*
* @param clientId clientId
* @return 是否存在 session
*/
boolean hasSession(String clientId);
/**
* 清除 session
*
......
......@@ -96,6 +96,11 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
return value.getAndIncrement();
}
@Override
public boolean hasSession(String clientId) {
return false;
}
@Override
public void remove(String clientId) {
pendingPublishStore.remove(clientId);
......
......@@ -89,9 +89,16 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
if (keepAliveSeconds > 0) {
context.setHeartbeatTimeout(TimeUnit.SECONDS.toMillis(keepAliveSeconds));
}
// TODO session 处理,先默认全部连接关闭时清除
// 5. session 处理,先默认全部连接关闭时清除
// boolean cleanSession = variableHeader.isCleanSession();
// 5. 存储遗嘱消息
// if (cleanSession) {
// // TODO L.cm 考虑 session 处理 可参数: https://www.emqx.com/zh/blog/mqtt-session
// // mqtt v5.0 会话超时时间
// MqttProperties properties = variableHeader.properties();
// Integer sessionExpiryInterval = properties.getPropertyValue(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL);
// System.out.println(sessionExpiryInterval);
// }
// 6. 存储遗嘱消息
boolean willFlag = variableHeader.isWillFlag();
if (willFlag) {
Message willMessage = new Message();
......@@ -101,9 +108,9 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
willMessage.setRetain(variableHeader.isWillRetain());
messageStore.addWillMessage(clientId, willMessage);
}
// 6. 返回 ack
// 7. 返回 ack
connAckByReturnCode(clientId, context, MqttConnectReturnCode.CONNECTION_ACCEPTED);
// 7. 在线状态
// 8. 在线状态
connectStatusListener.online(clientId);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册