提交 19cced6f 编写于 作者: S Shengliang Guan

TD-1768

上级 196b03f1
...@@ -56,6 +56,7 @@ void vnodeRelease(void *pVnode); // dec refCount ...@@ -56,6 +56,7 @@ void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *param);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code); void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
...@@ -65,6 +66,7 @@ int32_t vnodeInitResources(); ...@@ -65,6 +66,7 @@ int32_t vnodeInitResources();
void vnodeCleanupResources(); void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
int32_t vnodeCheckRead(void *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -465,9 +465,10 @@ void *vnodeAcquireRqueue(int32_t vgId) { ...@@ -465,9 +465,10 @@ void *vnodeAcquireRqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) { int32_t code = vnodeCheckRead(pVnode);
terrno = TSDB_CODE_APP_NOT_READY; if (code != TSDB_CODE_SUCCESS) {
vInfo("vgId:%d, status is in reset", vgId); terrno = code;
vInfo("vgId:%d, can not provide read service, status is %s", vgId, vnodeStatus[pVnode->status]);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return NULL; return NULL;
} }
...@@ -479,13 +480,14 @@ void *vnodeAcquireWqueue(int32_t vgId) { ...@@ -479,13 +480,14 @@ void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) { int32_t code = vnodeCheckWrite(pVnode);
terrno = TSDB_CODE_APP_NOT_READY; if (code != TSDB_CODE_SUCCESS) {
vInfo("vgId:%d, status is in reset", vgId); terrno = code;
vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return NULL; return NULL;
} }
return pVnode->wqueue; return pVnode->wqueue;
} }
......
...@@ -38,7 +38,13 @@ void vnodeInitReadFp(void) { ...@@ -38,7 +38,13 @@ void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
} }
static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) { //
// After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue
//
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType; int msgType = pReadMsg->rpcMsg.msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) { if (vnodeProcessReadMsgFp[msgType] == NULL) {
...@@ -46,53 +52,36 @@ static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -46,53 +52,36 @@ static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
return TSDB_CODE_VND_MSG_NOT_PROCESSED; return TSDB_CODE_VND_MSG_NOT_PROCESSED;
} }
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
}
int32_t vnodeCheckRead(void *param) {
SVnodeObj *pVnode = param;
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %s", pVnode->vgId, taosMsg[msgType], vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
vnodeStatus[pVnode->status]); pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) { if (pVnode->tsdb == NULL) {
vDebug("vgId:%d, msgType:%s not processed, tsdb is null", pVnode->vgId, taosMsg[msgType]); vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
vDebug("vgId:%d, msgType:%s not processed, vstatus is %s", pVnode->vgId, taosMsg[msgType],
vnodeStatus[pVnode->status]);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType], vDebug("vgId:%d, replica:%d role:%s, recCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica,
pVnode->syncCfg.replica, syncRole[pVnode->role]); syncRole[pVnode->role], pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); return TSDB_CODE_SUCCESS;
} }
int32_t vnodeProcessRead(void *param, SReadMsg *pRead) { static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
SVnodeObj *pVnode = (SVnodeObj *)param; int32_t code = vnodeCheckRead(pVnode);
int32_t code = vnodeProcessReadImp(pVnode, pRead); if (code != TSDB_CODE_SUCCESS) return code;
if (code == TSDB_CODE_APP_NOT_READY && pRead->rpcMsg.msgType == TSDB_MSG_TYPE_QUERY) {
// After the fetch request enters the vnode queue
// If the vnode cannot provide services, the following operations are still required
// Or, there will be a deadlock
void **qhandle = (void **)pRead->pCont;
vError("QInfo:%p msg:%p will be killed for vstatus is %s", *qhandle, pRead, vnodeStatus[pVnode->status]);
// qKillQuery(*qhandle);
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
return TSDB_CODE_APP_NOT_READY;
} else {
return code;
}
}
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle; pRead->pCont = qhandle;
...@@ -103,6 +92,8 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { ...@@ -103,6 +92,8 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead); vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
return TSDB_CODE_SUCCESS;
} }
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle) { static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle) {
...@@ -112,8 +103,13 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, ...@@ -112,8 +103,13 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle,
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) { if (continueExec) {
*freeHandle = false; *freeHandle = false;
vnodePutItemIntoReadQueue(pVnode, handle); code = vnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = *handle; if (code != TSDB_CODE_SUCCESS) {
*freeHandle = true;
return code;
} else {
pRet->qhandle = *handle;
}
} else { } else {
*freeHandle = true; *freeHandle = true;
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
...@@ -214,7 +210,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -214,7 +210,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) { if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
vnodePutItemIntoReadQueue(pVnode, handle); code = vnodePutItemIntoReadQueue(pVnode, handle);
if (code != TSDB_CODE_SUCCESS) {
pRsp->code = code;
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
return pRsp->code;
}
} }
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
......
...@@ -56,15 +56,6 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -56,15 +56,6 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return TSDB_CODE_VND_MSG_NOT_PROCESSED; return TSDB_CODE_VND_MSG_NOT_PROCESSED;
} }
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, msgType:%s not processed, no write auth", pVnode->vgId, taosMsg[pHead->msgType]);
return TSDB_CODE_VND_NO_WRITE_AUTH;
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
if (pHead->version == 0) { // from client or CQ if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
...@@ -105,6 +96,28 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -105,6 +96,28 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return syncCode; return syncCode;
} }
int32_t vnodeCheckWrite(void *param) {
SVnodeObj *pVnode = param;
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_VND_NO_WRITE_AUTH;
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) {
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
return TSDB_CODE_SUCCESS;
}
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) { void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
syncConfirmForward(pVnode->sync, version, code); syncConfirmForward(pVnode->sync, version, code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册