diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 68239c5c63248e199e6b339767acdc5d6e905038..c7e0a53502af5ec16100b5972e7807f6da7174f7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6361,8 +6361,8 @@ static bool doBuildResCheck(SQInfo* pQInfo) { pthread_mutex_unlock(&pQInfo->lock); // clear qhandle owner -// assert(pQInfo->owner == pthread_self()); -// pQInfo->owner = 0; + assert(pQInfo->owner == pthread_self()); + pQInfo->owner = 0; return buildRes; } @@ -6370,14 +6370,14 @@ static bool doBuildResCheck(SQInfo* pQInfo) { bool qTableQuery(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; assert(pQInfo && pQInfo->signature == pQInfo); -// int64_t threadId = pthread_self(); + int64_t threadId = pthread_self(); -// int64_t curOwner = 0; -// if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) { -// qError("QInfo:%p qhandle is now executed by thread:%p", pQInfo, (void*) curOwner); -// pQInfo->code = TSDB_CODE_QRY_IN_EXEC; -// return false; -// } + int64_t curOwner = 0; + if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) { + qError("QInfo:%p qhandle is now executed by thread:%p", pQInfo, (void*) curOwner); + pQInfo->code = TSDB_CODE_QRY_IN_EXEC; + return false; + } if (IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); @@ -6529,6 +6529,13 @@ int32_t qKillQuery(qinfo_t qinfo) { } setQueryKilled(pQInfo); + + // Wait for the query executing thread being stopped/ + // Once the query is stopped, the owner of qHandle will be cleared immediately. + while(pQInfo->owner != 0) { + taosMsleep(100); + } + return TSDB_CODE_SUCCESS; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index a02413822e8555ff4e55d54374dedff7b7efb6e4..8ca76ef22de81e4e98fde59da1f889b8e306935c 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -225,8 +225,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { SRspRet *pRet = &pReadMsg->rspRet; SRetrieveTableMsg *pRetrieve = pCont; - pRetrieve->qhandle = htobe64(pRetrieve->qhandle); pRetrieve->free = htons(pRetrieve->free); + pRetrieve->qhandle = htobe64(pRetrieve->qhandle); vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle); @@ -236,24 +236,29 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { code = TSDB_CODE_QRY_INVALID_QHANDLE; - vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); + vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); vnodeBuildNoResultQueryRsp(pRet); return code; } if (pRetrieve->free == 1) { - vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); + vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); vnodeBuildNoResultQueryRsp(pRet); + code = TSDB_CODE_TSC_QUERY_CANCELLED; return code; } - // todo add more error check here // register the qhandle to connect to quit query immediate if connection is broken - vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId); + if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); + code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + return code; + } bool freeHandle = true; bool buildRes = false; @@ -273,8 +278,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle); } - // if qhandle is not added into task queue, the query must be completed already or paused with error , - // free qhandle immediately + // If qhandle is not added into vread queue, the query should be completed already or paused with error. + // Here free qhandle immediately if (freeHandle) { qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); }