From be4ede43b3c976a452b4bc9050f25e0b3cf761e1 Mon Sep 17 00:00:00 2001 From: goldenhawking Date: Fri, 2 May 2014 13:53:47 +0800 Subject: [PATCH] This week, we realized the cross-server client syn hask_maps. When cross-svr network established, servers will exchange their clients uuid info. wen client log in, local svr will broadcast its uuid to remote svrs, when clients log out, its uuid will be broadcasted again. we have only one step left.--the real data hop. --- .../smartlink/st_client_table.cpp | 85 ++++++++++++++++++- .../smartlink/st_client_table.h | 9 ++ .../smartlink/st_cross_svr_node.cpp | 74 ++++++++++++++++ .../smartlink/st_cross_svr_node.h | 11 +++ 4 files changed, 176 insertions(+), 3 deletions(-) diff --git a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp index 2c11f2d..fb42a66 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp +++ b/ZoomPipeline_FuncSvr/smartlink/st_client_table.cpp @@ -2,7 +2,9 @@ #include "st_clientnode_applayer.h" #include #include "st_cross_svr_node.h" +#include "st_cross_svr_msg.h" #include +#include namespace SmartLink{ using namespace std::placeholders; st_client_table::st_client_table( @@ -95,6 +97,7 @@ namespace SmartLink{ m_hash_mutex.lock(); m_hash_uuid2node[c->uuid()] = c; m_hash_mutex.unlock(); + broadcast_client_uuid(c->uuid(),true); return true; } @@ -194,7 +197,10 @@ namespace SmartLink{ //This is important. some time m_hash_sock2node and m_hash_uuid2node, same uuid has different socket. if (m_hash_uuid2node.contains(pClientNode->uuid())) if (m_hash_uuid2node[pClientNode->uuid()]==pClientNode) + { m_hash_uuid2node.remove(pClientNode->uuid()); + broadcast_client_uuid(pClientNode->uuid(),false); + } } pClientNode->bTermSet = true; @@ -274,14 +280,50 @@ namespace SmartLink{ //this event indicates new svr successfully hand-shaked. void st_client_table::on_evt_NewSvrConnected(const QString & svrHandle) { - const char * pstr = "Hello World!"; - m_pCluster->SendDataToRemoteServer(svrHandle,QByteArray(pstr)); - emit evt_Message(this,"Send Svr Msg to "+svrHandle); + //Send All Client UUIDs to new Svr + m_hash_mutex.lock(); + QList uuids = m_hash_uuid2node.keys(); + int nNodeSz = uuids.size(); + if (nNodeSz>0) + { + int nMsgLen = sizeof(STCROSSSVR_MSG::tag_msgHearder) + nNodeSz * sizeof(quint32); + QByteArray array(nMsgLen,0); + STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) array.data(); + pMsg->header.Mark = 0x4567; + pMsg->header.version = 1; + pMsg->header.messageLen = nNodeSz * sizeof(quint32); + pMsg->header.mesageType = 0x01; + int ct = -1; + foreach (quint32 uuid,uuids) + pMsg->payload.uuids[++ct] = uuid; + m_pCluster->SendDataToRemoteServer(svrHandle,array); + } + m_hash_mutex.unlock(); + emit evt_Message(this,tr("Send Initial UUIDs to Remote Svr:") + svrHandle); + } + void st_client_table::broadcast_client_uuid(quint32 uuid, bool bActive) + { + QStringList svrs = m_pCluster->SvrNames(); + if (svrs.empty()==false) + { + int nMsgLen = sizeof(STCROSSSVR_MSG::tag_msgHearder) + sizeof(quint32); + QByteArray array(nMsgLen,0); + STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) array.data(); + pMsg->header.Mark = 0x4567; + pMsg->header.version = 1; + pMsg->header.messageLen = sizeof(quint32); + pMsg->header.mesageType = bActive==true?0x01:0x02; + pMsg->payload.uuids[0] = uuid; + foreach (QString svr,svrs) + m_pCluster->SendDataToRemoteServer(svr,array); + } } //this event indicates a client disconnected. void st_client_table::on_evt_NewSvrDisconnected(const QString & svrHandle) { + //remove all client-maps belongs to this server. + this->cross_svr_del_uuids(svrHandle,NULL,0); emit evt_Message(this,"Svr DisConnected. " + svrHandle); } @@ -305,5 +347,42 @@ namespace SmartLink{ pNode->setClientTable(this); return pNode; } + //reg new uuids in m_hash_remoteClient2SvrName + void st_client_table::cross_svr_add_uuids(const QString & svrname,quint32 * pUUIDs, int nUUIDs) + { + m_mutex_cross_svr_map.lock(); + for (int i=0;i keys; + for(std::unordered_map::iterator p = + m_hash_remoteClient2SvrName.begin(); + p!=m_hash_remoteClient2SvrName.end();p++) + { + if ((*p).second == svrname ) + keys.push_back((*p).first); + } + foreach (quint32 key, keys) + { + m_hash_remoteClient2SvrName.erase(key); + } + } + else + { + for (int i=0;i m_nodeToBeDel; @@ -72,6 +80,7 @@ namespace SmartLink{ //cluster Nodes Map std::unordered_map m_hash_remoteClient2SvrName; + QMutex m_mutex_cross_svr_map; //Cluster Node Factory ZP_Cluster::zp_ClusterNode * cross_svr_node_factory( ZP_Cluster::zp_ClusterTerm * /*pTerm*/, diff --git a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp index d2414f2..12b3053 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp +++ b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.cpp @@ -6,11 +6,85 @@ namespace SmartLink{ st_cross_svr_node::st_cross_svr_node(ZP_Cluster::zp_ClusterTerm * pTerm, QObject * psock,QObject *parent) :ZP_Cluster::zp_ClusterNode(pTerm,psock,parent) { + m_currStMegSize = 0; } + int st_cross_svr_node::st_bytesLeft() + { + return m_st_Header.messageLen + sizeof(STCROSSSVR_MSG::tag_msgHearder) - m_currStMegSize ; + } + bool st_cross_svr_node::deal_user_data(const QByteArray &array) { + const char * pData = array.constData(); + int nBlockSize = array.size(); + int nOffset = 0; + while (nOffset < nBlockSize ) + { + while (m_currStMegSize < sizeof(STCROSSSVR_MSG::tag_msgHearder) && nOffset< nBlockSize) + { + m_currStBlock.push_back(pData[nOffset++]); + ++m_currStMegSize; + } + if (m_currStMegSize < sizeof(STCROSSSVR_MSG::tag_msgHearder)) + return true; + if (m_currStMegSize == sizeof(STCROSSSVR_MSG::tag_msgHearder)) + { + memcpy (&m_st_Header,m_currStBlock.constData(),sizeof(STCROSSSVR_MSG::tag_msgHearder)); + if (m_st_Header.Mark != 0x4567) + { + m_currStMegSize = 0; + m_currStBlock.clear(); + return true; + } + } + while (nOffset0) + return delCurrBlock; + STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) m_currStBlock.constData(); + int nUUIDs = pMsg->header.messageLen / sizeof(quint32); + this->m_pClientTable->cross_svr_add_uuids(this->termName(),pMsg->payload.uuids,nUUIDs); + } + break; + case 0x02: //client node exit + { + if (st_bytesLeft()>0) + return delCurrBlock; + STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) m_currStBlock.constData(); + int nUUIDs = pMsg->header.messageLen / sizeof(quint32); + this->m_pClientTable->cross_svr_del_uuids(this->termName(),pMsg->payload.uuids,nUUIDs); + } + break; + case 0x03: // data transfer + delCurrBlock = true; + break; + default: + break; + } + + return delCurrBlock; + + } + void st_cross_svr_node::setClientTable(st_client_table * table) { this->m_pClientTable = table; diff --git a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h index 7df136e..e6cd3a6 100644 --- a/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h +++ b/ZoomPipeline_FuncSvr/smartlink/st_cross_svr_node.h @@ -1,6 +1,7 @@ #ifndef ST_CROSS_SVR_NODE_H #define ST_CROSS_SVR_NODE_H #include "../cluster/zp_clusternode.h" +#include "st_cross_svr_msg.h" namespace SmartLink{ class st_client_table; class st_cross_svr_node : public ZP_Cluster::zp_ClusterNode @@ -12,8 +13,18 @@ namespace SmartLink{ protected: //!virtual functions, dealing with the user-defined operations. virtual bool deal_user_data(const QByteArray &); + bool deal_msg(); + int st_bytesLeft(); protected: st_client_table * m_pClientTable; + //Current Message Offset, according to m_currentHeader + int m_currStMegSize; + //Current un-procssed message block.for large blocks, + //this array will be re-setted as soon as some part of data has been + //dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces + QByteArray m_currStBlock; + //current Header + STCROSSSVR_MSG::tag_msgHearder m_st_Header; }; } #endif // ST_CROSS_SVR_NODE_H -- GitLab