diff --git a/ZoomPipeline_FuncSvr/cluster/cross_svr_messages.h b/ZoomPipeline_FuncSvr/cluster/cross_svr_messages.h index c08d4d9807d63f75fd22eeff5d342bf3540a389b..2e6814250e8587a3e624ea73c1ff15429ddf0c85 100644 --- a/ZoomPipeline_FuncSvr/cluster/cross_svr_messages.h +++ b/ZoomPipeline_FuncSvr/cluster/cross_svr_messages.h @@ -10,12 +10,10 @@ namespace ZP_Cluster{ struct tag_header{ __UINT16_TYPE__ Mark; //Always be "0x1234" __UINT8_TYPE__ messagetype; + __INT32_TYPE__ data_length; } hearder; union uni_payload{ - struct tag_plainData{ - __UINT16_TYPE__ data_length; - __UINT8_TYPE__ data[1]; - } plainData; + __UINT8_TYPE__ data[1]; } payload; } CROSS_SVR_MSG; @@ -28,14 +26,10 @@ namespace ZP_Cluster{ struct tag_header{ unsigned __int16 Mark; //Always be 0x1234 unsigned __int8 messagetype; - + __int32 data_length; } hearder; union uni_payload{ - struct tag_plainData{ - unsigned __int16 data_length; - unsigned __int8 data[1]; - } plainData; - + unsigned __int8 data[1]; } payload; } CROSS_SVR_MSG; diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp index 6b14723b2eaf5c62a4db61ac708368050e90dc31..10faa08a2c8e28075327d0cc9945fe0db85c4490 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp @@ -1,11 +1,186 @@ #include "zp_clusternode.h" +#include "zp_clusterterm.h" namespace ZP_Cluster{ - zp_ClusterNode::zp_ClusterNode(QObject *parent) : + zp_ClusterNode::zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent) : ZPTaskEngine::zp_plTaskBase(parent) + ,m_pTerm(pTerm) + ,m_pSock(psock) + ,bTermSet(false) { + m_currentReadOffset = 0; + m_currentMessageSize = 0; + m_last_Report = QDateTime::currentDateTime(); } int zp_ClusterNode::run() + { + if (bTermSet==true) + { + //qDebug()<=0 && nCurrSz!=0 ) + { + QByteArray block; + m_mutex_rawData.lock(); + if (m_list_RawData.size()) + block = *m_list_RawData.begin(); + m_mutex_rawData.unlock(); + if (block.isEmpty()==false && block.isNull()==false) + { + m_currentReadOffset = filter_message(block,m_currentReadOffset); + if (m_currentReadOffset >= block.size()) + { + m_mutex_rawData.lock(); + m_list_RawData.pop_front(); + m_currentReadOffset = 0; + m_mutex_rawData.unlock(); + } + } + else + { + m_mutex_rawData.lock(); + //pop empty cabs + if (m_list_RawData.empty()==false) + m_list_RawData.pop_front(); + m_mutex_rawData.unlock(); + } + m_mutex_rawData.lock(); + nCurrSz = m_list_RawData.size(); + m_mutex_rawData.unlock(); + } + m_mutex_rawData.lock(); + nCurrSz = m_list_RawData.size(); + m_mutex_rawData.unlock(); + if (nCurrSz==0) + return 0; + return -1; + } + //push new binary data into queue + int zp_ClusterNode::push_new_data(const QByteArray & dtarray) + { + int res = 0; + m_mutex_rawData.lock(); + + m_list_RawData.push_back(dtarray); + res = m_list_RawData.size(); + m_mutex_rawData.unlock(); + m_last_Report = QDateTime::currentDateTime(); + return res; + } + + //!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader + //!return bytes Used. + int zp_ClusterNode::filter_message(const QByteArray & block, int offset) + { + const int blocklen = block.length(); + while (blocklen>offset) + { + const char * dataptr = block.constData(); + + //Recieve First 2 byte + while (m_currentMessageSize<2 && blocklen>offset ) + { + m_currentBlock.push_back(dataptr[offset++]); + m_currentMessageSize++; + } + if (m_currentMessageSize < 2) //First 2 byte not complete + continue; + + if (m_currentMessageSize==2) + { + const char * headerptr = m_currentBlock.constData(); + memcpy((void *)&m_currentHeader,headerptr,2); + } + + const char * ptrCurrData = m_currentBlock.constData(); + if (m_currentHeader.Mark == 0x1234) + //Valid Message + { + while (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset) + { + m_currentBlock.push_back(dataptr[offset++]); + m_currentMessageSize++; + } + if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed. + continue; + else if (m_currentMessageSize == sizeof(CROSS_SVR_MSG::tag_header))//Header just completed. + { + const char * headerptr = m_currentBlock.constData(); + memcpy((void *)&m_currentHeader,headerptr,sizeof(CROSS_SVR_MSG::tag_header)); + + //continue reading if there is data left behind + if (block.length()>offset) + { + qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) + -m_currentMessageSize ; + while (bitLeft>0 && blocklen>offset) + { + m_currentBlock.push_back(dataptr[offset++]); + m_currentMessageSize++; + bitLeft--; + } + //deal block, may be send data as soon as possible; + deal_current_message_block(); + if (bitLeft>0) + continue; + //This Message is Over. Start a new one. + m_currentMessageSize = 0; + m_currentBlock = QByteArray(); + continue; + } + } + else + { + if (block.length()>offset) + { + qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) + -m_currentMessageSize ; + while (bitLeft>0 && blocklen>offset) + { + m_currentBlock.push_back(dataptr[offset++]); + m_currentMessageSize++; + bitLeft--; + } + //deal block, may be processed as soon as possible; + deal_current_message_block(); + if (bitLeft>0) + continue; + //This Message is Over. Start a new one. + m_currentMessageSize = 0; + m_currentBlock = QByteArray(); + continue; + } + } // end if there is more bytes to append + } //end deal trans message + else + { + emit evt_Message(this,tr("Client Send a unknown start Header %1 %2. Close client immediately.") + .arg((int)(ptrCurrData[0])).arg((int)(ptrCurrData[1]))); + m_currentMessageSize = 0; + m_currentBlock = QByteArray(); + offset = blocklen; + emit evt_close_client(this->sock()); + } + } // end while block len > offset + + return offset; + } + //in Trans-Level, do nothing. + int zp_ClusterNode::deal_current_message_block() { return 0; } + void zp_ClusterNode::CheckHeartBeating() + { + QDateTime dtm = QDateTime::currentDateTime(); + qint64 usc = this->m_last_Report.secsTo(dtm); + int nThredHold = m_pTerm->heartBeatingThrd(); + if (usc >= nThredHold) + { + emit evt_Message(this,tr("Client ") + QString("%1").arg((unsigned int)((quint64)this)) + tr(" is dead, kick out.")); + emit evt_close_client(this->sock()); + } + } } diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h index 7a18236a6ea30f70ec040b9256569d05d3d6f8bd..2d61905245e38e50d47d6cce389fade94905b46d 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h @@ -2,8 +2,12 @@ #define ZP_CLUSTERNODE_H #include +#include +#include #include "../pipeline/zp_pltaskbase.h" +#include "cross_svr_messages.h" namespace ZP_Cluster{ + class zp_ClusterTerm; /** * @brief This class stand for a remote server. * when local server establish a connection between itself and remote svr, @@ -13,12 +17,58 @@ namespace ZP_Cluster{ { Q_OBJECT public: - explicit zp_ClusterNode(QObject *parent = 0); + explicit zp_ClusterNode(zp_ClusterTerm * pTerm, QObject * psock,QObject *parent = 0); int run(); - signals: + bool bTermSet; + //!deal at most m_nMessageBlockSize messages per deal_message(); + static const int m_nMessageBlockSize = 8; + //push new binary data into queue + int push_new_data(const QByteArray & dtarray); + //!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader + //!return bytes Used. + int filter_message(const QByteArray &, int offset); + //!in Trans-Layer, it does nothing. + int deal_current_message_block(); + + QDateTime lastActiveTime(){ return m_last_Report;} + void CheckHeartBeating(); + + public: + QString termName(){return m_strTermName;} + QHostAddress addrPublish(){return m_addrPublish;} + int portPublish() {return m_nPortPublish;} + QObject * sock() { return m_pSock;} + protected: + zp_ClusterTerm * m_pTerm; + //Client socket handle of this connection + QObject * m_pSock; + //the data members. + QString m_strTermName; //the Terminal's name + QHostAddress m_addrPublish; //The publish address for other terms to connect to + int m_nPortPublish; //The publish port for other terms to connect to - public slots: + //Data Process + //The raw data queue and its mutex + QList m_list_RawData; + QMutex m_mutex_rawData; + //The current Read Offset, from m_list_RawData's beginning + int m_currentReadOffset; + //Current Message Offset, according to m_currentHeader + int m_currentMessageSize; + //Current un-procssed message block.for large blocks, + //this array will be re-setted as soon as some part of data has been + //dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces + QByteArray m_currentBlock; + CROSS_SVR_MSG::tag_header m_currentHeader; + + QDateTime m_last_Report; + + signals: + void evt_SendDataToClient(QObject * objClient,const QByteArray & dtarray); + void evt_BroadcastData(QObject * objFromClient,const QByteArray & dtarray); + void evt_close_client(QObject * objClient); + void evt_Message (QObject * psource,const QString &); }; } #endif // ZP_CLUSTERNODE_H diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp index 9c55fff199e8f8c8b6bc8588aebf8fe755f6f2a0..4c63df68362a20576bc84ca5cd011ca6a692a47c 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp @@ -1,4 +1,6 @@ #include "zp_clusterterm.h" +#include "zp_clusternode.h" +#include namespace ZP_Cluster{ zp_ClusterTerm::zp_ClusterTerm(const QString & name,QObject *parent ) : QObject(parent) @@ -14,6 +16,7 @@ namespace ZP_Cluster{ connect(m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_NewClientConnected, this,&zp_ClusterTerm::on_evt_NewClientConnected); //connect(m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_ClientEncrypted, this,&zp_ClusterTerm::on_evt_ClientEncrypted); m_nPortPublish = 0; + m_nHeartBeatingDeadThrd = 20; } void zp_ClusterTerm::StartListen(const QHostAddress &addr, int nPort) @@ -29,30 +32,182 @@ namespace ZP_Cluster{ { return m_pClusterEng->canClose() && m_pClusterNet->CanExit(); } - //this event indicates new client connected. - void zp_ClusterTerm::on_evt_NewClientConnected(QObject * /*clientHandle*/) + + bool zp_ClusterTerm::regisitNewServer(zp_ClusterNode * c) + { + //Before reg, termname must be recieved. + if (c->termName().length()<1) + return false; + m_hash_mutex.lock(); + m_hash_Name2node[c->termName()] = c; + m_hash_mutex.unlock(); + return true; + } + + zp_ClusterNode * zp_ClusterTerm::SvrNodeFromName(const QString & uuid) { + m_hash_mutex.lock(); + if (m_hash_Name2node.contains(uuid)) + { + m_hash_mutex.unlock(); + return m_hash_Name2node[uuid]; + } + m_hash_mutex.unlock(); + return NULL; } - //this event indicates new client encrypted. - void zp_ClusterTerm::on_evt_ClientEncrypted(QObject * /*clientHandle*/) + zp_ClusterNode * zp_ClusterTerm::SvrNodeFromSocket(QObject * sock) { + m_hash_mutex.lock(); + if (m_hash_sock2node.contains(sock)) + { + m_hash_mutex.unlock(); + return m_hash_sock2node[sock]; + } + m_hash_mutex.unlock(); + return NULL; + } + //this event indicates new client connected. + void zp_ClusterTerm::on_evt_NewClientConnected(QObject * clientHandle) + { + bool nHashContains = false; + zp_ClusterNode * pClientNode = 0; + m_hash_mutex.lock(); + nHashContains = m_hash_sock2node.contains(clientHandle); + if (false==nHashContains) + { + zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); + //using queued connection of send and revieve; + connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection); + m_hash_sock2node[clientHandle] = pnode; + nHashContains = true; + pClientNode = pnode; + } + else + { + pClientNode = m_hash_sock2node[clientHandle]; + } + m_hash_mutex.unlock(); + assert(nHashContains!=0 && pClientNode !=0); + } + + //this event indicates new client encrypted. + void zp_ClusterTerm::on_evt_ClientEncrypted(QObject * clientHandle) + { + bool nHashContains = false; + zp_ClusterNode * pClientNode = 0; + m_hash_mutex.lock(); + nHashContains = m_hash_sock2node.contains(clientHandle); + if (false==nHashContains) + { + zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); + //using queued connection of send and revieve; + connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection); + m_hash_sock2node[clientHandle] = pnode; + nHashContains = true; + pClientNode = pnode; + } + else + { + pClientNode = m_hash_sock2node[clientHandle]; + } + m_hash_mutex.unlock(); + assert(nHashContains!=0 && pClientNode !=0); } //this event indicates a client disconnected. - void zp_ClusterTerm::on_evt_ClientDisconnected(QObject * /*clientHandle*/) + void zp_ClusterTerm::on_evt_ClientDisconnected(QObject * clientHandle) { + bool nHashContains = false; + zp_ClusterNode * pClientNode = 0; + m_hash_mutex.lock(); + nHashContains = m_hash_sock2node.contains(clientHandle); + if (nHashContains) + pClientNode = m_hash_sock2node[clientHandle]; + if (pClientNode) + { + m_hash_sock2node.remove(clientHandle); + if (pClientNode->termName().length()>0) + m_hash_Name2node.remove(pClientNode->termName()); + + pClientNode->bTermSet = true; + disconnect (pClientNode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::SendDataToClient); + disconnect (pClientNode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData); + disconnect (pClientNode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::KickClients); + disconnect (pClientNode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message); + + m_nodeToBeDel.push_back(pClientNode); + //qDebug()<ref()); + } + m_hash_mutex.unlock(); + //Try to delete objects + QList toBedel; + foreach(zp_ClusterNode * pdelobj,m_nodeToBeDel) + { + if (pdelobj->ref() ==0) + toBedel.push_back(pdelobj); + else + { + //qDebug()<ref()); + } + } + foreach(zp_ClusterNode * pdelobj,toBedel) + { + m_nodeToBeDel.removeAll(pdelobj); + //qDebug()<ref()); + pdelobj->deleteLater(); + } } //some data arrival - void zp_ClusterTerm::on_evt_Data_recieved(QObject * /*clientHandle*/,const QByteArray & /*datablock*/ ) + void zp_ClusterTerm::on_evt_Data_recieved(QObject * clientHandle,const QByteArray & datablock ) { - + //Push Clients to nodes if it is not exist + bool nHashContains = false; + zp_ClusterNode * pClientNode = 0; + m_hash_mutex.lock(); + nHashContains = m_hash_sock2node.contains(clientHandle); + if (false==nHashContains) + { + zp_ClusterNode * pnode = new zp_ClusterNode(this,clientHandle,0); + //using queued connection of send and revieve; + connect (pnode,&zp_ClusterNode::evt_SendDataToClient,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_BroadcastData,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_close_client,m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection); + connect (pnode,&zp_ClusterNode::evt_Message,this,&zp_ClusterTerm::evt_Message,Qt::QueuedConnection); + m_hash_sock2node[clientHandle] = pnode; + nHashContains = true; + pClientNode = pnode; + } + else + { + pClientNode = m_hash_sock2node[clientHandle]; + } + assert(nHashContains!=0 && pClientNode !=0); + int nblocks = pClientNode->push_new_data(datablock); + if (nblocks<=1) + m_pClusterEng->pushTask(pClientNode); + m_hash_mutex.unlock(); + } + void zp_ClusterTerm::KickDeadClients() + { + m_hash_mutex.lock(); + for (QMap::iterator p =m_hash_sock2node.begin(); + p!=m_hash_sock2node.end();p++) + { + p.value()->CheckHeartBeating(); + } + m_hash_mutex.unlock(); } - //a block of data has been successfuly sent void zp_ClusterTerm::on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/) { diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h index cffbf8c061d5209644a08631bcb04c8f4887075c..6baf19348c7a8983706bf085f0cbd8a2095cfc30 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h @@ -3,11 +3,15 @@ #include #include +#include +#include +#include #include "../network/zp_net_threadpool.h" #include "../pipeline/zp_pipeline.h" #include "../pipeline/zp_pltaskbase.h" namespace ZP_Cluster{ + class zp_ClusterNode; //!this class enable server processes can //! communicate with each other. class zp_ClusterTerm : public QObject @@ -27,12 +31,26 @@ namespace ZP_Cluster{ int publishPort(){return m_nPortPublish;} QHostAddress setPublishAddr(QHostAddress addr){return m_addrPublish = addr;} int setPublishPort(int port){return m_nPortPublish = port;} + int heartBeatingThrd() {return m_nHeartBeatingDeadThrd;} + void setHeartBeatingThrd(const int n){m_nHeartBeatingDeadThrd = n;} protected: QString m_strTermName;//the Terminal's name QHostAddress m_addrPublish; //The publish address for other terms to connect to int m_nPortPublish;//The publish port for other terms to connect to ZPNetwork::zp_net_ThreadPool * m_pClusterNet; ZPTaskEngine::zp_pipeline * m_pClusterEng; + int m_nHeartBeatingDeadThrd; + //Server Group Mapping + protected: + //This list hold dead nodes that still in task queue,avoiding crash + QList m_nodeToBeDel; + //important hashes. server name to socket, socket to server name + QMutex m_hash_mutex; + QMap m_hash_Name2node; + QMap m_hash_sock2node; + bool regisitNewServer(zp_ClusterNode *); + zp_ClusterNode * SvrNodeFromName(const QString &); + zp_ClusterNode * SvrNodeFromSocket(QObject *); signals: void evt_Message(QObject * ,const QString &); @@ -57,6 +75,7 @@ namespace ZP_Cluster{ //!as soon as connection established, more existing terms will be sent to this term, //!an p2p connection will start bool JoinCluster(const QHostAddress &addr, int nPort,bool bSSL=false); + void KickDeadClients(); }; } diff --git a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp index ee35e8f613666f3ab48480cfff3e78659bf17366..20a6ff297365c7338559485439cfc70730941a01 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp +++ b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp @@ -23,7 +23,7 @@ namespace SmartLink{ st_client_table::~st_client_table() { } - void st_client_table::KickDealClients() + void st_client_table::KickDeadClients() { m_hash_mutex.lock(); for (QMap::iterator p =m_hash_sock2node.begin(); @@ -71,7 +71,7 @@ namespace SmartLink{ //this event indicates new client encrypted. void st_client_table::on_evt_ClientEncrypted(QObject * clientHandle) { - bool nHashContains = 0; + bool nHashContains = false; st_clientNode_baseTrans * pClientNode = 0; m_hash_mutex.lock(); nHashContains = m_hash_sock2node.contains(clientHandle); @@ -98,7 +98,7 @@ namespace SmartLink{ //this event indicates new client connected. void st_client_table::on_evt_NewClientConnected(QObject * clientHandle) { - bool nHashContains = 0; + bool nHashContains = false; st_clientNode_baseTrans * pClientNode = 0; m_hash_mutex.lock(); nHashContains = m_hash_sock2node.contains(clientHandle); @@ -174,7 +174,7 @@ namespace SmartLink{ void st_client_table::on_evt_Data_recieved(QObject * clientHandle,const QByteArray & datablock ) { //Push Clients to nodes if it is not exist - bool nHashContains = 0; + bool nHashContains = false; st_clientNode_baseTrans * pClientNode = 0; m_hash_mutex.lock(); nHashContains = m_hash_sock2node.contains(clientHandle); diff --git a/ZoomPipeline_FuncSvr/smartlink/st_client_table.h b/ZoomPipeline_FuncSvr/smartlink/st_client_table.h index d549eaf7f7a0292ac0512538c324a7b2201ce649..f94ab8bc5c1db4c50fac794cde250b3ebea9aab0 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_client_table.h +++ b/ZoomPipeline_FuncSvr/smartlink/st_client_table.h @@ -23,7 +23,7 @@ namespace SmartLink{ st_clientNode_baseTrans * clientNodeFromSocket(QObject *); //Heart beating and healthy - void KickDealClients(); + void KickDeadClients(); int heartBeatingThrd(){return m_nHeartBeatingDeadThrd;} void setHeartBeatingThrd(int h) {m_nHeartBeatingDeadThrd = h;} diff --git a/ZoomPipeline_FuncSvr/smartlink/st_clientnode_basetrans.h b/ZoomPipeline_FuncSvr/smartlink/st_clientnode_basetrans.h index 70914ad0df80f6b3cefda59c7ece96029afd4a8f..6b63135f874e48a49e9d7bea25abcb7be5bf7161 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_clientnode_basetrans.h +++ b/ZoomPipeline_FuncSvr/smartlink/st_clientnode_basetrans.h @@ -29,10 +29,7 @@ namespace SmartLink{ bool uuidValid(){return m_bUUIDRecieved;} bool bTermSet; - QDateTime lastActiveTime() - { - return m_last_Report; - } + QDateTime lastActiveTime(){ return m_last_Report;} qint32 bytesLeft() { return m_currentHeader.data_length + sizeof(SMARTLINK_MSG) - 1 diff --git a/ZoomPipeline_FuncSvr/zpmainframe.cpp b/ZoomPipeline_FuncSvr/zpmainframe.cpp index 1998339c141e2c2376bcdf58042b92f16e2d29dc..85319b2163b133b3243c7658236851dbe19a6d10 100644 --- a/ZoomPipeline_FuncSvr/zpmainframe.cpp +++ b/ZoomPipeline_FuncSvr/zpmainframe.cpp @@ -84,7 +84,10 @@ ZPMainFrame::~ZPMainFrame() break; } } - + m_netEngine->deleteLater(); + m_pDatabases->deleteLater(); + m_taskEngine->deleteLater(); + m_pClusterTerm->deleteLater(); delete ui; } @@ -282,8 +285,9 @@ void ZPMainFrame::timerEvent(QTimerEvent * e) { killTimer(m_nTimerCheck); m_nTimerCheck = -1; - m_clientTable->KickDealClients(); - m_nTimerCheck = startTimer(10000); + m_clientTable->KickDeadClients(); + m_pClusterTerm->KickDeadClients(); + m_nTimerCheck = startTimer(5000); } }