提交 cc60ed91 编写于 作者: 麦壳饼's avatar 麦壳饼

新版本加入taos 的时序数据库支持

上级 d538dc47
......@@ -15,7 +15,8 @@ namespace IoTSharp
public enum TelemetryStorage
{
SingleTable,
Sharding
Sharding,
Taos
}
public class AppSettings
{
......@@ -31,7 +32,7 @@ namespace IoTSharp
/// mqtt client settings
/// </summary>
public MqttClientSetting MqttClient { get; set; }
public Dictionary<string, string> ConnectionStrings { get; set; }
public CoapConfig CoapServer { get; set; } = new CoapConfig();
public ModBusServerSetting ModBusServer { get; set; } = new ModBusServerSetting();
......
......@@ -41,12 +41,13 @@
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="3.1.2" />
<PackageReference Include="AspNetCore.HealthChecks.Network" Version="3.1.1" />
<PackageReference Include="AspNetCore.HealthChecks.NpgSql" Version="3.1.1" />
<PackageReference Include="EFCore.Sharding" Version="3.1.6.9" />
<PackageReference Include="EFCore.Sharding.PostgreSql" Version="3.1.6.9" />
<PackageReference Include="EFCore.Sharding" Version="3.1.6.12" />
<PackageReference Include="EFCore.Sharding.PostgreSql" Version="3.1.6.12" />
<PackageReference Include="IoTSharp.CoAP.NET" Version="2.0.8" />
<PackageReference Include="IoTSharp.X509Extensions" Version="1.4.9" />
<PackageReference Include="kimbus" Version="2.0.1" />
<PackageReference Include="LiteDB" Version="5.0.9" />
<PackageReference Include="Maikebing.Data.Taos" Version="2.0.128" />
<PackageReference Include="Microsoft.AspNetCore.Http.Features" Version="3.1.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="3.1.7">
<PrivateAssets>all</PrivateAssets>
......
......@@ -21,7 +21,6 @@ using System.Threading.Tasks;
namespace IoTSharp.Jobs
{
[DisallowConcurrentExecution]
[SilkierQuartz(0, "PushData", "Push Iot Message Data to DataBase ", TriggerGroup = "Data")]
public class PushData : IJob
{
......
......@@ -166,6 +166,9 @@ namespace IoTSharp
case TelemetryStorage.SingleTable:
services.AddSingleton<IStorage, EFStorage>();
break;
case TelemetryStorage.Taos:
services.AddSingleton<IStorage, TaosStorage>();
break;
default:
break;
}
......
using EFCore.Sharding;
using IoTSharp.Data;
using IoTSharp.Dtos;
using IoTSharp.Extensions;
using IoTSharp.Queue;
using Maikebing.Data.Taos;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Org.BouncyCastle.Utilities.Encoders;
using Silkier;
using Silkier.EFCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.Storage
{
public class TaosStorage : IStorage
{
private readonly AppSettings _appSettings;
private readonly ILogger _logger;
private readonly IServiceScope scope;
private readonly TaosConnectionStringBuilder _taosBuilder;
public TaosStorage(ILogger<TaosStorage> logger, IServiceScopeFactory scopeFactor
, IOptions<AppSettings> options
)
{
_appSettings = options.Value;
_logger = logger;
scope = scopeFactor.CreateScope();
_taosBuilder = new TaosConnectionStringBuilder(_appSettings.ConnectionStrings["Taos"]);
}
private bool dbisok = false;
private bool CheckDataBase()
{
if (!dbisok)
{
dbisok = Retry.RetryOnAny(10, f =>
{
using (TaosConnection _taos = new TaosConnection(_taosBuilder.ConnectionString))
{
_taos.Open();
_taos.CreateCommand($"CREATE DATABASE IF NOT EXISTS {_taosBuilder.DataBase} KEEP 365 DAYS 10 BLOCKS 4;").ExecuteNonQuery();
_taos.ChangeDatabase(_taosBuilder.DataBase);
_taos.CreateCommand("CREATE TABLE IF NOT EXISTS telemetrydata (ts timestamp,value_type tinyint, value_boolean bool, value_string binary(10240), value_long bigint,value_datetime timestamp,value_double double) TAGS (deviceid binary(32),keyname binary(64));")
.ExecuteNonQuery();
dbisok = true;
_taos.Close();
}
return true;
}, ef =>
{
_logger.LogError(ef.ex, $"CheckDataBase第{ef.current}次失败{ef.ex.Message} {ef.ex.InnerException?.Message} ");
});
}
return dbisok;
}
public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId)
{
using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString))
{
string sql = $"select last_row(*) from telemetrydata where deviceid='{deviceId:N}' group by deviceid,keyname";
List<TelemetryDataDto> dt = SqlToTDD(db, sql, "last_row(", ")", string.Empty);
return Task.FromResult(dt);
}
}
private List<TelemetryDataDto> SqlToTDD(TaosConnection db, string sql, string prefix, string suffix, string keyname)
{
List<TelemetryDataDto> dt = new List<TelemetryDataDto>();
TaosDataReader dataReader = db.CreateCommand(sql).ExecuteReader();
while (dataReader.Read())
{
TelemetryDataDto telemetry = new TelemetryDataDto();
byte datatype = (byte)dataReader[dataReader.GetOrdinal($"{prefix}value_type{suffix}")];
if (string.IsNullOrEmpty(keyname))
{
telemetry.KeyName = dataReader.GetString(dataReader.GetOrdinal("keyname"));
}
else
{
telemetry.KeyName = keyname;
}
telemetry.DateTime = dataReader.GetDateTime(dataReader.GetOrdinal($"{prefix}ts{suffix}"));
switch ((DataType)datatype)
{
case DataType.Boolean:
telemetry.Value = dataReader.GetBoolean(dataReader.GetOrdinal($"{prefix}value_boolean{suffix}"));
break;
case DataType.String:
telemetry.Value = dataReader.GetString(dataReader.GetOrdinal($"{prefix}value_string{suffix}"));
break;
case DataType.Long:
telemetry.Value = dataReader.GetInt64(dataReader.GetOrdinal($"{prefix}value_long{suffix}"));
break;
case DataType.Double:
telemetry.Value = dataReader.GetDouble(dataReader.GetOrdinal($"{prefix}value_double{suffix}"));
break;
case DataType.Json:
case DataType.XML:
case DataType.Binary:
telemetry.Value = dataReader.GetString(dataReader.GetOrdinal($"{prefix}value_string{suffix}"));
break;
case DataType.DateTime:
telemetry.Value = dataReader.GetDateTime(dataReader.GetOrdinal($"{prefix}value_datetime{suffix}"));
break;
default:
break;
}
dt.Add(telemetry);
}
return dt;
}
public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string keys)
{
using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString))
{
IEnumerable<string> kvs = from k in keys
select $" keyname = '{k}' ";
string sql = $"select last_row(*) from telemetrydata where deviceid='{deviceId:N}' and ({string.Join("or", kvs) }) group by deviceid,keyname";
List<TelemetryDataDto> dt = SqlToTDD(db, sql, "last_row(", ")", string.Empty);
return Task.FromResult(dt);
}
}
private List<TelemetryDataDto> SQLToDTByDate(DateTime begin, DateTime end, TaosConnection db, string sql)
{
List<TelemetryDataDto> dt = new List<TelemetryDataDto>();
List<(string tbname, string keyname)> list = db.CreateCommand(sql).ExecuteReader().ToList<(string tbname, string keyname)>();
foreach ((string tbname, string keyname) item in list)
{
string susql = $" select * from {item.tbname} where ts >={begin:yyyy-MM-dd HH:mm:ss.fff} and ts <={end:yyyy-MM-dd HH:mm:ss.fff}";
List<TelemetryDataDto> dtx = SqlToTDD(db, susql, "", "", item.keyname);
dt.AddRange(dtx);
}
return dt;
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin)
{
return LoadTelemetryAsync(deviceId, keys, begin, DateTime.Now);
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin, DateTime end)
{
using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString))
{
IEnumerable<string> kvs = from k in keys
select $" keyname = '{k}' ";
string sql = $"select tbname,keyname from telemetrydata where deviceid='{deviceId:N}' and ({string.Join("or", kvs) }) ";
List<TelemetryDataDto> dt = SQLToDTByDate(begin, end, db, sql);
return Task.FromResult(dt);
}
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin)
{
return LoadTelemetryAsync(deviceId, begin, DateTime.Now);
}
public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end)
{
using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString))
{
string sql = $"select tbname,keyname from telemetrydata where deviceid='{deviceId:N}'";
List<TelemetryDataDto> dt = SQLToDTByDate(begin, end, db, sql);
return Task.FromResult(dt);
}
}
public async Task<bool> StoreTelemetryAsync(RawMsg msg)
{
bool result = false;
try
{
CheckDataBase();
using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString))
{
List<string> lst = new List<string>();
msg.MsgBody.ToList().ForEach(kp =>
{
if (kp.Value != null)
{
TelemetryData tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) };
tdata.FillKVToMe(kp);
string _type = "";
string _value = "";
// value_boolean bool, value_string binary(4096), value_long bigint,value_datetime timestamp,value_double double,value_json binary(4096) ,value_xml binary
switch (tdata.Type)
{
case DataType.Boolean:
_type = "value_boolean";
_value = tdata.Value_Boolean.ToString().ToLower();
break;
case DataType.String:
_type = "value_string";
_value = $"'{tdata.Value_String?.Replace("'", "\\'")}'";
break;
case DataType.Long:
_type = "value_long";
_value = $"{tdata.Value_Long}";
break;
case DataType.Double:
_type = "value_double";
_value = $"{tdata.Value_Double}";
break;
case DataType.Json://td 一条记录16kb , 因此为了写更多数据, 我们json xml binary 全部使用 string
_type = "value_string";
_value = $"'{tdata.Value_Json?.Replace("'", "\\'")}'";
break;
case DataType.XML:
_type = "value_string";
_value = $"'{tdata.Value_XML?.Replace("'", "\\'")}'";
break;
case DataType.Binary:
_type = "value_string";
_value = $"\"{Hex.ToHexString(tdata.Value_Binary)}\"";
break;
case DataType.DateTime:
_type = "value_datetime";
_value = $"{tdata.Value_DateTime.Subtract(new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalMilliseconds}";
break;
default:
break;
}
string vals = $"device_{tdata.DeviceId:N}_{tdata.KeyName} USING telemetrydata TAGS('{tdata.DeviceId:N}','{tdata.KeyName}') (ts,value_type,{_type}) values (now,{(int)tdata.Type},{_value})";
lst.Add(vals);
}
});
await Retry.RetryOnAny(10, async f =>
{
db.Open();
int dt = await db.CreateCommand($"INSERT INTO {string.Join("\r\n", lst)}").ExecuteNonQueryAsync();
db.Close();
_logger.LogInformation($"数据入库完成,共数据{lst.Count}条,写入{dt}条");
}, ef =>
{
_logger.LogError(ef.ex, $"{msg.DeviceId}数据处理第{ef.current}次失败{ef.ex.Message} {ef.ex.InnerException?.Message} ");
});
}
}
catch (TaosException ex)
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.ErrorCode}-{ex.Message} {ex.InnerException?.Message}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} ");
}
return result;
}
}
}
\ No newline at end of file
......@@ -7,8 +7,10 @@
}
},
"ConnectionStrings": {
"IoTSharp": "Server=10.165.83.194;Database=IoTSharp;Username=postgres;Password=future;"
"IoTSharp": "Server=localhost;Database=IoTSharp;Username=postgres;Password=future;",
"Taos": "Data Source=taos;DataBase=IoTSharp;Username=root;Password=taosdata;Port=6030"
},
"TelemetryStorage": "Taos",
"JwtKey": "kissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissme",
"JwtExpireHours": 24,
"JwtIssuer": "IoTSharp.Net",
......
......@@ -7,7 +7,8 @@
}
},
"ConnectionStrings": {
"IoTSharp": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;"
"IoTSharp": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;",
"Taos": "Data Source=taos;DataBase=IoTSharp;Username=root;Password=taosdata;Port=6030"
},
"JwtKey": "iotsharpiotsharpiotsharpiotsharpiotsharp",
"JwtExpireHours": 24,
......
......@@ -56,7 +56,17 @@ services:
- 502:502
networks:
- iotsharp-network
taos:
image: tdengine/tdengine:2.0.1.1
restart: always
container_name: taos
ports:
- 6030:6030
- 6035:6035
- 6041:6041
- 6030-6040:6030-6040/udp
volumes:
- /etc/taos:/etc/taos
networks:
iotsharp-network:
driver: bridge
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册