From cc60ed9138d446f946e7ab70b85da00eac1d2868 Mon Sep 17 00:00:00 2001 From: MysticBoy Date: Fri, 28 Aug 2020 19:24:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E7=89=88=E6=9C=AC=E5=8A=A0=E5=85=A5ta?= =?UTF-8?q?os=20=20=E7=9A=84=E6=97=B6=E5=BA=8F=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- IoTSharp/AppSettings.cs | 5 +- IoTSharp/IoTSharp.csproj | 5 +- IoTSharp/Jobs/PushData.cs | 1 - IoTSharp/Startup.cs | 3 + IoTSharp/Storage/TaosStorage.cs | 256 ++++++++++++++++++++++++++ IoTSharp/appsettings.Development.json | 4 +- IoTSharp/appsettings.Production.json | 3 +- docker-compose.yml | 12 +- 8 files changed, 281 insertions(+), 8 deletions(-) create mode 100644 IoTSharp/Storage/TaosStorage.cs diff --git a/IoTSharp/AppSettings.cs b/IoTSharp/AppSettings.cs index 2ad42ab0..ef17eaab 100644 --- a/IoTSharp/AppSettings.cs +++ b/IoTSharp/AppSettings.cs @@ -15,7 +15,8 @@ namespace IoTSharp public enum TelemetryStorage { SingleTable, - Sharding + Sharding, + Taos } public class AppSettings { @@ -31,7 +32,7 @@ namespace IoTSharp /// mqtt client settings /// public MqttClientSetting MqttClient { get; set; } - + public Dictionary ConnectionStrings { get; set; } public CoapConfig CoapServer { get; set; } = new CoapConfig(); public ModBusServerSetting ModBusServer { get; set; } = new ModBusServerSetting(); diff --git a/IoTSharp/IoTSharp.csproj b/IoTSharp/IoTSharp.csproj index c8178502..9ca0c4f7 100644 --- a/IoTSharp/IoTSharp.csproj +++ b/IoTSharp/IoTSharp.csproj @@ -41,12 +41,13 @@ - - + + + all diff --git a/IoTSharp/Jobs/PushData.cs b/IoTSharp/Jobs/PushData.cs index bbdd46e5..8855dc43 100644 --- a/IoTSharp/Jobs/PushData.cs +++ b/IoTSharp/Jobs/PushData.cs @@ -21,7 +21,6 @@ using System.Threading.Tasks; namespace IoTSharp.Jobs { - [DisallowConcurrentExecution] [SilkierQuartz(0, "PushData", "Push Iot Message Data to DataBase ", TriggerGroup = "Data")] public class PushData : IJob { diff --git a/IoTSharp/Startup.cs b/IoTSharp/Startup.cs index 73ed26fb..dadb03df 100644 --- a/IoTSharp/Startup.cs +++ b/IoTSharp/Startup.cs @@ -166,6 +166,9 @@ namespace IoTSharp case TelemetryStorage.SingleTable: services.AddSingleton(); break; + case TelemetryStorage.Taos: + services.AddSingleton(); + break; default: break; } diff --git a/IoTSharp/Storage/TaosStorage.cs b/IoTSharp/Storage/TaosStorage.cs new file mode 100644 index 00000000..9f94b7b2 --- /dev/null +++ b/IoTSharp/Storage/TaosStorage.cs @@ -0,0 +1,256 @@ +using EFCore.Sharding; +using IoTSharp.Data; +using IoTSharp.Dtos; +using IoTSharp.Extensions; +using IoTSharp.Queue; +using Maikebing.Data.Taos; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Org.BouncyCastle.Utilities.Encoders; +using Silkier; +using Silkier.EFCore; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +namespace IoTSharp.Storage +{ + + public class TaosStorage : IStorage + { + private readonly AppSettings _appSettings; + private readonly ILogger _logger; + private readonly IServiceScope scope; + private readonly TaosConnectionStringBuilder _taosBuilder; + public TaosStorage(ILogger logger, IServiceScopeFactory scopeFactor + , IOptions options + ) + { + _appSettings = options.Value; + _logger = logger; + scope = scopeFactor.CreateScope(); + _taosBuilder = new TaosConnectionStringBuilder(_appSettings.ConnectionStrings["Taos"]); + } + private bool dbisok = false; + private bool CheckDataBase() + { + if (!dbisok) + { + dbisok = Retry.RetryOnAny(10, f => + { + using (TaosConnection _taos = new TaosConnection(_taosBuilder.ConnectionString)) + { + _taos.Open(); + _taos.CreateCommand($"CREATE DATABASE IF NOT EXISTS {_taosBuilder.DataBase} KEEP 365 DAYS 10 BLOCKS 4;").ExecuteNonQuery(); + _taos.ChangeDatabase(_taosBuilder.DataBase); + _taos.CreateCommand("CREATE TABLE IF NOT EXISTS telemetrydata (ts timestamp,value_type tinyint, value_boolean bool, value_string binary(10240), value_long bigint,value_datetime timestamp,value_double double) TAGS (deviceid binary(32),keyname binary(64));") + .ExecuteNonQuery(); + dbisok = true; + _taos.Close(); + } + return true; + }, ef => + { + _logger.LogError(ef.ex, $"CheckDataBase第{ef.current}次失败{ef.ex.Message} {ef.ex.InnerException?.Message} "); + }); + } + return dbisok; + } + + public Task> GetTelemetryLatest(Guid deviceId) + { + using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString)) + { + string sql = $"select last_row(*) from telemetrydata where deviceid='{deviceId:N}' group by deviceid,keyname"; + List dt = SqlToTDD(db, sql, "last_row(", ")", string.Empty); + return Task.FromResult(dt); + } + } + + private List SqlToTDD(TaosConnection db, string sql, string prefix, string suffix, string keyname) + { + List dt = new List(); + TaosDataReader dataReader = db.CreateCommand(sql).ExecuteReader(); + while (dataReader.Read()) + { + TelemetryDataDto telemetry = new TelemetryDataDto(); + byte datatype = (byte)dataReader[dataReader.GetOrdinal($"{prefix}value_type{suffix}")]; + if (string.IsNullOrEmpty(keyname)) + { + telemetry.KeyName = dataReader.GetString(dataReader.GetOrdinal("keyname")); + } + else + { + telemetry.KeyName = keyname; + } + telemetry.DateTime = dataReader.GetDateTime(dataReader.GetOrdinal($"{prefix}ts{suffix}")); + switch ((DataType)datatype) + { + case DataType.Boolean: + telemetry.Value = dataReader.GetBoolean(dataReader.GetOrdinal($"{prefix}value_boolean{suffix}")); + break; + case DataType.String: + telemetry.Value = dataReader.GetString(dataReader.GetOrdinal($"{prefix}value_string{suffix}")); + break; + case DataType.Long: + telemetry.Value = dataReader.GetInt64(dataReader.GetOrdinal($"{prefix}value_long{suffix}")); + break; + case DataType.Double: + telemetry.Value = dataReader.GetDouble(dataReader.GetOrdinal($"{prefix}value_double{suffix}")); + break; + case DataType.Json: + case DataType.XML: + case DataType.Binary: + telemetry.Value = dataReader.GetString(dataReader.GetOrdinal($"{prefix}value_string{suffix}")); + break; + case DataType.DateTime: + telemetry.Value = dataReader.GetDateTime(dataReader.GetOrdinal($"{prefix}value_datetime{suffix}")); + break; + default: + break; + } + dt.Add(telemetry); + } + return dt; + } + + public Task> GetTelemetryLatest(Guid deviceId, string keys) + { + using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString)) + { + IEnumerable kvs = from k in keys + select $" keyname = '{k}' "; + string sql = $"select last_row(*) from telemetrydata where deviceid='{deviceId:N}' and ({string.Join("or", kvs) }) group by deviceid,keyname"; + List dt = SqlToTDD(db, sql, "last_row(", ")", string.Empty); + return Task.FromResult(dt); + } + } + private List SQLToDTByDate(DateTime begin, DateTime end, TaosConnection db, string sql) + { + List dt = new List(); + List<(string tbname, string keyname)> list = db.CreateCommand(sql).ExecuteReader().ToList<(string tbname, string keyname)>(); + foreach ((string tbname, string keyname) item in list) + { + string susql = $" select * from {item.tbname} where ts >={begin:yyyy-MM-dd HH:mm:ss.fff} and ts <={end:yyyy-MM-dd HH:mm:ss.fff}"; + List dtx = SqlToTDD(db, susql, "", "", item.keyname); + dt.AddRange(dtx); + } + return dt; + } + public Task> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin) + { + return LoadTelemetryAsync(deviceId, keys, begin, DateTime.Now); + } + + + public Task> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin, DateTime end) + { + using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString)) + { + IEnumerable kvs = from k in keys + select $" keyname = '{k}' "; + string sql = $"select tbname,keyname from telemetrydata where deviceid='{deviceId:N}' and ({string.Join("or", kvs) }) "; + List dt = SQLToDTByDate(begin, end, db, sql); + return Task.FromResult(dt); + } + } + + public Task> LoadTelemetryAsync(Guid deviceId, DateTime begin) + { + return LoadTelemetryAsync(deviceId, begin, DateTime.Now); + } + + public Task> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end) + { + using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString)) + { + string sql = $"select tbname,keyname from telemetrydata where deviceid='{deviceId:N}'"; + List dt = SQLToDTByDate(begin, end, db, sql); + return Task.FromResult(dt); + } + } + + public async Task StoreTelemetryAsync(RawMsg msg) + { + bool result = false; + try + { + CheckDataBase(); + using (TaosConnection db = new TaosConnection(_taosBuilder.ConnectionString)) + { + List lst = new List(); + msg.MsgBody.ToList().ForEach(kp => + { + if (kp.Value != null) + { + TelemetryData tdata = new TelemetryData() { DateTime = DateTime.Now, DeviceId = msg.DeviceId, KeyName = kp.Key, Value_DateTime = new DateTime(1970, 1, 1) }; + tdata.FillKVToMe(kp); + string _type = ""; + string _value = ""; + // value_boolean bool, value_string binary(4096), value_long bigint,value_datetime timestamp,value_double double,value_json binary(4096) ,value_xml binary + switch (tdata.Type) + { + case DataType.Boolean: + _type = "value_boolean"; + _value = tdata.Value_Boolean.ToString().ToLower(); + break; + case DataType.String: + _type = "value_string"; + _value = $"'{tdata.Value_String?.Replace("'", "\\'")}'"; + break; + case DataType.Long: + _type = "value_long"; + _value = $"{tdata.Value_Long}"; + break; + case DataType.Double: + _type = "value_double"; + _value = $"{tdata.Value_Double}"; + break; + case DataType.Json://td 一条记录16kb , 因此为了写更多数据, 我们json xml binary 全部使用 string + _type = "value_string"; + _value = $"'{tdata.Value_Json?.Replace("'", "\\'")}'"; + break; + case DataType.XML: + _type = "value_string"; + _value = $"'{tdata.Value_XML?.Replace("'", "\\'")}'"; + break; + case DataType.Binary: + _type = "value_string"; + _value = $"\"{Hex.ToHexString(tdata.Value_Binary)}\""; + break; + case DataType.DateTime: + _type = "value_datetime"; + _value = $"{tdata.Value_DateTime.Subtract(new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalMilliseconds}"; + break; + default: + break; + } + string vals = $"device_{tdata.DeviceId:N}_{tdata.KeyName} USING telemetrydata TAGS('{tdata.DeviceId:N}','{tdata.KeyName}') (ts,value_type,{_type}) values (now,{(int)tdata.Type},{_value})"; + lst.Add(vals); + } + }); + await Retry.RetryOnAny(10, async f => + { + db.Open(); + int dt = await db.CreateCommand($"INSERT INTO {string.Join("\r\n", lst)}").ExecuteNonQueryAsync(); + db.Close(); + _logger.LogInformation($"数据入库完成,共数据{lst.Count}条,写入{dt}条"); + }, ef => + { + _logger.LogError(ef.ex, $"{msg.DeviceId}数据处理第{ef.current}次失败{ef.ex.Message} {ef.ex.InnerException?.Message} "); + }); + } + } + catch (TaosException ex) + { + _logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.ErrorCode}-{ex.Message} {ex.InnerException?.Message}"); + } + catch (Exception ex) + { + _logger.LogError(ex, $"{msg.DeviceId}数据处理失败{ex.Message} {ex.InnerException?.Message} "); + } + return result; + } + } +} \ No newline at end of file diff --git a/IoTSharp/appsettings.Development.json b/IoTSharp/appsettings.Development.json index b4e1a521..21edde65 100644 --- a/IoTSharp/appsettings.Development.json +++ b/IoTSharp/appsettings.Development.json @@ -7,8 +7,10 @@ } }, "ConnectionStrings": { - "IoTSharp": "Server=10.165.83.194;Database=IoTSharp;Username=postgres;Password=future;" + "IoTSharp": "Server=localhost;Database=IoTSharp;Username=postgres;Password=future;", + "Taos": "Data Source=taos;DataBase=IoTSharp;Username=root;Password=taosdata;Port=6030" }, + "TelemetryStorage": "Taos", "JwtKey": "kissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissmekissme", "JwtExpireHours": 24, "JwtIssuer": "IoTSharp.Net", diff --git a/IoTSharp/appsettings.Production.json b/IoTSharp/appsettings.Production.json index 093dc511..5d3e5a7a 100644 --- a/IoTSharp/appsettings.Production.json +++ b/IoTSharp/appsettings.Production.json @@ -7,7 +7,8 @@ } }, "ConnectionStrings": { - "IoTSharp": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;" + "IoTSharp": "Server=pgsql;Database=IoTSharp;Username=postgres;Password=future;", + "Taos": "Data Source=taos;DataBase=IoTSharp;Username=root;Password=taosdata;Port=6030" }, "JwtKey": "iotsharpiotsharpiotsharpiotsharpiotsharp", "JwtExpireHours": 24, diff --git a/docker-compose.yml b/docker-compose.yml index 1db5fc78..2eba9c9c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -56,7 +56,17 @@ services: - 502:502 networks: - iotsharp-network - + taos: + image: tdengine/tdengine:2.0.1.1 + restart: always + container_name: taos + ports: + - 6030:6030 + - 6035:6035 + - 6041:6041 + - 6030-6040:6030-6040/udp + volumes: + - /etc/taos:/etc/taos networks: iotsharp-network: driver: bridge \ No newline at end of file -- GitLab