提交 b97fb6a6 编写于 作者: 麦壳饼's avatar 麦壳饼

add retained message table

上级 5d1c52da
......@@ -40,5 +40,6 @@ namespace IoTSharp
public bool EnableTls { get; set; } = false;
public string Certificate { get; set; }
public SslProtocols SslProtocol { get; set; } = SslProtocols.None;
public bool PersistRetainedMessages { get; internal set; }
}
}
......@@ -140,5 +140,7 @@ namespace IoTSharp.Data
public DbSet<TelemetryLatest> TelemetryLatest { get; set; }
public DbSet<DeviceIdentity> DeviceIdentities { get; set; }
public DbSet<AuditLog> AuditLog { get; set; }
public DbSet<RetainedMessage> RetainedMessage { get; set; }
}
}
\ No newline at end of file
using MQTTnet;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq;
using System.Security.Cryptography;
using System.Threading.Tasks;
namespace IoTSharp.Data
{
[Serializable]
public class RetainedMessage
{
// user-defined conversion from Fraction to double
public static implicit operator MqttApplicationMessage(RetainedMessage f)
{
return new MqttApplicationMessage() { Payload = f.Payload, Retain = true, Topic = f.Topic, QualityOfServiceLevel = f.QualityOfServiceLevel };
}
public static implicit operator RetainedMessage(MqttApplicationMessage f)
{
return new RetainedMessage(f);
}
public byte[] Payload { get; set; }
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }
public string Topic { get; set; }
public bool Retain { get; set; }
MD5 MD5 = MD5.Create();
public RetainedMessage()
{
}
public RetainedMessage(MqttApplicationMessage retained)
{
Topic = retained.Topic;
QualityOfServiceLevel = retained.QualityOfServiceLevel;
Payload = retained.Payload;
Retain = retained.Retain;
List<byte> lst = new List<byte>(Payload);
lst.AddRange(System.Text.Encoding.UTF8.GetBytes(Topic));
Id = BitConverter.ToString(MD5.ComputeHash(lst.ToArray())).Replace("-", "");
}
[Key]
public string Id { get; set; }
}
}
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace IoTSharp.Data
{
public class RetainedMessageHandler : IMqttServerStorage
{
private ApplicationDbContext _context;
private ILogger _logger;
public RetainedMessageHandler(ILogger<RetainedMessageHandler> logger, ApplicationDbContext context)
{
_context = context;
_logger = logger;
}
public async Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
{
await Task.CompletedTask;
try
{
var lst = from m in _context.RetainedMessage select (MqttApplicationMessage)m;
return await lst.ToListAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, $"load RetainedMessage error {ex.Message} ");
return new List<MqttApplicationMessage>();
}
}
public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
{
Task.Factory.StartNew(() =>
{
_context.Database.BeginTransaction();
try
{
DateTime dateTime = DateTime.Now;
var needsave = from mam in messages select new RetainedMessage(mam);
var ids = needsave.Select(x => x.Id).ToList();
var dbids = _context.RetainedMessage.Select(x => x.Id).ToArray();
var needdelete = dbids.Except(ids);//.Except(dbids);
var del = from f in _context.RetainedMessage where needdelete.Contains(f.Id) select f;
var needadd = ids.Except(dbids);
var add = from f in needsave where needadd.Contains(f.Id) select f;
if (del.Any()) _context.RetainedMessage.RemoveRange(del);
if (add.Any()) _context.RetainedMessage.AddRange(add);
int ret = _context.SaveChanges();
_context.Database.CommitTransaction();
_logger.LogInformation($"{ret} pieces of data were saved and took {DateTime.Now.Subtract(dateTime).TotalSeconds} seconds.");
}
catch (Exception ex)
{
_context.Database.RollbackTransaction();
_logger.LogError(ex, $" An exception was encountered,{ex.Message}");
}
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
return Task.CompletedTask;
}
}
}
\ No newline at end of file
......@@ -14,6 +14,7 @@ using IoTSharp.Services;
using MQTTnet.Server;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Options;
using IoTSharp.MQTT;
namespace IoTSharp
{
......@@ -22,7 +23,6 @@ namespace IoTSharp
//static private IMqttServer _mqttServer;
public static void AddIoTSharpMqttServer(this IServiceCollection services, MqttBrokerSetting setting)
{
services.AddMqttTcpServerAdapter();
services.AddHostedMqttServerEx(options =>
{
......@@ -42,6 +42,7 @@ namespace IoTSharp
{
options.WithoutEncryptedEndpoint();
}
options.Build();
});
services.AddMqttConnectionHandler();
......@@ -52,6 +53,7 @@ namespace IoTSharp
{
app.UseMqttEndpoint();
var mqttEvents = app.ApplicationServices.CreateScope().ServiceProvider.GetService<MqttEventsHandler>();
IMqttServerStorage storage = app.ApplicationServices.CreateScope().ServiceProvider.GetService<IMqttServerStorage>();
app.UseMqttServerEx(server =>
{
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(args => mqttEvents.Server_ClientConnected(server, args));
......@@ -62,7 +64,6 @@ namespace IoTSharp
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(args => mqttEvents.Server_ClientUnsubscribedTopic(server, args));
server.ClientConnectionValidatorHandler = new MqttServerClientConnectionValidatorHandlerDelegate(args => mqttEvents.Server_ClientConnectionValidator(server, args));
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(args => mqttEvents.Server_ClientDisconnected(server, args));
});
var mqttNetLogger = app.ApplicationServices.GetService<IMqttNetLogger>();
......
using IoTSharp.Data;
using IoTSharp.Diagnostics;
using IoTSharp.Extensions;
using IoTSharp.MQTT;
using IoTSharp.Storage;
using IoTSharp.Sys;
using IoTSharp.X509Extensions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
......@@ -7,6 +11,7 @@ using MQTTnet;
using MQTTnet.AspNetCoreEx;
using MQTTnet.Server;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
......@@ -21,11 +26,35 @@ namespace IoTSharp.Handlers
readonly ILogger<MqttEventsHandler> _logger;
readonly ApplicationDbContext _dbContext;
readonly IMqttServerEx _serverEx;
public MqttEventsHandler(ILogger<MqttEventsHandler> logger, ApplicationDbContext dbContext, IMqttServerEx serverEx)
private readonly BlockingCollection<MqttApplicationMessageReceivedEventArgs> _incomingMessages = new BlockingCollection<MqttApplicationMessageReceivedEventArgs>();
private readonly Dictionary<string, MqttTopicImporter> _importers = new Dictionary<string, MqttTopicImporter>();
private readonly Dictionary<string, MqttSubscriber> _subscribers = new Dictionary<string, MqttSubscriber>();
private readonly OperationsPerSecondCounter _inboundCounter;
private readonly OperationsPerSecondCounter _outboundCounter;
private bool _enableMqttLogging;
private readonly StorageService _storageService;
private readonly SystemCancellationToken _systemCancellationToken;
public MqttEventsHandler(ILogger<MqttEventsHandler> logger, ApplicationDbContext dbContext, IMqttServerEx serverEx, DiagnosticsService diagnosticsService,
StorageService storageService,
SystemStatusService systemStatusService
)
{
_logger = logger;
_dbContext = dbContext;
_serverEx = serverEx;
_inboundCounter = diagnosticsService.CreateOperationsPerSecondCounter("mqtt.inbound_rate");
_outboundCounter = diagnosticsService.CreateOperationsPerSecondCounter("mqtt.outbound_rate");
systemStatusService.Set("mqtt.subscribers_count", () => _subscribers.Count);
systemStatusService.Set("mqtt.incoming_messages_count", () => _incomingMessages.Count);
systemStatusService.Set("mqtt.inbound_rate", () => _inboundCounter.Count);
systemStatusService.Set("mqtt.outbound_rate", () => _outboundCounter.Count);
systemStatusService.Set("mqtt.connected_clients_count", () => serverEx.GetClientStatusAsync().GetAwaiter().GetResult().Count);
}
static long clients = 0;
......
......@@ -56,6 +56,7 @@
<ItemGroup>
<PackageReference Include="CoAP.NET.Core" Version="1.1.0" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.WindowsServices" Version="2.2.0" />
<PackageReference Include="MQTTnet.AspNetCoreEx" Version="3.0.2.1" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="3.0.2" />
<PackageReference Include="Packaging.Targets" Version="0.1.78" />
......@@ -89,7 +90,6 @@
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="2.2.3" />
<PackageReference Include="MQTTnet" Version="3.0.2" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.0.2" />
<PackageReference Include="MQTTnet.AspNetCoreEx" Version="3.0.2" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="3.0.2" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.2.4" />
<PackageReference Include="NSwag.AspNetCore" Version="12.3.1" />
......
using IoTSharp.Data;
using IoTSharp.Diagnostics;
using IoTSharp.Extensions;
using IoTSharp.MQTT;
using IoTSharp.Services;
using IoTSharp.Storage;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Builder;
......@@ -62,7 +64,7 @@ namespace IoTSharp
services.AddLogging(loggingBuilder => loggingBuilder.AddConsole());
//services.AddIoTSharpHub(Configuration);
services.AddIoTSharpHub(Configuration);
// Enable the Gzip compression especially for Kestrel
services.Configure<GzipCompressionProviderOptions>(options => options.Level = System.IO.Compression.CompressionLevel.Optimal);
services.AddResponseCompression(options =>
......@@ -90,15 +92,12 @@ namespace IoTSharp
configure.Description = description?.Description;
});
services.AddTransient<ApplicationDBInitializer>();
services.AddIoTSharpMqttServer(AppSettings.MqttBroker);
services.AddMqttClient(AppSettings.MqttClient);
services.AddHostedService<CoAPService>();
//services.AddTransient<IoTSharp.Sys.SystemCancellationToken>();
//foreach (var singletonService in Reflection.GetClassesImplementingInterface<IService>())
//{
// services.AddSingleton(singletonService);
//}
services.AddSingleton<DiagnosticsService>();
services.AddSingleton<RetainedMessageHandler>();
services.AddSingleton<SystemStatusService>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
......@@ -119,7 +118,7 @@ namespace IoTSharp
app.UseSwagger();
app.UseHttpsRedirection();
//app.UseIotSharpMqttServer();
app.UseIotSharpMqttServer();
// serviceProvider.GetRequiredService<MqttService>().Start();
app.UseForwardedHeaders(new ForwardedHeadersOptions
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册