提交 a6251443 编写于 作者: 麦壳饼's avatar 麦壳饼

优化数据入库

上级 b8ff0a96
......@@ -61,6 +61,7 @@ namespace IoTSharp
public ShardingSetting Sharding { get; set; } = new ShardingSetting();
public EventBusStore EventBusStore { get; set; } = EventBusStore.InMemory;
public EventBusMQ EventBusMQ { get; set; } = EventBusMQ.InMemory;
public int ConsumerThreadCount { get; set; }
}
public class ShardingSetting
......
......@@ -395,10 +395,8 @@ namespace IoTSharp.Controllers
{
return NotFound(new ApiResult<Guid>(ApiCode.NotFoundDevice, $"Device {id} not found ", id));
}
_context.Device.Remove(device);
await _context.SaveChangesAsync();
return device;
}
......@@ -475,7 +473,7 @@ namespace IoTSharp.Controllers
else
{
var result = await _context.SaveAsync<TelemetryLatest>(telemetrys, device.Id, DataSide.ClientSide);
return Ok(new ApiResult<Dic>(result.ret > 0 ? ApiCode.Success : ApiCode.NothingToDo, result.ret > 0 ? "OK" : "No Telemetry save", result.exceptions));
return Ok(new ApiResult<Dic>(result.ret > 0 ? ApiCode.Success : ApiCode.NothingToDo, result.ret > 0 ? "OK" : "No Telemetry save", new Dic( result.exceptions?.Select(f=> new DicKV(f.Key,f.Value.Message)))));
}
}
......@@ -538,7 +536,7 @@ namespace IoTSharp.Controllers
else
{
var result = await _context.SaveAsync<AttributeLatest>(attributes, dev.Id, DataSide.ClientSide);
return Ok(new ApiResult<Dic>(result.ret > 0 ? ApiCode.Success : ApiCode.NothingToDo, result.ret > 0 ? "OK" : "No Attribute save", result.exceptions));
return Ok(new ApiResult<Dic>(result.ret > 0 ? ApiCode.Success : ApiCode.NothingToDo, result.ret > 0 ? "OK" : "No Attribute save", new Dic(result.exceptions?.Select(f => new DicKV(f.Key, f.Value.Message)))));
}
}
}
......
......@@ -10,7 +10,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Dic = System.Collections.Generic.Dictionary<string, string>;
using Dic = System.Collections.Generic.Dictionary<string, System.Exception>;
namespace IoTSharp.Extensions
{
......@@ -52,22 +52,23 @@ namespace IoTSharp.Extensions
{
Dic exceptions = new Dic();
var devdata = from tx in _context.Set<L>() where tx.DeviceId == deviceId select tx;
data.ToList().ForEach(kp =>
{
try
{
if (kp.Key != null && kp.Value !=null)
{
var devdata = from tx in _context.Set<L>() where tx.DeviceId == deviceId select tx;
var tl = from tx in devdata where tx.KeyName == kp.Key && tx.DataSide == dataSide select tx;
if (tl.Any())
{
var tx = tl.First();
tx.FillKVToMe(kp);
tx.DateTime = DateTime.Now;
_context.Set<L>().Update(tx).State= EntityState.Modified;
_context.Set<L>().Update(tx).State = EntityState.Modified;
}
else
else
{
var t2 = new L() { DateTime = DateTime.Now, DeviceId = deviceId, KeyName = kp.Key, DataSide = dataSide };
t2.Catalog = (typeof(L) == typeof(AttributeLatest) ? DataCatalog.AttributeLatest
......@@ -79,12 +80,12 @@ namespace IoTSharp.Extensions
}
else
{
exceptions.Add($"Key:{ kp.Key}","Key is null or value is null");
exceptions.Add($"Key:{ kp.Key}",new Exception( "Key is null or value is null"));
}
}
catch (Exception ex)
{
exceptions.Add(kp.Key, ex.Message);
exceptions.Add(kp.Key, ex);
}
});
return exceptions;
......
......@@ -3,6 +3,7 @@ using IoTSharp.Data;
using IoTSharp.Extensions;
using IoTSharp.Queue;
using IoTSharp.Storage;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
......@@ -65,6 +66,6 @@ namespace IoTSharp.Handlers
});
}
[CapSubscribe("iotsharp.services.datastream.telemetrydata")]
public void StoreTelemetryData(RawMsg msg) => Task.Run(async () => await _storage.StoreTelemetryAsync(msg));
public void StoreTelemetryData(RawMsg msg) => Task.Run( () => _storage.StoreTelemetryAsync(msg));
}
}
......@@ -317,72 +317,75 @@ namespace IoTSharp.Handlers
public static string MD5Sum(string text) => BitConverter.ToString(MD5.Create().ComputeHash(Encoding.UTF8.GetBytes(text))).Replace("-", "");
internal void Server_ClientConnectionValidator(object sender, MqttServerClientConnectionValidatorEventArgs e)
{
using (var _dbContextcv = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
Task.Run(async () =>
{
MqttConnectionValidatorContext obj = e.Context;
Uri uri = new Uri("mqtt://" + obj.Endpoint);
if (uri.IsLoopback && !string.IsNullOrEmpty(e.Context.ClientId) && e.Context.ClientId == _mcsetting.MqttBroker && !string.IsNullOrEmpty(e.Context.Username) && e.Context.Username == _mcsetting.UserName && e.Context.Password == _mcsetting.Password)
using (var _dbContextcv = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.Success;
}
else
{
_logger.LogInformation($"ClientId={obj.ClientId},Endpoint={obj.Endpoint},Username={obj.Username},Password={obj.Password},WillMessage={obj.WillMessage?.ConvertPayloadToString()}");
var mcr = _dbContextcv.DeviceIdentities.Include(d => d.Device).FirstOrDefault(mc =>
(mc.IdentityType == IdentityType.AccessToken && mc.IdentityId == obj.Username) ||
(mc.IdentityType == IdentityType.DevicePassword && mc.IdentityId == obj.Username && mc.IdentityValue == obj.Password));
if (mcr != null)
MqttConnectionValidatorContext obj = e.Context;
Uri uri = new Uri("mqtt://" + obj.Endpoint);
if (uri.IsLoopback && !string.IsNullOrEmpty(e.Context.ClientId) && e.Context.ClientId == _mcsetting.MqttBroker && !string.IsNullOrEmpty(e.Context.Username) && e.Context.Username == _mcsetting.UserName && e.Context.Password == _mcsetting.Password)
{
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.Success;
}
else
{
try
_logger.LogInformation($"ClientId={obj.ClientId},Endpoint={obj.Endpoint},Username={obj.Username},Password={obj.Password},WillMessage={obj.WillMessage?.ConvertPayloadToString()}");
var mcr = await _dbContextcv.DeviceIdentities.Include(d => d.Device).FirstOrDefaultAsync(mc =>
(mc.IdentityType == IdentityType.AccessToken && mc.IdentityId == obj.Username) ||
(mc.IdentityType == IdentityType.DevicePassword && mc.IdentityId == obj.Username && mc.IdentityValue == obj.Password));
if (mcr != null)
{
var device = mcr.Device;
if (!Devices.ContainsKey(e.Context.ClientId))
try
{
Devices.Add(e.Context.ClientId, device);
var device = mcr.Device;
if (!Devices.ContainsKey(e.Context.ClientId))
{
Devices.Add(e.Context.ClientId, device);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "ConnectionRefusedServerUnavailable {0}", ex.Message);
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.ServerUnavailable;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "ConnectionRefusedServerUnavailable {0}", ex.Message);
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.ServerUnavailable;
}
}
else if (_dbContextcv.AuthorizedKeys.Any(ak=>ak.AuthToken==obj.Password))
{
var ak = _dbContextcv.AuthorizedKeys.Include(ak => ak.Customer).Include(ak => ak.Tenant).Include(ak => ak.Devices).First(ak => ak.AuthToken == obj.Password);
if (!ak.Devices.Any(dev => dev.Name == obj.Username))
else if (_dbContextcv.AuthorizedKeys.Any(ak => ak.AuthToken == obj.Password))
{
var ak = await _dbContextcv.AuthorizedKeys.Include(ak => ak.Customer).Include(ak => ak.Tenant).Include(ak => ak.Devices).FirstAsync(ak => ak.AuthToken == obj.Password);
if (!ak.Devices.Any(dev => dev.Name == obj.Username))
{
var devvalue = new Device() { Name = obj.Username, DeviceType = DeviceType.Device };
devvalue.Tenant = ak.Tenant;
devvalue.Customer = ak.Customer;
_dbContextcv.Device.Add(devvalue);
ak.Devices.Add(devvalue);
_dbContextcv.AfterCreateDevice(devvalue,obj.Username,obj.Password);
_dbContextcv.SaveChanges();
}
var mcp = _dbContextcv.DeviceIdentities.Include(d => d.Device).FirstOrDefault(mc => mc.IdentityType == IdentityType.DevicePassword && mc.IdentityId == obj.Username && mc.IdentityValue == obj.Password);
if (mcp != null)
{
if (!Devices.ContainsKey(e.Context.ClientId))
var devvalue = new Device() { Name = obj.Username, DeviceType = DeviceType.Device };
devvalue.Tenant = ak.Tenant;
devvalue.Customer = ak.Customer;
await _dbContextcv.Device.AddAsync(devvalue);
ak.Devices.Add(devvalue);
_dbContextcv.AfterCreateDevice(devvalue, obj.Username, obj.Password);
await _dbContextcv.SaveChangesAsync();
}
var mcp = await _dbContextcv.DeviceIdentities.Include(d => d.Device).FirstOrDefaultAsync(mc => mc.IdentityType == IdentityType.DevicePassword && mc.IdentityId == obj.Username && mc.IdentityValue == obj.Password);
if (mcp != null)
{
if (!Devices.ContainsKey(e.Context.ClientId))
{
Devices.Add(e.Context.ClientId, mcp.Device);
}
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.Success;
}
else
{
Devices.Add(e.Context.ClientId, mcp.Device);
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
}
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.Success;
}
else
{
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
_logger.LogInformation($"Bad username or password {obj.Username},connection {obj.Endpoint} refused");
}
}
else
{
obj.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
_logger.LogInformation($"Bad username or password {obj.Username},connection {obj.Endpoint} refused");
}
}
}
});
}
......
......@@ -54,8 +54,8 @@
<PackageReference Include="DotNetCore.CAP.MongoDB" Version="3.1.1" />
<PackageReference Include="DotNetCore.CAP.PostgreSql" Version="3.1.1" />
<PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="3.1.1" />
<PackageReference Include="EFCore.Sharding" Version="3.1.8.1" />
<PackageReference Include="EFCore.Sharding.PostgreSql" Version="3.1.8.1" />
<PackageReference Include="EFCore.Sharding" Version="3.1.8.2" />
<PackageReference Include="EFCore.Sharding.PostgreSql" Version="3.1.8.2" />
<PackageReference Include="IoTSharp.CoAP.NET" Version="2.0.8" />
<PackageReference Include="IoTSharp.X509Extensions" Version="1.4.19" />
<PackageReference Include="kimbus" Version="2.0.1" />
......
......@@ -44,6 +44,7 @@ using EFCore.Sharding;
using IoTSharp.Storage;
using DotNetCore.CAP.Dashboard.NodeDiscovery;
using Savorboard.CAP.InMemoryMessageQueue;
using System.Diagnostics;
namespace IoTSharp
{
......@@ -72,8 +73,7 @@ namespace IoTSharp
setting.MqttBroker = settings.MqttBroker;
setting.MqttClient = settings.MqttClient;
}));
services.AddDbContext<ApplicationDbContext>(options => options.UseNpgsql(Configuration.GetConnectionString("IoTSharp"))
, ServiceLifetime.Transient);
services.AddDbContextPool<ApplicationDbContext>(options => options.UseNpgsql(Configuration.GetConnectionString("IoTSharp")) , poolSize:2048 );
services.AddIdentity<IdentityUser, IdentityRole>()
.AddRoles<IdentityRole>()
.AddRoleManager<RoleManager<IdentityRole>>()
......@@ -188,7 +188,7 @@ namespace IoTSharp
services.AddCap(x =>
{
x.ConsumerThreadCount = settings.ConsumerThreadCount <=0? Environment.ProcessorCount: settings.ConsumerThreadCount;
switch (settings.EventBusStore)
{
case EventBusStore.PostgreSql:
......@@ -281,7 +281,7 @@ namespace IoTSharp
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
endpoints.MapMqtt("/mqtt");
endpoints.MapMqtt("/mqtt");
});
app.UseSwaggerUi3();
app.UseOpenApi();
......
......@@ -23,6 +23,7 @@ namespace IoTSharp.Storage
private readonly AppSettings _appSettings;
private readonly ILogger _logger;
private readonly IServiceScope scope;
private readonly IServiceScopeFactory _scopeFactor;
private readonly ApplicationDbContext _context;
public ShardingStorage(ILogger<ShardingStorage> logger, IServiceScopeFactory scopeFactor
......@@ -32,6 +33,7 @@ namespace IoTSharp.Storage
_appSettings = options.Value;
_logger = logger;
scope = scopeFactor.CreateScope();
_scopeFactor = scopeFactor;
_context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
}
......@@ -63,13 +65,16 @@ namespace IoTSharp.Storage
{
return Task.Run(() =>
{
using (var _context = scope.ServiceProvider.GetService<IShardingDbAccessor>())
using (var _scope = _scopeFactor.CreateScope())
{
var lst = new List<TelemetryDataDto>();
var kv = _context.GetIShardingQueryable<TelemetryData>()
.Where(t => t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin && t.DateTime < end)
.ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
return kv.ToList();
using (var _context = _scope.ServiceProvider.GetService<IShardingDbAccessor>())
{
var lst = new List<TelemetryDataDto>();
var kv = _context.GetIShardingQueryable<TelemetryData>()
.Where(t => t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin && t.DateTime < end)
.ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
return kv.ToList();
}
}
});
}
......@@ -99,30 +104,33 @@ namespace IoTSharp.Storage
bool result = false;
try
{
using (var db = scope.ServiceProvider.GetService<IShardingDbAccessor>())
using (var _scope = _scopeFactor.CreateScope())
{
var lst = new List<TelemetryData>();
msg.MsgBody.ToList().ForEach(kp =>
{
if (kp.Value != null)
using (var db = _scope.ServiceProvider.GetService<IShardingDbAccessor>())
{
var lst = new List<TelemetryData>();
msg.MsgBody.ToList().ForEach(kp =>
{
var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) };
tdata.FillKVToMe(kp);
lst.Add(tdata);
}
});
int ret = db.Insert(lst);
_logger.LogInformation($"新增({msg.DeviceId})遥测数据{ret}");
}
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var result1 = await _dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, msg.DeviceId, msg.DataSide);
result1.exceptions?.ToList().ForEach(ex =>
if (kp.Value != null)
{
var tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) };
tdata.FillKVToMe(kp);
lst.Add(tdata);
}
});
int ret = await db.InsertAsync(lst);
_logger.LogInformation($"新增({msg.DeviceId})遥测数据{ret}");
}
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
_logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}");
});
_logger.LogInformation($"新增({msg.DeviceId})遥测数据更新最新信息{result1.ret}");
result = true;
var result1 = await _dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, msg.DeviceId, msg.DataSide);
result1.exceptions?.ToList().ForEach(ex =>
{
_logger.LogError(ex.Value, $"{ex.Key} {ex.Value.Message} {ex.Value.InnerException?.Message}");
});
_logger.LogInformation($"新增({msg.DeviceId})遥测数据更新最新信息{result1.ret}");
result = true;
}
}
}
catch (Exception ex)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册