提交 577962ed 编写于 作者: Y yifan hao

[Dnode / Vnode] More error handling.

This patch adds more error handling code:
1. In dnodeAllocateWqueue(), if the allocation of pWorker fails, any
allocated memory should be unwound and returns a NULL queue. This is
a better function contract where either all allocation succeeds or none
succeeds.
2. In vnodeOpen(), handle allocation failure for pVnode.
上级 0d09f6c0
...@@ -73,7 +73,7 @@ static void dnodeAllocModules() { ...@@ -73,7 +73,7 @@ static void dnodeAllocModules() {
} }
void dnodeCleanUpModules() { void dnodeCleanUpModules() {
for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) { for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) {
if (tsModule[module].enable && tsModule[module].stopFp) { if (tsModule[module].enable && tsModule[module].stopFp) {
(*tsModule[module].stopFp)(); (*tsModule[module].stopFp)();
} }
......
...@@ -31,9 +31,9 @@ ...@@ -31,9 +31,9 @@
typedef struct { typedef struct {
taos_qall qall; taos_qall qall;
taos_qset qset; // queue set taos_qset qset; // queue set
pthread_t thread; // thread pthread_t thread; // thread
int32_t workerId; // worker ID int32_t workerId; // worker ID
} SWriteWorker; } SWriteWorker;
typedef struct { typedef struct {
SRspRet rspRet; SRspRet rspRet;
...@@ -67,7 +67,7 @@ int32_t dnodeInitWrite() { ...@@ -67,7 +67,7 @@ int32_t dnodeInitWrite() {
} }
void dnodeCleanupWrite() { void dnodeCleanupWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i; SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
...@@ -120,10 +120,18 @@ void *dnodeAllocateWqueue(void *pVnode) { ...@@ -120,10 +120,18 @@ void *dnodeAllocateWqueue(void *pVnode) {
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) return NULL; if (pWorker->qset == NULL) {
taosCloseQueue(queue);
return NULL;
}
taosAddIntoQset(pWorker->qset, queue, pVnode); taosAddIntoQset(pWorker->qset, queue, pVnode);
pWorker->qall = taosAllocateQall(); pWorker->qall = taosAllocateQall();
if (pWorker->qall == NULL) {
taosCloseQset(pWorker->qset);
taosCloseQueue(queue);
return NULL;
}
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
pthread_attr_t thAttr; pthread_attr_t thAttr;
...@@ -132,7 +140,10 @@ void *dnodeAllocateWqueue(void *pVnode) { ...@@ -132,7 +140,10 @@ void *dnodeAllocateWqueue(void *pVnode) {
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno)); dError("failed to create thread to process read queue, reason:%s", strerror(errno));
taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
taosCloseQueue(queue);
queue = NULL;
} else { } else {
dTrace("write worker:%d is launched", pWorker->workerId); dTrace("write worker:%d is launched", pWorker->workerId);
} }
...@@ -183,7 +194,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -183,7 +194,7 @@ static void *dnodeProcessWriteQueue(void *param) {
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs <=0) { if (numOfMsgs <=0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
continue; continue;
} }
...@@ -213,7 +224,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -213,7 +224,7 @@ static void *dnodeProcessWriteQueue(void *param) {
taosGetQitem(pWorker->qall, &type, &item); taosGetQitem(pWorker->qall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item; pWrite = (SWriteMsg *)item;
dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code); dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code);
} else { } else {
taosFreeQitem(item); taosFreeQitem(item);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -229,7 +240,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { ...@@ -229,7 +240,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
if (num > 0) { if (num > 0) {
usleep(30000); usleep(30000);
sched_yield(); sched_yield();
} else { } else {
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
......
...@@ -108,7 +108,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -108,7 +108,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression;; tsdbCfg.compression = pVnodeCfg->cfg.compression;;
char tsdbDir[TSDB_FILENAME_LEN] = {0}; char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
...@@ -134,7 +134,7 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -134,7 +134,7 @@ int32_t vnodeDrop(int32_t vgId) {
dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId);
pVnode->status = TAOS_VN_STATUS_DELETING; pVnode->status = TAOS_VN_STATUS_DELETING;
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -176,10 +176,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -176,10 +176,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pthread_once(&vnodeModuleInit, vnodeInit); pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
if (pVnode == NULL) {
return TSDB_CODE_NO_RESOURCE;
}
pVnode->vgId = vnode; pVnode->vgId = vnode;
pVnode->status = TAOS_VN_STATUS_INIT; pVnode->status = TAOS_VN_STATUS_INIT;
pVnode->refCount = 1; pVnode->refCount = 1;
pVnode->version = 0; pVnode->version = 0;
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode)); taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
int32_t code = vnodeReadCfg(pVnode); int32_t code = vnodeReadCfg(pVnode);
...@@ -190,7 +193,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -190,7 +193,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
vnodeReadVersion(pVnode); vnodeReadVersion(pVnode);
pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->wqueue = dnodeAllocateWqueue(pVnode);
pVnode->rqueue = dnodeAllocateRqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode);
...@@ -227,12 +230,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -227,12 +230,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.confirmForward = dnodeSendRpcWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
pVnode->sync = syncStart(&syncInfo); pVnode->sync = syncStart(&syncInfo);
// start continuous query // start continuous query
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq); cqStart(pVnode->cq);
pVnode->events = NULL; pVnode->events = NULL;
...@@ -316,7 +319,7 @@ void *vnodeAccquireVnode(int32_t vgId) { ...@@ -316,7 +319,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
} }
void *vnodeGetRqueue(void *pVnode) { void *vnodeGetRqueue(void *pVnode) {
return ((SVnodeObj *)pVnode)->rqueue; return ((SVnodeObj *)pVnode)->rqueue;
} }
void *vnodeGetWqueue(int32_t vgId) { void *vnodeGetWqueue(int32_t vgId) {
...@@ -326,7 +329,7 @@ void *vnodeGetWqueue(int32_t vgId) { ...@@ -326,7 +329,7 @@ void *vnodeGetWqueue(int32_t vgId) {
} }
void *vnodeGetWal(void *pVnode) { void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal; return ((SVnodeObj *)pVnode)->wal;
} }
void vnodeBuildStatusMsg(void *param) { void vnodeBuildStatusMsg(void *param) {
...@@ -394,9 +397,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -394,9 +397,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
pVnode->role = role; pVnode->role = role;
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq); cqStart(pVnode->cq);
else else
cqStop(pVnode->cq); cqStop(pVnode->cq);
} }
...@@ -426,14 +429,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -426,14 +429,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2); len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog); len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId); len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册