diff --git a/IoTSharp/AppSettings.cs b/IoTSharp/AppSettings.cs index daa8990e035422e8ac10be64ea109a12337db9e5..e117c6364e9e56952a7222625aa6434f74e66531 100644 --- a/IoTSharp/AppSettings.cs +++ b/IoTSharp/AppSettings.cs @@ -40,5 +40,6 @@ namespace IoTSharp public bool EnableTls { get; set; } = false; public string Certificate { get; set; } public SslProtocols SslProtocol { get; set; } = SslProtocols.None; + public bool PersistRetainedMessages { get; internal set; } } } diff --git a/IoTSharp/Data/ApplicationDbContext.cs b/IoTSharp/Data/ApplicationDbContext.cs index bbd73dd86135839ea6381e693885fb5ca2af3f96..6d06f72146fc60af7d924cf042d08a7d59de151d 100644 --- a/IoTSharp/Data/ApplicationDbContext.cs +++ b/IoTSharp/Data/ApplicationDbContext.cs @@ -140,5 +140,7 @@ namespace IoTSharp.Data public DbSet TelemetryLatest { get; set; } public DbSet DeviceIdentities { get; set; } public DbSet AuditLog { get; set; } + + public DbSet RetainedMessage { get; set; } } } \ No newline at end of file diff --git a/IoTSharp/Data/RetainedMessage.cs b/IoTSharp/Data/RetainedMessage.cs new file mode 100644 index 0000000000000000000000000000000000000000..a924b6287e8b13c2695bca789bd44149233b9d7d --- /dev/null +++ b/IoTSharp/Data/RetainedMessage.cs @@ -0,0 +1,48 @@ +using MQTTnet; +using MQTTnet.Protocol; +using System; +using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; +using System.Linq; +using System.Security.Cryptography; +using System.Threading.Tasks; + +namespace IoTSharp.Data +{ + [Serializable] + public class RetainedMessage + { + + + // user-defined conversion from Fraction to double + public static implicit operator MqttApplicationMessage(RetainedMessage f) + { + return new MqttApplicationMessage() { Payload = f.Payload, Retain = true, Topic = f.Topic, QualityOfServiceLevel = f.QualityOfServiceLevel }; + } + public static implicit operator RetainedMessage(MqttApplicationMessage f) + { + return new RetainedMessage(f); + } + public byte[] Payload { get; set; } + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } + public string Topic { get; set; } + public bool Retain { get; set; } + MD5 MD5 = MD5.Create(); + public RetainedMessage() + { + + } + public RetainedMessage(MqttApplicationMessage retained) + { + Topic = retained.Topic; + QualityOfServiceLevel = retained.QualityOfServiceLevel; + Payload = retained.Payload; + Retain = retained.Retain; + List lst = new List(Payload); + lst.AddRange(System.Text.Encoding.UTF8.GetBytes(Topic)); + Id = BitConverter.ToString(MD5.ComputeHash(lst.ToArray())).Replace("-", ""); + } + [Key] + public string Id { get; set; } + } +} diff --git a/IoTSharp/Data/RetainedMessageHandler.cs b/IoTSharp/Data/RetainedMessageHandler.cs new file mode 100644 index 0000000000000000000000000000000000000000..3ab1fa0ef98dab818efeaaea49028d5fad5c9fe2 --- /dev/null +++ b/IoTSharp/Data/RetainedMessageHandler.cs @@ -0,0 +1,70 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using MQTTnet; +using MQTTnet.Server; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace IoTSharp.Data +{ + public class RetainedMessageHandler : IMqttServerStorage + { + private ApplicationDbContext _context; + private ILogger _logger; + + public RetainedMessageHandler(ILogger logger, ApplicationDbContext context) + { + _context = context; + _logger = logger; + } + + public async Task> LoadRetainedMessagesAsync() + { + await Task.CompletedTask; + try + { + var lst = from m in _context.RetainedMessage select (MqttApplicationMessage)m; + + return await lst.ToListAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, $"load RetainedMessage error {ex.Message} "); + return new List(); + } + } + + public Task SaveRetainedMessagesAsync(IList messages) + { + Task.Factory.StartNew(() => + { + _context.Database.BeginTransaction(); + try + { + DateTime dateTime = DateTime.Now; + var needsave = from mam in messages select new RetainedMessage(mam); + var ids = needsave.Select(x => x.Id).ToList(); + var dbids = _context.RetainedMessage.Select(x => x.Id).ToArray(); + var needdelete = dbids.Except(ids);//.Except(dbids); + var del = from f in _context.RetainedMessage where needdelete.Contains(f.Id) select f; + var needadd = ids.Except(dbids); + var add = from f in needsave where needadd.Contains(f.Id) select f; + if (del.Any()) _context.RetainedMessage.RemoveRange(del); + if (add.Any()) _context.RetainedMessage.AddRange(add); + int ret = _context.SaveChanges(); + _context.Database.CommitTransaction(); + _logger.LogInformation($"{ret} pieces of data were saved and took {DateTime.Now.Subtract(dateTime).TotalSeconds} seconds."); + } + catch (Exception ex) + { + _context.Database.RollbackTransaction(); + _logger.LogError(ex, $" An exception was encountered,{ex.Message}"); + } + }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/IoTSharp/Extensions/MqttExtension.cs b/IoTSharp/Extensions/MqttExtension.cs index 561d03219aad935ac5ab3abd57aaa84051de6465..4775d3013349b890d945eeb45110cc4ebaf636a5 100644 --- a/IoTSharp/Extensions/MqttExtension.cs +++ b/IoTSharp/Extensions/MqttExtension.cs @@ -14,6 +14,7 @@ using IoTSharp.Services; using MQTTnet.Server; using MQTTnet.Client.Receiving; using MQTTnet.Client.Options; +using IoTSharp.MQTT; namespace IoTSharp { @@ -22,7 +23,6 @@ namespace IoTSharp //static private IMqttServer _mqttServer; public static void AddIoTSharpMqttServer(this IServiceCollection services, MqttBrokerSetting setting) { - services.AddMqttTcpServerAdapter(); services.AddHostedMqttServerEx(options => { @@ -42,6 +42,7 @@ namespace IoTSharp { options.WithoutEncryptedEndpoint(); } + options.Build(); }); services.AddMqttConnectionHandler(); @@ -52,6 +53,7 @@ namespace IoTSharp { app.UseMqttEndpoint(); var mqttEvents = app.ApplicationServices.CreateScope().ServiceProvider.GetService(); + IMqttServerStorage storage = app.ApplicationServices.CreateScope().ServiceProvider.GetService(); app.UseMqttServerEx(server => { server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(args => mqttEvents.Server_ClientConnected(server, args)); @@ -62,7 +64,6 @@ namespace IoTSharp server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(args => mqttEvents.Server_ClientUnsubscribedTopic(server, args)); server.ClientConnectionValidatorHandler = new MqttServerClientConnectionValidatorHandlerDelegate(args => mqttEvents.Server_ClientConnectionValidator(server, args)); server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(args => mqttEvents.Server_ClientDisconnected(server, args)); - }); var mqttNetLogger = app.ApplicationServices.GetService(); diff --git a/IoTSharp/Handlers/MQTTService.cs b/IoTSharp/Handlers/MQTTService.cs index 1ffa6436ca4b87d54a87eeeecabb979cc2d2de9b..2ee48cdb05abe0cafe8e0e406dda584154888ad7 100644 --- a/IoTSharp/Handlers/MQTTService.cs +++ b/IoTSharp/Handlers/MQTTService.cs @@ -1,5 +1,9 @@ using IoTSharp.Data; +using IoTSharp.Diagnostics; using IoTSharp.Extensions; +using IoTSharp.MQTT; +using IoTSharp.Storage; +using IoTSharp.Sys; using IoTSharp.X509Extensions; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; @@ -7,6 +11,7 @@ using MQTTnet; using MQTTnet.AspNetCoreEx; using MQTTnet.Server; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Security.Cryptography; @@ -21,11 +26,35 @@ namespace IoTSharp.Handlers readonly ILogger _logger; readonly ApplicationDbContext _dbContext; readonly IMqttServerEx _serverEx; - public MqttEventsHandler(ILogger logger, ApplicationDbContext dbContext, IMqttServerEx serverEx) + private readonly BlockingCollection _incomingMessages = new BlockingCollection(); + private readonly Dictionary _importers = new Dictionary(); + private readonly Dictionary _subscribers = new Dictionary(); + private readonly OperationsPerSecondCounter _inboundCounter; + private readonly OperationsPerSecondCounter _outboundCounter; + + private bool _enableMqttLogging; + private readonly StorageService _storageService; + private readonly SystemCancellationToken _systemCancellationToken; + + public MqttEventsHandler(ILogger logger, ApplicationDbContext dbContext, IMqttServerEx serverEx, DiagnosticsService diagnosticsService, + StorageService storageService, + SystemStatusService systemStatusService + ) { _logger = logger; _dbContext = dbContext; _serverEx = serverEx; + + _inboundCounter = diagnosticsService.CreateOperationsPerSecondCounter("mqtt.inbound_rate"); + _outboundCounter = diagnosticsService.CreateOperationsPerSecondCounter("mqtt.outbound_rate"); + + systemStatusService.Set("mqtt.subscribers_count", () => _subscribers.Count); + systemStatusService.Set("mqtt.incoming_messages_count", () => _incomingMessages.Count); + systemStatusService.Set("mqtt.inbound_rate", () => _inboundCounter.Count); + systemStatusService.Set("mqtt.outbound_rate", () => _outboundCounter.Count); + systemStatusService.Set("mqtt.connected_clients_count", () => serverEx.GetClientStatusAsync().GetAwaiter().GetResult().Count); + + } static long clients = 0; diff --git a/IoTSharp/IoTSharp.csproj b/IoTSharp/IoTSharp.csproj index 8b1590ff31140602d2065b32586082ff5119b290..9e4d42e96ebf9d095ae7916c405b6ea30ae74216 100644 --- a/IoTSharp/IoTSharp.csproj +++ b/IoTSharp/IoTSharp.csproj @@ -56,6 +56,7 @@ + @@ -89,7 +90,6 @@ - diff --git a/IoTSharp/Startup.cs b/IoTSharp/Startup.cs index d305de0a75a5c39e04b8f7d6b2af3dffdc692b21..56dfcc0f815a236d2cb1cf58824d751a2429cafb 100644 --- a/IoTSharp/Startup.cs +++ b/IoTSharp/Startup.cs @@ -1,7 +1,9 @@ using IoTSharp.Data; +using IoTSharp.Diagnostics; using IoTSharp.Extensions; using IoTSharp.MQTT; using IoTSharp.Services; +using IoTSharp.Storage; using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Builder; @@ -62,7 +64,7 @@ namespace IoTSharp services.AddLogging(loggingBuilder => loggingBuilder.AddConsole()); - //services.AddIoTSharpHub(Configuration); + services.AddIoTSharpHub(Configuration); // Enable the Gzip compression especially for Kestrel services.Configure(options => options.Level = System.IO.Compression.CompressionLevel.Optimal); services.AddResponseCompression(options => @@ -90,15 +92,12 @@ namespace IoTSharp configure.Description = description?.Description; }); services.AddTransient(); - services.AddIoTSharpMqttServer(AppSettings.MqttBroker); services.AddMqttClient(AppSettings.MqttClient); services.AddHostedService(); - //services.AddTransient(); - //foreach (var singletonService in Reflection.GetClassesImplementingInterface()) - //{ - // services.AddSingleton(singletonService); - //} + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. @@ -119,7 +118,7 @@ namespace IoTSharp app.UseSwagger(); app.UseHttpsRedirection(); - //app.UseIotSharpMqttServer(); + app.UseIotSharpMqttServer(); // serviceProvider.GetRequiredService().Start(); app.UseForwardedHeaders(new ForwardedHeadersOptions