diff --git a/ZoomPipeline_FuncSvr/cluster/cross_svr_messages.h b/ZoomPipeline_FuncSvr/cluster/cross_svr_messages.h index 5a62a21fef280b963a205daf3c09bf6432e45ddf..9e4d4c2c23669fa75f323ba0b3f338f4e2787be0 100644 --- a/ZoomPipeline_FuncSvr/cluster/cross_svr_messages.h +++ b/ZoomPipeline_FuncSvr/cluster/cross_svr_messages.h @@ -14,6 +14,10 @@ namespace ZP_Cluster{ } hearder; union uni_payload{ __UINT8_TYPE__ data[1]; + struct tag_CSM_heartBeating{ + __UINT32_TYPE__ nClients; + } heartBeating; + struct tag_CSM_BasicInfo{ __UINT8_TYPE__ name [64]; __UINT8_TYPE__ Address[64]; @@ -40,6 +44,9 @@ namespace ZP_Cluster{ } hearder; union uni_payload{ unsigned __int8 data[1]; + struct tag_CSM_heartBeating{ + unsigned __int32 nClients; + } heartBeating; struct tag_CSM_BasicInfo{ unsigned __int8 name [64]; unsigned __int8 Address[64]; diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp index 19235bded5d1b92e59946c35b5c2f068346b1452..941d30714b92f40a4a47fef883f20757806827d6 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.cpp @@ -11,6 +11,11 @@ namespace ZP_Cluster{ m_currentMessageSize = 0; m_nPortPublish = 0; m_last_Report = QDateTime::currentDateTime(); + m_nRemoteClientNums = 0; + } + quint32 zp_ClusterNode::clientNums() + { + return m_nRemoteClientNums; } QDateTime zp_ClusterNode::lastActiveTime() @@ -199,6 +204,10 @@ namespace ZP_Cluster{ switch(m_currentHeader.messagetype) { case 0x00://Heart Beating + if (bytesLeft==0) + { + m_nRemoteClientNums = pMsg->payload.heartBeating.nClients; + } break; case 0x01://basicInfo, when connection established, this message should be used if (m_currentBlock.length()>=64) diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h index aa8a3b83a0a0d5d29ceb615ff0e7e1d456ade845..755d322797d70596ee1ebe8cee1a5f910aeb0b83 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusternode.h @@ -42,9 +42,11 @@ namespace ZP_Cluster{ QHostAddress addrPublish(); int portPublish() ; QObject * sock() ; + //!Messages public: void SendHelloPackage(); + quint32 clientNums(); protected: zp_ClusterTerm * m_pTerm; //Client socket handle of this connection @@ -70,6 +72,8 @@ namespace ZP_Cluster{ CROSS_SVR_MSG::tag_header m_currentHeader; QDateTime m_last_Report; + + quint32 m_nRemoteClientNums; signals: void evt_SendDataToClient(QObject * objClient,const QByteArray & dtarray); void evt_BroadcastData(QObject * objFromClient,const QByteArray & dtarray); diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp index 065d5875e26fc72b5c04c3539e10db87ccdcdd64..a049c5fcd4ed815d30e2c252d7cf8c291bbb0e80 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp @@ -6,6 +6,7 @@ namespace ZP_Cluster{ zp_ClusterTerm::zp_ClusterTerm(const QString & name,QObject *parent ) : QObject(parent) ,m_strTermName(name) + ,m_nClientNums(0) { m_pClusterEng = new ZPTaskEngine::zp_pipeline(this); m_pClusterNet = new ZPNetwork::zp_net_Engine(8192,this); @@ -20,6 +21,15 @@ namespace ZP_Cluster{ m_nHeartBeatingTime = 20; m_factory = std::bind(&zp_ClusterTerm::default_factory,this,_1,_2,_3); } + void zp_ClusterTerm::setClientNums(quint32 nnum) + { + m_nClientNums = nnum; + } + + quint32 zp_ClusterTerm::clientNums() + { + return m_nClientNums; + } /** * @brief The factory enables user-defined sub-classes inherits from zp_ClusterNode @@ -139,6 +149,15 @@ namespace ZP_Cluster{ m_hash_mutex.unlock(); return port; } + quint32 zp_ClusterTerm::remoteClientNums(const QString & name) + { + quint32 res = 0; + m_hash_mutex.lock(); + if (m_hash_Name2node.contains(name)) + res = m_hash_Name2node[name]->clientNums(); + m_hash_mutex.unlock(); + return res; + } bool zp_ClusterTerm::regisitNewServer(zp_ClusterNode * c) { @@ -399,14 +418,14 @@ namespace ZP_Cluster{ void zp_ClusterTerm::SendHeartBeatings() { - int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header); + int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) + sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_heartBeating); QByteArray array(nMsgLen,0); CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data(); pMsg->hearder.Mark = 0x1234; - pMsg->hearder.data_length = 0; + pMsg->hearder.data_length = sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_heartBeating); pMsg->hearder.messagetype = 0x00; + pMsg->payload.heartBeating.nClients = this->m_nClientNums; //m_pClusterNet->BroadcastData(0,array); - m_hash_mutex.lock(); QList keys = m_hash_Name2node.keys(); //Msgs diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h index 8141a122c626b8e4e6c3584a08822613b74598a3..db3387afafa3046449f113647de7dfbf0da35baf 100644 --- a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h @@ -61,6 +61,7 @@ namespace ZP_Cluster{ protected: int m_nHeartBeatingTime; + quint32 m_nClientNums; //the clients this server now connected. QString m_strTermName;//the Terminal's name QHostAddress m_addrPublish; //The publish address for other terms to connect to int m_nPortPublish;//The publish port for other terms to connect to @@ -79,6 +80,11 @@ namespace ZP_Cluster{ QStringList SvrNames(); QHostAddress SvrAddr(const QString & name); int SvrPort(const QString & name); + quint32 remoteClientNums(const QString & name); + //Client Num set, for cross-svr balance + void setClientNums(quint32 nnum); + quint32 clientNums(); + signals: void evt_Message(QObject * ,const QString &); diff --git a/ZoomPipeline_FuncSvr/zpmainframe.cpp b/ZoomPipeline_FuncSvr/zpmainframe.cpp index ffec84511cc23d629ebe8fd791098e154567557f..abcc5908f060ce7cfe2ba7dfe77b190dca414689 100644 --- a/ZoomPipeline_FuncSvr/zpmainframe.cpp +++ b/ZoomPipeline_FuncSvr/zpmainframe.cpp @@ -142,10 +142,11 @@ void ZPMainFrame::initUI() ui->comboBox_db_type->setModel(pCombo); - m_pModelCluster= new QStandardItemModel(0,3,this); + m_pModelCluster= new QStandardItemModel(0,4,this); m_pModelCluster->setHeaderData(0,Qt::Horizontal,tr("Name")); m_pModelCluster->setHeaderData(1,Qt::Horizontal,tr("Address")); m_pModelCluster->setHeaderData(2,Qt::Horizontal,tr("Port")); + m_pModelCluster->setHeaderData(3,Qt::Horizontal,tr("Clients")); ui->tableView_activeTerms->setModel(m_pModelCluster); } @@ -235,12 +236,18 @@ void ZPMainFrame::timerEvent(QTimerEvent * e) int nClientThreads = m_netEngine->TransThreadNum(); str_msg += tr("Current Trans Threads: %1\n").arg(nClientThreads); + int nTotalCLientsNums = 0; for (int i=0;itotalClients(i)); if ((i+1)%5==0) str_msg += "\n"; + nTotalCLientsNums += m_netEngine->totalClients(i); } + + //Set This message to Cluster Info. + m_pClusterTerm->setClientNums(nTotalCLientsNums); + str_msg += "\n"; //recording task status str_msg += tr("Current Task Threads: %1\n").arg(m_taskEngine->threadsCount()); @@ -293,12 +300,19 @@ void ZPMainFrame::timerEvent(QTimerEvent * e) if (m_pModelCluster->rowCount()>0) m_pModelCluster->removeRows(0,m_pModelCluster->rowCount()); int nInserted = 0; + m_pModelCluster->insertRow(nInserted); + m_pModelCluster->setData(m_pModelCluster->index(nInserted,0),this->m_pClusterTerm->name()); + m_pModelCluster->setData(m_pModelCluster->index(nInserted,1),m_pClusterTerm->publishAddr().toString()); + m_pModelCluster->setData(m_pModelCluster->index(nInserted,2),m_pClusterTerm->publishPort()); + m_pModelCluster->setData(m_pModelCluster->index(nInserted,3),m_pClusterTerm->clientNums()); + ++nInserted; foreach (QString strNodeName,lstCluster) { m_pModelCluster->insertRow(nInserted); m_pModelCluster->setData(m_pModelCluster->index(nInserted,0),strNodeName); m_pModelCluster->setData(m_pModelCluster->index(nInserted,1),m_pClusterTerm->SvrAddr(strNodeName).toString()); m_pModelCluster->setData(m_pModelCluster->index(nInserted,2),m_pClusterTerm->SvrPort(strNodeName)); + m_pModelCluster->setData(m_pModelCluster->index(nInserted,3),m_pClusterTerm->remoteClientNums(strNodeName)); ++nInserted; } }