//============================================================== // Copyright (C) 2019 Inc. All rights reserved. // //============================================================== // Create by 种道洋 at 2019/12/27 18:45:02. // Version 1.0 // 种道洋 //============================================================== using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Linq; using System.Threading.Tasks; using System.Diagnostics; using System.Buffers; using DotNetty.Common; using DBRuntime.His; using System.Collections; using System.Drawing; /* * ****文件结构**** * 一个文件头 + 多个数据区组成 , 一个数据区:数据区头+数据块指针区+数据块区 * [] 表示重复的一个或多个内容 * HisData File Structor FileHead(84) + [HisDataRegion] FileHead: dataTime(8)(FileTime)+dateTime(8)(LastUpdateTime)+DataRegionCount(4)+DatabaseName(64) HisDataRegion Structor: RegionHead + DataBlockPoint Area + DataBlock Area RegionHead: PreDataRegionPoint(8) + NextDataRegionPoint(8) + Datatime(8)+ tagcount(4)+ tagid sum(8)+file duration(4)+block duration(4)+Time tick duration(4) DataBlockPoint Area: [ID]+[block Point] [block point]: [[tag1 point,tag2 point,....][tag1 point,tag2 point,...].....] 以时间单位对变量的数去区指针进行组织 DataBlock Area: [block size + data block] */ namespace Cdy.Tag { /// /// 序列话引擎 /// public class SeriseEnginer2 : IDataSerialize2, IDisposable { #region ... Variables ... /// /// /// private ManualResetEvent resetEvent; private ManualResetEvent closedEvent; private Thread mCompressThread; private bool mIsClosed = false; //private DateTime mCurrentTime; private Dictionary mWaitForProcessMemory = new Dictionary(); /// /// /// private Dictionary mSeriserFiles = new Dictionary(); //private int mManualRequestSaveCount = 0; #endregion ...Variables... #region ... Events ... #endregion ...Events... #region ... Constructor... /// /// /// public SeriseEnginer2() { } #endregion ...Constructor... #region ... Properties ... /// /// 单个文件存储数据的时间长度 /// 单位小时 /// 最大24小时 /// public int FileDuration { get; set; } /// /// 单个块持续时间 /// 单位分钟 /// public int BlockDuration { get; set; } /// /// 单个文件内变量的个数 /// public int TagCountOneFile { get; set; } = 100000; /// /// 数据库名称 /// public string DatabaseName { get; set; } /// /// /// public string DataSeriser { get; set; } /// /// 主历史记录路径 /// public string HisDataPathPrimary { get; set; } /// /// 备份历史记录路径 /// public string HisDataPathBack { get; set; } /// /// 当前工作的历史记录路径 /// public static string HisDataPath { get; set; } #endregion ...Properties... #region ... Methods ... /// /// 选择历史记录路径 /// /// private string SelectHisDataPath() { if (string.IsNullOrEmpty(HisDataPathPrimary) && string.IsNullOrEmpty(HisDataPathBack)) { return PathHelper.helper.GetDataPath(this.DatabaseName, "HisData"); } else { string sp = string.IsNullOrEmpty(HisDataPathPrimary) ? PathHelper.helper.GetDataPath(this.DatabaseName, "HisData") : System.IO.Path.IsPathRooted(HisDataPathPrimary) ? HisDataPathPrimary : PathHelper.helper.GetDataPath(this.DatabaseName, HisDataPathPrimary); string spb = string.IsNullOrEmpty(HisDataPathBack) ? PathHelper.helper.GetDataPath(this.DatabaseName, "HisData") : System.IO.Path.IsPathRooted(HisDataPathBack) ? HisDataPathBack : PathHelper.helper.GetDataPath(this.DatabaseName, HisDataPathBack); //to do select avaiable data path return sp; } //return string.Empty; } /// /// /// public void Init() { var his = ServiceLocator.Locator.Resolve(); var histag = his.ListAllTags().OrderBy(e => e.Id); //计算数据区域个数 var mLastDataRegionId = -1; foreach (var vv in histag) { var id = vv.Id; var did = id / TagCountOneFile; if (mLastDataRegionId != did) { mSeriserFiles.Add(did, new SeriseFileItem2() { FileDuration = FileDuration, BlockDuration = BlockDuration, TagCountOneFile = TagCountOneFile, DatabaseName = DatabaseName, Id = did }); mLastDataRegionId = did; } } foreach (var vv in mSeriserFiles) { vv.Value.FileWriter = DataFileSeriserManager.manager.GetSeriser(DataSeriser).New(); vv.Value.FileWriter2 = DataFileSeriserManager.manager.GetSeriser(DataSeriser).New(); vv.Value.Init(); } HisDataPath = SelectHisDataPath(); } /// /// /// public void Start() { LoggerService.Service.Info("SeriseEnginer", "开始启动"); mIsClosed = false; resetEvent = new ManualResetEvent(false); closedEvent = new ManualResetEvent(false); mCompressThread = new Thread(ThreadPro); mCompressThread.IsBackground = true; mCompressThread.Start(); } /// /// /// public void Stop() { LoggerService.Service.Info("SeriseEnginer", "开始停止"); mIsClosed = true; resetEvent.Set(); closedEvent.WaitOne(); resetEvent.Dispose(); closedEvent.Dispose(); foreach(var vv in mSeriserFiles) { vv.Value.Reset(); } } /// /// /// /// /// public void RequestToSeriseFile(CompressMemory2 dataMemory) { lock (mWaitForProcessMemory) { if (mWaitForProcessMemory.ContainsKey(dataMemory.Id)) { mWaitForProcessMemory[dataMemory.Id] = dataMemory; } else { mWaitForProcessMemory.Add(dataMemory.Id, dataMemory); } } //mCurrentTime = dataMemory.CurrentTime; } /// /// /// /// /// /// /// public void ManualRequestToSeriseFile(IMemoryBlock data) { HisDataPath = SelectHisDataPath(); int id = data.ReadInt(0); foreach (var vv in mSeriserFiles) { if (id >= vv.Value.IdStart && id < vv.Value.IdEnd) { vv.Value.AppendManualSeriseFile(id, data); //mManualRequestSaveCount++; break; } } //if (mManualRequestSaveCount > 10) RequestToSave(); } /// /// /// public void RequestToSave() { lock (resetEvent) resetEvent.Set(); } /// /// /// private void ThreadPro() { ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2); while (!mIsClosed) { resetEvent.WaitOne(); lock (resetEvent) resetEvent.Reset(); //#if DEBUG Stopwatch sw = new Stopwatch(); sw.Start(); LoggerService.Service.Info("SeriseEnginer", "********开始执行存储********", ConsoleColor.Cyan); //#endif //mManualRequestSaveCount = 0; if (mWaitForProcessMemory.Count > 0) { SaveToFile(); } foreach (var vv in mSeriserFiles) { if (vv.Value.HasManualRecordData) vv.Value.FreshManualDataToDisk(); } //#if DEBUG sw.Stop(); LoggerService.Service.Info("SeriseEnginer", ">>>>>>>>>完成执行存储>>>>>>> ElapsedMilliseconds:" + sw.ElapsedMilliseconds, ConsoleColor.Cyan); //#endif if (mIsClosed) break; } closedEvent.Set(); } /// /// 执行存储到磁盘 /// private void SaveToFile() { /* 1. 检查变量ID是否变动,如果变动则重新记录变量的ID列表 2. 拷贝数据块 3. 更新数据块指针 */ HisDataPath = SelectHisDataPath(); List mtmp; lock (mWaitForProcessMemory) { mtmp = mWaitForProcessMemory.Values.ToList(); mWaitForProcessMemory.Clear(); } foreach (var vv in mtmp) { mSeriserFiles[vv.Id].SaveToFile(vv, vv.CurrentTime,vv.EndTime); vv.Clear(); vv.MakeMemoryNoBusy(); } } /// /// /// public void Dispose() { if (mSeriserFiles != null) { foreach (var vv in mSeriserFiles) { vv.Value.Dispose(); } mSeriserFiles.Clear(); } } #endregion ...Methods... #region ... Interfaces ... #endregion ...Interfaces... } /// /// /// public class SeriseFileItem2 : IDisposable { #region ... Variables ... /// /// 变量的数据指针的相对起始地址 /// private Dictionary mIdAddrs = new Dictionary(); /// /// /// private bool mNeedRecordDataHeader = true; /// /// 当前数据区首地址 /// private long mCurrentDataRegion = 0; private string mCurrentFileName; private DataFileSeriserbase mFileWriter; private DataFileSeriserbase mFileWriter2; /// /// 数据文件扩展名 /// public const string DataFileExtends = ".dbd"; /// /// 文件头大小 /// public const int FileHeadSize = 84; private MemoryBlock mBlockPointMemory; //指针区域的起始地址 private long mBlockPointOffset = 0; private long mDataRegionHeadSize = 0; private VarintCodeMemory mTagIdMemoryCach; //变量ID校验和 private long mTagIdSum; private DateTime mCurrentTime; private int mTagCount = 0; /// /// 上一个数据区域首地址 /// private long mPreDataRegion = 0; static object mFileLocker = new object(); private List mTagIdsCach; private Dictionary> mPointerCach = new Dictionary>(); private Dictionary> mManualHisDataCach = new Dictionary>(); private int mId = 0; #endregion ...Variables... #region ... Events ... #endregion ...Events... #region ... Constructor... #endregion ...Constructor... #region ... Properties ... /// /// /// public int IdStart { get; set; } /// /// /// public int IdEnd { get; set; } /// /// /// public int Id { get { return mId; } set { mId = value; IdStart = value * TagCountOneFile;IdEnd = (value + 1) * TagCountOneFile; } } /// /// /// public DataFileSeriserbase FileWriter { get { return mFileWriter; } set { mFileWriter = value; } } /// /// /// public DataFileSeriserbase FileWriter2 { get { return mFileWriter2; } set { mFileWriter2 = value; } } /// /// /// public long CurrentDataRegion { get { return mCurrentDataRegion; } set { mCurrentDataRegion = value; } } /// /// /// public int FileStartHour { get; set; } /// /// 单个文件内变量的个数 /// public int TagCountOneFile { get; set; } /// /// 单个块持续时间 /// 单位分钟 /// public int BlockDuration { get; set; } /// /// /// public int FileDuration { get; set; } /// /// /// public string DatabaseName { get; set; } public bool IsNeedInit { get; set; } /// /// /// public bool HasManualRecordData { get { return mManualHisDataCach.Count > 0; } } #endregion ...Properties... #region ... Methods ... /// /// /// public void Reset() { mNeedRecordDataHeader = true; } /// /// /// /// private int GetDataRegionHeaderLength() { //头部结构:Pre DataRegion(8) + Next DataRegion(8) + Datatime(8)+ tagcount(4)+ tagid sum(8)+file duration(4)+block duration(4)+Time tick duration(4) return 8 + 8 + 8 + 4 + 8 + 4 + 4 + 4; } /// /// 添加文件头部 /// private void AppendFileHeader(DateTime time, string databaseName,DataFileSeriserbase mFileWriter) { DateTime date = new DateTime(time.Year, time.Month, time.Day, ((time.Hour / FileDuration) * FileDuration), 0, 0); mFileWriter.Write(date, 0); byte[] nameBytes = ArrayPool.Shared.Rent(64); var ntmp = Encoding.UTF8.GetBytes(databaseName); Buffer.BlockCopy(ntmp, 0, nameBytes, 0, Math.Min(64, ntmp.Length)); mFileWriter.Write(nameBytes, 20); ArrayPool.Shared.Return(nameBytes); } /// /// 添加区域头部 /// private void AppendDataRegionHeader(DateTime mCurrentTime, DataFileSeriserbase mFileWriter,long mPreDataRegion) { byte[] bval; int totalLen; int datalen; //更新上个DataRegion 的Next DataRegion Pointer 指针 if (mPreDataRegion >= 0) { mFileWriter.Write(mCurrentDataRegion, mPreDataRegion + 8); } bval = GeneratorDataRegionHeader(mCurrentTime,out totalLen, out datalen); mFileWriter.Append(bval, 0, datalen); mFileWriter.AppendZore(totalLen - datalen); mFileWriter.Write(mFileWriter.ReadInt(16) + 1, 16); //LoggerService.Service.Debug("SeriseEnginer2", "AppendDataRegionHeader_数据区数量更新:" + mFileWriter.ReadInt(16)); mPreDataRegion = mCurrentDataRegion; mBlockPointOffset = mCurrentDataRegion + mBlockPointOffset; ArrayPool.Shared.Return(bval); } /// /// /// /// private void NewDataRegionHeader(DateTime mCurrentTime, DataFileSeriserbase mFileWriter) { byte[] bval; int totalLen; int datalen; bval = GeneratorDataRegionHeader(mCurrentTime,out totalLen, out datalen); mFileWriter.Append(bval, 0, datalen); mFileWriter.AppendZore(totalLen - datalen); mFileWriter.Write(mFileWriter.ReadInt(16) + 1, 16); //LoggerService.Service.Debug("SeriseEnginer2", "NewDataRegionHeader_数据区数量更新:" + mFileWriter.ReadInt(16)); ArrayPool.Shared.Return(bval); } /// /// 生成区域头部 /// 偏移位置 /// private byte[] GeneratorDataRegionHeader(DateTime mCurrentTime, out int totallenght, out int datalen) { Stopwatch sw = new Stopwatch(); sw.Start(); //文件头部结构:Pre DataRegion(8) + Next DataRegion(8) + Datatime(8)+tagcount(4)+ tagid sum(8) +file duration(4)+ block duration(4)+Time tick duration(4)+ { len + [tag id]}+ [data blockpoint(8)] int blockcount = FileDuration * 60 / BlockDuration; int len = GetDataRegionHeaderLength() + 4 + mTagIdMemoryCach.Position + mTagCount * (blockcount * 8); totallenght = len; byte[] bvals = ArrayPool.Shared.Rent(52 + mTagIdMemoryCach.Position); using (System.IO.MemoryStream mHeadMemory = new System.IO.MemoryStream(bvals)) { mHeadMemory.Position = 0; mHeadMemory.Write(BitConverter.GetBytes((long)mPreDataRegion));//更新Pre DataRegion 指针 mHeadMemory.Write(BitConverter.GetBytes((long)0)); //更新Next DataRegion 指针 mHeadMemory.Write(MemoryHelper.GetBytes(mCurrentTime)); //写入时间 mHeadMemory.Write(BitConverter.GetBytes(mTagCount)); //写入变量个数 mHeadMemory.Write(BitConverter.GetBytes(mTagIdSum)); //写入Id 校验和 mHeadMemory.Write(BitConverter.GetBytes(FileDuration)); //写入文件持续时间 mHeadMemory.Write(BitConverter.GetBytes(BlockDuration)); //写入数据块持续时间 mHeadMemory.Write(BitConverter.GetBytes(HisEnginer.MemoryTimeTick)); //写入时间间隔 //写入变量编号列表 mHeadMemory.Write(BitConverter.GetBytes(mTagIdMemoryCach.Position));// mHeadMemory.Write(mTagIdMemoryCach.Position);//写入压缩后的数组的长度 mHeadMemory.Write(mTagIdMemoryCach.Buffer, 0, mTagIdMemoryCach.Position);//写入压缩数据 mDataRegionHeadSize = mBlockPointOffset = mHeadMemory.Position; datalen = (int)mHeadMemory.Position; sw.Stop(); LoggerService.Service.Info("SeriseFileItem" + Id, "GeneratorDataRegionHeader " + sw.ElapsedMilliseconds); return bvals; //return mHeadMemory.ToArray(); } } /// /// /// /// /// private string GetDataPath(DateTime time) { return System.IO.Path.Combine(SeriseEnginer2.HisDataPath, GetFileName(time)); } /// /// /// /// /// private string GetFileName(DateTime time) { return DatabaseName + Id.ToString("D3") + time.ToString("yyyyMMdd") + FileDuration.ToString("D2") + (time.Hour / FileDuration).ToString("D2") + DataFileExtends; } /// /// 搜索最后一个数据区域 /// /// private long SearchLastDataRegion() { long offset = FileHeadSize; while (true) { var nextaddr = mFileWriter.ReadLong(offset + 8); if (nextaddr <= 0) { break; } else { offset = nextaddr; } } mFileWriter.GoToEnd(); mCurrentDataRegion = mFileWriter.CurrentPostion; return offset; } /// /// /// /// /// private Dictionary GetDataRegions(DataFileSeriserbase writer) { Dictionary dd = new Dictionary(); long offset = FileHeadSize; while (true) { var nextaddr = writer.ReadLong(offset + 8); DateTime dt = DateTime.FromBinary(writer.ReadLong(offset + 16)); dd.Add(dt, offset); if (nextaddr <= 0) { break; } else { offset = nextaddr; } } return dd; } /// /// /// /// /// private long GetDataRegionHeadPoint(int id, DateTime time, out DataFileSeriserbase mFileReader) { string sfile = GetFileName(time); // DataFileSeriserbase reader = mFileWriter2; DataFileSeriserbase dfs; if (time > mCurrentTime) { //如果需要新建的文件,影响到自动记录存储要用到的文件, //则转到自动记录存储逻辑进行处理 CheckFile(time); dfs = this.mFileWriter; //mCurrentTime = time; } else { if (sfile == mCurrentFileName) { dfs = this.mFileWriter; } else { dfs = mFileWriter2; if (mFileWriter2.CreatOrOpenFile(sfile)) { var date = new DateTime(time.Year, time.Month, time.Day, ((time.Hour / FileDuration) * FileDuration), 0, 0); //新建文件 AppendFileHeader(time, this.DatabaseName, mFileWriter2); NewDataRegionHeader(date, mFileWriter2); } else { //如果文件格式不正确,则新建 if (mFileWriter2.Length < FileHeadSize) { var date = new DateTime(time.Year, time.Month, time.Day, ((time.Hour / FileDuration) * FileDuration), 0, 0); //新建文件 AppendFileHeader(time, this.DatabaseName, mFileWriter2); NewDataRegionHeader(date, mFileWriter2); } } } } mFileReader = dfs; long ltmp = 0; //计算本次更新对应的指针区域的起始地址 var fsh = (time.Hour / FileDuration) * FileDuration; int bid = ((time.Hour - fsh) * 60 + time.Minute) / BlockDuration; var icount = mTagIdsCach.IndexOf(id); ltmp = SearchDataRegionToDatetime(dfs, time); if (ltmp < 0) { LoggerService.Service.Warn("SeriseEnginer2", "不允许修改本次运行之前时间的历史记录!"); return -1; } ltmp += mDataRegionHeadSize + mTagCount * 8 * bid + icount * 8; //LoggerService.Service.Debug("SeriseEnginer", "DataRegion Pointer:"+ ltmp + ",mDataRegionHeadSize:" + mDataRegionHeadSize + ",BlockIndex:" + bid + " tag index:" + icount); return ltmp; } private Dictionary GetDataRegionHeadPoint(string sfile, SortedDictionary ids, DateTime time, out DataFileSeriserbase mFileReader) { Dictionary re = new Dictionary(); DataFileSeriserbase dfs; if (time > mCurrentTime) { //如果需要新建的文件,影响到自动记录存储要用到的文件, //则转到自动记录存储逻辑进行处理 CheckFile(time); dfs = this.mFileWriter; //mCurrentTime = time; } else { if (sfile == mCurrentFileName) { dfs = this.mFileWriter; } else { dfs = mFileWriter2; if (mFileWriter2.CreatOrOpenFile(sfile)) { var date = new DateTime(time.Year, time.Month, time.Day, ((time.Hour / FileDuration) * FileDuration), 0, 0); //新建文件 AppendFileHeader(time, this.DatabaseName, mFileWriter2); NewDataRegionHeader(date, mFileWriter2); } else { //如果文件格式不正确,则新建 if (mFileWriter2.Length < FileHeadSize) { var date = new DateTime(time.Year, time.Month, time.Day, ((time.Hour / FileDuration) * FileDuration), 0, 0); //新建文件 AppendFileHeader(time, this.DatabaseName, mFileWriter2); NewDataRegionHeader(date, mFileWriter2); } } } } mFileReader = dfs; DateTime mLastRegionStartTime = DateTime.MaxValue; DateTime mLastRegionEndTime = DateTime.MaxValue; long regionOffset = 0; foreach (var vv in ids) { long ltmp = 0; //计算本次更新对应的指针区域的起始地址 var fsh = (vv.Value.Hour / FileDuration) * FileDuration; int bid = ((vv.Value.Hour - fsh) * 60 + time.Minute) / BlockDuration; if (mLastRegionStartTime == DateTime.MaxValue || vv.Value < mLastRegionStartTime || vv.Value > mLastRegionEndTime) { regionOffset = SearchDataRegionToDatetime(dfs, vv.Value,out mLastRegionStartTime,out mLastRegionEndTime); ltmp = regionOffset; } else { ltmp = regionOffset; } if (ltmp < 0) { LoggerService.Service.Warn("SeriseEnginer2", "不允许修改本次运行之前时间的历史记录!"); return re; } var icount = mTagIdsCach.IndexOf(vv.Key); ltmp += mDataRegionHeadSize + mTagCount * 8 * bid + icount * 8; re.Add(vv.Key,ltmp); } //LoggerService.Service.Debug("SeriseEnginer", "DataRegion Pointer:" + ltmp + ",mDataRegionHeadSize:" + mDataRegionHeadSize + ",BlockIndex:" + bid + " tag index:" + icount); return re; } /// /// /// /// /// private long SearchDataRegionToDatetime(DataFileSeriserbase mFileWriter, DateTime time,out DateTime startTime,out DateTime endTime) { long preoffset = -1, offset = FileHeadSize; DateTime tm; DateTime mStartTime=time.Date, mEndTime; while (true) { tm = mFileWriter.ReadDateTime(offset + 16); if (tm > time) { mEndTime = tm; break; } var nextaddr = mFileWriter.ReadLong(offset + 8); if (nextaddr <= 0) { mStartTime = tm; mEndTime = mStartTime.AddDays(1); preoffset = offset; break; } else { mStartTime = tm; preoffset = offset; offset = nextaddr; } } startTime = mStartTime; endTime = mEndTime; return preoffset; } /// /// /// /// /// /// private long SearchDataRegionToDatetime(DataFileSeriserbase mFileWriter, DateTime time) { long preoffset = -1, offset = FileHeadSize; DateTime tm; while (true) { tm = mFileWriter.ReadDateTime(offset + 16); if (tm > time) { break; } var nextaddr = mFileWriter.ReadLong(offset + 8); if (nextaddr <= 0) { preoffset = offset; break; } else { preoffset = offset; offset = nextaddr; } } return preoffset; } /// /// /// /// /// public void AppendManualSeriseFile(int id, IMemoryBlock datablock) { DateTime time = datablock.ReadDateTime(4); string sfile = GetFileName(time); lock (mManualHisDataCach) { if (mManualHisDataCach.ContainsKey(sfile)) { if (!mManualHisDataCach[sfile].ContainsKey(id)) mManualHisDataCach[sfile].Add(id, datablock); else { LoggerService.Service.Warn("SeriseEnginer", "数据存储出现阻塞:"+id); } } else { Dictionary blocks = new Dictionary(); blocks.Add(id, datablock); mManualHisDataCach.Add(sfile, blocks); } } } /// /// /// public void FreshManualDataToDisk() { string oldFile = string.Empty; DataFileSeriserbase mwriter; Dictionary mHeadAddress = new Dictionary(); Dictionary mHeadValue = new Dictionary(); while(mManualHisDataCach.Count>0) { var vv = mManualHisDataCach.First(); lock(mManualHisDataCach) { mManualHisDataCach.Remove(vv.Key); } Stopwatch sw = new Stopwatch(); sw.Start(); LoggerService.Service.Info("SeriseEnginer", "SeriseFileItem" + this.Id + " 开始执行存储,数据块:" + vv.Value.Count, ConsoleColor.Cyan); SortedDictionary times = new SortedDictionary(); DateTime maxTime = DateTime.MinValue; DateTime mLastModifyTime = DateTime.MinValue; foreach(var vvv in vv.Value) { DateTime time = vvv.Value.ReadDateTime(4); DateTime endTime = vvv.Value.ReadDateTime(12); mTagCount = vvv.Value.ReadInt(24);//变量个数 times.Add(vvv.Key, time); maxTime = time > maxTime ? time : maxTime; mLastModifyTime = endTime > mLastModifyTime ? endTime : mLastModifyTime; } mHeadAddress = GetDataRegionHeadPoint(vv.Key, times, maxTime, out mwriter); long ltmp = sw.ElapsedMilliseconds; mHeadValue.Clear(); mwriter.GoToEnd(); var vpointer = mwriter.CurrentPostion; //写入数据,同时获取数据块地址 foreach (var vvv in vv.Value) { int size = vvv.Value.ReadInt(20); mHeadValue.Add(vvv.Key, vpointer); vvv.Value.WriteToStream(mwriter.GetStream(), 28, size - 28);//直接拷贝数据块 vpointer += (size - 28); } long ltmp2 = sw.ElapsedMilliseconds; //更新数据块指针 foreach (var hd in mHeadAddress) { mwriter.Write(mHeadValue[hd.Key], hd.Value); } long ltmp3 = sw.ElapsedMilliseconds; //更新文件的最后修改时间 var vtmp = mwriter.ReadDateTime(8); if (mLastModifyTime > vtmp) { mwriter.Write(mLastModifyTime, 8); } mwriter.Flush(); if (mwriter != mFileWriter) mwriter.Close(); foreach (var vvv in vv.Value) { (vvv.Value as MarshalMemoryBlock).MakeMemoryNoBusy(); MarshalMemoryBlockPool.Pool.Release(vvv.Value as MarshalMemoryBlock); } foreach (var tm in times) { var vtime = FormateTime(tm.Value); //如果时间大于上次自动存储的时间,则需要将地址指针记录下来,等到下次自动存储内容更新时,将当前更新的数据的指针区同步过去 //仿制被覆盖过去 lock (mPointerCach) { if (tm.Value > mCurrentTime) { if (mPointerCach.ContainsKey(vtime)) { var dd = mPointerCach[vtime]; if (dd.ContainsKey(tm.Key)) { dd[tm.Key] = mHeadValue[tm.Key]; } else { dd.Add(tm.Key, mHeadValue[tm.Key]); } } else { Dictionary dtmp = new Dictionary(); dtmp.Add(tm.Key, mHeadValue[tm.Key]); mPointerCach.Add(vtime, dtmp); } } } } //LoggerService.Service.Info("SeriseEnginer", "SeriseFileItem" + this.Id + " 完成存储,数据块:" + vv.Value.Count + " ReadHeadPoint:" + ltmp + " WriteData:" + (ltmp2 - ltmp) + " UpdateHead:" + (ltmp3 - ltmp2), ConsoleColor.Cyan); } } /// /// 通过手动更新的方式,提交历史记录 /// /// /// /// /// /// public void ManualRequestToSeriseFile(int id,MarshalMemoryBlock datablock ) { lock (mFileLocker) { DataFileSeriserbase mwriter; DateTime time = datablock.ReadDateTime(4); DateTime endTime = datablock.ReadDateTime(12); int size = datablock.ReadInt(20); mTagCount = datablock.ReadInt(24);//变量个数 var heads = GetDataRegionHeadPoint(id, time, out mwriter); if (heads < 0) return; //如果更新时间,大于最后更新时间,则更新最后更新时间 var vtmp = mwriter.ReadDateTime(8); if (endTime > vtmp) { mwriter.Write(endTime, 8); } mwriter.GoToEnd(); var vpointer = mwriter.CurrentPostion; datablock.WriteToStream(mFileWriter.GetStream(), 28, size-28);//直接拷贝数据块 mFileWriter.Write(vpointer, heads); LoggerService.Service.Debug("SeriseEnginer", "单数据块更新 变量:"+ id +" 头指针:"+heads+" 数据区指针:"+vpointer, ConsoleColor.Cyan); mwriter.Flush(); if (mwriter != mFileWriter) mwriter.Close(); datablock.MakeMemoryNoBusy(); MarshalMemoryBlockPool.Pool.Release(datablock); var vtime = FormateTime(time); //如果时间大于上次自动存储的时间,则需要将地址指针记录下来,等到下次自动存储内容更新时,将当前更新的数据的指针区同步过去 //仿制被覆盖过去 lock (mPointerCach) { if (time > mCurrentTime) { if (mPointerCach.ContainsKey(vtime)) { var dd = mPointerCach[vtime]; if (dd.ContainsKey(id)) { dd[id] = vpointer; } else { dd.Add(id, vpointer); } } else { Dictionary dtmp = new Dictionary(); dtmp.Add(id, vpointer); mPointerCach.Add(vtime, dtmp); } } } } } /// /// /// /// /// /// public bool CheckInSameFile(DateTime time1) { return GetFileName(time1) == mCurrentFileName; } /// /// 计算变量Id集合所占的大小 /// /// private int CalTagIdsSize() { if (mTagIdMemoryCach != null) mTagIdMemoryCach.Dispose(); mTagIdsCach = ServiceLocator.Locator.Resolve().ListAllTags().Where(e => e.Id >= Id * TagCountOneFile && e.Id < (Id + 1) * TagCountOneFile).OrderBy(e => e.Id).Select(e => e.Id).ToList(); mTagIdSum = 0; mTagIdMemoryCach = new VarintCodeMemory((int)(mTagIdsCach.Count * 4 * 1.2)); if (mTagIdsCach.Count > 0) { int preids = mTagIdsCach[0]; mTagIdSum += preids; mTagIdMemoryCach.WriteInt32(preids); for (int i = 1; i < mTagIdsCach.Count; i++) { var id = mTagIdsCach[i]; mTagIdMemoryCach.WriteInt32(id - preids); mTagIdSum += id; preids = id; } } return mTagIdMemoryCach.Position + 4; } /// /// /// public void Init() { var vv = ServiceLocator.Locator.Resolve(); var tags = vv.ListAllTags().Where(e => e.Id >= Id * TagCountOneFile && e.Id < (Id + 1) * TagCountOneFile).OrderBy(e => e.Id); if (mBlockPointMemory != null) mBlockPointMemory.Dispose(); mBlockPointMemory = new MemoryBlock(tags.Count() * 8, 4 * 1024 * 1024); mBlockPointMemory.Clear(); LoggerService.Service.Info("SeriseEnginer", "Cal BlockPointMemory memory size:" + (mBlockPointMemory.AllocSize) / 1024.0 / 1024 + "M", ConsoleColor.Cyan); CalTagIdsSize(); } /// /// 检查文件是否存在 /// /// private bool CheckFile(DateTime time) { if (!CheckInSameFile(time)) { if (mFileWriter != null) { mFileWriter.Flush(); mFileWriter.Close(); } string sfile = GetDataPath(time); if (mFileWriter.CreatOrOpenFile(sfile)) { var date = new DateTime(time.Year, time.Month, time.Day, ((time.Hour / FileDuration) * FileDuration), 0, 0); AppendFileHeader(time, this.DatabaseName, mFileWriter); //新建文件 mCurrentDataRegion = FileHeadSize; mPreDataRegion = -1; AppendDataRegionHeader(date,mFileWriter, -1); } else { if (mFileWriter.Length < FileHeadSize) { var date = new DateTime(time.Year, time.Month, time.Day, ((time.Hour / FileDuration) * FileDuration), 0, 0); AppendFileHeader(time, this.DatabaseName, mFileWriter); //新建文件 mCurrentDataRegion = FileHeadSize; mPreDataRegion = -1; AppendDataRegionHeader(date, mFileWriter, -1); } else { //打开已有文件 mPreDataRegion = SearchLastDataRegion(); AppendDataRegionHeader(mCurrentTime,mFileWriter, mPreDataRegion); } } if(mNeedRecordDataHeader) mNeedRecordDataHeader = false; mCurrentFileName = GetFileName(time); } else { if (mNeedRecordDataHeader) { mPreDataRegion = SearchLastDataRegion(); AppendDataRegionHeader(mCurrentTime,mFileWriter, mPreDataRegion); mNeedRecordDataHeader = false; } } return true; } /// /// /// /// /// public void SaveToFile(MarshalMemoryBlock mProcessMemory, DateTime time,DateTime endTime) { SaveToFile(mProcessMemory, 0, time,endTime); } /// /// /// /// /// private DateTime FormateTime(DateTime time) { return new DateTime(time.Year, time.Month, time.Day, time.Hour, time.Minute, 0); } /// /// 执行存储到磁盘 /// public void SaveToFile(MarshalMemoryBlock mProcessMemory, long dataOffset, DateTime time,DateTime endTime) { /* 1. 检查变量ID是否变动,如果变动则重新记录变量的ID列表 2. 拷贝数据块 3. 更新数据块指针 */ //LoggerService.Service.Info("SeriseFileItem" + Id, "*********开始执行存储**********"); try { lock (mFileLocker) { Stopwatch sw = new Stopwatch(); sw.Start(); //数据大小 var datasize = mProcessMemory.ReadInt(dataOffset); var count = mProcessMemory.ReadInt(dataOffset + 4);//变量个数 mTagCount = count; mCurrentTime = time; var ltmp = sw.ElapsedMilliseconds; //打开文件 if (!CheckFile(time)) return; //更新最后写入时间 var vtmp = mFileWriter.ReadDateTime(8); if (endTime > vtmp) { mFileWriter.Write(endTime, 8); } if (datasize == 0) { Flush(); sw.Stop(); return; } var ltmp2 = sw.ElapsedMilliseconds; long offset = 8 + dataOffset; long start = count * 8 + offset;//计算出数据起始地址 var dataAddr = this.mFileWriter.GoToEnd().CurrentPostion; mBlockPointMemory.CheckAndResize(mTagCount * 8); mBlockPointMemory.Clear(); var vtime = FormateTime(time); Dictionary timecach; lock (mPointerCach) { if (mPointerCach.ContainsKey(time)) { timecach = mPointerCach[time]; } else { timecach = new Dictionary(); } } //更新数据块指针 for (int i = 0; i < count; i++) { var id = mProcessMemory.ReadInt(offset); //计算新的偏移量 var addr = mProcessMemory.ReadInt(offset + 4) - start + dataAddr; offset += 8; if (id > -1) { //如果之前通过,手动记录已经更新了,则需要同步指针 if (timecach.ContainsKey(id)) { mBlockPointMemory.WriteLong(i * 8, timecach[id]); } else { mBlockPointMemory.WriteLong(i * 8, addr); } } } lock (mPointerCach) { foreach (var vv in mPointerCach.Keys.ToArray()) { if (vv <= mCurrentTime) { mPointerCach.Remove(vv); } } } //计算本次更新对应的指针区域的起始地址 FileStartHour = (time.Hour / FileDuration) * FileDuration; int bid = ((time.Hour - FileStartHour) * 60 + time.Minute) / BlockDuration; //计算出本次更新的头地址地址 var pointAddr = mBlockPointOffset + count * 8 * bid; var ltmp3 = sw.ElapsedMilliseconds; mFileWriter.GoToEnd(); //long lpp = mFileWriter.CurrentPostion; mProcessMemory.WriteToStream(mFileWriter.GetStream(), start, datasize);//直接拷贝数据块 //写入指针头部区域 mFileWriter.Write(mBlockPointMemory.Buffers, pointAddr, 0, (int)mBlockPointMemory.AllocSize); Flush(); sw.Stop(); LoggerService.Service.Info("SeriseFileItem" + Id, "写入数据 " + mCurrentFileName + " 数据大小:" + ((datasize) + mBlockPointMemory.AllocSize) / 1024.0 / 1024 + " m" + "其他脚本耗时:" + ltmp + "," + (ltmp2 - ltmp) + "," + (ltmp3 - ltmp2) + "存储耗时:" + (sw.ElapsedMilliseconds - ltmp3)); } } catch (System.IO.IOException ex) { LoggerService.Service.Erro("SeriseEnginer" + Id, ex.Message); } } /// /// /// public void Flush() { mFileWriter.Flush(); mFileWriter.CloseAndReOpen(); } /// /// /// public void Dispose() { mIdAddrs.Clear(); mFileWriter.Dispose(); mFileWriter = null; } #endregion ...Methods... #region ... Interfaces ... #endregion ...Interfaces... } }