提交 6b4c368d 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

Thread without event loop (Worker thread) can not meet the demond of signals and slots

operations in task line. adding event-loop based task thread pool.
上级 1018b064
#include "zp_net_threadpool.h"
#include <QCoreApplication>
namespace ZPNetwork{
zp_net_ThreadPool::zp_net_ThreadPool(int nPayLoad,QObject *parent) :
QObject(parent)
......@@ -9,6 +11,7 @@ zp_net_ThreadPool::zp_net_ThreadPool(int nPayLoad,QObject *parent) :
qRegisterMetaType<qintptr>("qintptr");
if (false==QMetaType::isRegistered(QMetaType::type("QAbstractSocket::SocketError")))
qRegisterMetaType<QAbstractSocket::SocketError>("QAbstractSocket::SocketError");
}
QStringList zp_net_ThreadPool::ListenerNames()
{
......@@ -127,7 +130,7 @@ void zp_net_ThreadPool::AddClientTransThreads(int nThreads)
{
for (int i=0;i<nThreads;i++)
{
zp_netTransThread * clientTH = new zp_netTransThread(m_nPayLoad);
zp_netTransThread * clientTH = new zp_netTransThread(this,m_nPayLoad);
QThread * pThread = new QThread(this);
//m_mutex_trans.lock();
m_vec_netInternalTransThreads.push_back(pThread);
......
......@@ -7,6 +7,9 @@
#include <QVector>
#include <QMutex>
#include <QThread>
#include <QSslCertificate>
#include <QSslKey>
#include <QFile>
#include "zp_netlistenthread.h"
#include "zp_nettransthread.h"
......
......@@ -5,8 +5,9 @@
#include <QDebug>
#include <QCoreApplication>
namespace ZPNetwork{
zp_netTransThread::zp_netTransThread(int nPayLoad,QObject *parent) :
zp_netTransThread::zp_netTransThread(zp_net_ThreadPool *pThreadPool,int nPayLoad,QObject *parent) :
QObject(parent)
,m_pThreadPool(pThreadPool)
{
m_nPayLoad = nPayLoad;
m_bActivated = true;
......
......@@ -7,11 +7,12 @@
#include <QAbstractSocket>
#include <QMutex>
namespace ZPNetwork{
class zp_net_ThreadPool;
class zp_netTransThread : public QObject
{
Q_OBJECT
public:
explicit zp_netTransThread(int nPayLoad = 4096,QObject *parent = 0);
explicit zp_netTransThread(zp_net_ThreadPool * pThreadPool,int nPayLoad = 4096,QObject *parent = 0);
QList <QObject *> clientsList();
int CurrentClients();
......@@ -30,6 +31,7 @@ private:
QMap<QObject*,int> m_clientList;
int m_nPayLoad;
QMutex m_mutex_protect;
zp_net_ThreadPool * m_pThreadPool;
public slots:
//新的客户连接到来
void incomingConnection(QObject * threadid,qintptr socketDescriptor);
......
......@@ -14,11 +14,18 @@ int zp_pipeline::addThreads(int nThreads)
{
zp_plWorkingThread * thread = new zp_plWorkingThread(this);
m_vec_workingThreads.push_back(thread);
thread->start();
QThread * pTh = new QThread(this);
m_vec_InternalworkingThreads.push_back(pTh);
thread->moveToThread(pTh);
connect (this,&zp_pipeline::evt_start_work,thread,&zp_plWorkingThread::FetchNewTask);
connect (thread,&zp_plWorkingThread::taskFinished,this,&zp_pipeline::on_finished_task);
pTh->start();
m_mutex_protect.lock();
m_nExistingThreads++;
m_mutex_protect.unlock();
emit evt_start_work(thread);
}
}
return m_vec_workingThreads.size();
}
......@@ -34,6 +41,7 @@ int zp_pipeline::removeThreads(int nThreads)
{
m_vec_workingThreads.last()->setStopMark();
m_vec_workingThreads.pop_back();
m_vec_InternalworkingThreads.pop_back();
}
return m_vec_workingThreads.size();
}
......@@ -77,5 +85,8 @@ int zp_pipeline::payload()
return res;
}
void zp_pipeline::on_finished_task (zp_plWorkingThread * task)
{
emit evt_start_work(task);
}
}
......@@ -38,6 +38,7 @@ protected:
QMutex m_mutex_protect;
//working threads
QVector<zp_plWorkingThread *> m_vec_workingThreads;
QVector<QThread *> m_vec_InternalworkingThreads;
//This is a C++11 function pool.
//return -1,the function will be kept in list, return 0 , will be removed.
std::list< zptaskfunc > m_list_tasks;
......@@ -47,10 +48,11 @@ protected:
zptaskfunc popTask( bool * bValid);
signals:
void evt_start_work(zp_plWorkingThread * task);
public slots:
void on_finished_task (zp_plWorkingThread * task);
};
}
#endif // ZP_PIPELINE_H
......@@ -2,11 +2,11 @@
#include <assert.h>
#include "zp_pipeline.h"
namespace ZPTaskEngine{
zp_plWorkingThread::zp_plWorkingThread(QObject *parent) :
QThread(parent)
zp_plWorkingThread::zp_plWorkingThread(zp_pipeline * pipl,QObject *parent) :
QObject(parent)
{
m_bRuning = true;
m_pipeline = qobject_cast<zp_pipeline *>(parent);
m_pipeline = pipl;
assert(m_pipeline != nullptr);
}
......@@ -15,9 +15,11 @@ void zp_plWorkingThread::setStopMark()
m_bRuning = false;
}
void zp_plWorkingThread::run()
void zp_plWorkingThread::FetchNewTask(zp_plWorkingThread * obj)
{
while (m_bRuning)
if (obj != this)
return;
if (m_bRuning)
{
bool bValid = false;
zptaskfunc funcobj = m_pipeline->popTask(&bValid);
......@@ -26,14 +28,23 @@ void zp_plWorkingThread::run()
int res = funcobj();
if (res!=0)
m_pipeline->pushTask(funcobj);
emit taskFinished(this);
}
else
this->msleep(500);
{
QThread::currentThread()->msleep(500);
emit taskFinished(this);
}
}
m_pipeline->m_mutex_protect.lock();
m_pipeline->m_nExistingThreads--;
m_pipeline->m_mutex_protect.unlock();
this->deleteLater();
else
{
m_pipeline->m_mutex_protect.lock();
m_pipeline->m_nExistingThreads--;
m_pipeline->m_mutex_protect.unlock();
this->deleteLater();
QThread::currentThread()->quit();
}
}
......
#ifndef ZP_PLWORKINGTHREAD_H
#define ZP_PLWORKINGTHREAD_H
#include <QThread>
#include <QObject>
namespace ZPTaskEngine{
class zp_pipeline;
//Working thread, reading functions from queue,
//running tasks
class zp_plWorkingThread : public QThread
class zp_plWorkingThread : public QObject
{
Q_OBJECT
public:
explicit zp_plWorkingThread(QObject *parent = 0);
explicit zp_plWorkingThread(zp_pipeline * pipl,QObject *parent = 0);
protected:
zp_pipeline * m_pipeline;
bool m_bRuning;
virtual void run();
public:
public slots:
void setStopMark();
void FetchNewTask(zp_plWorkingThread *);
signals:
void taskFinished(zp_plWorkingThread *);
};
}
......
......@@ -44,7 +44,7 @@ ZPMainFrame::~ZPMainFrame()
while (m_netEngine->CanExit()==false || m_taskEngine->canClose()==false)
{
QCoreApplication::processEvents();
thread()->msleep(200);
QThread::currentThread()->msleep(200);
//_sleep(100);
}
......@@ -92,7 +92,7 @@ void ZPMainFrame::on_evt_Data_recieved(QObject * clientHandle,const QByteArray
this->m_netEngine->SendDataToClient(clientHandle,datablock);
//push some tasks
m_taskEngine->pushTask([](void)->int {
QThread::currentThread()->msleep(200);
QThread::currentThread()->msleep(50);
return 0;
});
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册