From ac56bd77269d7634c9d28f141851c950e24ca2d5 Mon Sep 17 00:00:00 2001 From: MysticBoy Date: Thu, 6 Aug 2020 03:14:46 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E8=BF=87EFCore.Sharding=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E4=BA=86=E5=88=86=E8=A1=A8=E5=88=86=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .editorconfig | 6 + IoTSharp.Test/IoTSharp.Test.csproj | 4 + IoTSharp.sln | 1 + IoTSharp/Controllers/DevicesController.cs | 13 +- IoTSharp/Extensions/DataExtension.cs | 14 +-- IoTSharp/Extensions/DeviceExtension.cs | 4 +- IoTSharp/IoTSharp.xml | 14 ++- IoTSharp/Jobs/PushData.cs | 9 +- IoTSharp/Startup.cs | 2 +- IoTSharp/Storage/EFStorage.cs | 28 ++++- IoTSharp/Storage/IStorage.cs | 3 +- IoTSharp/Storage/ShardingStorage.cs | 142 ++++++++++++++++++++++ IoTSharp/appsettings.Development.json | 2 +- 13 files changed, 211 insertions(+), 31 deletions(-) create mode 100644 .editorconfig create mode 100644 IoTSharp/Storage/ShardingStorage.cs diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..5a5ca272 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,6 @@ +[*.cs] + +# CS1584: XML 注释中有语法错误的 cref 特性 +dotnet_diagnostic.CS1584.severity = none +dotnet_diagnostic.CS1658.severity = none + diff --git a/IoTSharp.Test/IoTSharp.Test.csproj b/IoTSharp.Test/IoTSharp.Test.csproj index 9fa5ada8..0f5bd24f 100644 --- a/IoTSharp.Test/IoTSharp.Test.csproj +++ b/IoTSharp.Test/IoTSharp.Test.csproj @@ -6,6 +6,10 @@ false + + + + diff --git a/IoTSharp.sln b/IoTSharp.sln index 0b5aad6f..be2d4ca7 100644 --- a/IoTSharp.sln +++ b/IoTSharp.sln @@ -5,6 +5,7 @@ VisualStudioVersion = 16.0.28803.156 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{D5C97089-F896-436D-8E99-27B2E43BC65F}" ProjectSection(SolutionItems) = preProject + .editorconfig = .editorconfig appveyor.yml = appveyor.yml buildimage.cmd = buildimage.cmd ef_mg_add.cmd = ef_mg_add.cmd diff --git a/IoTSharp/Controllers/DevicesController.cs b/IoTSharp/Controllers/DevicesController.cs index 2caf14ea..0aaf83c2 100644 --- a/IoTSharp/Controllers/DevicesController.cs +++ b/IoTSharp/Controllers/DevicesController.cs @@ -121,14 +121,14 @@ namespace IoTSharp.Controllers [ProducesResponseType(StatusCodes.Status200OK)] [ProducesResponseType(typeof(ApiResult), StatusCodes.Status404NotFound)] [ProducesDefaultResponseType] - public async Task>> GetTelemetryLatest(Guid deviceId) + public async Task>> GetTelemetryLatest(Guid deviceId) { var devid = from dev in _context.TelemetryLatest where dev.DeviceId == deviceId select dev; if (!devid.Any()) { return NotFound(new ApiResult(ApiCode.NotFoundDeviceIdentity, $"Device's Identity not found ")); } - return await devid.ToListAsync(); + return await _storage.GetTelemetryLatest(deviceId); } /// /// Request telemetry values from the server @@ -141,7 +141,7 @@ namespace IoTSharp.Controllers [ProducesResponseType(StatusCodes.Status200OK)] [ProducesResponseType(typeof(ApiResult), StatusCodes.Status404NotFound)] [ProducesDefaultResponseType] - public async Task> GetTelemetryLatest(Guid deviceId, string keys) + public async Task>> GetTelemetryLatest(Guid deviceId, string keys) { var dev = _context.Device.Find(deviceId); if (dev == null) @@ -150,8 +150,7 @@ namespace IoTSharp.Controllers } else { - var kv = from t in _context.TelemetryLatest where t.DeviceId == dev.Id && keys.Split(',',' ',';').Contains(t.KeyName) select t; - return (await kv.FirstOrDefaultAsync())?.ToObject(); + return await _storage.GetTelemetryLatest(deviceId,keys); } } /// @@ -431,7 +430,7 @@ namespace IoTSharp.Controllers } else { - var result = await _context.SaveAsync(telemetrys, device, DataSide.ClientSide); + var result = await _context.SaveAsync(telemetrys, device.Id, DataSide.ClientSide); return Ok(new ApiResult(result.ret > 0 ? ApiCode.Success : ApiCode.NothingToDo, result.ret > 0 ? "OK" : "No Telemetry save", result.exceptions)); } } @@ -494,7 +493,7 @@ namespace IoTSharp.Controllers } else { - var result = await _context.SaveAsync(attributes, dev, DataSide.ClientSide); + var result = await _context.SaveAsync(attributes, dev.Id, DataSide.ClientSide); return Ok(new ApiResult(result.ret > 0 ? ApiCode.Success : ApiCode.NothingToDo, result.ret > 0 ? "OK" : "No Attribute save", result.exceptions)); } } diff --git a/IoTSharp/Extensions/DataExtension.cs b/IoTSharp/Extensions/DataExtension.cs index 84e52b8a..83026e83 100644 --- a/IoTSharp/Extensions/DataExtension.cs +++ b/IoTSharp/Extensions/DataExtension.cs @@ -28,13 +28,13 @@ namespace IoTSharp.Extensions /// /// Latest /// - /// /// + /// /// /// - internal static async Task<(int ret, Dic exceptions)> SaveAsync(this ApplicationDbContext _context, Dictionary data, Device device, DataSide dataSide) where L : DataStorage, new() + internal static async Task<(int ret, Dic exceptions)> SaveAsync(this ApplicationDbContext _context, Dictionary data, Guid deviceId, DataSide dataSide) where L : DataStorage, new() { - Dic exceptions = _context.PreparingData(data, device, dataSide); + Dic exceptions = _context.PreparingData(data, deviceId, dataSide); int ret = await _context.SaveChangesAsync(); return (ret, exceptions); } @@ -44,15 +44,15 @@ namespace IoTSharp.Extensions /// /// /// - /// + /// /// /// - internal static Dic PreparingData(this ApplicationDbContext _context, Dictionary data, Device device, DataSide dataSide) + internal static Dic PreparingData(this ApplicationDbContext _context, Dictionary data, Guid deviceId, DataSide dataSide) where L : DataStorage, new() { Dic exceptions = new Dic(); - var devdata = from tx in _context.Set() where tx.DeviceId == device.Id select tx; + var devdata = from tx in _context.Set() where tx.DeviceId == deviceId select tx; data.ToList().ForEach(kp => { try @@ -66,7 +66,7 @@ namespace IoTSharp.Extensions } else { - var t2 = new L() { DateTime = DateTime.Now, DeviceId = device.Id, KeyName = kp.Key, DataSide = dataSide }; + var t2 = new L() { DateTime = DateTime.Now, DeviceId = deviceId, KeyName = kp.Key, DataSide = dataSide }; t2.Catalog = (typeof(L) == typeof(AttributeLatest) ? DataCatalog.AttributeLatest : ((typeof(L) == typeof(TelemetryLatest) ? DataCatalog.TelemetryLatest : 0))); diff --git a/IoTSharp/Extensions/DeviceExtension.cs b/IoTSharp/Extensions/DeviceExtension.cs index ad41b41d..a7fbba79 100644 --- a/IoTSharp/Extensions/DeviceExtension.cs +++ b/IoTSharp/Extensions/DeviceExtension.cs @@ -29,7 +29,7 @@ namespace IoTSharp.Extensions }); Dictionary pairs = new Dictionary(); pairs.Add("CreateDateTime", DateTime.Now); - _context.PreparingData(pairs, device, DataSide.ServerSide); + _context.PreparingData(pairs, device.Id, DataSide.ServerSide); } } public static void AfterCreateDevice(this ApplicationDbContext _context, Device device,string username,string password) @@ -49,7 +49,7 @@ namespace IoTSharp.Extensions }) ; Dictionary pairs = new Dictionary(); pairs.Add("CreateDateTime", DateTime.Now); - _context.PreparingData(pairs, device, DataSide.ServerSide); + _context.PreparingData(pairs, device.Id, DataSide.ServerSide); } } } diff --git a/IoTSharp/IoTSharp.xml b/IoTSharp/IoTSharp.xml index 69caf3ef..b4702b89 100644 --- a/IoTSharp/IoTSharp.xml +++ b/IoTSharp/IoTSharp.xml @@ -24,25 +24,25 @@ https://github.com/chkr1011/MQTTnet/blob/master/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs - + Save Data to Device's and Latest - + - + Preparing Data to Device's - + @@ -233,6 +233,12 @@ 查找类似 The certificate is installed 的本地化字符串。 + + + 由于此模式目前无法通过EFCore.Sharding 进行Group By 获取最新遥测数据, 和Select 新对象, 所以, 最新遥测依然从DataStorage表里获取,历史从分表里获取 + 更多内容可以参考 + + Uses LiteDB to provide a persisted, thread safe, (optionally) transactional, FIFO queue. diff --git a/IoTSharp/Jobs/PushData.cs b/IoTSharp/Jobs/PushData.cs index 931684f0..bbc1f4f3 100644 --- a/IoTSharp/Jobs/PushData.cs +++ b/IoTSharp/Jobs/PushData.cs @@ -58,7 +58,7 @@ namespace IoTSharp.Jobs { case DataCatalog.AttributeData: - var result2 = await _dbContext.SaveAsync(msg.MsgBody, device, msg.DataSide); + var result2 = await _dbContext.SaveAsync(msg.MsgBody, device.Id, msg.DataSide); result2.exceptions?.ToList().ForEach(ex => { _logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject( msg.MsgBody[ex.Key])}"); @@ -68,12 +68,7 @@ namespace IoTSharp.Jobs case DataCatalog.TelemetryData: bool sta= await _storage.StoreTelemetryAsync(msg); - var result1 = await _dbContext.SaveAsync(msg.MsgBody, device, msg.DataSide); - result1.exceptions?.ToList().ForEach(ex => - { - _logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}"); - }); - _logger.LogInformation($"新增{device.Name}({device.Id})遥测数据{sta},更新最新信息{result1.ret}"); + _logger.LogInformation($"新增{device.Name}({device.Id})遥测数据{sta}"); break; default: break; diff --git a/IoTSharp/Startup.cs b/IoTSharp/Startup.cs index 47f46f65..73ed26fb 100644 --- a/IoTSharp/Startup.cs +++ b/IoTSharp/Startup.cs @@ -161,7 +161,7 @@ namespace IoTSharp config.AddDataSource(Configuration.GetConnectionString("TelemetryStorage"), ReadWriteType.Read | ReadWriteType.Write, settings.Sharding.DatabaseType) .SetDateSharding(nameof(TelemetryData.DateTime), settings.Sharding.ExpandByDateMode, DateTime.MinValue); }); - + services.AddSingleton(); break; case TelemetryStorage.SingleTable: services.AddSingleton(); diff --git a/IoTSharp/Storage/EFStorage.cs b/IoTSharp/Storage/EFStorage.cs index 643af80c..197be07b 100644 --- a/IoTSharp/Storage/EFStorage.cs +++ b/IoTSharp/Storage/EFStorage.cs @@ -29,7 +29,33 @@ namespace IoTSharp.Storage scope = scopeFactor.CreateScope(); } - + public Task> GetTelemetryLatest(Guid deviceId) + { + using (var _context = scope.ServiceProvider.GetRequiredService()) + { + var devid = from fx in ( from t in _context.TelemetryData where t.DeviceId == deviceId orderby t.DateTime group t by t.KeyName into g + select new { KeyName = g.Key, td =g.OrderByDescending(x=>x.DateTime).Take(1).First()}) + select new TelemetryDataDto() { DateTime = fx.td.DateTime, KeyName = fx.KeyName, Value = fx.td.ToObject() }; + + return devid.ToListAsync(); + } + } + + public Task> GetTelemetryLatest(Guid deviceId, string keys) + { + using (var _context = scope.ServiceProvider.GetRequiredService()) + { + var devid = from fx in (from t in _context.TelemetryData + where t.DeviceId == deviceId && + keys.Split(',', ' ', ';').Contains(t.KeyName) + orderby t.DateTime + group t by t.KeyName into g + select new { KeyName = g.Key, td = g.OrderByDescending(x => x.DateTime).Take(1).First() }) + select new TelemetryDataDto() { DateTime = fx.td.DateTime, KeyName = fx.KeyName, Value = fx.td.ToObject() }; + + return devid.ToListAsync(); + } + } public Task> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin) { diff --git a/IoTSharp/Storage/IStorage.cs b/IoTSharp/Storage/IStorage.cs index ec476b34..4324a3d3 100644 --- a/IoTSharp/Storage/IStorage.cs +++ b/IoTSharp/Storage/IStorage.cs @@ -12,7 +12,8 @@ namespace IoTSharp.Storage public interface IStorage { Task StoreTelemetryAsync(RawMsg msg); - + Task> GetTelemetryLatest(Guid deviceId); + Task> GetTelemetryLatest(Guid deviceId, string keys); Task> LoadTelemetryAsync(Guid deviceId, string keyName, DateTime begin); Task> LoadTelemetryAsync(Guid deviceId, string keyName, DateTime begin, DateTime end); Task> LoadTelemetryAsync(Guid deviceId, DateTime begin); diff --git a/IoTSharp/Storage/ShardingStorage.cs b/IoTSharp/Storage/ShardingStorage.cs new file mode 100644 index 00000000..93a2250e --- /dev/null +++ b/IoTSharp/Storage/ShardingStorage.cs @@ -0,0 +1,142 @@ +using EFCore.Sharding; +using IoTSharp.Data; +using IoTSharp.Dtos; +using IoTSharp.Extensions; +using IoTSharp.Queue; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace IoTSharp.Storage +{ + /// + /// 由于此模式目前无法通过EFCore.Sharding 进行Group By 获取最新遥测数据, 和Select 新对象, 所以, 最新遥测依然从DataStorage表里获取,历史从分表里获取 + /// 更多内容可以参考 + /// + public class ShardingStorage : IStorage + { + private readonly AppSettings _appSettings; + private readonly ILogger _logger; + private readonly IServiceScope scope; + + public ShardingStorage(ILogger logger, IServiceScopeFactory scopeFactor + , IOptions options + ) + { + _appSettings = options.Value; + _logger = logger; + scope = scopeFactor.CreateScope(); + } + + public Task> GetTelemetryLatest(Guid deviceId) + { + using (var _context = scope.ServiceProvider.GetRequiredService()) + { + var devid = from fx in (from t in _context.TelemetryData + where t.DeviceId == deviceId + orderby t.DateTime + group t by t.KeyName into g + select new { KeyName = g.Key, td = g.OrderByDescending(x => x.DateTime).Take(1).First() }) + select new TelemetryDataDto() { DateTime = fx.td.DateTime, KeyName = fx.KeyName, Value = fx.td.ToObject() }; + + return devid.ToListAsync(); + } + } + + public Task> GetTelemetryLatest(Guid deviceId, string keys) + { + using (var _context = scope.ServiceProvider.GetRequiredService()) + { + var devid = from fx in (from t in _context.TelemetryData + where t.DeviceId == deviceId && + keys.Split(',', ' ', ';').Contains(t.KeyName) + orderby t.DateTime + group t by t.KeyName into g + select new { KeyName = g.Key, td = g.OrderByDescending(x => x.DateTime).Take(1).First() }) + select new TelemetryDataDto() { DateTime = fx.td.DateTime, KeyName = fx.KeyName, Value = fx.td.ToObject() }; + + return devid.ToListAsync(); + } + } + + public Task> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin) + { + return LoadTelemetryAsync(deviceId, keys, begin, DateTime.Now); + } + + public Task> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin, DateTime end) + { + return Task.Run(() => + { + using (var _context = scope.ServiceProvider.GetRequiredService()) + { + var lst = new List(); + var kv = _context.GetIShardingQueryable() + .Where(t => t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin && t.DateTime < end) + .ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() }); + return kv.ToList(); + } + }); + } + + public Task> LoadTelemetryAsync(Guid deviceId, DateTime begin) + { + return LoadTelemetryAsync(deviceId, begin, DateTime.Now); + } + + public Task> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end) + { + return Task.Run(() => + { + using (var _context = scope.ServiceProvider.GetRequiredService()) + { + var lst = new List(); + var kv = _context.GetIShardingQueryable() + .Where(t => t.DeviceId == deviceId && t.DateTime >= begin && t.DateTime < end) + .ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() }); + return kv.ToList(); + } + }); + } + + public async Task StoreTelemetryAsync(RawMsg msg) + { + bool result = false; + try + { + using (var db = scope.ServiceProvider.GetService()) + { + var lst = new List(); + msg.MsgBody.ToList().ForEach(kp => + { + var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) }; + tdata.FillKVToMe(kp); + lst.Add(tdata); + }); + int ret = db.Insert(lst); + _logger.LogInformation($"新增({msg.DeviceId})遥测数据{ret}"); + } + using (var _dbContext = scope.ServiceProvider.GetRequiredService()) + { + var result1 = await _dbContext.SaveAsync(msg.MsgBody, msg.DeviceId, msg.DataSide); + result1.exceptions?.ToList().ForEach(ex => + { + _logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}"); + }); + _logger.LogInformation($"新增({msg.DeviceId})遥测数据更新最新信息{result1.ret}"); + result = true; + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} "); + } + return result; + } + } +} \ No newline at end of file diff --git a/IoTSharp/appsettings.Development.json b/IoTSharp/appsettings.Development.json index cdef4c9d..b4e1a521 100644 --- a/IoTSharp/appsettings.Development.json +++ b/IoTSharp/appsettings.Development.json @@ -7,7 +7,7 @@ } }, "ConnectionStrings": { - "IoTSharp": "Server=localhost;Database=IoTSharp;Username=postgres;Password=future;" + "IoTSharp": "Server=10.165.83.194;Database=IoTSharp;Username=postgres;Password=future;" }, "JwtKey": "kissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissme", "JwtExpireHours": 24, -- GitLab