//============================================================== // Copyright (C) 2019 Inc. All rights reserved. // //============================================================== // Create by 种道洋 at 2019/12/27 18:45:02. // Version 1.0 // 种道洋 //============================================================== using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Linq; using System.Threading.Tasks; using System.Diagnostics; using System.Buffers; /* * ****文件结构**** * 一个文件头 + 多个数据区组成 , 一个数据区:数据区头+数据块指针区+数据块区 * [] 表示重复的一个或多个内容 * HisData File Structor FileHead(72) + [HisDataRegion] FileHead: dataTime(8)+DatabaseName(64) HisDataRegion Structor: RegionHead + DataBlockPoint Area + DataBlock Area RegionHead: PreDataRegionPoint(8) + NextDataRegionPoint(8) + Datatime(8)+ tagcount(4)+ tagid sum(8)+file duration(4)+block duration(4)+Time tick duration(4) DataBlockPoint Area: [ID]+[block Point] [block point]: [[tag1 point,tag2 point,....][tag1 point,tag2 point,...].....] 以时间单位对变量的数去区指针进行组织 DataBlock Area: [block size + data block] */ namespace Cdy.Tag { /// /// 序列话引擎 /// public class SeriseEnginer2 : IDataSerialize2, IDisposable { #region ... Variables ... /// /// /// private ManualResetEvent resetEvent; private ManualResetEvent closedEvent; private Thread mCompressThread; private bool mIsClosed = false; //private MemoryBlock mProcessMemory; private DateTime mCurrentTime; //private SeriseFileItem[] mSeriseFile; private Dictionary mWaitForProcessMemory = new Dictionary(); /// /// /// private Dictionary mSeriserFiles = new Dictionary(); #endregion ...Variables... #region ... Events ... #endregion ...Events... #region ... Constructor... /// /// /// public SeriseEnginer2() { } #endregion ...Constructor... #region ... Properties ... /// /// 单个文件存储数据的时间长度 /// 单位小时 /// 最大24小时 /// public int FileDuration { get; set; } /// /// 单个块持续时间 /// 单位分钟 /// public int BlockDuration { get; set; } /// /// 单个文件内变量的个数 /// public int TagCountOneFile { get; set; } = 100000; /// /// 数据库名称 /// public string DatabaseName { get; set; } /// /// /// public string DataSeriser { get; set; } /// /// 主历史记录路径 /// public string HisDataPathPrimary { get; set; } /// /// 备份历史记录路径 /// public string HisDataPathBack { get; set; } /// /// 当前工作的历史记录路径 /// public static string HisDataPath { get; set; } #endregion ...Properties... #region ... Methods ... /// /// 选择历史记录路径 /// /// private string SelectHisDataPath() { if (string.IsNullOrEmpty(HisDataPathPrimary) && string.IsNullOrEmpty(HisDataPathBack)) { return PathHelper.helper.GetDataPath(this.DatabaseName, "HisData"); } else { string sp = string.IsNullOrEmpty(HisDataPathPrimary) ? PathHelper.helper.GetDataPath(this.DatabaseName, "HisData") : System.IO.Path.IsPathRooted(HisDataPathPrimary) ? HisDataPathPrimary : PathHelper.helper.GetDataPath(this.DatabaseName, HisDataPathPrimary); string spb = string.IsNullOrEmpty(HisDataPathBack) ? PathHelper.helper.GetDataPath(this.DatabaseName, "HisData") : System.IO.Path.IsPathRooted(HisDataPathBack) ? HisDataPathBack : PathHelper.helper.GetDataPath(this.DatabaseName, HisDataPathBack); //to do select avaiable data path return sp; } //return string.Empty; } /// /// /// public void Init() { var his = ServiceLocator.Locator.Resolve(); var histag = his.ListAllTags().OrderBy(e => e.Id); //计算数据区域个数 var mLastDataRegionId = -1; foreach (var vv in histag) { var id = vv.Id; var did = id / TagCountOneFile; if (mLastDataRegionId != did) { mSeriserFiles.Add(did, new SeriseFileItem2() { FileDuration = FileDuration, BlockDuration = BlockDuration, TagCountOneFile = TagCountOneFile, DatabaseName = DatabaseName, Id = did }); mLastDataRegionId = did; } } foreach (var vv in mSeriserFiles) { vv.Value.FileWriter = DataFileSeriserManager.manager.GetSeriser(DataSeriser).New(); vv.Value.Init(); } } /// /// /// public void Start() { LoggerService.Service.Info("SeriseEnginer", "start to Start"); mIsClosed = false; //Init(); resetEvent = new ManualResetEvent(false); closedEvent = new ManualResetEvent(false); mCompressThread = new Thread(ThreadPro); mCompressThread.IsBackground = true; mCompressThread.Start(); } /// /// /// public void Stop() { LoggerService.Service.Info("SeriseEnginer", "start to stop"); mIsClosed = true; resetEvent.Set(); closedEvent.WaitOne(); resetEvent.Dispose(); closedEvent.Dispose(); } /// /// /// /// /// public void RequestToSeriseFile(CompressMemory2 dataMemory, DateTime date) { lock (mWaitForProcessMemory) { if (mWaitForProcessMemory.ContainsKey(dataMemory.Id)) { mWaitForProcessMemory[dataMemory.Id] = dataMemory; } else { mWaitForProcessMemory.Add(dataMemory.Id, dataMemory); } } mCurrentTime = date; } /// /// /// public void RequestToSave() { resetEvent.Set(); } /// /// /// private void ThreadPro() { ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2); while (!mIsClosed) { resetEvent.WaitOne(); resetEvent.Reset(); if (mIsClosed) break; SaveToFile(); } closedEvent.Set(); } /// /// 执行存储到磁盘 /// private void SaveToFile() { /* 1. 检查变量ID是否变动,如果变动则重新记录变量的ID列表 2. 拷贝数据块 3. 更新数据块指针 */ //#if DEBUG Stopwatch sw = new Stopwatch(); sw.Start(); LoggerService.Service.Info("SeriseEnginer", "********开始执行存储********", ConsoleColor.Cyan); //#endif HisDataPath = SelectHisDataPath(); List mtmp; lock (mWaitForProcessMemory) { mtmp = mWaitForProcessMemory.Values.ToList(); mWaitForProcessMemory.Clear(); } foreach (var vv in mtmp) { mSeriserFiles[vv.Id].SaveToFile(vv, vv.CurrentTime); vv.Clear(); vv.MakeMemoryNoBusy(); } //#if DEBUG sw.Stop(); LoggerService.Service.Info("SeriseEnginer", ">>>>>>>>>完成执行存储>>>>>>> Count:" + mtmp.Count + " ElapsedMilliseconds:" + sw.ElapsedMilliseconds, ConsoleColor.Cyan); //#endif } /// /// /// public void Dispose() { if (mSeriserFiles != null) { foreach (var vv in mSeriserFiles) { vv.Value.Dispose(); } mSeriserFiles.Clear(); } } #endregion ...Methods... #region ... Interfaces ... #endregion ...Interfaces... } /// /// /// public class SeriseFileItem2:IDisposable { #region ... Variables ... /// /// 变量的数据指针的相对起始地址 /// private Dictionary mIdAddrs = new Dictionary(); /// /// /// private bool mNeedUpdateTagHeads = false; /// /// 当前数据区首地址 /// private long mCurrentDataRegion = 0; private string mCurrentFileName; private DataFileSeriserbase mFileWriter; /// /// 数据文件扩展名 /// public const string DataFileExtends = ".dbd"; /// /// 文件头大小 /// public const int FileHeadSize = 72; //private MemoryBlock mHeadMemory; private MemoryBlock mBlockPointMemory; //指针区域的起始地址 private long mBlockPointOffset = 0; private VarintCodeMemory mTagIdMemoryCach; //变量ID校验和 private long mTagIdSum; private DateTime mCurrentTime; private int mTagCount = 0; /// /// 上一个数据区域首地址 /// private long mPreDataRegion = 0; static object mFileLocker = new object(); #endregion ...Variables... #region ... Events ... #endregion ...Events... #region ... Constructor... #endregion ...Constructor... #region ... Properties ... /// /// /// public int Id { get; set; } /// /// /// public DataFileSeriserbase FileWriter { get { return mFileWriter; } set { mFileWriter=value; } } /// /// /// public long CurrentDataRegion { get { return mCurrentDataRegion; } set { mCurrentDataRegion = value; } } /// /// /// public int FileStartHour { get; set; } /// /// 单个文件内变量的个数 /// public int TagCountOneFile { get; set; } /// /// 单个块持续时间 /// 单位分钟 /// public int BlockDuration { get; set; } /// /// /// public int FileDuration { get; set; } /// /// /// public string DatabaseName { get; set; } #endregion ...Properties... #region ... Methods ... /// /// /// /// private int GetDataRegionHeaderLength() { //头部结构:Pre DataRegion(8) + Next DataRegion(8) + Datatime(8)+ tagcount(4)+ tagid sum(8)+file duration(4)+block duration(4)+Time tick duration(4) return 8 + 8 + 8 + 4 + 8 + 4 + 4 + 4; } /// /// 添加文件头部 /// private void AppendFileHeader(DateTime time, string databaseName) { DateTime date = new DateTime(time.Year, time.Month, time.Day, ((time.Hour / FileDuration) * FileDuration), 0, 0); mFileWriter.Write(date, 0); //byte[] nameBytes = new byte[64]; byte[] nameBytes = ArrayPool.Shared.Rent(64); var ntmp = Encoding.UTF8.GetBytes(databaseName); Buffer.BlockCopy(ntmp, 0, nameBytes, 0, Math.Min(64, ntmp.Length)); mFileWriter.Write(nameBytes, 8); ArrayPool.Shared.Return(nameBytes); } /// /// 添加区域头部 /// private void AppendDataRegionHeader() { byte[] bval; int totalLen; int datalen; //更新上个DataRegion 的Next DataRegion Pointer 指针 if (mPreDataRegion >= 0) { mFileWriter.Write(mCurrentDataRegion, mPreDataRegion + 8); } bval = GeneratorDataRegionHeader(out totalLen,out datalen); mFileWriter.Append(bval, 0, datalen); mFileWriter.AppendZore(totalLen - datalen); mPreDataRegion = mCurrentDataRegion; mBlockPointOffset = mCurrentDataRegion + mBlockPointOffset; ArrayPool.Shared.Return(bval); } /// /// 生成区域头部 /// 偏移位置 /// private byte[] GeneratorDataRegionHeader(out int totallenght,out int datalen) { Stopwatch sw = new Stopwatch(); sw.Start(); //文件头部结构:Pre DataRegion(8) + Next DataRegion(8) + Datatime(8)+tagcount(4)+ tagid sum(8) +file duration(4)+ block duration(4)+Time tick duration(4)+ { len + [tag id]}+ [data blockpoint(8)] int blockcount = FileDuration * 60 / BlockDuration; int len = GetDataRegionHeaderLength() + 4+ mTagIdMemoryCach.Position + mTagCount * (blockcount * 8); totallenght = len; byte[] bvals = ArrayPool.Shared.Rent(52 + mTagIdMemoryCach.Position); using (System.IO.MemoryStream mHeadMemory = new System.IO.MemoryStream(bvals)) { mHeadMemory.Position = 0; mHeadMemory.Write(BitConverter.GetBytes((long)mPreDataRegion));//更新Pre DataRegion 指针 mHeadMemory.Write(BitConverter.GetBytes((long)0)); //更新Next DataRegion 指针 mHeadMemory.Write(MemoryHelper.GetBytes(mCurrentTime)); //写入时间 mHeadMemory.Write(BitConverter.GetBytes(mTagCount)); //写入变量个数 mHeadMemory.Write(BitConverter.GetBytes(mTagIdSum)); //写入Id 校验和 mHeadMemory.Write(BitConverter.GetBytes(FileDuration)); //写入文件持续时间 mHeadMemory.Write(BitConverter.GetBytes(BlockDuration)); //写入数据块持续时间 mHeadMemory.Write(BitConverter.GetBytes(HisEnginer.MemoryTimeTick)); //写入时间间隔 //写入变量编号列表 mHeadMemory.Write(BitConverter.GetBytes(mTagIdMemoryCach.Position));// mHeadMemory.Write(mTagIdMemoryCach.Position);//写入压缩后的数组的长度 mHeadMemory.Write(mTagIdMemoryCach.Buffer, 0, mTagIdMemoryCach.Position);//写入压缩数据 mBlockPointOffset = mHeadMemory.Position; datalen = (int)mHeadMemory.Position; sw.Stop(); LoggerService.Service.Info("SeriseFileItem" + Id, "GeneratorDataRegionHeader " + sw.ElapsedMilliseconds); return bvals; //return mHeadMemory.ToArray(); } } #endregion ...Methods... #region ... Interfaces ... #endregion ...Interfaces... /// /// /// /// /// private string GetDataPath(DateTime time) { return System.IO.Path.Combine(SeriseEnginer2.HisDataPath, GetFileName(time)); } /// /// /// /// /// private string GetFileName(DateTime time) { return DatabaseName + Id.ToString("D3") + time.ToString("yyyyMMdd") + FileDuration.ToString("D2") + (time.Hour / FileDuration).ToString("D2") + DataFileExtends; } /// /// 搜索最后一个数据区域 /// /// private long SearchLastDataRegion() { long offset = FileHeadSize; while (true) { var nextaddr = mFileWriter.ReadLong(offset + 8); if (nextaddr <= 0) { break; } else { offset = nextaddr; } } mPreDataRegion = offset; mFileWriter.GoToEnd(); mCurrentDataRegion = mFileWriter.CurrentPostion; return 0; } /// /// /// /// /// /// public bool CheckInSameFile(DateTime time1) { return GetFileName(time1) == mCurrentFileName; } /// /// 计算变量Id集合所占的大小 /// /// private int CalTagIdsSize() { if (mTagIdMemoryCach != null) mTagIdMemoryCach.Dispose(); var aids = ServiceLocator.Locator.Resolve().ListAllTags().Where(e => e.Id >= Id * TagCountOneFile && e.Id < (Id + 1) * TagCountOneFile).OrderBy(e => e.Id).ToArray(); mTagIdSum = 0; mTagIdMemoryCach = new VarintCodeMemory((int)(aids.Count() * 4 * 1.2)); if (aids.Length > 0) { int preids = aids[0].Id; mTagIdSum += preids; mTagIdMemoryCach.WriteInt32(preids); for (int i = 1; i < aids.Length; i++) { var id = aids[i].Id; mTagIdMemoryCach.WriteInt32(id - preids); mTagIdSum += id; preids = id; } } return mTagIdMemoryCach.Position + 4; } /// /// /// public void Init() { //LoggerService.Service.Info("SeriseFileItem" + Id, "------Init------"); //mIdAddrs.Clear(); //long offset = GetDataRegionHeaderLength() + CalTagIdsSize(); //long offset = 0; //int blockcount = FileDuration * 60 / BlockDuration; var vv = ServiceLocator.Locator.Resolve(); var tags = vv.ListAllTags().Where(e => e.Id >= Id * TagCountOneFile && e.Id < (Id + 1) * TagCountOneFile).OrderBy(e => e.Id); if (mBlockPointMemory != null) mBlockPointMemory.Dispose(); mBlockPointMemory = new MemoryBlock(tags.Count() * 8, 4 * 1024 * 1024); mBlockPointMemory.Clear(); LoggerService.Service.Info("SeriseEnginer", "Cal BlockPointMemory memory size:" + (mBlockPointMemory.AllocSize) / 1024.0 / 1024 + "M", ConsoleColor.Cyan); //foreach (var vtag in tags) //{ // mIdAddrs.Add(vtag.Id, offset); // //offset += (blockcount * 8); // offset += 8; //} CalTagIdsSize(); } /// /// 检查文件是否存在 /// /// private bool CheckFile(DateTime time) { //LoggerService.Service.Info("SeriseFileItem" + Id, "------CheckFile------"); if (!CheckInSameFile(time)) { if (mNeedUpdateTagHeads) { Init(); mNeedUpdateTagHeads = false; } mFileWriter.Flush(); mFileWriter.Close(); string sfile = GetDataPath(time); if (mFileWriter.CreatOrOpenFile(sfile)) { AppendFileHeader(time, this.DatabaseName); //新建文件 mCurrentDataRegion = FileHeadSize; mPreDataRegion = -1; AppendDataRegionHeader(); } else { if (mFileWriter.Length < 72) { AppendFileHeader(time, this.DatabaseName); //新建文件 mCurrentDataRegion = FileHeadSize; mPreDataRegion = -1; AppendDataRegionHeader(); } else { //打开已有文件 SearchLastDataRegion(); AppendDataRegionHeader(); } } mCurrentFileName = GetFileName(time); } else { if (mNeedUpdateTagHeads) { Init(); SearchLastDataRegion(); AppendDataRegionHeader(); mNeedUpdateTagHeads = false; } } //LoggerService.Service.Info("SeriseFileItem" + Id, "*********CheckFile end**********"); return true; } /// /// /// /// /// public void SaveToFile(MarshalMemoryBlock mProcessMemory, DateTime time) { SaveToFile(mProcessMemory, 0, time); } /// /// 执行存储到磁盘 /// public void SaveToFile(MarshalMemoryBlock mProcessMemory,long dataOffset,DateTime time) { /* 1. 检查变量ID是否变动,如果变动则重新记录变量的ID列表 2. 拷贝数据块 3. 更新数据块指针 */ //LoggerService.Service.Info("SeriseFileItem" + Id, "*********开始执行存储**********"); try { Stopwatch sw = new Stopwatch(); sw.Start(); var datasize = mProcessMemory.ReadInt(dataOffset); var count = mProcessMemory.ReadInt(dataOffset + 4); mTagCount = count; mCurrentTime = time; //to do 计算变量信息是否改变 ////判断变量的ID列表是否被修改了 //for (int i = 0; i < count; i++) //{ // var id = mProcessMemory.ReadInt(offset); // if (!mIdAddrs.ContainsKey(id)) // { // mNeedUpdateTagHeads = true; // break; // } // offset += 8; //} var ltmp = sw.ElapsedMilliseconds; if (!CheckFile(time)) return; var ltmp2 = sw.ElapsedMilliseconds; long offset = 8 + dataOffset; long start = count * 8 + offset;//计算出数据起始地址 //LoggerService.Service.Info("SeriseFileItem" + Id, "开始更新指针区域"); var dataAddr = this.mFileWriter.GoToEnd().CurrentPostion; mBlockPointMemory.CheckAndResize(mTagCount * 8); mBlockPointMemory.Clear(); //更新BlockPoint for (int i = 0; i < count; i++) { var id = mProcessMemory.ReadInt(offset); var addr = mProcessMemory.ReadInt(offset + 4) - start + dataAddr; offset += 8; if (id > -1) { mBlockPointMemory.WriteLong(i * 8, addr); } } //StringBuilder sb = new StringBuilder(); //foreach (var vv in mBlockPointMemory.ToLongList()) //{ // sb.Append(vv + ","); //} //计算本次更新对应的指针区域的起始地址 FileStartHour = (time.Hour / FileDuration) * FileDuration; int bid = ((time.Hour - FileStartHour) * 60 + time.Minute) / BlockDuration; var pointAddr = mBlockPointOffset + count * 8 * bid; var ltmp3 = sw.ElapsedMilliseconds; //lock (mFileLocker) { mFileWriter.GoToEnd(); long lpp = mFileWriter.CurrentPostion; mProcessMemory.WriteToStream(mFileWriter.GetStream(), start, datasize);//直接拷贝数据块 // LoggerService.Service.Info("SeriseFileItem", "数据写入地址:" + lpp + " 数据大小: "+(datasize) +" 最后地址: "+mFileWriter.CurrentPostion+ ",更新指针地址:" + pointAddr + " block index:" + bid + " tagcount:" + count + " block point Start Addr:" + mBlockPointOffset, ConsoleColor.Red); //this.mFileWriter.Append(mProcessMemory.Buffers, (int)start, (int)(totalsize - start)); mFileWriter.Write(mBlockPointMemory.Buffers, pointAddr, 0, (int)mBlockPointMemory.AllocSize); Flush(); } sw.Stop(); LoggerService.Service.Info("SeriseFileItem" + Id, "写入数据 " + mCurrentFileName + " 数据大小:" + ((datasize) + mBlockPointMemory.AllocSize) / 1024.0 / 1024 + " m" +"其他脚本耗时:"+ltmp+","+(ltmp2-ltmp)+","+(ltmp3-ltmp2)+ "存储耗时:" + (sw.ElapsedMilliseconds-ltmp3)); } catch(System.IO.IOException ex) { LoggerService.Service.Erro("SeriseEnginer" + Id,ex.Message); } //LoggerService.Service.Info("SeriseEnginer" + Id, ">>>>>>>>>完成执行存储>>>>>>>>>"); } /// /// /// public void Flush() { mFileWriter.Flush(); mFileWriter.CloseAndReOpen(); } /// /// /// public void Dispose() { mIdAddrs.Clear(); mFileWriter.Dispose(); mFileWriter = null; } } }