提交 fcc835f2 编写于 作者: C Christian

Add new method for reading a single retained message.

上级 9cbd5dd7
......@@ -8,3 +8,4 @@
* [Server] Exposed server session items at server level and allow custom session items for injected application messages (#1638).
* [Server] Improved performance for retained message handling when subscribing using "DoNotSendOnSubscribe" or "SendAtSubscribeIfNewSubscriptionOnly" (#1661, thanks to @Int32Overflow).
* [Server] Added support for changing the used TLS certificate while the server is running (#1652, thanks to @YAJeff). The certificate provider will now be invoked for every new connection!
* [Server] Added a new API method which allows reading a single retained message without the need to processing the entire set of retained messages (#1659).
......@@ -22,18 +22,19 @@ namespace MQTTnet.Server
public MqttRetainedMessagesManager(MqttServerEventContainer eventContainer, IMqttNetLogger logger)
{
_eventContainer = eventContainer ?? throw new ArgumentNullException(nameof(eventContainer));
if (logger == null) throw new ArgumentNullException(nameof(logger));
if (logger == null)
throw new ArgumentNullException(nameof(logger));
_logger = logger.WithSource(nameof(MqttRetainedMessagesManager));
}
public async Task Start()
{
try
{
var eventArgs = new LoadingRetainedMessagesEventArgs();
await _eventContainer.LoadingRetainedMessagesEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
lock (_messages)
{
_messages.Clear();
......@@ -64,7 +65,7 @@ namespace MQTTnet.Server
{
List<MqttApplicationMessage> messagesForSave = null;
var saveIsRequired = false;
lock (_messages)
{
var hasPayload = applicationMessage.Payload != null && applicationMessage.Payload.Length > 0;
......@@ -83,7 +84,8 @@ namespace MQTTnet.Server
}
else
{
if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel || !existingMessage.Payload.SequenceEqual(applicationMessage.Payload ?? EmptyBuffer.Array))
if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel ||
!existingMessage.Payload.SequenceEqual(applicationMessage.Payload ?? EmptyBuffer.Array))
{
_messages[applicationMessage.Topic] = applicationMessage;
saveIsRequired = true;
......@@ -113,7 +115,7 @@ namespace MQTTnet.Server
_logger.Error(exception, "Unhandled exception while handling retained messages.");
}
}
public Task<IList<MqttApplicationMessage>> GetMessages()
{
lock (_messages)
......@@ -123,6 +125,19 @@ namespace MQTTnet.Server
}
}
public Task<MqttApplicationMessage> GetMessage(string topic)
{
lock (_messages)
{
if (_messages.TryGetValue(topic, out var message))
{
return Task.FromResult(message);
}
return null;
}
}
public async Task ClearMessages()
{
lock (_messages)
......@@ -136,4 +151,4 @@ namespace MQTTnet.Server
}
}
}
}
}
\ No newline at end of file
......@@ -204,6 +204,18 @@ namespace MQTTnet.Server
return _retainedMessagesManager.GetMessages();
}
public Task<MqttApplicationMessage> GetRetainedMessageAsync(string topic)
{
if (topic == null)
{
throw new ArgumentNullException(nameof(topic));
}
ThrowIfNotStarted();
return _retainedMessagesManager.GetMessage(topic);
}
public Task<IList<MqttSessionStatus>> GetSessionsAsync()
{
ThrowIfNotStarted();
......@@ -233,7 +245,7 @@ namespace MQTTnet.Server
}
var sessionItems = injectedApplicationMessage.CustomSessionItems ?? ServerSessionItems;
return _clientSessionsManager.DispatchApplicationMessage(
injectedApplicationMessage.SenderClientId,
sessionItems,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册