提交 50e2d6ce 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

Connect cluster messages

上级 4137a4c1
......@@ -6,6 +6,8 @@ namespace ZP_Cluster{
{
m_pClusterEng = new ZPTaskEngine::zp_pipeline(this);
m_pClusterNet = new ZPNetwork::zp_net_ThreadPool(8192,this);
connect(m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_Message, this,&zp_ClusterTerm::evt_Message);
connect(m_pClusterNet,&ZPNetwork::zp_net_ThreadPool::evt_SocketError, this,&zp_ClusterTerm::evt_SocketError);
m_nPortPublish = 0;
}
......
......@@ -34,7 +34,10 @@ namespace ZP_Cluster{
ZPNetwork::zp_net_ThreadPool * m_pClusterNet;
ZPTaskEngine::zp_pipeline * m_pClusterEng;
signals:
//These Message is nessery.-------------------------------------
void evt_Message(const QString &);
//The socket error message
void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
public slots:
//!Start listen, this term can be connected by newly joined terms in future.
void StartListen(const QHostAddress &addr, int nPort);
......
......@@ -176,6 +176,7 @@ namespace ZPNetwork{
connect (clientTH,&zp_netTransThread::evt_Data_recieved,this,&zp_net_ThreadPool::evt_Data_recieved,Qt::QueuedConnection);
connect (clientTH,&zp_netTransThread::evt_Data_transferred,this,&zp_net_ThreadPool::evt_Data_transferred,Qt::QueuedConnection);
connect (clientTH,&zp_netTransThread::evt_NewClientConnected,this,&zp_net_ThreadPool::evt_NewClientConnected,Qt::QueuedConnection);
connect (clientTH,&zp_netTransThread::evt_ClientEncrypted,this,&zp_net_ThreadPool::evt_ClientEncrypted,Qt::QueuedConnection);
connect (clientTH,&zp_netTransThread::evt_SocketError,this,&zp_net_ThreadPool::evt_SocketError,Qt::QueuedConnection);
connect (this,&zp_net_ThreadPool::evt_EstablishConnection,clientTH,&zp_netTransThread::incomingConnection,Qt::QueuedConnection);
connect (this,&zp_net_ThreadPool::evt_FireConnection,clientTH,&zp_netTransThread::startConnection,Qt::QueuedConnection);
......@@ -207,6 +208,7 @@ namespace ZPNetwork{
disconnect (clientTH,&zp_netTransThread::evt_Data_recieved,this,&zp_net_ThreadPool::evt_Data_recieved);
disconnect (clientTH,&zp_netTransThread::evt_Data_transferred,this,&zp_net_ThreadPool::evt_Data_transferred);
disconnect (clientTH,&zp_netTransThread::evt_NewClientConnected,this,&zp_net_ThreadPool::evt_NewClientConnected);
disconnect (clientTH,&zp_netTransThread::evt_ClientEncrypted,this,&zp_net_ThreadPool::evt_ClientEncrypted);
disconnect (clientTH,&zp_netTransThread::evt_SocketError,this,&zp_net_ThreadPool::evt_SocketError);
disconnect (this,&zp_net_ThreadPool::evt_EstablishConnection,clientTH,&zp_netTransThread::incomingConnection);
disconnect (this,&zp_net_ThreadPool::evt_FireConnection,clientTH,&zp_netTransThread::startConnection);
......
......@@ -71,6 +71,8 @@ namespace ZPNetwork{
void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
//this event indicates new client connected.
void evt_NewClientConnected(QObject * /*clientHandle*/);
//SSL Connections OK
void evt_ClientEncrypted(QObject * client);
//this event indicates a client disconnected.
void evt_ClientDisconnected(QObject * /*clientHandle*/);
//some data arrival
......
......@@ -77,8 +77,7 @@ namespace ZPNetwork{
connect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection);
psslsock->startServerEncryption();
}
else
emit evt_NewClientConnected(sock_client);
emit evt_NewClientConnected(sock_client);
}
else
sock_client->deleteLater();
......@@ -109,6 +108,7 @@ namespace ZPNetwork{
connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection);
connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::bytesWritten, this,&zp_netTransThread::some_data_sended,Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::connected,this, &zp_netTransThread::on_connected,Qt::QueuedConnection);
connect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection);
m_mutex_protect.lock();
m_clientList[sock_client] = 0;
......@@ -122,7 +122,7 @@ namespace ZPNetwork{
connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection);
connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::bytesWritten, this,&zp_netTransThread::some_data_sended,Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::connected,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::connected,this, &zp_netTransThread::on_connected,Qt::QueuedConnection);
m_mutex_protect.lock();
m_clientList[sock_client] = 0;
m_mutex_protect.unlock();
......@@ -133,11 +133,16 @@ namespace ZPNetwork{
else
assert(false);
}
void zp_netTransThread::on_connected()
{
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
emit evt_NewClientConnected(pSock);
}
void zp_netTransThread::on_encrypted()
{
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
emit evt_NewClientConnected(pSock);
emit evt_ClientEncrypted(pSock);
}
void zp_netTransThread::client_closed()
......
......@@ -58,12 +58,15 @@ namespace ZPNetwork{
void new_data_recieved();
void some_data_sended(qint64);
void displayError(QAbstractSocket::SocketError socketError);
//Plain Connected
void on_connected();
//SSL Encrypted started
void on_encrypted();
signals:
void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
void evt_NewClientConnected(QObject * client);
void evt_ClientEncrypted(QObject * client);
void evt_ClientDisconnected(QObject * client);
void evt_Data_recieved(QObject * ,const QByteArray & );
void evt_Data_transferred(QObject * client,qint64);
......
......@@ -67,12 +67,58 @@ namespace SmartLink{
return NULL;
}
//this event indicates new client encrypted.
void st_client_table::on_evt_ClientEncrypted(QObject * clientHandle)
{
bool nHashContains = 0;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains)
{
st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
}
else
{
pClientNode = m_hash_sock2node[clientHandle];
}
m_hash_mutex.unlock();
assert(nHashContains!=0 && pClientNode !=0);
}
//this event indicates new client connected.
void st_client_table::on_evt_NewClientConnected(QObject * /*clientHandle*/)
void st_client_table::on_evt_NewClientConnected(QObject * clientHandle)
{
bool nHashContains = 0;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains)
{
st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
}
else
{
pClientNode = m_hash_sock2node[clientHandle];
}
m_hash_mutex.unlock();
assert(nHashContains!=0 && pClientNode !=0);
}
//this event indicates a client disconnected.
......
......@@ -66,6 +66,8 @@ namespace SmartLink{
public slots:
//this event indicates new client connected.
void on_evt_NewClientConnected(QObject * /*clientHandle*/);
//this event indicates new client encrypted.
void on_evt_ClientEncrypted(QObject * /*clientHandle*/);
//this event indicates a client disconnected.
void on_evt_ClientDisconnected(QObject * /*clientHandle*/);
//some data arrival
......
......@@ -27,6 +27,8 @@ ZPMainFrame::ZPMainFrame(QWidget *parent) :
//Cluster is not created
m_pClusterTerm = new ZP_Cluster::zp_ClusterTerm("Unknown",this);
connect (m_pClusterTerm,&ZP_Cluster::zp_ClusterTerm::evt_Message,this,&ZPMainFrame::on_evt_Message_Cluster);
connect (m_pClusterTerm,&ZP_Cluster::zp_ClusterTerm::evt_SocketError,this,&ZPMainFrame::on_evt_SocketError_Cluster);
//Create databases
m_pDatabases = new ZPDatabase::DatabaseResource(this);
......@@ -142,6 +144,29 @@ void ZPMainFrame::on_evt_SocketError(QObject * senderSock ,QAbstractSocket::Soc
m_pMsgModel->removeRow(m_pMsgModel->rowCount()-1);
}
//These Message is nessery.-------------------------------------
void ZPMainFrame::on_evt_Message_Cluster(const QString & strMsg)
{
QDateTime dtm = QDateTime::currentDateTime();
QString msg = dtm.toString("yyyy-MM-dd HH:mm:ss.zzz") + " (Cluster)" + strMsg;
int nrows = m_pMsgModel->rowCount();
m_pMsgModel->insertRow(0,new QStandardItem(msg));
while (nrows-- > 16384)
m_pMsgModel->removeRow(m_pMsgModel->rowCount()-1);
}
//The socket error message
void ZPMainFrame::on_evt_SocketError_Cluster(QObject * senderSock ,QAbstractSocket::SocketError socketError)
{
QDateTime dtm = QDateTime::currentDateTime();
QString msg = dtm.toString("yyyy-MM-dd HH:mm:ss.zzz") + " (Cluster)" + QString("SockError %1 with code %2")
.arg((quint64)senderSock).arg((quint32)socketError);
int nrows = m_pMsgModel->rowCount();
m_pMsgModel->insertRow(0,new QStandardItem(msg));
while (nrows-- > 16384)
m_pMsgModel->removeRow(m_pMsgModel->rowCount()-1);
}
void ZPMainFrame::timerEvent(QTimerEvent * e)
......@@ -237,11 +262,11 @@ void ZPMainFrame::on_action_Start_Stop_triggered(bool setordel)
this->m_pClusterTerm->netEng()->RemoveClientTransThreads(-1,false);
this->m_pClusterTerm->taskEng()->removeThreads(-1);
while (m_netEngine->CanExit()==false || m_taskEngine->canClose()==false || m_pClusterTerm->canExit()==false)
/*while (m_netEngine->CanExit()==false || m_taskEngine->canClose()==false || m_pClusterTerm->canExit()==false)
{
QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents);
QThread::currentThread()->msleep(200);
}
}*/
}
......@@ -352,7 +377,7 @@ void ZPMainFrame::forkServer(const QString & config_file)
QString strClusterPubPort = settings.value("Cluster/strClusterPubPort","25600").toString();
int nClusterTransThreads = settings.value("Cluster/nClusterTransThreads","4").toInt();
int nClusterWorkingThreads = settings.value("Cluster/nClusterWorkingThreads","4").toInt();
this->m_pClusterTerm->netEng()->RemoveListeningAddress("clusterTerm");
this->m_pClusterTerm->netEng()->RemoveAllAddresses();
this->m_pClusterTerm->netEng()->RemoveClientTransThreads(-1,false);
this->m_pClusterTerm->netEng()->AddClientTransThreads(nClusterTransThreads,false);
this->m_pClusterTerm->taskEng()->removeThreads(-1);
......@@ -360,7 +385,7 @@ void ZPMainFrame::forkServer(const QString & config_file)
this->m_pClusterTerm->setName(strClusterPubName);
this->m_pClusterTerm->setPublishAddr(QHostAddress(strClusterPubAddr));
this->m_pClusterTerm->setPublishPort(strClusterPubPort.toInt());
this->m_pClusterTerm->netEng()->AddListeningAddress("clusterTerm",QHostAddress(strClusterTermAddr),strClusterTermPort.toInt());
this->m_pClusterTerm->StartListen(QHostAddress(strClusterTermAddr),strClusterTermPort.toInt());
}
......
......@@ -52,12 +52,16 @@ private:
void LoadSettings(const QString & config_file);
void SaveSettings(const QString & config_file);
void forkServer(const QString & config_file);
public slots:
protected slots:
//These Message is nessery.-------------------------------------
void on_evt_Message(const QString &);
//The socket error message
void on_evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
//These Message is nessery.-------------------------------------
void on_evt_Message_Cluster(const QString &);
//The socket error message
void on_evt_SocketError_Cluster(QObject * senderSock ,QAbstractSocket::SocketError socketError);
public slots:
void on_action_Start_Stop_triggered(bool);
void on_action_About_triggered();
void on_actionReload_config_file_triggered();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册