提交 674332a5 编写于 作者: B Bomin Zhang

TD-337, #1827: thread exit gracefully

上级 345b1c3a
......@@ -17,6 +17,7 @@
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "trpc.h"
#include "twal.h"
......@@ -71,11 +72,16 @@ int32_t dnodeInitRead() {
}
void dnodeCleanupRead() {
for (int i=0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(readQset);
}
}
for (int i=0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) {
pthread_cancel(pWorker->thread);
pthread_join(pWorker->thread, NULL);
}
}
......@@ -201,15 +207,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
}
static void *dnodeProcessReadQueue(void *param) {
SReadWorker *pWorker = param;
SReadMsg *pReadMsg;
int type;
void *pVnode;
while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
dnodeHandleIdleReadWorker(pWorker);
continue;
dTrace("dnodeProcessReadQueee: got no message from qset, exiting...");
break;
}
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
......@@ -221,6 +226,8 @@ static void *dnodeProcessReadQueue(void *param) {
return NULL;
}
UNUSED_FUNC
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
int32_t num = taosGetQueueNumber(readQset);
......
......@@ -17,6 +17,7 @@
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tutil.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
......@@ -67,11 +68,16 @@ int32_t dnodeInitWrite() {
}
void dnodeCleanupWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset);
}
}
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) {
pthread_cancel(pWorker->thread);
pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset);
......@@ -186,9 +192,9 @@ static void *dnodeProcessWriteQueue(void *param) {
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs <=0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
continue;
if (numOfMsgs ==0) {
dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
break;
}
for (int32_t i = 0; i < numOfMsgs; ++i) {
......@@ -228,6 +234,7 @@ static void *dnodeProcessWriteQueue(void *param) {
return NULL;
}
UNUSED_FUNC
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset);
......
......@@ -53,6 +53,7 @@ extern "C" {
#include <string.h>
#include <strings.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/file.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
......
......@@ -199,7 +199,7 @@ typedef struct HttpThread {
pthread_t thread;
HttpContext * pHead;
pthread_mutex_t threadMutex;
pthread_cond_t fdReady;
bool stop;
int pollFd;
int numOfFds;
int threadId;
......@@ -212,6 +212,8 @@ typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
uint32_t serverIp;
uint16_t serverPort;
bool online;
int fd;
int cacheContext;
int sessionExpire;
int numOfThreads;
......@@ -226,7 +228,6 @@ typedef struct HttpServer {
bool (*processData)(HttpContext *pContext);
int requestNum;
void *timerHandle;
bool online;
} HttpServer;
// http util method
......
......@@ -258,28 +258,45 @@ void httpCloseContextByServerForExpired(void *param, void *tmrId) {
httpCloseContextByServer(pContext->pThread, pContext);
}
void httpCleanUpConnect(HttpServer *pServer) {
int i;
HttpThread *pThread;
if (pServer == NULL) return;
static void httpStopThread(HttpThread* pThread) {
pThread->stop = true;
pthread_cancel(pServer->thread);
pthread_join(pServer->thread, NULL);
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
pthread_cancel(pThread->thread);
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
pthread_cancel(pThread->thread);
}
for (i = 0; i < pServer->numOfThreads; ++i) {
pThread = pServer->pThreads + i;
if (pThread == NULL) continue;
//taosCloseSocket(pThread->pollFd);
pthread_join(pThread->thread, NULL);
if (fd != -1) {
close(fd);
}
//while (pThread->pHead) {
// httpCleanUpContext(pThread->pHead, 0);
//}
close(pThread->pollFd);
pthread_mutex_destroy(&(pThread->threadMutex));
pthread_cancel(pThread->thread);
pthread_join(pThread->thread, NULL);
pthread_cond_destroy(&(pThread->fdReady));
pthread_mutex_destroy(&(pThread->threadMutex));
//while (pThread->pHead) {
// httpCleanUpContext(pThread->pHead, 0);
//}
}
void httpCleanUpConnect(HttpServer *pServer) {
if (pServer == NULL) return;
shutdown(pServer->fd, SHUT_RD);
pthread_join(pServer->thread, NULL);
for (int i = 0; i < pServer->numOfThreads; ++i) {
HttpThread* pThread = pServer->pThreads + i;
if (pThread != NULL) {
httpStopThread(pThread);
}
}
tfree(pServer->pThreads);
......@@ -412,15 +429,13 @@ void httpProcessHttpData(void *param) {
pthread_sigmask(SIG_SETMASK, &set, NULL);
while (1) {
pthread_mutex_lock(&pThread->threadMutex);
if (pThread->numOfFds < 1) {
pthread_cond_wait(&pThread->fdReady, &pThread->threadMutex);
}
pthread_mutex_unlock(&pThread->threadMutex);
struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1);
if (pThread->stop) {
httpTrace("%p, http thread get stop event, exiting...", pThread);
break;
}
if (fdNum <= 0) continue;
for (int i = 0; i < fdNum; ++i) {
......@@ -485,10 +500,9 @@ void httpProcessHttpData(void *param) {
}
}
void httpAcceptHttpConnection(void *arg) {
void* httpAcceptHttpConnection(void *arg) {
int connFd = -1;
struct sockaddr_in clientAddr;
int sockFd;
int threadId = 0;
HttpThread * pThread;
HttpServer * pServer;
......@@ -502,12 +516,12 @@ void httpAcceptHttpConnection(void *arg) {
sigaddset(&set, SIGPIPE);
pthread_sigmask(SIG_SETMASK, &set, NULL);
sockFd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
if (sockFd < 0) {
if (pServer->fd < 0) {
httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp),
pServer->serverPort, strerror(errno));
return;
return NULL;
} else {
httpPrint("http service init success at %u", pServer->serverPort);
pServer->online = true;
......@@ -515,9 +529,12 @@ void httpAcceptHttpConnection(void *arg) {
while (1) {
socklen_t addrlen = sizeof(clientAddr);
connFd = (int)accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd < 3) {
connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd == -1) {
if (errno == EINVAL) {
httpTrace("%s HTTP server socket was shutdown, exiting...", pServer->label);
break;
}
httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno));
continue;
}
......@@ -579,7 +596,6 @@ void httpAcceptHttpConnection(void *arg) {
pThread->pHead = pContext;
pThread->numOfFds++;
pthread_cond_signal(&pThread->fdReady);
pthread_mutex_unlock(&(pThread->threadMutex));
......@@ -587,6 +603,9 @@ void httpAcceptHttpConnection(void *arg) {
threadId++;
threadId = threadId % pServer->numOfThreads;
}
close(pServer->fd);
return NULL;
}
bool httpInitConnect(HttpServer *pServer) {
......@@ -612,11 +631,6 @@ bool httpInitConnect(HttpServer *pServer) {
return false;
}
if (pthread_cond_init(&(pThread->fdReady), NULL) != 0) {
httpError("http thread:%s, init HTTP condition variable failed, reason:%s", pThread->label, strerror(errno));
return false;
}
pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter
if (pThread->pollFd < 0) {
httpError("http thread:%s, failed to create HTTP epoll", pThread->label);
......
......@@ -39,8 +39,8 @@ typedef struct SThreadObj {
pthread_t thread;
SFdObj * pHead;
pthread_mutex_t mutex;
pthread_cond_t fdReady;
uint32_t ip;
bool stop;
int pollFd;
int numOfFds;
int threadId;
......@@ -50,6 +50,7 @@ typedef struct SThreadObj {
} SThreadObj;
typedef struct {
int fd;
uint32_t ip;
uint16_t port;
char label[12];
......@@ -63,7 +64,7 @@ static void *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj);
static void taosAcceptTcpConnection(void *arg);
static void* taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
SServerObj *pServerObj;
......@@ -95,12 +96,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break;;
}
code = pthread_cond_init(&(pThreadObj->fdReady), NULL);
if (code != 0) {
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
break;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
......@@ -144,28 +139,45 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return (void *)pServerObj;
}
static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
pthread_cancel(pThreadObj->thread);
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
pthread_cancel(pThreadObj->thread);
}
pthread_join(pThreadObj->thread, NULL);
close(pThreadObj->pollFd);
if (fd != -1) {
close(fd);
}
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
}
void taosCleanUpTcpServer(void *handle) {
SServerObj *pServerObj = handle;
SThreadObj *pThreadObj;
if (pServerObj == NULL) return;
pthread_cancel(pServerObj->thread);
shutdown(pServerObj->fd, SHUT_RD);
pthread_join(pServerObj->thread, NULL);
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i;
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
close(pThreadObj->pollFd);
pthread_cancel(pThreadObj->thread);
pthread_join(pThreadObj->thread, NULL);
pthread_cond_destroy(&(pThreadObj->fdReady));
taosStopTcpThread(pThreadObj);
pthread_mutex_destroy(&(pThreadObj->mutex));
}
......@@ -175,26 +187,28 @@ void taosCleanUpTcpServer(void *handle) {
tfree(pServerObj);
}
static void taosAcceptTcpConnection(void *arg) {
static void* taosAcceptTcpConnection(void *arg) {
int connFd = -1;
struct sockaddr_in caddr;
int sockFd;
int threadId = 0;
SThreadObj *pThreadObj;
SServerObj *pServerObj;
pServerObj = (SServerObj *)arg;
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) return;
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (pServerObj->fd < 0) return NULL;
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
while (1) {
socklen_t addrlen = sizeof(caddr);
connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen);
if (connFd < 0) {
connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
if (connFd == -1) {
if (errno == EINVAL) {
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
break;
}
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
continue;
}
......@@ -220,6 +234,9 @@ static void taosAcceptTcpConnection(void *arg) {
threadId++;
threadId = threadId % pServerObj->numOfThreads;
}
close(pServerObj->fd);
return NULL;
}
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
......@@ -237,11 +254,6 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
return NULL;
}
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
return NULL;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label);
......@@ -268,17 +280,7 @@ void taosCleanUpTcpClient(void *chandle) {
SThreadObj *pThreadObj = chandle;
if (pThreadObj == NULL) return;
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
close(pThreadObj->pollFd);
pthread_cancel(pThreadObj->thread);
pthread_join(pThreadObj->thread, NULL);
taosStopTcpThread(pThreadObj);
tTrace (":%s, all connections are cleaned up", pThreadObj->label);
tfree(pThreadObj);
......@@ -350,13 +352,11 @@ static void *taosProcessTcpData(void *param) {
SRpcHead rpcHead;
while (1) {
pthread_mutex_lock(&pThreadObj->mutex);
if (pThreadObj->numOfFds < 1) {
pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex);
}
pthread_mutex_unlock(&pThreadObj->mutex);
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
if (pThreadObj->stop) {
tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
break;
}
if (fdNum < 0) continue;
for (int i = 0; i < fdNum; ++i) {
......@@ -444,7 +444,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->mutex));
return pFdObj;
......@@ -492,5 +491,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj);
}
......@@ -135,14 +135,15 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
pConn->signature = NULL;
free(pConn->buffer);
pthread_cancel(pConn->thread);
taosCloseSocket(pConn->fd);
// shutdown to signal the thread to exit
shutdown(pConn->fd, SHUT_RD);
}
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
pthread_join(pConn->thread, NULL);
free(pConn->buffer);
taosCloseSocket(pConn->fd);
tTrace("chandle:%p is closed", pConn);
}
......@@ -177,6 +178,11 @@ static void *taosRecvUdpData(void *param) {
while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if(dataLen == 0) {
tTrace("data length is 0, socket was closed, exiting");
break;
}
port = ntohs(sourceAdd.sin_port);
if (dataLen < sizeof(SRpcHead)) {
......
......@@ -39,6 +39,7 @@ void taosResetQitems(taos_qall);
taos_qset taosOpenQset();
void taosCloseQset();
void taosQsetThreadResume(taos_qset param);
int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset);
......
......@@ -230,6 +230,14 @@ void taosCloseQset(taos_qset param) {
free(qset);
}
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
void taosQsetThreadResume(taos_qset param) {
STaosQset *qset = (STaosQset *)param;
tsem_post(&qset->sem);
}
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1;
......
......@@ -31,7 +31,7 @@ typedef struct {
int numOfThreads;
pthread_t * qthread;
SSchedMsg * queue;
bool stop;
void* pTmrCtrl;
void* pTimer;
} SSchedQueue;
......@@ -85,6 +85,7 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
return NULL;
}
pSched->stop = false;
for (int i = 0; i < numOfThreads; ++i) {
pthread_attr_t attr;
pthread_attr_init(&attr);
......@@ -128,6 +129,9 @@ void *taosProcessSchedQueue(void *param) {
}
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
}
if (pSched->stop) {
break;
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
......@@ -185,13 +189,16 @@ void taosCleanUpScheduler(void *param) {
SSchedQueue *pSched = (SSchedQueue *)param;
if (pSched == NULL) return;
pSched->stop = true;
for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pSched->qthread[i])
pthread_cancel(pSched->qthread[i]);
if (pSched->qthread[i]) {
tsem_post(&pSched->fullSem);
}
}
for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pSched->qthread[i])
if (pSched->qthread[i]) {
pthread_join(pSched->qthread[i], NULL);
}
}
tsem_destroy(&pSched->emptySem);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册