提交 b2c16621 编写于 作者: H Haojun Liao

[td-225] add check after killing query.

上级 0ee12789
...@@ -6361,8 +6361,8 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -6361,8 +6361,8 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
// clear qhandle owner // clear qhandle owner
// assert(pQInfo->owner == pthread_self()); assert(pQInfo->owner == pthread_self());
// pQInfo->owner = 0; pQInfo->owner = 0;
return buildRes; return buildRes;
} }
...@@ -6370,14 +6370,14 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -6370,14 +6370,14 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
bool qTableQuery(qinfo_t qinfo) { bool qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo); assert(pQInfo && pQInfo->signature == pQInfo);
// int64_t threadId = pthread_self(); int64_t threadId = pthread_self();
// int64_t curOwner = 0; int64_t curOwner = 0;
// if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) { if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
// qError("QInfo:%p qhandle is now executed by thread:%p", pQInfo, (void*) curOwner); qError("QInfo:%p qhandle is now executed by thread:%p", pQInfo, (void*) curOwner);
// pQInfo->code = TSDB_CODE_QRY_IN_EXEC; pQInfo->code = TSDB_CODE_QRY_IN_EXEC;
// return false; return false;
// } }
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
...@@ -6529,6 +6529,13 @@ int32_t qKillQuery(qinfo_t qinfo) { ...@@ -6529,6 +6529,13 @@ int32_t qKillQuery(qinfo_t qinfo) {
} }
setQueryKilled(pQInfo); setQueryKilled(pQInfo);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while(pQInfo->owner != 0) {
taosMsleep(100);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -225,8 +225,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -225,8 +225,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
SRetrieveTableMsg *pRetrieve = pCont; SRetrieveTableMsg *pRetrieve = pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free); pRetrieve->free = htons(pRetrieve->free);
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle); vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
...@@ -236,24 +236,29 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -236,24 +236,29 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
code = TSDB_CODE_QRY_INVALID_QHANDLE; code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet); vnodeBuildNoResultQueryRsp(pRet);
return code; return code;
} }
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle); qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
vnodeBuildNoResultQueryRsp(pRet); vnodeBuildNoResultQueryRsp(pRet);
code = TSDB_CODE_TSC_QUERY_CANCELLED;
return code; return code;
} }
// todo add more error check here
// register the qhandle to connect to quit query immediate if connection is broken // register the qhandle to connect to quit query immediate if connection is broken
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId); if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return code;
}
bool freeHandle = true; bool freeHandle = true;
bool buildRes = false; bool buildRes = false;
...@@ -273,8 +278,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -273,8 +278,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle); code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle);
} }
// if qhandle is not added into task queue, the query must be completed already or paused with error , // If qhandle is not added into vread queue, the query should be completed already or paused with error.
// free qhandle immediately // Here free qhandle immediately
if (freeHandle) { if (freeHandle) {
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册