From 577962ed83e4249faefedafafda5e24ec9299ffb Mon Sep 17 00:00:00 2001 From: yifan hao Date: Mon, 4 May 2020 23:25:08 -0600 Subject: [PATCH] [Dnode / Vnode] More error handling. This patch adds more error handling code: 1. In dnodeAllocateWqueue(), if the allocation of pWorker fails, any allocated memory should be unwound and returns a NULL queue. This is a better function contract where either all allocation succeeds or none succeeds. 2. In vnodeOpen(), handle allocation failure for pVnode. --- src/dnode/src/dnodeModule.c | 2 +- src/dnode/src/dnodeWrite.c | 25 ++++++++++++++++++------- src/vnode/src/vnodeMain.c | 27 +++++++++++++++------------ 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index e1aa48d477..f3f5314146 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -73,7 +73,7 @@ static void dnodeAllocModules() { } void dnodeCleanUpModules() { - for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) { + for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) { if (tsModule[module].enable && tsModule[module].stopFp) { (*tsModule[module].stopFp)(); } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 39757c690f..1e877108ca 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -31,9 +31,9 @@ typedef struct { taos_qall qall; taos_qset qset; // queue set - pthread_t thread; // thread + pthread_t thread; // thread int32_t workerId; // worker ID -} SWriteWorker; +} SWriteWorker; typedef struct { SRspRet rspRet; @@ -67,7 +67,7 @@ int32_t dnodeInitWrite() { } void dnodeCleanupWrite() { - + for (int32_t i = 0; i < wWorkerPool.max; ++i) { SWriteWorker *pWorker = wWorkerPool.writeWorker + i; if (pWorker->thread) { @@ -120,10 +120,18 @@ void *dnodeAllocateWqueue(void *pVnode) { if (pWorker->qset == NULL) { pWorker->qset = taosOpenQset(); - if (pWorker->qset == NULL) return NULL; + if (pWorker->qset == NULL) { + taosCloseQueue(queue); + return NULL; + } taosAddIntoQset(pWorker->qset, queue, pVnode); pWorker->qall = taosAllocateQall(); + if (pWorker->qall == NULL) { + taosCloseQset(pWorker->qset); + taosCloseQueue(queue); + return NULL; + } wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; pthread_attr_t thAttr; @@ -132,7 +140,10 @@ void *dnodeAllocateWqueue(void *pVnode) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { dError("failed to create thread to process read queue, reason:%s", strerror(errno)); + taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); + taosCloseQueue(queue); + queue = NULL; } else { dTrace("write worker:%d is launched", pWorker->workerId); } @@ -183,7 +194,7 @@ static void *dnodeProcessWriteQueue(void *param) { while (1) { numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); - if (numOfMsgs <=0) { + if (numOfMsgs <=0) { dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore continue; } @@ -213,7 +224,7 @@ static void *dnodeProcessWriteQueue(void *param) { taosGetQitem(pWorker->qall, &type, &item); if (type == TAOS_QTYPE_RPC) { pWrite = (SWriteMsg *)item; - dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code); + dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code); } else { taosFreeQitem(item); vnodeRelease(pVnode); @@ -229,7 +240,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { if (num > 0) { usleep(30000); - sched_yield(); + sched_yield(); } else { taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 2a7d133039..e8676fc8ba 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -108,7 +108,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.compression = pVnodeCfg->cfg.compression;; - + char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); @@ -134,7 +134,7 @@ int32_t vnodeDrop(int32_t vgId) { dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId); pVnode->status = TAOS_VN_STATUS_DELETING; vnodeCleanUp(pVnode); - + return TSDB_CODE_SUCCESS; } @@ -176,10 +176,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); + if (pVnode == NULL) { + return TSDB_CODE_NO_RESOURCE; + } pVnode->vgId = vnode; pVnode->status = TAOS_VN_STATUS_INIT; pVnode->refCount = 1; - pVnode->version = 0; + pVnode->version = 0; taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode)); int32_t code = vnodeReadCfg(pVnode); @@ -190,7 +193,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } vnodeReadVersion(pVnode); - + pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); @@ -227,12 +230,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.writeToCache = vnodeWriteToQueue; - syncInfo.confirmForward = dnodeSendRpcWriteRsp; + syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; pVnode->sync = syncStart(&syncInfo); // start continuous query - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); pVnode->events = NULL; @@ -316,7 +319,7 @@ void *vnodeAccquireVnode(int32_t vgId) { } void *vnodeGetRqueue(void *pVnode) { - return ((SVnodeObj *)pVnode)->rqueue; + return ((SVnodeObj *)pVnode)->rqueue; } void *vnodeGetWqueue(int32_t vgId) { @@ -326,7 +329,7 @@ void *vnodeGetWqueue(int32_t vgId) { } void *vnodeGetWal(void *pVnode) { - return ((SVnodeObj *)pVnode)->wal; + return ((SVnodeObj *)pVnode)->wal; } void vnodeBuildStatusMsg(void *param) { @@ -394,9 +397,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; pVnode->role = role; - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); - else + else cqStop(pVnode->cq); } @@ -426,14 +429,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2); len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); - len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); + len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); - + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId); -- GitLab