提交 74dbeca8 编写于 作者: cdy816's avatar cdy816

结构调整

上级 6b9b87c7
......@@ -52,6 +52,13 @@ namespace Cdy.Tag
Console.WriteLine(string.Format(debugFormate, DateTime.Now.ToString(), name, msg));
}
public void Debug(string name, string msg, object parameter)
{
Console.ForegroundColor = (ConsoleColor)(parameter);
Console.WriteLine(string.Format(debugFormate, DateTime.Now.ToString(), name, msg));
Console.ResetColor();
}
/// <summary>
///
/// </summary>
......
......@@ -29,9 +29,22 @@ namespace Cdy.Tag
/// </summary>
/// <param name="name"></param>
/// <param name="msg"></param>
void Info(string name, string msg);
/// <param name="parameter"></param>
void Debug(string name, string msg, object parameter);
/// <summary>
///
/// </summary>
/// <param name="name"></param>
/// <param name="msg"></param>
void Info(string name, string msg);
/// <summary>
///
/// </summary>
/// <param name="name"></param>
/// <param name="msg"></param>
/// <param name="parameter"></param>
void Info(string name, string msg, object parameter);
......
......@@ -79,6 +79,16 @@ namespace Cdy.Tag
}
}
public void Debug(string name, string msg, object parameter)
{
lock (mLockObj)
{
if (EnableLogger)
mLogger?.Debug(name, msg,parameter);
}
}
/// <summary>
///
/// </summary>
......
......@@ -10,7 +10,9 @@ using DBRuntime.His;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.MemoryMappedFiles;
using System.Linq;
using System.Net.Http.Headers;
using System.Text;
using System.Threading;
......@@ -28,17 +30,16 @@ namespace Cdy.Tag
/// </summary>
private ManualResetEvent resetEvent;
private ManualResetEvent mManualEvent;
private ManualResetEvent closedEvent;
private Thread mCompressThread;
private Thread mManualCompressThread;
private bool mIsClosed = false;
private HisDataMemoryBlockCollection mSourceMemory;
/// <summary>
///
/// </summary>
private Queue<HisDataMemoryBlockCollection> mSourceMemorys = new Queue<HisDataMemoryBlockCollection>();
/// <summary>
......@@ -155,19 +156,11 @@ namespace Cdy.Tag
{
LoggerService.Service.Info("CompressEnginer", "开始启动");
mIsClosed = false;
//Init();
resetEvent = new ManualResetEvent(false);
closedEvent = new ManualResetEvent(false);
mManualEvent = new ManualResetEvent(false);
mCompressThread = new Thread(ThreadPro);
mCompressThread.IsBackground = true;
mCompressThread.Start();
mManualCompressThread = new Thread(ManualThreadPro);
mManualCompressThread.IsBackground = true;
mManualCompressThread.Start();
}
/// <summary>
......@@ -179,7 +172,6 @@ namespace Cdy.Tag
mIsClosed = true;
resetEvent.Set();
mManualEvent.Set();
closedEvent.WaitOne();
resetEvent.Dispose();
......@@ -191,8 +183,6 @@ namespace Cdy.Tag
vv.Value.DecRef();
}
while (mCompressThread.IsAlive) Thread.Sleep(1);
while (mManualCompressThread.IsAlive) Thread.Sleep(1);
}
/// <summary>
......@@ -201,17 +191,42 @@ namespace Cdy.Tag
/// <param name="dataMemory"></param>
public void RequestToCompress(HisDataMemoryBlockCollection dataMemory)
{
mSourceMemory = dataMemory;
mCurrentTime = mSourceMemory.CurrentDatetime;
lock(mSourceMemorys)
mSourceMemorys.Enqueue(dataMemory);
mCurrentTime = dataMemory.CurrentDatetime;
foreach(var vv in mTargetMemorys)
{
vv.Value.CurrentTime = mCurrentTime;
vv.Value.EndTime = dataMemory.EndDateTime;
}
resetEvent.Set();
lock (resetEvent)
resetEvent.Set();
}
/// <summary>
///
/// </summary>
/// <param name="data"></param>
public void RequestManualToCompress(ManualHisDataMemoryBlock data)
{
foreach (var vv in mTargetMemorys)
{
if (data.Id >= vv.Value.Id * TagCountOneFile && data.Id < (vv.Value.Id + 1) * TagCountOneFile)
{
vv.Value.AddManualToCompress(data);
}
}
}
/// <summary>
///
/// </summary>
public void SubmitManualToCompress()
{
lock (resetEvent)
resetEvent.Set();
}
/// <summary>
///
......@@ -254,32 +269,55 @@ namespace Cdy.Tag
try
{
resetEvent.WaitOne();
resetEvent.Reset();
if (mIsClosed)
break;
lock (resetEvent)
{
resetEvent.Reset();
}
//#if DEBUG
Stopwatch sw = new Stopwatch();
sw.Start();
LoggerService.Service.Info("Compress", "********开始执行压缩********", ConsoleColor.Blue);
//#endif
var sm = mSourceMemory;
while (CheckIsBusy())
if (mSourceMemorys.Count > 0)
{
LoggerService.Service.Warn("Compress", "压缩出现阻塞");
Thread.Sleep(500);
while (mSourceMemorys.Count > 0)
{
HisDataMemoryBlockCollection sm;
lock (mSourceMemorys)
sm = mSourceMemorys.Dequeue();
while (CheckIsBusy())
{
LoggerService.Service.Warn("Compress", "压缩出现阻塞");
Thread.Sleep(500);
}
System.Threading.Tasks.Parallel.ForEach(mTargetMemorys, (mm) =>
{
ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
mm.Value.Compress(sm);
});
sm.Clear();
sm.MakeMemoryNoBusy();
System.Threading.Tasks.Parallel.ForEach(mTargetMemorys.Where(e => e.Value.HasManualCompressItems), (mm) =>
{
ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
mm.Value.ManualCompress();
});
}
}
System.Threading.Tasks.Parallel.ForEach(mTargetMemorys, (mm) =>
else
{
ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
mm.Value.Compress(sm);
});
sm.Clear();
// ServiceLocator.Locator.Resolve<IHisEngine2>().ClearMemoryHisData(sm);
sm.MakeMemoryNoBusy();
System.Threading.Tasks.Parallel.ForEach(mTargetMemorys.Where(e => e.Value.HasManualCompressItems), (mm) =>
{
ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
mm.Value.ManualCompress();
});
}
ServiceLocator.Locator.Resolve<IDataSerialize2>().RequestToSave();
......@@ -287,12 +325,15 @@ namespace Cdy.Tag
sw.Stop();
LoggerService.Service.Info("Compress", ">>>>>>>>>压缩完成>>>>>>>>>" + " ElapsedMilliseconds:" + sw.ElapsedMilliseconds, ConsoleColor.Blue);
//#endif
}
catch(Exception ex)
{
LoggerService.Service.Erro("Compress", ex.Message+ex.StackTrace);
}
if (mIsClosed)
break;
}
closedEvent.Set();
......@@ -312,64 +353,43 @@ namespace Cdy.Tag
vv.Value.Dispose();
}
mTargetMemorys.Clear();
mSourceMemory = null;
mHisTagService = null;
}
/// <summary>
///
/// </summary>
/// <param name="data"></param>
public void RequestManualToCompress(ManualHisDataMemoryBlock data)
{
foreach (var vv in mTargetMemorys)
{
if (data.Id >= vv.Value.Id * TagCountOneFile && data.Id < (vv.Value.Id + 1) * TagCountOneFile)
{
vv.Value.AddRequestManualToCompress(data);
}
}
}
/// <summary>
///
/// </summary>
private void ManualThreadPro()
{
ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
while (!mIsClosed)
{
mManualEvent.WaitOne();
mManualEvent.Reset();
if (mIsClosed)
break;
System.Threading.Tasks.Parallel.ForEach(mTargetMemorys.Values, (vv) =>
{
ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
vv.MakeMemoryBusy();
vv.RequestManualToCompress();
vv.MakeMemoryNoBusy();
});
//foreach (var vv in mTargetMemorys.Values)
//{
// vv.MakeMemoryBusy();
// vv.RequestManualToCompress();
// vv.MakeMemoryNoBusy();
//}
///// <summary>
/////
///// </summary>
//private void ManualThreadPro()
//{
// ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
// while (!mIsClosed)
// {
// mManualEvent.WaitOne();
// mManualEvent.Reset();
// if (mIsClosed)
// break;
// System.Threading.Tasks.Parallel.ForEach(mTargetMemorys.Values, (vv) =>
// {
// ThreadHelper.AssignToCPU(CPUAssignHelper.Helper.CPUArray2);
// vv.MakeMemoryBusy();
// vv.ManualCompress();
// vv.MakeMemoryNoBusy();
// });
// //foreach (var vv in mTargetMemorys.Values)
// //{
// // vv.MakeMemoryBusy();
// // vv.RequestManualToCompress();
// // vv.MakeMemoryNoBusy();
// //}
// }
//}
}
}
/// <summary>
///
/// </summary>
public void SubmitManualToCompress()
{
mManualEvent.Set();
}
#endregion ...Methods...
......
......@@ -36,7 +36,12 @@ namespace Cdy.Tag
private bool mIsRunning=false;
private Queue<ManualHisDataMemoryBlock> mMemoryCach = new Queue<ManualHisDataMemoryBlock>();
/// <summary>
///
/// </summary>
private object mLockObj = new object();
#endregion ...Variables...
#region ... Events ...
......@@ -131,6 +136,17 @@ namespace Cdy.Tag
/// </summary>
public DateTime EndTime { get; set; }
/// <summary>
///
/// </summary>
public bool HasManualCompressItems
{
get
{
return mMemoryCach.Count>0;
}
}
#endregion ...Properties...
......@@ -139,27 +155,30 @@ namespace Cdy.Tag
/// <summary>
///
/// </summary>
public void RequestManualToCompress()
public void ManualCompress()
{
mIsRunning = true;
while (mMemoryCach.Count > 0)
lock (mLockObj)
{
ManualHisDataMemoryBlock vpp;
lock (mMemoryCach)
mIsRunning = true;
while (mMemoryCach.Count > 0)
{
vpp = mMemoryCach.Dequeue();
ManualHisDataMemoryBlock vpp;
lock (mMemoryCach)
{
vpp = mMemoryCach.Dequeue();
}
Compress(vpp);
Thread.Sleep(1);
}
RequestManualToCompress(vpp);
mIsRunning = false;
}
mIsRunning = false;
}
/// <summary>
///
/// </summary>
/// <param name="data"></param>
public void AddRequestManualToCompress(ManualHisDataMemoryBlock data)
public void AddManualToCompress(ManualHisDataMemoryBlock data)
{
lock (mMemoryCach)
mMemoryCach.Enqueue(data);
......@@ -169,12 +188,11 @@ namespace Cdy.Tag
///
/// </summary>
/// <param name="data"></param>
private void RequestManualToCompress(ManualHisDataMemoryBlock data)
private void Compress(ManualHisDataMemoryBlock data)
{
int datasize = 0;
var cdata = CompressMemory(data, out datasize);
var cdata = CompressBlockMemory(data);
cdata.MakeMemoryBusy();
ServiceLocator.Locator.Resolve<IDataSerialize2>().ManualRequestToSeriseFile(data.Id, data.Time,data.EndTime, cdata, datasize);
ServiceLocator.Locator.Resolve<IDataSerialize2>().ManualRequestToSeriseFile(cdata);
data.MakeMemoryNoBusy();
ManualHisDataMemoryBlockPool.Pool.Release(data);
}
......@@ -235,7 +253,7 @@ namespace Cdy.Tag
/// </summary>
public void Compress(HisDataMemoryBlockCollection source)
{
lock (mMemoryCach)
lock (mLockObj)
{
/*
内存结构:Head+数据指针区域+数据区
......@@ -358,20 +376,22 @@ namespace Cdy.Tag
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
private MarshalMemoryBlock CompressMemory(ManualHisDataMemoryBlock data, out int datasize)
private MarshalMemoryBlock CompressBlockMemory(ManualHisDataMemoryBlock data)
{
MarshalMemoryBlock block = MarshalMemoryBlockPool.Pool.Get(data.Length * 2 + 4);
MarshalMemoryBlock block = MarshalMemoryBlockPool.Pool.Get(data.Length * 2 + 28 + 5);
var histag = mHisTagService.GetHisTag(data.Id);
if (histag == null)
{
datasize = 0;
return null;
}
int datasize = 0;
var targetPosition = 4;
var targetPosition = 28;
//写入变量的个数
block.WriteInt(0, mTagIds.Count);
block.WriteDatetime(4, data.Time); //时间
block.WriteDatetime(12, data.EndTime); //结束时间
//block.WriteInt(20, 0); //写入数据大小
block.WriteInt(24, mTagIds.Count);//写入变量的个数
var qulityoffset = data.QualityAddress;
var comtype = histag.CompressType;//压缩类型
......@@ -390,10 +410,7 @@ namespace Cdy.Tag
var size = tp.Compress(data, 0, block, targetPosition + 5, data.Length) + 1;
block.WriteInt(targetPosition, (int)size);
datasize = (int)(targetPosition + size + 5);
}
else
{
datasize = 0;
block.WriteInt(20, datasize); //写入数据大小
}
return block;
}
......
......@@ -1225,6 +1225,7 @@ namespace Cdy.Tag
{
ServiceLocator.Locator.Resolve<IDataCompress2>().RequestManualToCompress(vvv.Value);
}
ServiceLocator.Locator.Resolve<IDataCompress2>().SubmitManualToCompress();
}
}
......@@ -1347,6 +1348,8 @@ namespace Cdy.Tag
/// <returns></returns>
private bool ManualRecordHisValues(long id, IEnumerable<Cdy.Tag.TagValue> values, int timeUnit = 100)
{
if (mIsClosed) return false;
int valueOffset, qulityOffset = 0;
DateTime mLastTime = DateTime.MinValue;
......@@ -1524,6 +1527,8 @@ namespace Cdy.Tag
/// <returns></returns>
private bool ManualRecordHisValues(long id, Cdy.Tag.TagValue value, int timeUnit = 100)
{
if (mIsClosed) return false;
int valueOffset, qulityOffset = 0;
DateTime mLastTime = DateTime.MinValue;
......
......@@ -16,6 +16,7 @@ using System.Diagnostics;
using System.Buffers;
using DotNetty.Common;
using DBRuntime.His;
using System.Collections;
/*
* ****文件结构****
......@@ -59,6 +60,11 @@ namespace Cdy.Tag
private Dictionary<int, CompressMemory2> mWaitForProcessMemory = new Dictionary<int, CompressMemory2>();
/// <summary>
///
/// </summary>
private Queue<MarshalMemoryBlock> mCachSeriseMemoryBlock = new Queue<MarshalMemoryBlock>();
/// <summary>
///
/// </summary>
......@@ -217,7 +223,6 @@ namespace Cdy.Tag
{
vv.Value.Reset();
}
}
/// <summary>
......@@ -246,7 +251,8 @@ namespace Cdy.Tag
/// </summary>
public void RequestToSave()
{
resetEvent.Set();
lock (resetEvent)
resetEvent.Set();
}
/// <summary>
......@@ -259,10 +265,27 @@ namespace Cdy.Tag
while (!mIsClosed)
{
resetEvent.WaitOne();
resetEvent.Reset();
lock (resetEvent)
resetEvent.Reset();
if (mWaitForProcessMemory.Count > 0)
{
SaveToFile();
}
if (mCachSeriseMemoryBlock.Count > 0)
{
while (mCachSeriseMemoryBlock.Count > 0)
{
MarshalMemoryBlock vb;
lock (mCachSeriseMemoryBlock)
vb = mCachSeriseMemoryBlock.Dequeue();
ProcessManualSeriseToFile(vb);
}
}
if (mIsClosed)
break;
SaveToFile();
}
closedEvent.Set();
}
......@@ -321,23 +344,35 @@ namespace Cdy.Tag
/// <summary>
///
/// </summary>
/// <param name="id"></param>
/// <param name="time"></param>
/// <param name="data"></param>
/// <param name="size"></param>
public void ManualRequestToSeriseFile(int id, DateTime time,DateTime endTime, MarshalMemoryBlock data, int size)
private void ProcessManualSeriseToFile(MarshalMemoryBlock data)
{
HisDataPath = SelectHisDataPath();
int id = data.ReadInt(0);
foreach (var vv in mSeriserFiles)
{
if( id >= vv.Value.Id* TagCountOneFile && id<(vv.Value.Id+1)* TagCountOneFile)
if (id >= vv.Value.Id * TagCountOneFile && id < (vv.Value.Id + 1) * TagCountOneFile)
{
vv.Value.ManualRequestToSeriseFile(id, data, size, time,endTime);
vv.Value.ManualRequestToSeriseFile(id, data);
}
}
}
/// <summary>
///
/// </summary>
/// <param name="id"></param>
/// <param name="time"></param>
/// <param name="data"></param>
/// <param name="size"></param>
public void ManualRequestToSeriseFile(MarshalMemoryBlock data)
{
HisDataPath = SelectHisDataPath();
lock (mCachSeriseMemoryBlock)
mCachSeriseMemoryBlock.Enqueue(data);
}
#endregion ...Methods...
#region ... Interfaces ...
......@@ -742,7 +777,7 @@ namespace Cdy.Tag
ltmp += mDataRegionHeadSize + mTagCount * 8 * bid + icount * 8;
LoggerService.Service.Debug("SeriseEnginer2", "DataRegion Pointer:"+ ltmp + ",mDataRegionHeadSize:" + mDataRegionHeadSize + ",BlockIndex:" + bid + " tag index:" + icount);
//LoggerService.Service.Debug("SeriseEnginer", "DataRegion Pointer:"+ ltmp + ",mDataRegionHeadSize:" + mDataRegionHeadSize + ",BlockIndex:" + bid + " tag index:" + icount);
return ltmp;
}
......@@ -788,14 +823,17 @@ namespace Cdy.Tag
/// <param name="size"></param>
/// <param name="time"></param>
/// <param name="endTime"></param>
public void ManualRequestToSeriseFile(int id, MarshalMemoryBlock datablock, int size, DateTime time,DateTime endTime)
public void ManualRequestToSeriseFile(int id,MarshalMemoryBlock datablock )
{
lock (mFileLocker)
{
DataFileSeriserbase mwriter;
mTagCount = datablock.ReadInt(0);//变量个数
DateTime time = datablock.ReadDateTime(4);
DateTime endTime = datablock.ReadDateTime(12);
int size = datablock.ReadInt(20);
mTagCount = datablock.ReadInt(24);//变量个数
var heads = GetDataRegionHeadPoint(id, time, out mwriter);
if (heads < 0) return;
......@@ -810,11 +848,10 @@ namespace Cdy.Tag
mwriter.GoToEnd();
var vpointer = mwriter.CurrentPostion;
datablock.WriteToStream(mFileWriter.GetStream(), 4, size);//直接拷贝数据块
datablock.WriteToStream(mFileWriter.GetStream(), 28, size-28);//直接拷贝数据块
mFileWriter.Write(vpointer, heads);
// datablock.WriteLong(heads, vpointer);
LoggerService.Service.Debug("SeriseEnginer2", "手动记录历史数据 变量:"+ id +" 头指针:"+heads+" 数据区指针:"+vpointer);
LoggerService.Service.Debug("SeriseEnginer", "单数据块更新 变量:"+ id +" 头指针:"+heads+" 数据区指针:"+vpointer, ConsoleColor.Cyan);
mwriter.Flush();
......@@ -1154,9 +1191,5 @@ namespace Cdy.Tag
#region ... Interfaces ...
#endregion ...Interfaces...
}
}
......@@ -49,7 +49,7 @@ namespace Cdy.Tag
/// <param name="time"></param>
/// <param name="data"></param>
/// <param name="size"></param>
void ManualRequestToSeriseFile(int id, DateTime time,DateTime enddate, MarshalMemoryBlock data, int size);
void ManualRequestToSeriseFile(MarshalMemoryBlock data);
/// <summary>
///
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册