提交 ac56bd77 编写于 作者: 麦壳饼's avatar 麦壳饼

通过EFCore.Sharding 实现了分表分库

上级 07215c13
[*.cs]
# CS1584: XML 注释中有语法错误的 cref 特性
dotnet_diagnostic.CS1584.severity = none
dotnet_diagnostic.CS1658.severity = none
......@@ -6,6 +6,10 @@
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<None Include="..\.editorconfig" Link=".editorconfig" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.6" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" />
......
......@@ -5,6 +5,7 @@ VisualStudioVersion = 16.0.28803.156
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{D5C97089-F896-436D-8E99-27B2E43BC65F}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
appveyor.yml = appveyor.yml
buildimage.cmd = buildimage.cmd
ef_mg_add.cmd = ef_mg_add.cmd
......
......@@ -121,14 +121,14 @@ namespace IoTSharp.Controllers
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ApiResult), StatusCodes.Status404NotFound)]
[ProducesDefaultResponseType]
public async Task<ActionResult<List<TelemetryLatest>>> GetTelemetryLatest(Guid deviceId)
public async Task<ActionResult<List<TelemetryDataDto>>> GetTelemetryLatest(Guid deviceId)
{
var devid = from dev in _context.TelemetryLatest where dev.DeviceId == deviceId select dev;
if (!devid.Any())
{
return NotFound(new ApiResult(ApiCode.NotFoundDeviceIdentity, $"Device's Identity not found "));
}
return await devid.ToListAsync();
return await _storage.GetTelemetryLatest(deviceId);
}
/// <summary>
/// Request telemetry values from the server
......@@ -141,7 +141,7 @@ namespace IoTSharp.Controllers
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ApiResult), StatusCodes.Status404NotFound)]
[ProducesDefaultResponseType]
public async Task<ActionResult<object>> GetTelemetryLatest(Guid deviceId, string keys)
public async Task<ActionResult<List<TelemetryDataDto>>> GetTelemetryLatest(Guid deviceId, string keys)
{
var dev = _context.Device.Find(deviceId);
if (dev == null)
......@@ -150,8 +150,7 @@ namespace IoTSharp.Controllers
}
else
{
var kv = from t in _context.TelemetryLatest where t.DeviceId == dev.Id && keys.Split(',',' ',';').Contains(t.KeyName) select t;
return (await kv.FirstOrDefaultAsync())?.ToObject();
return await _storage.GetTelemetryLatest(deviceId,keys);
}
}
/// <summary>
......@@ -431,7 +430,7 @@ namespace IoTSharp.Controllers
}
else
{
var result = await _context.SaveAsync<TelemetryLatest>(telemetrys, device, DataSide.ClientSide);
var result = await _context.SaveAsync<TelemetryLatest>(telemetrys, device.Id, DataSide.ClientSide);
return Ok(new ApiResult<Dic>(result.ret > 0 ? ApiCode.Success : ApiCode.NothingToDo, result.ret > 0 ? "OK" : "No Telemetry save", result.exceptions));
}
}
......@@ -494,7 +493,7 @@ namespace IoTSharp.Controllers
}
else
{
var result = await _context.SaveAsync<AttributeLatest>(attributes, dev, DataSide.ClientSide);
var result = await _context.SaveAsync<AttributeLatest>(attributes, dev.Id, DataSide.ClientSide);
return Ok(new ApiResult<Dic>(result.ret > 0 ? ApiCode.Success : ApiCode.NothingToDo, result.ret > 0 ? "OK" : "No Attribute save", result.exceptions));
}
}
......
......@@ -28,13 +28,13 @@ namespace IoTSharp.Extensions
/// </summary>
/// <typeparam name="L">Latest</typeparam>
/// <param name="data"></param>
/// <param name="device"></param>
/// <param name="dataSide"></param>
/// <param name="deviceId"></param>
/// <param name="_context"></param>
/// <returns></returns>
internal static async Task<(int ret, Dic exceptions)> SaveAsync<L>(this ApplicationDbContext _context, Dictionary<string, object> data, Device device, DataSide dataSide) where L : DataStorage, new()
internal static async Task<(int ret, Dic exceptions)> SaveAsync<L>(this ApplicationDbContext _context, Dictionary<string, object> data, Guid deviceId, DataSide dataSide) where L : DataStorage, new()
{
Dic exceptions = _context.PreparingData<L>(data, device, dataSide);
Dic exceptions = _context.PreparingData<L>(data, deviceId, dataSide);
int ret = await _context.SaveChangesAsync();
return (ret, exceptions);
}
......@@ -44,15 +44,15 @@ namespace IoTSharp.Extensions
/// <typeparam name="L"></typeparam>
/// <param name="_context"></param>
/// <param name="data"></param>
/// <param name="device"></param>
/// <param name="deviceId"></param>
/// <param name="dataSide"></param>
/// <returns></returns>
internal static Dic PreparingData<L>(this ApplicationDbContext _context, Dictionary<string, object> data, Device device, DataSide dataSide)
internal static Dic PreparingData<L>(this ApplicationDbContext _context, Dictionary<string, object> data, Guid deviceId, DataSide dataSide)
where L : DataStorage, new()
{
Dic exceptions = new Dic();
var devdata = from tx in _context.Set<L>() where tx.DeviceId == device.Id select tx;
var devdata = from tx in _context.Set<L>() where tx.DeviceId == deviceId select tx;
data.ToList().ForEach(kp =>
{
try
......@@ -66,7 +66,7 @@ namespace IoTSharp.Extensions
}
else
{
var t2 = new L() { DateTime = DateTime.Now, DeviceId = device.Id, KeyName = kp.Key, DataSide = dataSide };
var t2 = new L() { DateTime = DateTime.Now, DeviceId = deviceId, KeyName = kp.Key, DataSide = dataSide };
t2.Catalog = (typeof(L) == typeof(AttributeLatest) ? DataCatalog.AttributeLatest
: ((typeof(L) == typeof(TelemetryLatest) ? DataCatalog.TelemetryLatest
: 0)));
......
......@@ -29,7 +29,7 @@ namespace IoTSharp.Extensions
});
Dictionary<string, object> pairs = new Dictionary<string, object>();
pairs.Add("CreateDateTime", DateTime.Now);
_context.PreparingData<AttributeLatest>(pairs, device, DataSide.ServerSide);
_context.PreparingData<AttributeLatest>(pairs, device.Id, DataSide.ServerSide);
}
}
public static void AfterCreateDevice(this ApplicationDbContext _context, Device device,string username,string password)
......@@ -49,7 +49,7 @@ namespace IoTSharp.Extensions
}) ;
Dictionary<string, object> pairs = new Dictionary<string, object>();
pairs.Add("CreateDateTime", DateTime.Now);
_context.PreparingData<AttributeLatest>(pairs, device, DataSide.ServerSide);
_context.PreparingData<AttributeLatest>(pairs, device.Id, DataSide.ServerSide);
}
}
}
......
......@@ -24,25 +24,25 @@
https://github.com/chkr1011/MQTTnet/blob/master/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
</summary>
</member>
<member name="M:IoTSharp.Extensions.DataExtension.SaveAsync``1(IoTSharp.Data.ApplicationDbContext,System.Collections.Generic.Dictionary{System.String,System.Object},IoTSharp.Data.Device,IoTSharp.Data.DataSide)">
<member name="M:IoTSharp.Extensions.DataExtension.SaveAsync``1(IoTSharp.Data.ApplicationDbContext,System.Collections.Generic.Dictionary{System.String,System.Object},System.Guid,IoTSharp.Data.DataSide)">
<summary>
Save Data to Device's and <typeparamref name="L"/>
</summary>
<typeparam name="L">Latest</typeparam>
<param name="data"></param>
<param name="device"></param>
<param name="dataSide"></param>
<param name="deviceId"></param>
<param name="_context"></param>
<returns></returns>
</member>
<member name="M:IoTSharp.Extensions.DataExtension.PreparingData``1(IoTSharp.Data.ApplicationDbContext,System.Collections.Generic.Dictionary{System.String,System.Object},IoTSharp.Data.Device,IoTSharp.Data.DataSide)">
<member name="M:IoTSharp.Extensions.DataExtension.PreparingData``1(IoTSharp.Data.ApplicationDbContext,System.Collections.Generic.Dictionary{System.String,System.Object},System.Guid,IoTSharp.Data.DataSide)">
<summary>
Preparing Data to Device's <typeparamref name="L"/>
</summary>
<typeparam name="L"></typeparam>
<param name="_context"></param>
<param name="data"></param>
<param name="device"></param>
<param name="deviceId"></param>
<param name="dataSide"></param>
<returns></returns>
</member>
......@@ -233,6 +233,12 @@
查找类似 The certificate is installed 的本地化字符串。
</summary>
</member>
<member name="T:IoTSharp.Storage.ShardingStorage">
<summary>
由于此模式目前无法通过EFCore.Sharding 进行Group By 获取最新遥测数据, 和Select 新对象, 所以, 最新遥测依然从DataStorage表里获取,历史从分表里获取
更多内容可以参考<seealso cref="!:https://github.com/Coldairarrow/EFCore.Sharding/issues/52"/>
</summary>
</member>
<member name="T:LiteQueue.LiteQueue`1">
<summary>
Uses LiteDB to provide a persisted, thread safe, (optionally) transactional, FIFO queue.
......
......@@ -58,7 +58,7 @@ namespace IoTSharp.Jobs
{
case DataCatalog.AttributeData:
var result2 = await _dbContext.SaveAsync<AttributeLatest>(msg.MsgBody, device, msg.DataSide);
var result2 = await _dbContext.SaveAsync<AttributeLatest>(msg.MsgBody, device.Id, msg.DataSide);
result2.exceptions?.ToList().ForEach(ex =>
{
_logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject( msg.MsgBody[ex.Key])}");
......@@ -68,12 +68,7 @@ namespace IoTSharp.Jobs
case DataCatalog.TelemetryData:
bool sta= await _storage.StoreTelemetryAsync(msg);
var result1 = await _dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, device, msg.DataSide);
result1.exceptions?.ToList().ForEach(ex =>
{
_logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}");
});
_logger.LogInformation($"新增{device.Name}({device.Id})遥测数据{sta},更新最新信息{result1.ret}");
_logger.LogInformation($"新增{device.Name}({device.Id})遥测数据{sta}");
break;
default:
break;
......
......@@ -161,7 +161,7 @@ namespace IoTSharp
config.AddDataSource(Configuration.GetConnectionString("TelemetryStorage"), ReadWriteType.Read | ReadWriteType.Write, settings.Sharding.DatabaseType)
.SetDateSharding<TelemetryData>(nameof(TelemetryData.DateTime), settings.Sharding.ExpandByDateMode, DateTime.MinValue);
});
services.AddSingleton<IStorage, ShardingStorage>();
break;
case TelemetryStorage.SingleTable:
services.AddSingleton<IStorage, EFStorage>();
......
......@@ -29,7 +29,33 @@ namespace IoTSharp.Storage
scope = scopeFactor.CreateScope();
}
public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId)
{
using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var devid = from fx in ( from t in _context.TelemetryData where t.DeviceId == deviceId orderby t.DateTime group t by t.KeyName into g
select new { KeyName = g.Key, td =g.OrderByDescending(x=>x.DateTime).Take(1).First()})
select new TelemetryDataDto() { DateTime = fx.td.DateTime, KeyName = fx.KeyName, Value = fx.td.ToObject() };
return devid.ToListAsync();
}
}
public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string keys)
{
using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var devid = from fx in (from t in _context.TelemetryData
where t.DeviceId == deviceId &&
keys.Split(',', ' ', ';').Contains(t.KeyName)
orderby t.DateTime
group t by t.KeyName into g
select new { KeyName = g.Key, td = g.OrderByDescending(x => x.DateTime).Take(1).First() })
select new TelemetryDataDto() { DateTime = fx.td.DateTime, KeyName = fx.KeyName, Value = fx.td.ToObject() };
return devid.ToListAsync();
}
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin)
{
......
......@@ -12,7 +12,8 @@ namespace IoTSharp.Storage
public interface IStorage
{
Task<bool> StoreTelemetryAsync(RawMsg msg);
Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId);
Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string keys);
Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keyName, DateTime begin);
Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keyName, DateTime begin, DateTime end);
Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin);
......
using EFCore.Sharding;
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Extensions;
using IoTSharp.Queue;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Storage
{
/// <summary>
/// 由于此模式目前无法通过EFCore.Sharding 进行Group By 获取最新遥测数据, 和Select 新对象, 所以, 最新遥测依然从DataStorage表里获取,历史从分表里获取
/// 更多内容可以参考<seealso cref="https://github.com/Coldairarrow/EFCore.Sharding/issues/52"/>
/// </summary>
public class ShardingStorage : IStorage
{
private readonly AppSettings _appSettings;
private readonly ILogger _logger;
private readonly IServiceScope scope;
public ShardingStorage(ILogger<ShardingStorage> logger, IServiceScopeFactory scopeFactor
, IOptions<AppSettings> options
)
{
_appSettings = options.Value;
_logger = logger;
scope = scopeFactor.CreateScope();
}
public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId)
{
using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var devid = from fx in (from t in _context.TelemetryData
where t.DeviceId == deviceId
orderby t.DateTime
group t by t.KeyName into g
select new { KeyName = g.Key, td = g.OrderByDescending(x => x.DateTime).Take(1).First() })
select new TelemetryDataDto() { DateTime = fx.td.DateTime, KeyName = fx.KeyName, Value = fx.td.ToObject() };
return devid.ToListAsync();
}
}
public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string keys)
{
using (var _context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var devid = from fx in (from t in _context.TelemetryData
where t.DeviceId == deviceId &&
keys.Split(',', ' ', ';').Contains(t.KeyName)
orderby t.DateTime
group t by t.KeyName into g
select new { KeyName = g.Key, td = g.OrderByDescending(x => x.DateTime).Take(1).First() })
select new TelemetryDataDto() { DateTime = fx.td.DateTime, KeyName = fx.KeyName, Value = fx.td.ToObject() };
return devid.ToListAsync();
}
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin)
{
return LoadTelemetryAsync(deviceId, keys, begin, DateTime.Now);
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin, DateTime end)
{
return Task.Run(() =>
{
using (var _context = scope.ServiceProvider.GetRequiredService<IShardingDbAccessor>())
{
var lst = new List<TelemetryDataDto>();
var kv = _context.GetIShardingQueryable<TelemetryData>()
.Where(t => t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin && t.DateTime < end)
.ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
return kv.ToList();
}
});
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin)
{
return LoadTelemetryAsync(deviceId, begin, DateTime.Now);
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end)
{
return Task.Run(() =>
{
using (var _context = scope.ServiceProvider.GetRequiredService<IShardingDbAccessor>())
{
var lst = new List<TelemetryDataDto>();
var kv = _context.GetIShardingQueryable<TelemetryData>()
.Where(t => t.DeviceId == deviceId && t.DateTime >= begin && t.DateTime < end)
.ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
return kv.ToList();
}
});
}
public async Task<bool> StoreTelemetryAsync(RawMsg msg)
{
bool result = false;
try
{
using (var db = scope.ServiceProvider.GetService<IShardingDbAccessor>())
{
var lst = new List<TelemetryData>();
msg.MsgBody.ToList().ForEach(kp =>
{
var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) };
tdata.FillKVToMe(kp);
lst.Add(tdata);
});
int ret = db.Insert(lst);
_logger.LogInformation($"新增({msg.DeviceId})遥测数据{ret}");
}
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var result1 = await _dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, msg.DeviceId, msg.DataSide);
result1.exceptions?.ToList().ForEach(ex =>
{
_logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}");
});
_logger.LogInformation($"新增({msg.DeviceId})遥测数据更新最新信息{result1.ret}");
result = true;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} ");
}
return result;
}
}
}
\ No newline at end of file
......@@ -7,7 +7,7 @@
}
},
"ConnectionStrings": {
"IoTSharp": "Server=localhost;Database=IoTSharp;Username=postgres;Password=future;"
"IoTSharp": "Server=10.165.83.194;Database=IoTSharp;Username=postgres;Password=future;"
},
"JwtKey": "kissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissme",
"JwtExpireHours": 24,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册