提交 e66954e3 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

Add a global balance method. servers in clusters exchange their clients payload,

Each server instance knows exactly how many clients each server currently have.
上级 80af38bc
......@@ -14,6 +14,10 @@ namespace ZP_Cluster{
} hearder;
union uni_payload{
__UINT8_TYPE__ data[1];
struct tag_CSM_heartBeating{
__UINT32_TYPE__ nClients;
} heartBeating;
struct tag_CSM_BasicInfo{
__UINT8_TYPE__ name [64];
__UINT8_TYPE__ Address[64];
......@@ -40,6 +44,9 @@ namespace ZP_Cluster{
} hearder;
union uni_payload{
unsigned __int8 data[1];
struct tag_CSM_heartBeating{
unsigned __int32 nClients;
} heartBeating;
struct tag_CSM_BasicInfo{
unsigned __int8 name [64];
unsigned __int8 Address[64];
......
......@@ -11,6 +11,11 @@ namespace ZP_Cluster{
m_currentMessageSize = 0;
m_nPortPublish = 0;
m_last_Report = QDateTime::currentDateTime();
m_nRemoteClientNums = 0;
}
quint32 zp_ClusterNode::clientNums()
{
return m_nRemoteClientNums;
}
QDateTime zp_ClusterNode::lastActiveTime()
......@@ -199,6 +204,10 @@ namespace ZP_Cluster{
switch(m_currentHeader.messagetype)
{
case 0x00://Heart Beating
if (bytesLeft==0)
{
m_nRemoteClientNums = pMsg->payload.heartBeating.nClients;
}
break;
case 0x01://basicInfo, when connection established, this message should be used
if (m_currentBlock.length()>=64)
......
......@@ -42,9 +42,11 @@ namespace ZP_Cluster{
QHostAddress addrPublish();
int portPublish() ;
QObject * sock() ;
//!Messages
public:
void SendHelloPackage();
quint32 clientNums();
protected:
zp_ClusterTerm * m_pTerm;
//Client socket handle of this connection
......@@ -70,6 +72,8 @@ namespace ZP_Cluster{
CROSS_SVR_MSG::tag_header m_currentHeader;
QDateTime m_last_Report;
quint32 m_nRemoteClientNums;
signals:
void evt_SendDataToClient(QObject * objClient,const QByteArray & dtarray);
void evt_BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
......
......@@ -6,6 +6,7 @@ namespace ZP_Cluster{
zp_ClusterTerm::zp_ClusterTerm(const QString & name,QObject *parent ) :
QObject(parent)
,m_strTermName(name)
,m_nClientNums(0)
{
m_pClusterEng = new ZPTaskEngine::zp_pipeline(this);
m_pClusterNet = new ZPNetwork::zp_net_Engine(8192,this);
......@@ -20,6 +21,15 @@ namespace ZP_Cluster{
m_nHeartBeatingTime = 20;
m_factory = std::bind(&zp_ClusterTerm::default_factory,this,_1,_2,_3);
}
void zp_ClusterTerm::setClientNums(quint32 nnum)
{
m_nClientNums = nnum;
}
quint32 zp_ClusterTerm::clientNums()
{
return m_nClientNums;
}
/**
* @brief The factory enables user-defined sub-classes inherits from zp_ClusterNode
......@@ -139,6 +149,15 @@ namespace ZP_Cluster{
m_hash_mutex.unlock();
return port;
}
quint32 zp_ClusterTerm::remoteClientNums(const QString & name)
{
quint32 res = 0;
m_hash_mutex.lock();
if (m_hash_Name2node.contains(name))
res = m_hash_Name2node[name]->clientNums();
m_hash_mutex.unlock();
return res;
}
bool zp_ClusterTerm::regisitNewServer(zp_ClusterNode * c)
{
......@@ -399,14 +418,14 @@ namespace ZP_Cluster{
void zp_ClusterTerm::SendHeartBeatings()
{
int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header);
int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) + sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_heartBeating);
QByteArray array(nMsgLen,0);
CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data();
pMsg->hearder.Mark = 0x1234;
pMsg->hearder.data_length = 0;
pMsg->hearder.data_length = sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_heartBeating);
pMsg->hearder.messagetype = 0x00;
pMsg->payload.heartBeating.nClients = this->m_nClientNums;
//m_pClusterNet->BroadcastData(0,array);
m_hash_mutex.lock();
QList<QString> keys = m_hash_Name2node.keys();
//Msgs
......
......@@ -61,6 +61,7 @@ namespace ZP_Cluster{
protected:
int m_nHeartBeatingTime;
quint32 m_nClientNums; //the clients this server now connected.
QString m_strTermName;//the Terminal's name
QHostAddress m_addrPublish; //The publish address for other terms to connect to
int m_nPortPublish;//The publish port for other terms to connect to
......@@ -79,6 +80,11 @@ namespace ZP_Cluster{
QStringList SvrNames();
QHostAddress SvrAddr(const QString & name);
int SvrPort(const QString & name);
quint32 remoteClientNums(const QString & name);
//Client Num set, for cross-svr balance
void setClientNums(quint32 nnum);
quint32 clientNums();
signals:
void evt_Message(QObject * ,const QString &);
......
......@@ -142,10 +142,11 @@ void ZPMainFrame::initUI()
ui->comboBox_db_type->setModel(pCombo);
m_pModelCluster= new QStandardItemModel(0,3,this);
m_pModelCluster= new QStandardItemModel(0,4,this);
m_pModelCluster->setHeaderData(0,Qt::Horizontal,tr("Name"));
m_pModelCluster->setHeaderData(1,Qt::Horizontal,tr("Address"));
m_pModelCluster->setHeaderData(2,Qt::Horizontal,tr("Port"));
m_pModelCluster->setHeaderData(3,Qt::Horizontal,tr("Clients"));
ui->tableView_activeTerms->setModel(m_pModelCluster);
}
......@@ -235,12 +236,18 @@ void ZPMainFrame::timerEvent(QTimerEvent * e)
int nClientThreads = m_netEngine->TransThreadNum();
str_msg += tr("Current Trans Threads: %1\n").arg(nClientThreads);
int nTotalCLientsNums = 0;
for (int i=0;i<nClientThreads;i++)
{
str_msg += tr("\t%1:%2").arg(i+1).arg(m_netEngine->totalClients(i));
if ((i+1)%5==0)
str_msg += "\n";
nTotalCLientsNums += m_netEngine->totalClients(i);
}
//Set This message to Cluster Info.
m_pClusterTerm->setClientNums(nTotalCLientsNums);
str_msg += "\n";
//recording task status
str_msg += tr("Current Task Threads: %1\n").arg(m_taskEngine->threadsCount());
......@@ -293,12 +300,19 @@ void ZPMainFrame::timerEvent(QTimerEvent * e)
if (m_pModelCluster->rowCount()>0)
m_pModelCluster->removeRows(0,m_pModelCluster->rowCount());
int nInserted = 0;
m_pModelCluster->insertRow(nInserted);
m_pModelCluster->setData(m_pModelCluster->index(nInserted,0),this->m_pClusterTerm->name());
m_pModelCluster->setData(m_pModelCluster->index(nInserted,1),m_pClusterTerm->publishAddr().toString());
m_pModelCluster->setData(m_pModelCluster->index(nInserted,2),m_pClusterTerm->publishPort());
m_pModelCluster->setData(m_pModelCluster->index(nInserted,3),m_pClusterTerm->clientNums());
++nInserted;
foreach (QString strNodeName,lstCluster)
{
m_pModelCluster->insertRow(nInserted);
m_pModelCluster->setData(m_pModelCluster->index(nInserted,0),strNodeName);
m_pModelCluster->setData(m_pModelCluster->index(nInserted,1),m_pClusterTerm->SvrAddr(strNodeName).toString());
m_pModelCluster->setData(m_pModelCluster->index(nInserted,2),m_pClusterTerm->SvrPort(strNodeName));
m_pModelCluster->setData(m_pModelCluster->index(nInserted,3),m_pClusterTerm->remoteClientNums(strNodeName));
++nInserted;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册