提交 63cf995d 编写于 作者: 麦壳饼's avatar 麦壳饼

修正合并中遇到的问题

# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ master ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
schedule:
- cron: '16 17 * * 5'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
language: [ 'csharp', 'javascript' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: Checkout repository
uses: actions/checkout@v2
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
......@@ -14,9 +14,9 @@
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.1" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="6.0.0" />
<PackageReference Include="EFCore.Sharding.MySql" Version="6.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.NetTopologySuite" Version="6.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="6.0.1" />
<PackageReference Include="EFCore.Sharding.MySql" Version="6.0.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.NetTopologySuite" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
......
......@@ -8,7 +8,7 @@
<PackageReference Include="AspNetCore.HealthChecks.Oracle" Version="6.0.1" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Core" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.UI.InMemory.Storage" Version="6.0.2" />
<PackageReference Include="EFCore.Sharding.Oracle" Version="6.0.0" />
<PackageReference Include="EFCore.Sharding.Oracle" Version="6.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="6.0.1" />
<PackageReference Include="Oracle.EntityFrameworkCore" Version="6.21.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="6.0.1">
......
......@@ -14,12 +14,12 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.1" />
<PackageReference Include="Npgsql" Version="6.0.2" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.2" />
<PackageReference Include="EFCore.Sharding.PostgreSql" Version="6.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NetTopologySuite" Version="6.0.2" />
<PackageReference Include="Npgsql.Json.NET" Version="6.0.2" />
<PackageReference Include="Npgsql.NodaTime" Version="6.0.2" />
<PackageReference Include="Npgsql" Version="6.0.3" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.3" />
<PackageReference Include="EFCore.Sharding.PostgreSql" Version="6.0.2" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NetTopologySuite" Version="6.0.3" />
<PackageReference Include="Npgsql.Json.NET" Version="6.0.3" />
<PackageReference Include="Npgsql.NodaTime" Version="6.0.3" />
</ItemGroup>
<ItemGroup>
......
......@@ -8,7 +8,7 @@
<PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="6.0.1" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Core" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.UI.SqlServer.Storage" Version="6.0.2" />
<PackageReference Include="EFCore.Sharding.SqlServer" Version="6.0.0" />
<PackageReference Include="EFCore.Sharding.SqlServer" Version="6.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.1" />
</ItemGroup>
......
......@@ -14,7 +14,7 @@
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.1" />
<PackageReference Include="EFCore.Sharding.SQLite" Version="6.0.0" />
<PackageReference Include="EFCore.Sharding.SQLite" Version="6.0.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\IoTSharp.Data\IoTSharp.Data.csproj" />
......
......@@ -9,7 +9,7 @@
<PackageReference Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="6.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="EFCore.Sharding" Version="6.0.0" />
<PackageReference Include="EFCore.Sharding" Version="6.0.2" />
</ItemGroup>
......
......@@ -15,7 +15,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MQTTnet" Version="3.1.1" />
<PackageReference Include="MQTTnet" Version="4.0.0-preview3" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>
......
......@@ -26,7 +26,7 @@ namespace IoTSharp.TaskAction
string contentType = "application/json";
var restclient = new RestClient(config.BaseUrl);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.Post);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.POST);
request.AddHeader("X-Access-Token",
config.Token);
request.RequestFormat = DataFormat.Json;
......
......@@ -5,7 +5,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="RestSharp" Version="107.1.1" />
<PackageReference Include="RestSharp" Version="106.15.0" />
</ItemGroup>
<ItemGroup>
......
......@@ -64,7 +64,7 @@ namespace IoTSharp.TaskAction
var dd = o.Properties().Select(c => new ParamObject { keyName = c.Name, value = JPropertyToObject(c.Value.First as JProperty) }).ToList();
string contentType = "application/json";
var restclient = new RestClient(config.BaseUrl);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.Post);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.POST);
request.AddHeader("X-Access-Token",
config.Token);
request.RequestFormat = DataFormat.Json;
......@@ -98,7 +98,7 @@ namespace IoTSharp.TaskAction
var dd = o.Properties().Select(c => new ParamObject { keyName = c.Name, value = JPropertyToObject(c) }).ToList();
string contentType = "application/json";
var restclient = new RestClient(config.BaseUrl);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.Post);
var request = new RestRequest(config.Url + (input.DeviceId == Guid.Empty ? "" : "/" + input.DeviceId), Method.POST);
request.AddHeader("X-Access-Token",
config.Token);
request.RequestFormat = DataFormat.Json;
......
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using System;
......@@ -30,9 +28,11 @@ namespace IoTSharp.Extensions
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
_logger = logger;
_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(args => OnApplicationMessageReceived(mqttClient, args) );
_mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceived;
}
public RpcClient(IMqttClientOptions mqtt, Microsoft.Extensions.Logging.ILogger _logger) :this (new MQTTnet.MqttFactory().CreateMqttClient(), _logger)
{
_mqtt = mqtt;
......@@ -121,19 +121,16 @@ namespace IoTSharp.Extensions
}
}
private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
{
if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs))
{
return;
}
if (tcs.Task.IsCompleted || tcs.Task.IsCanceled)
if (_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs))
{
return;
if (!tcs.Task.IsCompleted && !tcs.Task.IsCanceled)
{
tcs.TrySetResult(eventArgs.ApplicationMessage.Payload);
}
}
tcs.TrySetResult(eventArgs.ApplicationMessage.Payload);
return Task.CompletedTask;
}
......
......@@ -19,14 +19,9 @@ using MQTTnet.Protocol;
using IoTSharp.Extensions;
using IoTSharp.Models;
using MQTTnet.Exceptions;
using MQTTnet.Client.Options;
using Microsoft.AspNetCore.Identity;
using Microsoft.Extensions.Logging;
using IoTSharp.Storage;
using k8s.Models;
using Newtonsoft.Json.Linq;
using MQTTnet.AspNetCoreEx;
using MQTTnet.Server.Status;
using System.Security.Cryptography.X509Certificates;
using Microsoft.Extensions.Options;
using IoTSharp.X509Extensions;
......@@ -34,6 +29,7 @@ using System.IO;
using System.IO.Compression;
using DotNetCore.CAP;
using LinqKit;
using MQTTnet.Server;
namespace IoTSharp.Controllers
{
......@@ -51,12 +47,12 @@ namespace IoTSharp.Controllers
private readonly SignInManager<IdentityUser> _signInManager;
private readonly ILogger _logger;
private readonly IStorage _storage;
private readonly IMqttServerEx _serverEx;
private readonly MqttServer _serverEx;
private readonly AppSettings _setting;
private readonly ICapPublisher _queue;
public DevicesController(UserManager<IdentityUser> userManager,
SignInManager<IdentityUser> signInManager, ILogger<DevicesController> logger, IMqttServerEx serverEx, ApplicationDbContext context, IMqttClientOptions mqtt, IStorage storage, IOptions<AppSettings> options, ICapPublisher queue)
SignInManager<IdentityUser> signInManager, ILogger<DevicesController> logger, MqttServer serverEx, ApplicationDbContext context, IMqttClientOptions mqtt, IStorage storage, IOptions<AppSettings> options, ICapPublisher queue)
{
_context = context;
_mqtt = mqtt;
......@@ -944,9 +940,9 @@ namespace IoTSharp.Controllers
[HttpGet("SessionStatus")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesDefaultResponseType]
public async Task<ApiResult<IList<IMqttSessionStatus>>> GetSessionStatus()
public async Task<ApiResult<IList<MqttSessionStatus>>> GetSessionStatus()
{
return new ApiResult<IList<IMqttSessionStatus>>(ApiCode.Success, "OK", await _serverEx.GetSessionStatusAsync());
return new ApiResult<IList<MqttSessionStatus>>(ApiCode.Success, "OK", await _serverEx.GetSessionsAsync());
}
/// <summary>
/// SessionStatus
......@@ -956,9 +952,9 @@ namespace IoTSharp.Controllers
[HttpGet("ClientStatus")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesDefaultResponseType]
public async Task<ApiResult<IList<IMqttClientStatus>>> GetClientStatus()
public async Task<ApiResult<IList<MqttClientStatus>>> GetClientStatus()
{
return new ApiResult<IList<IMqttClientStatus>>(ApiCode.Success, "OK", await _serverEx.GetClientStatusAsync());
return new ApiResult<IList<MqttClientStatus>>(ApiCode.Success, "OK", await _serverEx.GetClientsAsync());
}
[Authorize(Roles = nameof(UserRole.NormalUser))]
......@@ -967,7 +963,7 @@ namespace IoTSharp.Controllers
[ProducesDefaultResponseType]
public async Task<ApiResult<int>> GetSessionsCount()
{
return new ApiResult<int>(ApiCode.Success, "OK", (await _serverEx.GetClientStatusAsync()).Count);
return new ApiResult<int>(ApiCode.Success, "OK", (await _serverEx.GetClientsAsync()).Count);
}
}
}
\ No newline at end of file
......@@ -23,8 +23,6 @@ namespace IoTSharp.Controllers
public class SubscriptionEventController : Controller
{
private ApplicationDbContext _context;
private readonly FlowRuleProcessor _flowRuleProcessor;
private readonly TaskExecutorHelper _helper;
private UserManager<IdentityUser> _userManager;
// GET: SubscriptionEventController
......
......@@ -48,6 +48,8 @@ RUN KEYRING=/usr/share/keyrings/nodesource.gpg && curl -fsSL https://deb.nodesou
WORKDIR /src
COPY ["IoTSharp/ClientApp/package.json", "IoTSharp/ClientApp/package.json"]
RUN npm install --prefix ./IoTSharp/ClientApp/
COPY ["IoTSharp/IoTSharp.csproj", "IoTSharp/"]
COPY ["IoTSharp.Data/IoTSharp.Data.csproj", "IoTSharp.Data/"]
COPY ["IoTSharp.Interpreter/IoTSharp.Interpreter.csproj", "IoTSharp.Interpreter/"]
......@@ -59,15 +61,11 @@ COPY ["IoTSharp.Data.Oracle/IoTSharp.Data.Oracle.csproj", "IoTSharp.Data.Oracle/
COPY ["IoTSharp.Data.PostgreSQL/IoTSharp.Data.PostgreSQL.csproj", "IoTSharp.Data.PostgreSQL/"]
COPY ["IoTSharp.Data.MySQL/IoTSharp.Data.MySQL.csproj", "IoTSharp.Data.MySQL/"]
RUN dotnet restore "IoTSharp/IoTSharp.csproj"
COPY ["IoTSharp/ClientApp/package.json", "IoTSharp/ClientApp/package.json"]
RUN npm install --prefix ./IoTSharp/ClientApp/
COPY . .
WORKDIR "/src/IoTSharp"
RUN dotnet build "IoTSharp.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "IoTSharp.csproj" -c Release -o /app/publish
......
......@@ -48,9 +48,9 @@ namespace IoTSharp.Dtos
public string IdentityValue { get; set; }
public string IdentityId { get; set; }
#nullable enable
public DeviceModel? Model { get; set; }
#nullable disable
}
}
......@@ -8,17 +8,14 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Builder;
using MQTTnet.AspNetCore;
using MQTTnet.Diagnostics;
using MQTTnet.AspNetCoreEx;
using IoTSharp.Handlers;
using IoTSharp.Services;
using MQTTnet.Server;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Options;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using MQTTnet.Diagnostics.Logger;
using System.Security.Cryptography.X509Certificates;
using MQTTnet;
namespace IoTSharp
{
......@@ -28,7 +25,7 @@ namespace IoTSharp
public static void AddIoTSharpMqttServer(this IServiceCollection services, MqttBrokerSetting broker)
{
services.AddMqttTcpServerAdapter();
services.AddHostedMqttServerEx(options =>
services.AddHostedMqttServer(options =>
{
options.WithDefaultEndpointPort(broker.Port).WithDefaultEndpoint();
if (broker.EnableTls)
......@@ -48,6 +45,7 @@ namespace IoTSharp
{
options.WithoutEncryptedEndpoint();
}
options.WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(5));
options.WithPersistentSessions();
options.Build();
}).AddMqttConnectionHandler()
......@@ -58,19 +56,37 @@ namespace IoTSharp
public static void UseIotSharpMqttServer(this IApplicationBuilder app)
{
var mqttEvents = app.ApplicationServices.CreateScope().ServiceProvider.GetService<MQTTServerHandler>();
IMqttServerStorage storage = app.ApplicationServices.CreateScope().ServiceProvider.GetService<IMqttServerStorage>();
app.UseMqttServerEx(server =>
app.UseMqttServer(server =>
{
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(args => mqttEvents.Server_ClientConnected(server, args));
server.StartedHandler = new MqttServerStartedHandlerDelegate(args => mqttEvents.Server_Started(server, args));
server.StoppedHandler = new MqttServerStoppedHandlerDelegate(args => mqttEvents.Server_Stopped(server, args));
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(args => mqttEvents.Server_ApplicationMessageReceived(server, args));
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate( args => mqttEvents.Server_ClientSubscribedTopic(server, args));
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(args => mqttEvents.Server_ClientUnsubscribedTopic(server, args));
server.ClientConnectionValidatorHandler = new MqttServerClientConnectionValidatorHandlerDelegate(args => mqttEvents.Server_ClientConnectionValidator(server, args));
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(args => mqttEvents.Server_ClientDisconnected(server, args));
server.ClientConnectedAsync += mqttEvents.Server_ClientConnectedAsync;
server.StartedAsync += mqttEvents.Server_Started ;
server.StoppedAsync += mqttEvents.Server_Stopped ;
server.ApplicationMessageNotConsumedAsync += mqttEvents.Server_ApplicationMessageReceived ;
server .ClientSubscribedTopicAsync += mqttEvents.Server_ClientSubscribedTopic;
server.ClientUnsubscribedTopicAsync += mqttEvents.Server_ClientUnsubscribedTopic;
server.ValidatingConnectionAsync += mqttEvents.Server_ClientConnectionValidator;
server.ClientDisconnectedAsync +=mqttEvents.Server_ClientDisconnected;
});
}
public static async Task PublishAsync<T>(this MqttServer mqtt, string SenderClientId, string topic, T _payload) where T : class
{
await mqtt.PublishAsync(SenderClientId, new MqttApplicationMessage() { Topic = topic, Payload = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(_payload) });
}
public static async Task PublishAsync(this MqttServer mqtt, string SenderClientId, string topic, string _payload)
{
await mqtt.PublishAsync(SenderClientId, new MqttApplicationMessage() { Topic = topic, Payload = System.Text.Encoding.Default.GetBytes(_payload) });
}
public static async Task PublishAsync(this MqttServer mqtt, string SenderClientId, string topic, byte[] _payload)
{
await mqtt.PublishAsync(SenderClientId, new MqttApplicationMessage() { Topic = topic, Payload = _payload });
}
public static async Task PublishAsync ( this MqttServer mqtt, string SenderClientId ,MqttApplicationMessage message)
{
var clients = await mqtt.GetClientsAsync();
var client= clients.FirstOrDefault(c => c.Id == SenderClientId);
await client.Session.EnqueueApplicationMessageAsync(message);
}
public static void AddMqttClient(this IServiceCollection services, MqttClientSetting setting)
......
......@@ -46,69 +46,77 @@ namespace IoTSharp.Handlers
[CapSubscribe("iotsharp.services.datastream.attributedata")]
public async void StoreAttributeData(RawMsg msg)
{
using (var _scope = _scopeFactor.CreateScope())
try
{
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
using (var _scope = _scopeFactor.CreateScope())
{
var device = _dbContext.Device.FirstOrDefault(d => d.Id == msg.DeviceId);
if (device != null)
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var mb = msg.MsgBody;
Dictionary<string, object> dc = new Dictionary<string, object>();
mb.ToList().ForEach(kp =>
var device = _dbContext.Device.FirstOrDefault(d => d.Id == msg.DeviceId);
if (device != null)
{
if (kp.Value.GetType() == typeof(System.Text.Json.JsonElement))
var mb = msg.MsgBody;
Dictionary<string, object> dc = new Dictionary<string, object>();
mb.ToList().ForEach(kp =>
{
var je = (System.Text.Json.JsonElement)kp.Value;
switch (je.ValueKind)
if (kp.Value.GetType() == typeof(System.Text.Json.JsonElement))
{
case System.Text.Json.JsonValueKind.Undefined:
case System.Text.Json.JsonValueKind.Object:
case System.Text.Json.JsonValueKind.Array:
dc.Add(kp.Key, je.GetRawText());
break;
case System.Text.Json.JsonValueKind.String:
dc.Add(kp.Key, je.GetString());
break;
case System.Text.Json.JsonValueKind.Number:
dc.Add(kp.Key, je.GetDouble());
break;
case System.Text.Json.JsonValueKind.True:
case System.Text.Json.JsonValueKind.False:
dc.Add(kp.Key, je.GetBoolean());
break;
case System.Text.Json.JsonValueKind.Null:
break;
default:
break;
var je = (System.Text.Json.JsonElement)kp.Value;
switch (je.ValueKind)
{
case System.Text.Json.JsonValueKind.Undefined:
case System.Text.Json.JsonValueKind.Object:
case System.Text.Json.JsonValueKind.Array:
dc.Add(kp.Key, je.GetRawText());
break;
case System.Text.Json.JsonValueKind.String:
dc.Add(kp.Key, je.GetString());
break;
case System.Text.Json.JsonValueKind.Number:
dc.Add(kp.Key, je.GetDouble());
break;
case System.Text.Json.JsonValueKind.True:
case System.Text.Json.JsonValueKind.False:
dc.Add(kp.Key, je.GetBoolean());
break;
case System.Text.Json.JsonValueKind.Null:
break;
default:
break;
}
}
else
{
dc.Add(kp.Key, kp.Value);
}
}
else
{
dc.Add(kp.Key, kp.Value);
}
});
var result2 = await _dbContext.SaveAsync<AttributeLatest>(dc, device.Id, msg.DataSide);
result2.exceptions?.ToList().ForEach(ex =>
{
_logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}");
});
_logger.LogInformation($"更新{device.Name}({device.Id})属性数据结果{result2.ret}");
ExpandoObject obj = new ExpandoObject();
dc.ToList().ForEach(kv =>
{
obj.TryAdd(kv.Key, kv.Value);
});
await RunRules(msg.DeviceId, obj, MountType.Telemetry);
});
var result2 = await _dbContext.SaveAsync<AttributeLatest>(dc, device.Id, msg.DataSide);
result2.exceptions?.ToList().ForEach(ex =>
{
_logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}");
});
_logger.LogInformation($"更新{device.Name}({device.Id})属性数据结果{result2.ret}");
ExpandoObject obj = new ExpandoObject();
dc.ToList().ForEach(kv =>
{
obj.TryAdd(kv.Key, kv.Value);
});
await RunRules(msg.DeviceId, obj, MountType.Telemetry);
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "StoreAttributeData:"+ex.Message);
}
}
......@@ -184,25 +192,33 @@ namespace IoTSharp.Handlers
private async Task RunRules(Guid devid, object obj, MountType mountType)
{
var rules = await _caching.GetAsync($"ruleid_{devid}_{Enum.GetName(mountType)}", async () =>
try
{
using (var scope = _scopeFactor.CreateScope())
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
var rules = await _caching.GetAsync($"ruleid_{devid}_{Enum.GetName(mountType)}", async () =>
{
using (var scope = _scopeFactor.CreateScope())
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var guids = await _dbContext.GerDeviceRulesIdList(devid, mountType);
return guids;
}
}, TimeSpan.FromSeconds(_appSettings.RuleCachingExpiration));
if (rules.HasValue)
{
var guids = await _dbContext.GerDeviceRulesIdList(devid, mountType);
return guids;
rules.Value.ToList().ForEach(async g =>
{
await _flowRuleProcessor.RunFlowRules(g, obj, devid, EventType.Normal, null);
});
}
}, TimeSpan.FromSeconds(_appSettings.RuleCachingExpiration));
if (rules.HasValue)
{
rules.Value.ToList().ForEach(async g =>
else
{
await _flowRuleProcessor.RunFlowRules(g, obj, devid, EventType.Normal, null);
});
_logger.LogInformation($"{devid}的数据无相关规则链处理。");
}
}
else
catch (Exception ex)
{
_logger.LogInformation($"{devid}的数据无相关规则链处理。");
_logger.LogError ( ex,$"{devid}的数据无相关规则链处理。");
}
}
}
......
using IoTSharp.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using IoTSharp.Extensions;
using Silkier.AspNetCore;
namespace IoTSharp.Handlers
{
public class RetainedMessageHandler : IMqttServerStorage
{
private ApplicationDbContext _context;
private ILogger _logger;
public RetainedMessageHandler(ILogger<RetainedMessageHandler> logger, IServiceScopeFactory scopeFactor)
{
_context = scopeFactor.GetRequiredService<ApplicationDbContext>();
_logger = logger;
}
public async Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
{
//try
//{
// var lst = from m in _context.RetainedMessage select (MqttApplicationMessage)m;
// return await lst.ToListAsync();
//}
//catch (Exception ex)
//{
// _logger.LogError(ex, $"load RetainedMessage error {ex.Message} ");
// return new List<MqttApplicationMessage>();
//}
return await Task.FromResult( new List<MqttApplicationMessage>());
}
public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
{
// BaseTask.Factory.StartNew(() =>
//{
// _context.Database.BeginTransaction();
// try
// {
// DateTime dateTime = DateTime.Now;
// var needsave = from mam in messages select new RetainedMessage(mam);
// var ids = needsave.Select(x => x.Id).ToList();
// var dbids = _context.RetainedMessage.Select(x => x.Id).ToArray();
// var needdelete = dbids.Except(ids);//.Except(dbids);
// var del = from f in _context.RetainedMessage where needdelete.Contains(f.Id) select f;
// var needadd = ids.Except(dbids);
// var add = from f in needsave where needadd.Contains(f.Id) select f;
// if (del.Any()) _context.RetainedMessage.RemoveRange(del);
// if (add.Any()) _context.RetainedMessage.AddRange(add);
// int ret = _context.SaveChanges();
// _context.Database.CommitTransaction();
// _logger.LogInformation($"{ret} pieces of data were saved and took {DateTime.Now.Subtract(dateTime).TotalSeconds} seconds.");
// }
// catch (Exception ex)
// {
// _context.Database.RollbackTransaction();
// _logger.LogError(ex, $" An exception was encountered,{ex.Message}");
// }
//}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
return Task.CompletedTask;
}
}
}
\ No newline at end of file
......@@ -63,12 +63,12 @@
<PackageReference Include="DotNetCore.CAP.MongoDB" Version="6.0.0" />
<PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="6.0.0" />
<PackageReference Include="DotNetCore.CAP.PostgreSql" Version="6.0.0" />
<PackageReference Include="EasyCaching.Core" Version="1.4.1" />
<PackageReference Include="EasyCaching.InMemory" Version="1.4.1" />
<PackageReference Include="EasyCaching.LiteDB" Version="1.4.1" />
<PackageReference Include="EasyCaching.Redis" Version="1.4.1" />
<PackageReference Include="EasyCaching.Core" Version="1.5.0" />
<PackageReference Include="EasyCaching.InMemory" Version="1.5.0" />
<PackageReference Include="EasyCaching.LiteDB" Version="1.5.0" />
<PackageReference Include="EasyCaching.Redis" Version="1.5.0" />
<PackageReference Include="hyjiacan.pinyin4net" Version="4.1.0" />
<PackageReference Include="InfluxDB.Client" Version="3.3.0-dev.4823" />
<PackageReference Include="InfluxDB.Client" Version="3.3.0" />
<PackageReference Include="IoTSharp.CoAP.NET" Version="2.0.8" />
<PackageReference Include="IoTSharp.X509Extensions" Version="1.4.27" />
<PackageReference Include="Jdenticon-net" Version="3.1.2" />
......@@ -89,20 +89,19 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="6.0.1" />
<PackageReference Include="MQTTnet" Version="3.1.1" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.1.1" />
<PackageReference Include="MQTTnet.AspNetCoreEx" Version="3.1.2" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="3.1.1" />
<PackageReference Include="NetMQ" Version="4.0.1.6" />
<PackageReference Include="Npgsql" Version="6.0.2" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.2" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NetTopologySuite" Version="6.0.2" />
<PackageReference Include="NSwag.AspNetCore" Version="13.15.5" />
<PackageReference Include="MQTTnet" Version="4.0.0-preview3" />
<PackageReference Include="MQTTnet.AspNetCore" Version="4.0.0-preview3" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="4.0.0-preview3" />
<PackageReference Include="NetMQ" Version="4.0.1.8" />
<PackageReference Include="Npgsql" Version="6.0.3" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.3" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NetTopologySuite" Version="6.0.3" />
<PackageReference Include="NSwag.AspNetCore" Version="13.15.7" />
<PackageReference Include="PinusDB" Version="1.0.10" />
<PackageReference Include="PinusDB.HealthChecks" Version="1.0.10" />
<PackageReference Include="ProxyKit" Version="2.3.4" />
<PackageReference Include="Quartz.Serialization.Json" Version="3.3.3" />
<PackageReference Include="RestSharp" Version="107.1.1" />
<PackageReference Include="RestSharp" Version="106.15.0" />
<PackageReference Include="Rin" Version="2.6.0" />
<PackageReference Include="Rin.Mvc" Version="2.6.0" />
<PackageReference Include="RulesEngine" Version="3.5.0" />
......@@ -124,7 +123,7 @@
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="6.0.0" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="3.1.1" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.0.0-preview3" />
<PackageReference Include="Microsoft.AspNetCore.SpaServices.Extensions" Version="6.0.1" />
<PackageReference Include="Microsoft.AspNetCore.ApiAuthorization.IdentityServer" Version="6.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Diagnostics.EntityFrameworkCore" Version="6.0.1" />
......
......@@ -3,7 +3,7 @@ using IoTSharp.Data;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MQTTnet.AspNetCoreEx;
using MQTTnet.Server;
using Quartz;
using SilkierQuartz;
using System;
......@@ -20,9 +20,9 @@ namespace IoTSharp.Jobs
private readonly MqttClientSetting _mcsetting;
private readonly ILogger<CheckDevices> _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly IMqttServerEx _serverEx;
private readonly MqttServer _serverEx;
public CheckDevices(ILogger<CheckDevices> logger, IServiceScopeFactory scopeFactor, IMqttServerEx serverEx
public CheckDevices(ILogger<CheckDevices> logger, IServiceScopeFactory scopeFactor, MqttServer serverEx
, IOptions<AppSettings> options)
{
_mcsetting = options.Value.MqttClient;
......@@ -38,12 +38,12 @@ namespace IoTSharp.Jobs
using (var scope = _scopeFactor.CreateScope())
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var clientstatus = await _serverEx.GetClientStatusAsync();
var clientstatus = await _serverEx.GetClientsAsync();
clientstatus.ToList().ForEach(cs =>
{
try
{
var _device = cs.Session.Items?.FirstOrDefault(k => (string)k.Key == nameof(Device)).Value as Device;
var _device = cs.Session.Items[ nameof(Device)] as Device;
if (_device != null)
{
var d = _dbContext.Device.FirstOrDefault(d => d.Id == _device.Id);
......@@ -58,7 +58,7 @@ namespace IoTSharp.Jobs
}
catch (Exception ex)
{
_logger.LogInformation($"检查设备{cs.ClientId}-{cs.Endpoint}) 时遇到异常{ex.Message}{ex.InnerException?.Message} 发送消息:{cs.SentApplicationMessagesCount}({cs.BytesSent}kb) 收到{cs.ReceivedApplicationMessagesCount}({cs.BytesReceived / 1024}KB ) ");
_logger.LogInformation($"检查设备{cs.Id}-{cs.Endpoint}) 时遇到异常{ex.Message}{ex.InnerException?.Message} 发送消息:{cs.SentApplicationMessagesCount}({cs.BytesSent}kb) 收到{cs.ReceivedApplicationMessagesCount}({cs.BytesReceived / 1024}KB ) ");
}
});
......
......@@ -2,10 +2,11 @@
{
public class DeviceProp
{
#nullable enable
public Body? body { get; set; }
public Position? position { get; set; }
public Size? size { get; set; }
public Text? text { get; set; }
#nullable disable
}
}
\ No newline at end of file
......@@ -8,10 +8,10 @@
public long[] incomes { get; set; }
public long[] outgoings { get; set; }
#nullable enable
public DeviceProp? prop { get; set; }
#nullable disable
}
}
\ No newline at end of file
......@@ -7,10 +7,6 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using System;
using System.Collections.Generic;
using System.Linq;
......
......@@ -158,7 +158,6 @@ namespace IoTSharp
services.AddTransient<ApplicationDBInitializer>();
services.AddIoTSharpMqttServer(settings.MqttBroker);
services.AddMqttClient(settings.MqttClient);
services.AddSingleton<RetainedMessageHandler>();
services.AddSilkierQuartz(options =>
{
options.VirtualPathRoot = "/quartz";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册