diff --git a/IoTSharp/Extensions/MqttExtension.cs b/IoTSharp/Extensions/MqttExtension.cs index ebec2c35f815a00273dc987c197a2a0c1063d68b..648bcce265ecda72b89d06c72ff6de29ff7ad649 100644 --- a/IoTSharp/Extensions/MqttExtension.cs +++ b/IoTSharp/Extensions/MqttExtension.cs @@ -8,6 +8,8 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.AspNetCore.Builder; using MQTTnet.AspNetCore; using MQTTnet.Diagnostics; +using MQTTnet.AspNetCoreEx; +using IoTSharp.Handlers; namespace IoTSharp { @@ -32,6 +34,7 @@ namespace IoTSharp var clientOptions = app.ApplicationServices.GetService(); mqtt.ApplicationMessageReceived += (sender, e) => { + _logger.LogInformation($"Received : {e.ApplicationMessage.Topic}"); }; mqtt.Connected += (sender, e) => @@ -70,7 +73,7 @@ namespace IoTSharp { services.AddMqttTcpServerAdapter(); - services.AddHostedMqttServer(options => + services.AddHostedMqttServerEx(options => { var broker = setting; if (broker == null) broker = new MqttBrokerSetting(); @@ -88,26 +91,25 @@ namespace IoTSharp { options.WithoutEncryptedEndpoint(); } - options.WithConnectionValidator(action => - { - action.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionAccepted; - }); - options.Build(); + options.Build(); }); services.AddMqttConnectionHandler(); services.AddMqttWebSocketServerAdapter(); + services.AddSingleton(); } public static void UseIotSharpMqttServer(this IApplicationBuilder app) { app.UseMqttEndpoint(); - app.UseMqttServer(server => + var mqttEvents = app.ApplicationServices.GetService(); + app.UseMqttServerEx(server => { - server.ClientConnected += (sender, e) => { }; - server.Started += (sender, e) => { }; - server.Stopped += (sender, e) => { }; - server.ApplicationMessageReceived += (sender, e) => { }; - server.ClientSubscribedTopic += (sender, e) => { }; - server.ClientUnsubscribedTopic += (sender, e) => { }; + server.ClientConnected += mqttEvents.Server_ClientConnected; + server.Started += mqttEvents.Server_Started; + server.Stopped += mqttEvents.Server_Stopped; + server.ApplicationMessageReceived += mqttEvents.Server_ApplicationMessageReceived; + server.ClientSubscribedTopic += mqttEvents.Server_ClientSubscribedTopic; + server.ClientUnsubscribedTopic += mqttEvents.Server_ClientUnsubscribedTopic; + server.ClientConnectionValidator += mqttEvents.Server_ClientConnectionValidator; }); var mqttNetLogger = app.ApplicationServices.GetService(); diff --git a/IoTSharp/Handlers/MqttEventsHandler.cs b/IoTSharp/Handlers/MqttEventsHandler.cs new file mode 100644 index 0000000000000000000000000000000000000000..6fecfe2ae5cd1df38f67b3ee63b835cc21368ff0 --- /dev/null +++ b/IoTSharp/Handlers/MqttEventsHandler.cs @@ -0,0 +1,134 @@ +using IoTSharp.Data; +using IoTSharp.X509Extensions; +using Microsoft.Extensions.Logging; +using MQTTnet; +using MQTTnet.AspNetCoreEx; +using MQTTnet.Server; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using System.Text; +using System.Threading.Tasks; + +namespace IoTSharp.Handlers +{ + public class MqttEventsHandler + { + ILogger Logger { get; set; } + ApplicationDbContext _dbContext; + public MqttEventsHandler(ILogger _logger, ApplicationDbContext dbContext) + { + Logger = _logger; + _dbContext = dbContext; + } + + static long clients = 0; + internal void Server_ClientConnected(object sender, MqttClientConnectedEventArgs e) + { + Logger.LogInformation($"Client [{e.ClientId}] connected"); + clients++; + Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/clients/total", clients.ToString())); + } + static DateTime uptime = DateTime.MinValue; + internal void Server_Started(object sender, EventArgs e) + { + Logger.LogInformation($"MqttServer is started"); + uptime = DateTime.Now; + } + + internal void Server_Stopped(object sender, EventArgs e) + { + Logger.LogInformation($"Server is stopped"); + } + Dictionary lstTopics = new Dictionary(); + long received = 0; + internal void Server_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) + { + Logger.LogInformation($"Server received {e.ClientId}'s message: Topic=[{e.ApplicationMessage.Topic }],Retain=[{e.ApplicationMessage.Retain}],QualityOfServiceLevel=[{e.ApplicationMessage.QualityOfServiceLevel}]"); + if (!lstTopics.ContainsKey(e.ApplicationMessage.Topic)) + { + lstTopics.Add(e.ApplicationMessage.Topic, 1); + Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/subscriptions/count", lstTopics.Count.ToString())); + } + else + { + lstTopics[e.ApplicationMessage.Topic]++; + } + received += e.ApplicationMessage.Payload.Length; + } + long Subscribed; + internal void Server_ClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e) + { + Logger.LogInformation($"Client [{e.ClientId}] subscribed [{e.TopicFilter}]"); + if (e.TopicFilter.Topic.StartsWith("$SYS/")) + { + if (e.TopicFilter.Topic.StartsWith("$SYS/broker/version")) + { + var mename = typeof(MqttEventsHandler).Assembly.GetName(); + var mqttnet = typeof(MqttClientSubscribedTopicEventArgs).Assembly.GetName(); + Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/version", $"{mename.Name}V{mename.Version.ToString()},{mqttnet.Name}.{mqttnet.Version.ToString()}")); + } + else if (e.TopicFilter.Topic.StartsWith("$SYS/broker/uptime")) + { + Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/uptime", uptime.ToString())); + } + } + else + { + Subscribed++; + Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/subscriptions/count", Subscribed.ToString())); + } + + + } + + internal void Server_ClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e) + { + Logger.LogInformation($"Client [{e.ClientId}] unsubscribed[{e.TopicFilter}]"); + if (!e.TopicFilter.StartsWith("$SYS/")) + { + Subscribed--; + Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/subscriptions/count", Subscribed.ToString())); + } + } + public static string MD5Sum(string text) => BitConverter.ToString(MD5.Create().ComputeHash(Encoding.UTF8.GetBytes(text))).Replace("-", ""); + internal void Server_ClientConnectionValidator(object sender, MqttClientConnectionValidatorEventArgs e) + { + MqttConnectionValidatorContext obj = e.Context; + Uri uri = new Uri("mqtt://" + obj.Endpoint); + if (string.IsNullOrEmpty(obj.Username) && uri.IsLoopback) + { + obj.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionAccepted; + Logger.LogInformation($"Loopback {obj.Endpoint}, ConnectionAccepted"); + } + else + { + Logger.LogInformation($"ClientId={obj.ClientId},Endpoint={obj.Endpoint},Username={obj.Username},Password={obj.Password},WillMessage={obj.WillMessage?.ConvertPayloadToString()}"); + var mcr = _dbContext.DeviceIdentities.FirstOrDefault(mc => (mc.IdentityType == IdentityType.AccessToken && mc.IdentityId == obj.Username) + || (mc.IdentityType== IdentityType.DevicePassword && mc.IdentityId==obj.Username && mc.IdentityValue==obj.Password)); + if (mcr != null) + { + try + { + var device = mcr.Device; + + } + catch (Exception ex) + { + Logger.LogError(ex, "ConnectionRefusedServerUnavailable {0}", ex.Message); + obj.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionRefusedServerUnavailable; + } + + } + else + { + obj.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; + Logger.LogInformation($"Bad username or password {obj.Username},connection {obj.Endpoint} refused"); + } + } + + } + } +} diff --git a/IoTSharp/IoTSharp.csproj b/IoTSharp/IoTSharp.csproj index 2d0b376feb49a13e21c7cb2fc7cb05d66dfc610e..e21951b1644d617b3afcd6e7c29895bf440f595f 100644 --- a/IoTSharp/IoTSharp.csproj +++ b/IoTSharp/IoTSharp.csproj @@ -39,17 +39,18 @@ - - - - + + + + - + + - + diff --git a/IoTSharp/Startup.cs b/IoTSharp/Startup.cs index 7371498bf5de46717ab26cedfd0d7723344f5504..9fb698e29c15d471abef1b13b9516a5af52661ab 100644 --- a/IoTSharp/Startup.cs +++ b/IoTSharp/Startup.cs @@ -12,6 +12,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using MQTTnet.AspNetCore; +using MQTTnet.AspNetCoreEx; using MQTTnet.Client; using NSwag.AspNetCore; using System; @@ -79,8 +80,8 @@ namespace IoTSharp services.AddIoTSharpMqttServer(AppSettings.MqttBroker); services.AddMqttClient(AppSettings.MqttClient); + - } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. @@ -125,7 +126,7 @@ namespace IoTSharp // see https://go.microsoft.com/fwlink/?linkid=864501 spa.Options.SourcePath = "ClientApp"; - + spa.Options.StartupTimeout = new TimeSpan(0, 0, 120); if (env.IsDevelopment()) { spa.UseAngularCliServer(npmScript: "start");