提交 704e1724 编写于 作者: 麦壳饼's avatar 麦壳饼

add mqtt events handlers

上级 eea9b36d
......@@ -8,6 +8,8 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Builder;
using MQTTnet.AspNetCore;
using MQTTnet.Diagnostics;
using MQTTnet.AspNetCoreEx;
using IoTSharp.Handlers;
namespace IoTSharp
{
......@@ -32,6 +34,7 @@ namespace IoTSharp
var clientOptions = app.ApplicationServices.GetService<IMqttClientOptions>();
mqtt.ApplicationMessageReceived += (sender, e) =>
{
_logger.LogInformation($"Received : {e.ApplicationMessage.Topic}");
};
mqtt.Connected += (sender, e) =>
......@@ -70,7 +73,7 @@ namespace IoTSharp
{
services.AddMqttTcpServerAdapter();
services.AddHostedMqttServer(options =>
services.AddHostedMqttServerEx(options =>
{
var broker = setting;
if (broker == null) broker = new MqttBrokerSetting();
......@@ -88,26 +91,25 @@ namespace IoTSharp
{
options.WithoutEncryptedEndpoint();
}
options.WithConnectionValidator(action =>
{
action.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionAccepted;
});
options.Build();
options.Build();
});
services.AddMqttConnectionHandler();
services.AddMqttWebSocketServerAdapter();
services.AddSingleton<MqttEventsHandler>();
}
public static void UseIotSharpMqttServer(this IApplicationBuilder app)
{
app.UseMqttEndpoint();
app.UseMqttServer(server =>
var mqttEvents = app.ApplicationServices.GetService<MqttEventsHandler>();
app.UseMqttServerEx(server =>
{
server.ClientConnected += (sender, e) => { };
server.Started += (sender, e) => { };
server.Stopped += (sender, e) => { };
server.ApplicationMessageReceived += (sender, e) => { };
server.ClientSubscribedTopic += (sender, e) => { };
server.ClientUnsubscribedTopic += (sender, e) => { };
server.ClientConnected += mqttEvents.Server_ClientConnected;
server.Started += mqttEvents.Server_Started;
server.Stopped += mqttEvents.Server_Stopped;
server.ApplicationMessageReceived += mqttEvents.Server_ApplicationMessageReceived;
server.ClientSubscribedTopic += mqttEvents.Server_ClientSubscribedTopic;
server.ClientUnsubscribedTopic += mqttEvents.Server_ClientUnsubscribedTopic;
server.ClientConnectionValidator += mqttEvents.Server_ClientConnectionValidator;
});
var mqttNetLogger = app.ApplicationServices.GetService<IMqttNetLogger>();
......
using IoTSharp.Data;
using IoTSharp.X509Extensions;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.AspNetCoreEx;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
namespace IoTSharp.Handlers
{
public class MqttEventsHandler
{
ILogger<MqttEventsHandler> Logger { get; set; }
ApplicationDbContext _dbContext;
public MqttEventsHandler(ILogger<MqttEventsHandler> _logger, ApplicationDbContext dbContext)
{
Logger = _logger;
_dbContext = dbContext;
}
static long clients = 0;
internal void Server_ClientConnected(object sender, MqttClientConnectedEventArgs e)
{
Logger.LogInformation($"Client [{e.ClientId}] connected");
clients++;
Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/clients/total", clients.ToString()));
}
static DateTime uptime = DateTime.MinValue;
internal void Server_Started(object sender, EventArgs e)
{
Logger.LogInformation($"MqttServer is started");
uptime = DateTime.Now;
}
internal void Server_Stopped(object sender, EventArgs e)
{
Logger.LogInformation($"Server is stopped");
}
Dictionary<string, int> lstTopics = new Dictionary<string, int>();
long received = 0;
internal void Server_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
Logger.LogInformation($"Server received {e.ClientId}'s message: Topic=[{e.ApplicationMessage.Topic }],Retain=[{e.ApplicationMessage.Retain}],QualityOfServiceLevel=[{e.ApplicationMessage.QualityOfServiceLevel}]");
if (!lstTopics.ContainsKey(e.ApplicationMessage.Topic))
{
lstTopics.Add(e.ApplicationMessage.Topic, 1);
Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/subscriptions/count", lstTopics.Count.ToString()));
}
else
{
lstTopics[e.ApplicationMessage.Topic]++;
}
received += e.ApplicationMessage.Payload.Length;
}
long Subscribed;
internal void Server_ClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
{
Logger.LogInformation($"Client [{e.ClientId}] subscribed [{e.TopicFilter}]");
if (e.TopicFilter.Topic.StartsWith("$SYS/"))
{
if (e.TopicFilter.Topic.StartsWith("$SYS/broker/version"))
{
var mename = typeof(MqttEventsHandler).Assembly.GetName();
var mqttnet = typeof(MqttClientSubscribedTopicEventArgs).Assembly.GetName();
Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/version", $"{mename.Name}V{mename.Version.ToString()},{mqttnet.Name}.{mqttnet.Version.ToString()}"));
}
else if (e.TopicFilter.Topic.StartsWith("$SYS/broker/uptime"))
{
Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/uptime", uptime.ToString()));
}
}
else
{
Subscribed++;
Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/subscriptions/count", Subscribed.ToString()));
}
}
internal void Server_ClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
{
Logger.LogInformation($"Client [{e.ClientId}] unsubscribed[{e.TopicFilter}]");
if (!e.TopicFilter.StartsWith("$SYS/"))
{
Subscribed--;
Task.Run(() => ((IMqttServer)sender).PublishAsync("$SYS/broker/subscriptions/count", Subscribed.ToString()));
}
}
public static string MD5Sum(string text) => BitConverter.ToString(MD5.Create().ComputeHash(Encoding.UTF8.GetBytes(text))).Replace("-", "");
internal void Server_ClientConnectionValidator(object sender, MqttClientConnectionValidatorEventArgs e)
{
MqttConnectionValidatorContext obj = e.Context;
Uri uri = new Uri("mqtt://" + obj.Endpoint);
if (string.IsNullOrEmpty(obj.Username) && uri.IsLoopback)
{
obj.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionAccepted;
Logger.LogInformation($"Loopback {obj.Endpoint}, ConnectionAccepted");
}
else
{
Logger.LogInformation($"ClientId={obj.ClientId},Endpoint={obj.Endpoint},Username={obj.Username},Password={obj.Password},WillMessage={obj.WillMessage?.ConvertPayloadToString()}");
var mcr = _dbContext.DeviceIdentities.FirstOrDefault(mc => (mc.IdentityType == IdentityType.AccessToken && mc.IdentityId == obj.Username)
|| (mc.IdentityType== IdentityType.DevicePassword && mc.IdentityId==obj.Username && mc.IdentityValue==obj.Password));
if (mcr != null)
{
try
{
var device = mcr.Device;
}
catch (Exception ex)
{
Logger.LogError(ex, "ConnectionRefusedServerUnavailable {0}", ex.Message);
obj.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionRefusedServerUnavailable;
}
}
else
{
obj.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
Logger.LogInformation($"Bad username or password {obj.Username},connection {obj.Endpoint} refused");
}
}
}
}
}
......@@ -39,17 +39,18 @@
<PackageReference Include="Microsoft.AspNetCore.Identity" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Razor.Design" Version="2.2.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="2.2.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="2.2.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="2.2.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="2.2.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite.Design" Version="1.1.6" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="2.2.1" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="2.2.2" />
<PackageReference Include="MQTTnet" Version="2.8.5" />
<PackageReference Include="MQTTnet.AspNetCore" Version="2.8.5" />
<PackageReference Include="MQTTnet.AspNetCoreEx" Version="1.0.0" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="2.8.5" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.2.0" />
<PackageReference Include="NSwag.AspNetCore" Version="12.0.13" />
<PackageReference Include="NSwag.AspNetCore" Version="12.0.15" />
<PackageReference Include="QuartzHostedService" Version="0.0.4" />
<PackageReference Include="System.Text.Encoding.CodePages" Version="4.5.1" />
</ItemGroup>
......
......@@ -12,6 +12,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet.AspNetCore;
using MQTTnet.AspNetCoreEx;
using MQTTnet.Client;
using NSwag.AspNetCore;
using System;
......@@ -79,8 +80,8 @@ namespace IoTSharp
services.AddIoTSharpMqttServer(AppSettings.MqttBroker);
services.AddMqttClient(AppSettings.MqttClient);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
......@@ -125,7 +126,7 @@ namespace IoTSharp
// see https://go.microsoft.com/fwlink/?linkid=864501
spa.Options.SourcePath = "ClientApp";
spa.Options.StartupTimeout = new TimeSpan(0, 0, 120);
if (env.IsDevelopment())
{
spa.UseAngularCliServer(npmScript: "start");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册