提交 b07317dc 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

Sloved a serious problem in heavily client login-logoff. add a rubbish can,...

Sloved a serious problem in heavily client login-logoff. add a rubbish can, hold sockets to be deleted later
上级 5c7c2437
......@@ -54,7 +54,7 @@ namespace ZP_Cluster{
{
if (bTermSet==true)
{
//qDebug()<<QString("%1(%2) Node Martked Deleted, return.\n").arg((unsigned int)this).arg(ref());
//qDebug()<<QString("%1(%2) Node Martked Deleted, return.").arg((unsigned int)this).arg(ref());
return 0;
}
int nCurrSz = -1;
......
......@@ -336,7 +336,7 @@ namespace ZP_Cluster{
m_nodeToBeDel.push_back(pClientNode);
if (nameCurr.length()>0)
emit evt_NewSvrDisconnected(nameCurr);
//qDebug()<<QString("%1(ref %2) Node Push in queue.\n").arg((unsigned int)pClientNode).arg(pClientNode->ref());
//qDebug()<<QString("%1(ref %2) Node Push in queue.").arg((unsigned int)pClientNode).arg(pClientNode->ref());
}
m_hash_mutex.unlock();
......@@ -348,13 +348,13 @@ namespace ZP_Cluster{
toBedel.push_back(pdelobj);
else
{
//qDebug()<<QString("%1(ref %2) Waiting in del queue.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
//qDebug()<<QString("%1(ref %2) Waiting in del queue.").arg((unsigned int)pdelobj).arg(pdelobj->ref());
}
}
foreach(zp_ClusterNode * pdelobj,toBedel)
{
m_nodeToBeDel.removeAll(pdelobj);
//qDebug()<<QString("%1(ref %2) deleting.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
//qDebug()<<QString("%1(ref %2) deleting.").arg((unsigned int)pdelobj).arg(pdelobj->ref());
pdelobj->deleteLater();
}
}
......
......@@ -3,6 +3,7 @@
#include <QMutexLocker>
#include <QSqlError>
#include <QSqlQuery>
#include <QDebug>
namespace ZPDatabase{
......@@ -29,7 +30,8 @@ namespace ZPDatabase{
db.close();
QSqlDatabase::removeDatabase(threadName);
QString msg = "Database:"+tr(" Connection removed ")+threadName+ tr(" .");
emit evt_Message(this,msg);
qDebug()<<msg;
//emit evt_Message(this,msg);
//Remove the key map
m_ThreadsDB[mainName].remove(threadName);
}
......@@ -52,6 +54,7 @@ namespace ZPDatabase{
if (false==QSqlDatabase::contains(strDBName))
{
QString msg = "Database:"+tr(" Connection name ")+strDBName+ tr(" does not exist.");
qWarning()<<msg;
emit evt_Message(this,msg);
return QSqlDatabase();
}
......@@ -70,14 +73,19 @@ namespace ZPDatabase{
QString msg = "Database:"+tr(" Connection name ")+threadName+
tr(" Can not be cloned from database %1.").arg(strDBName)+
tr(" Err String:") + db.lastError().text();
qCritical()<<msg;
emit evt_Message(this,msg);
return QSqlDatabase();
}
else
{
QString msg = "Database:"+tr(" Connection name ")+threadName+
tr(" has been cloned from database %1.").arg(strDBName);
qDebug()<<msg;
//emit evt_Message(this,msg);
}
m_ThreadsDB[strDBName].insert(threadName);
m_ThreadOwnedMainDBs[pThread].insert(strDBName);
QString msg ="Database:"+ tr(" Connection ")+threadName+ tr(" Established.");
emit evt_Message(this,msg);
}
//Confirm the thread-owned db is still open
QSqlDatabase db = QSqlDatabase::database(threadName);
......@@ -93,6 +101,7 @@ namespace ZPDatabase{
{
QString msg = "Database:"+tr(" Connection ")+threadName+ tr(" confirm failed. MSG=");
msg += query.lastError().text();
qWarning()<<msg;
emit evt_Message(this,msg);
bNeedReconnect = true;
}
......@@ -114,6 +123,7 @@ namespace ZPDatabase{
{
QString msg = "Database:"+tr(" Connection ")+threadName+ tr(" Re-Established.");
emit evt_Message(this,msg);
qDebug()<<msg;
m_ThreadsDB[strDBName].insert(threadName);
m_ThreadOwnedMainDBs[pThread].insert(strDBName);
}
......@@ -122,6 +132,7 @@ namespace ZPDatabase{
QString msg = "Database:"+tr(" Connection name ")+threadName+
tr(" Can not be cloned from database %1.").arg(strDBName)+
tr(" Err String:") + db.lastError().text();
qWarning()<<msg;
emit evt_Message(this,msg);
m_ThreadsDB[strDBName].remove(threadName);
m_ThreadOwnedMainDBs[pThread].remove(strDBName);
......@@ -155,13 +166,15 @@ namespace ZPDatabase{
db.close();
QSqlDatabase::removeDatabase(strDBName);
QString msg = "Database:"+tr(" Connection removed ")+strDBName+ tr(" .");
emit evt_Message(this,msg);
//emit evt_Message(this,msg);
qDebug()<<msg;
RemoveTreadsConnections(strDBName);
m_ThreadsDB[strDBName].clear();
}
else
{
QString msg = "Database:"+tr(" Connection name ")+strDBName+ tr(" does not exist.");
qWarning()<<msg;
emit evt_Message(this,msg);
}
m_dbNames.remove(strDBName) ;
......@@ -179,7 +192,8 @@ namespace ZPDatabase{
db.close();
QSqlDatabase::removeDatabase(str);
QString msg = "Database:"+tr(" Connection removed ")+str+ tr(" .");
emit evt_Message(this,msg);
qDebug()<<msg;
//emit evt_Message(this,msg);
}
//Remove thread map.
foreach (QThread * ptr, m_ThreadOwnedMainDBs.keys())
......@@ -238,7 +252,8 @@ namespace ZPDatabase{
db.close();
QSqlDatabase::removeDatabase(connName);
QString msg = "Database:"+tr(" Connection removed ")+connName+ tr(" .");
emit evt_Message(this,msg);
qDebug()<<msg;
//emit evt_Message(this,msg);
}
m_dbNames[connName] = para;
......@@ -252,10 +267,12 @@ namespace ZPDatabase{
if (db.open()==true)
{
QString msg ="Database:"+ tr(" Connection ")+connName+ tr(" Established.");
emit evt_Message(this,msg);
qDebug()<<msg;
//emit evt_Message(this,msg);
return true;
}
QString msg = "Database:"+tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
qCritical()<<msg;
msg += db.lastError().text();
emit evt_Message(this,msg);
QSqlDatabase::removeDatabase(connName);
......@@ -294,6 +311,7 @@ namespace ZPDatabase{
{
QString msg = "Database:"+tr(" Connection ")+connName+ tr(" confirm failed. MSG=");
msg += query.lastError().text();
qCritical()<<msg;
emit evt_Message(this,msg);
bNeedDisconnect = true;
}
......@@ -308,6 +326,7 @@ namespace ZPDatabase{
return true;
}
QString msg = "Database:"+tr(" Connection ")+connName+ tr(" has not been opened.");
qWarning()<<msg;
emit evt_Message(this,msg);
db = QSqlDatabase::addDatabase(para.type,para.connName);
db.setHostName(para.HostAddr);
......@@ -321,12 +340,14 @@ namespace ZPDatabase{
para.status = true;
para.lastError = "";
msg = "Database:"+tr(" Connection ")+connName+ tr(" Re-Established.");
qDebug()<<msg;
emit evt_Message(this,msg);
return true;
}
QSqlDatabase::removeDatabase(connName);
msg ="Database:"+ tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
msg += db.lastError().text();
qCritical()<<msg;
emit evt_Message(this,msg);
para.status = false;
para.lastError = db.lastError().text();
......@@ -346,10 +367,12 @@ namespace ZPDatabase{
para.lastError = "";
QString msg ="Database:"+ tr(" Connection ")+connName+ tr(" Re-Established.");
emit evt_Message(this,msg);
qDebug()<<msg;
return true;
}
QString msg ="Database:"+ tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
msg += db.lastError().text();
qCritical()<<msg;
emit evt_Message(this,msg);
QSqlDatabase::removeDatabase(connName);
para.status = false;
......
......@@ -59,21 +59,42 @@ namespace STMsgLogger{
}
void st_logger::MessageOutput(QtMsgType type, const QMessageLogContext &context, const QString &msg)
{
m_mutextLogger.lock();
switch (type) {
case QtDebugMsg:
if (m_nLogLevel < 3) return;
if (m_nLogLevel < 3)
{
m_mutextLogger.unlock();
return;
}
break;
case QtWarningMsg:
if (m_nLogLevel < 2) return;
if (m_nLogLevel < 2)
{
m_mutextLogger.unlock();
return;
}
break;
case QtCriticalMsg:
if (m_nLogLevel < 1) return;
if (m_nLogLevel < 1)
{
m_mutextLogger.unlock();
return;
}
break;
case QtFatalMsg:
if (m_nLogLevel < 0) return;
if (m_nLogLevel < 0)
{
m_mutextLogger.unlock();
return;
}
break;
default:
if (m_nLogLevel < 3) return;
if (m_nLogLevel < 3)
{
m_mutextLogger.unlock();
return;
}
break;
}
if (m_pLogFile==0)
......@@ -81,12 +102,15 @@ namespace STMsgLogger{
if (m_bUseLogFile==true)
m_bUseLogFile = CreateNewLogFile(QCoreApplication::instance());
if (m_pLogFile==0)
{
m_mutextLogger.unlock();
return;
}
}
QDateTime dtmCur = QDateTime::currentDateTime().toUTC();
QString strMsg = "\n";
QString strMsg ;
strMsg += dtmCur.toString("yyyy-MM-dd HH:mm:ss.zzz");
QString strMsgHeader = dtmCur.toString("\n ");
strMsg += "(UTC)>";
......@@ -117,6 +141,7 @@ namespace STMsgLogger{
stream.flush();
if (m_pLogFile->pos()>=m_nMaxFileSize)
m_bUseLogFile = CreateNewLogFile(QCoreApplication::instance());
m_mutextLogger.unlock();
}
}
......@@ -5,6 +5,7 @@
#include <QDateTime>
#include <QDir>
#include <QByteArray>
#include <QMutex>
#include <QCoreApplication>
namespace STMsgLogger{
class st_logger : public QObject
......@@ -22,6 +23,7 @@ namespace STMsgLogger{
QString m_currLogFileName;
int m_nLogLevel;
int m_nMaxFileSize;
QMutex m_mutextLogger;
signals:
public slots:
......
......@@ -30,6 +30,7 @@ int main(int argc, char *argv[])
appTranslator.load(strTransLocalFile );
app.installTranslator(&appTranslator);
ZPMainFrame w;
w.setLogger(&g_logger);
w.show();
......@@ -59,6 +60,8 @@ int main(int argc, char *argv[])
}
}
}
int pp = app.exec();
return pp;
}
......@@ -158,7 +158,8 @@ namespace ZPNetwork{
return;
}
emit evt_Message(this,"Info>" + QString(tr("Incomming client arriverd.")));
//emit evt_Message(this,"Info>" + QString(tr("Incomming client arriverd.")));
qDebug()<<tr("Incomming client arriverd.");
//m_mutex_trans.lock();
int nsz = m_vec_NetTransThreads.size();
int nMinPay = 0x7fffffff;
......@@ -189,6 +190,7 @@ namespace ZPNetwork{
else
{
emit evt_Message(this,"Warning>"+QString(tr("Need Trans Thread Object for clients.")));
qCritical()<<tr("Need Trans Thread Object for clients.");
}
//m_mutex_trans.unlock();
}
......@@ -300,6 +302,7 @@ namespace ZPNetwork{
m_vec_netInternalTransThreads[idx]->quit();
m_vec_netInternalTransThreads[idx]->wait();
m_vec_NetTransThreads[idx]->Empty_RabishCan();
m_vec_netInternalTransThreads[idx]->deleteLater();
m_vec_NetTransThreads[idx]->deleteLater();
m_vec_netInternalTransThreads.remove(idx);
......
......@@ -14,7 +14,9 @@ quint64 g_bytesSent = 0;
quint64 g_secRecieved = 0;
quint64 g_secSent = 0;
namespace ZPNetwork{
namespace ZPNetwork{
int zp_netTransThread::RUBBISH_CAN_SIZE = 256;
zp_netTransThread::zp_netTransThread(zp_net_Engine *pThreadPool,int nPayLoad,QObject *parent) :
QObject(parent)
,m_pThreadPool(pThreadPool)
......@@ -24,6 +26,14 @@ namespace ZPNetwork{
m_bSSLConnection = true;
assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);
}
void zp_netTransThread::Empty_RabishCan()
{
m_mutex_rabish_can.lock();
foreach (QObject * pDel,m_rabish_can)
pDel->deleteLater();
m_rabish_can.clear();
m_mutex_rabish_can.unlock();
}
bool zp_netTransThread::isActive()
{
......@@ -94,6 +104,29 @@ namespace ZPNetwork{
m_nPayLoad = nPayload;
assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);
}
/**
* @brief the multithread object life-cycle is very complex, we hold a rabish_can,
* to prevent the misuse of deleted object in different threads.
* @param deletedobj The object to be deleted later
*/
void zp_netTransThread::push_to_rabish_can(QObject * deletedobj)
{
m_mutex_rabish_can.lock();
m_rabish_can.push_back(deletedobj);
if (RUBBISH_CAN_SIZE<16)
RUBBISH_CAN_SIZE = 16;
if (RUBBISH_CAN_SIZE > 65536)
RUBBISH_CAN_SIZE = 65536;
if (m_rabish_can.size()>=RUBBISH_CAN_SIZE)
qDebug()<<"Delete old objects from rubbish can.";
while (m_rabish_can.size()>=RUBBISH_CAN_SIZE)
{
m_rabish_can.first()->deleteLater();
m_rabish_can.pop_front();
}
m_mutex_rabish_can.unlock();
}
/**
* @brief This slot dealing with multi-thread client socket accept.
......@@ -136,11 +169,13 @@ namespace ZPNetwork{
connect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection);
psslsock->startServerEncryption();
}
qDebug()<<sock_client->peerAddress().toString()<<
sock_client->peerPort() <<tr("(%1)..Accepted.").arg((quint64)sock_client);
emit evt_NewClientConnected(sock_client);
emit evt_Message(sock_client,"Info>" + QString(tr("Client Accepted.")));
//emit evt_Message(sock_client,"Info>" + QString(tr("Client Accepted.")));
}
else
sock_client->deleteLater();
push_to_rabish_can(sock_client);
}
}
......@@ -204,14 +239,18 @@ namespace ZPNetwork{
{
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
emit evt_NewClientConnected(pSock);
emit evt_Message(pSock,"Info>" + QString(tr("Client connected.")));
//emit evt_Message(pSock,"Info>" + QString(tr("Client connected.")));
qDebug()<<pSock->peerAddress().toString()<<
pSock->peerPort() <<tr("(%1)..connected.").arg((quint64)pSock);
}
void zp_netTransThread::on_encrypted()
{
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
emit evt_ClientEncrypted(pSock);
emit evt_Message(pSock,"Info>" + QString(tr("Client Encrypted.")));
//emit evt_Message(pSock,"Info>" + QString(tr("Client Encrypted.")));
qDebug()<<pSock->peerAddress().toString()<<
pSock->peerPort() <<tr("(%1)..Encrypted.").arg((quint64)pSock);
}
void zp_netTransThread::client_closed()
......@@ -236,9 +275,11 @@ namespace ZPNetwork{
m_mutex_protect.lock();
m_clientList.remove(pSock);
m_mutex_protect.unlock();
pSock->deleteLater();
pSock->abort();
emit evt_ClientDisconnected(pSock);
emit evt_Message(pSock,"Info>" + QString(tr("Client Closed.")));
//emit evt_Message(pSock,"Info>" + QString(tr("Client Closed.")));
qDebug()<<tr("(%1)..Closed.").arg((quint64)pSock);
push_to_rabish_can(pSock);
}
}
void zp_netTransThread::new_data_recieved()
......@@ -297,8 +338,10 @@ namespace ZPNetwork{
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
if (pSock)
{
qDebug()<<pSock->peerAddress().toString()<<
pSock->peerPort() <<tr("(%1)..Error :%2.").arg((quint64)pSock).arg(pSock->errorString());
emit evt_SocketError(pSock,socketError);
emit evt_Message(pSock,"Debug:" + pSock->errorString());
//emit evt_Message(pSock,"Debug:" + pSock->errorString());
if (m_bSSLConnection)
{
QSslSocket * psslsock = qobject_cast<QSslSocket *>(pSock);
......@@ -317,10 +360,9 @@ namespace ZPNetwork{
m_clientList.remove(pSock);
m_mutex_protect.unlock();
pSock->abort();
pSock->deleteLater();
emit evt_ClientDisconnected(pSock);
emit evt_Message(pSock,"Info>" + QString(tr("Client Closed.")));
push_to_rabish_can(pSock);
}
}
......@@ -367,12 +409,25 @@ namespace ZPNetwork{
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(obj);
if (pSock)
{
QSslSocket * pSSl = qobject_cast<QSslSocket*>(pSock);
if (pSSl==NULL)
pSock->abort();
else
pSock->disconnectFromHost();
if (m_bSSLConnection)
{
QSslSocket * psslsock = qobject_cast<QSslSocket *>(pSock);
if (psslsock)
disconnect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted);
}
disconnect(pSock, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved);
disconnect(pSock, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed);
disconnect(pSock, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));
disconnect(pSock, &QTcpSocket::bytesWritten, this, &zp_netTransThread::some_data_sended);
disconnect(pSock, &QTcpSocket::connected,this, &zp_netTransThread::on_connected);
m_buffer_sending.erase(pSock);
m_buffer_sending_offset.erase(pSock);
m_clientList.remove(pSock);
pSock->abort();
emit evt_ClientDisconnected(pSock);
//emit evt_Message(pSock,"Info>" + QString(tr("Client Closed.")));
qDebug()<<tr("(%1)..Closed.").arg((quint64)pSock);
push_to_rabish_can(pSock);
}
}
......@@ -392,11 +447,27 @@ namespace ZPNetwork{
if (pSock)
{
QSslSocket * pSSl = qobject_cast<QSslSocket*>(pSock);
if (pSSl==NULL)
pSock->abort();
else
pSock->disconnectFromHost();
if (m_bSSLConnection)
{
QSslSocket * psslsock = qobject_cast<QSslSocket *>(pSock);
if (psslsock)
disconnect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted);
}
disconnect(pSock, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved);
disconnect(pSock, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed);
disconnect(pSock, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));
disconnect(pSock, &QTcpSocket::bytesWritten, this, &zp_netTransThread::some_data_sended);
disconnect(pSock, &QTcpSocket::connected,this, &zp_netTransThread::on_connected);
m_buffer_sending.erase(pSock);
m_buffer_sending_offset.erase(pSock);
m_mutex_protect.lock();
m_clientList.remove(pSock);
m_mutex_protect.unlock();
pSock->abort();
emit evt_ClientDisconnected(pSock);
//emit evt_Message(pSock,"Info>" + QString(tr("Client Closed.")));
qDebug()<<tr("(%1)..Closed.").arg((quint64)pSock);
push_to_rabish_can(pSock);
}
}
......
......@@ -10,6 +10,7 @@
#include <QMutex>
#include <unordered_map>
#include <QSet>
#include <QList>
namespace ZPNetwork{
class zp_net_Engine;
/**
......@@ -22,7 +23,6 @@ namespace ZPNetwork{
Q_OBJECT
public:
explicit zp_netTransThread(zp_net_Engine * pThreadPool,int nPayLoad = 4096,QObject *parent = 0);
QList <QObject *> clientsList();
int CurrentClients();
void SetPayload(int nPayload);
......@@ -32,6 +32,11 @@ namespace ZPNetwork{
bool SSLConnection();
void SetSSLConnection(bool bssl);
//RubbishCan Functions
void Empty_RabishCan();
//Size of the RubbishCan
static int RUBBISH_CAN_SIZE;
private:
bool m_bActivated;
bool m_bSSLConnection;
......@@ -44,6 +49,10 @@ namespace ZPNetwork{
int m_nPayLoad;
QMutex m_mutex_protect;
zp_net_Engine * m_pThreadPool;
//Rabish Can
QList<QObject *> m_rabish_can;
QMutex m_mutex_rabish_can;
void push_to_rabish_can(QObject * deletedobj);
public slots:
//This slot dealing with multi-thread client socket accept.
void incomingConnection(QObject * threadid,qintptr socketDescriptor);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册