提交 be4ede43 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

This week, we realized the cross-server client syn hask_maps.

When cross-svr network established, servers will exchange their clients uuid info.
wen client log in, local svr will broadcast its uuid to remote svrs, when clients log out,
its uuid will be broadcasted again.

we have only one step left.--the real data hop.
上级 08b7ccdc
......@@ -2,7 +2,9 @@
#include "st_clientnode_applayer.h"
#include <assert.h>
#include "st_cross_svr_node.h"
#include "st_cross_svr_msg.h"
#include <functional>
#include <QList>
namespace SmartLink{
using namespace std::placeholders;
st_client_table::st_client_table(
......@@ -95,6 +97,7 @@ namespace SmartLink{
m_hash_mutex.lock();
m_hash_uuid2node[c->uuid()] = c;
m_hash_mutex.unlock();
broadcast_client_uuid(c->uuid(),true);
return true;
}
......@@ -194,7 +197,10 @@ namespace SmartLink{
//This is important. some time m_hash_sock2node and m_hash_uuid2node, same uuid has different socket.
if (m_hash_uuid2node.contains(pClientNode->uuid()))
if (m_hash_uuid2node[pClientNode->uuid()]==pClientNode)
{
m_hash_uuid2node.remove(pClientNode->uuid());
broadcast_client_uuid(pClientNode->uuid(),false);
}
}
pClientNode->bTermSet = true;
......@@ -274,14 +280,50 @@ namespace SmartLink{
//this event indicates new svr successfully hand-shaked.
void st_client_table::on_evt_NewSvrConnected(const QString & svrHandle)
{
const char * pstr = "Hello World!";
m_pCluster->SendDataToRemoteServer(svrHandle,QByteArray(pstr));
emit evt_Message(this,"Send Svr Msg to "+svrHandle);
//Send All Client UUIDs to new Svr
m_hash_mutex.lock();
QList<quint32> uuids = m_hash_uuid2node.keys();
int nNodeSz = uuids.size();
if (nNodeSz>0)
{
int nMsgLen = sizeof(STCROSSSVR_MSG::tag_msgHearder) + nNodeSz * sizeof(quint32);
QByteArray array(nMsgLen,0);
STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) array.data();
pMsg->header.Mark = 0x4567;
pMsg->header.version = 1;
pMsg->header.messageLen = nNodeSz * sizeof(quint32);
pMsg->header.mesageType = 0x01;
int ct = -1;
foreach (quint32 uuid,uuids)
pMsg->payload.uuids[++ct] = uuid;
m_pCluster->SendDataToRemoteServer(svrHandle,array);
}
m_hash_mutex.unlock();
emit evt_Message(this,tr("Send Initial UUIDs to Remote Svr:") + svrHandle);
}
void st_client_table::broadcast_client_uuid(quint32 uuid, bool bActive)
{
QStringList svrs = m_pCluster->SvrNames();
if (svrs.empty()==false)
{
int nMsgLen = sizeof(STCROSSSVR_MSG::tag_msgHearder) + sizeof(quint32);
QByteArray array(nMsgLen,0);
STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) array.data();
pMsg->header.Mark = 0x4567;
pMsg->header.version = 1;
pMsg->header.messageLen = sizeof(quint32);
pMsg->header.mesageType = bActive==true?0x01:0x02;
pMsg->payload.uuids[0] = uuid;
foreach (QString svr,svrs)
m_pCluster->SendDataToRemoteServer(svr,array);
}
}
//this event indicates a client disconnected.
void st_client_table::on_evt_NewSvrDisconnected(const QString & svrHandle)
{
//remove all client-maps belongs to this server.
this->cross_svr_del_uuids(svrHandle,NULL,0);
emit evt_Message(this,"Svr DisConnected. " + svrHandle);
}
......@@ -305,5 +347,42 @@ namespace SmartLink{
pNode->setClientTable(this);
return pNode;
}
//reg new uuids in m_hash_remoteClient2SvrName
void st_client_table::cross_svr_add_uuids(const QString & svrname,quint32 * pUUIDs, int nUUIDs)
{
m_mutex_cross_svr_map.lock();
for (int i=0;i<nUUIDs;i++)
m_hash_remoteClient2SvrName[pUUIDs[i]] = svrname;
m_mutex_cross_svr_map.unlock();
emit evt_Message(this,tr("Recieved remote %1 client uuid(s) from svr ").arg(nUUIDs) + svrname);
}
//del uuids in m_hash_remoteClient2SvrName, pUUIDs =0 means del all uuids belong to svrname
void st_client_table::cross_svr_del_uuids(const QString & svrname,quint32 * pUUIDs , int nUUIDs)
{
m_mutex_cross_svr_map.lock();
if (pUUIDs==NULL)
{
QList<quint32> keys;
for(std::unordered_map<quint32,QString>::iterator p =
m_hash_remoteClient2SvrName.begin();
p!=m_hash_remoteClient2SvrName.end();p++)
{
if ((*p).second == svrname )
keys.push_back((*p).first);
}
foreach (quint32 key, keys)
{
m_hash_remoteClient2SvrName.erase(key);
}
}
else
{
for (int i=0;i<nUUIDs;i++)
m_hash_remoteClient2SvrName.erase(pUUIDs[i]);
}
m_mutex_cross_svr_map.unlock();
emit evt_Message(this,tr("Removed remote %1 client uuid(s) from svr ").arg(nUUIDs) + svrname);
}
}
......@@ -45,6 +45,14 @@ namespace SmartLink{
void setLargeFileFolder(const QString & s);
ZPDatabase::DatabaseResource * dbRes();
//reg new uuids in m_hash_remoteClient2SvrName
void cross_svr_add_uuids(const QString & svrname,quint32 * pUUIDs, int nUUIDs);
//del uuids in m_hash_remoteClient2SvrName, pUUIDs =0 means del all uuids belong to svrname
void cross_svr_del_uuids(const QString & svrname,quint32 * pUUIDs , int nUUIDs);
//Tell remote servers of uuid-change
void broadcast_client_uuid(quint32 uuid, bool bActive);
protected:
//This list hold dead nodes that still in task queue,avoiding crash
QList<st_clientNode_baseTrans *> m_nodeToBeDel;
......@@ -72,6 +80,7 @@ namespace SmartLink{
//cluster Nodes Map
std::unordered_map<quint32,QString> m_hash_remoteClient2SvrName;
QMutex m_mutex_cross_svr_map;
//Cluster Node Factory
ZP_Cluster::zp_ClusterNode * cross_svr_node_factory(
ZP_Cluster::zp_ClusterTerm * /*pTerm*/,
......
......@@ -6,11 +6,85 @@ namespace SmartLink{
st_cross_svr_node::st_cross_svr_node(ZP_Cluster::zp_ClusterTerm * pTerm, QObject * psock,QObject *parent)
:ZP_Cluster::zp_ClusterNode(pTerm,psock,parent)
{
m_currStMegSize = 0;
}
int st_cross_svr_node::st_bytesLeft()
{
return m_st_Header.messageLen + sizeof(STCROSSSVR_MSG::tag_msgHearder) - m_currStMegSize ;
}
bool st_cross_svr_node::deal_user_data(const QByteArray &array)
{
const char * pData = array.constData();
int nBlockSize = array.size();
int nOffset = 0;
while (nOffset < nBlockSize )
{
while (m_currStMegSize < sizeof(STCROSSSVR_MSG::tag_msgHearder) && nOffset< nBlockSize)
{
m_currStBlock.push_back(pData[nOffset++]);
++m_currStMegSize;
}
if (m_currStMegSize < sizeof(STCROSSSVR_MSG::tag_msgHearder))
return true;
if (m_currStMegSize == sizeof(STCROSSSVR_MSG::tag_msgHearder))
{
memcpy (&m_st_Header,m_currStBlock.constData(),sizeof(STCROSSSVR_MSG::tag_msgHearder));
if (m_st_Header.Mark != 0x4567)
{
m_currStMegSize = 0;
m_currStBlock.clear();
return true;
}
}
while (nOffset<nBlockSize && m_currStMegSize < m_st_Header.messageLen + sizeof(STCROSSSVR_MSG::tag_msgHearder) )
{
m_currStBlock.push_back(pData[nOffset++]);
++m_currStMegSize;
}
bool needDel = deal_msg();
if (st_bytesLeft()==0 || needDel==true)
{
m_currStMegSize = 0;
m_currStBlock.clear();
}
}
return ZP_Cluster::zp_ClusterNode::deal_user_data(array);
}
bool st_cross_svr_node::deal_msg()
{
bool delCurrBlock = false;
switch (m_st_Header.mesageType)
{
case 0x01: //client node log in
{
if (st_bytesLeft()>0)
return delCurrBlock;
STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) m_currStBlock.constData();
int nUUIDs = pMsg->header.messageLen / sizeof(quint32);
this->m_pClientTable->cross_svr_add_uuids(this->termName(),pMsg->payload.uuids,nUUIDs);
}
break;
case 0x02: //client node exit
{
if (st_bytesLeft()>0)
return delCurrBlock;
STCROSSSVR_MSG * pMsg = (STCROSSSVR_MSG *) m_currStBlock.constData();
int nUUIDs = pMsg->header.messageLen / sizeof(quint32);
this->m_pClientTable->cross_svr_del_uuids(this->termName(),pMsg->payload.uuids,nUUIDs);
}
break;
case 0x03: // data transfer
delCurrBlock = true;
break;
default:
break;
}
return delCurrBlock;
}
void st_cross_svr_node::setClientTable(st_client_table * table)
{
this->m_pClientTable = table;
......
#ifndef ST_CROSS_SVR_NODE_H
#define ST_CROSS_SVR_NODE_H
#include "../cluster/zp_clusternode.h"
#include "st_cross_svr_msg.h"
namespace SmartLink{
class st_client_table;
class st_cross_svr_node : public ZP_Cluster::zp_ClusterNode
......@@ -12,8 +13,18 @@ namespace SmartLink{
protected:
//!virtual functions, dealing with the user-defined operations.
virtual bool deal_user_data(const QByteArray &);
bool deal_msg();
int st_bytesLeft();
protected:
st_client_table * m_pClientTable;
//Current Message Offset, according to m_currentHeader
int m_currStMegSize;
//Current un-procssed message block.for large blocks,
//this array will be re-setted as soon as some part of data has been
//dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces
QByteArray m_currStBlock;
//current Header
STCROSSSVR_MSG::tag_msgHearder m_st_Header;
};
}
#endif // ST_CROSS_SVR_NODE_H
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册