diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index baaeae2a81d28331454eb414106eabe1ef64d939..f6e773c96b24950dd363e58c7008970c024f7c05 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -67,12 +67,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } -static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { +static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->pCont = qhandle; pRead->contLen = 0; - pRead->rpcMsg.handle = NULL; + pRead->rpcMsg.ahandle = ahandle; atomic_add_fetch_32(&pVnode->refCount, 1); @@ -80,14 +80,23 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); } -static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle) { +/** + * + * @param pRet response message object + * @param pVnode the vnode object + * @param handle qhandle for executing query + * @param freeHandle free qhandle or not + * @param ahandle sqlObj address at client side + * @return + */ +static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle, void *ahandle) { bool continueExec = false; int32_t code = TSDB_CODE_SUCCESS; if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if (continueExec) { *freeHandle = false; - vnodePutItemIntoReadQueue(pVnode, handle); + vnodePutItemIntoReadQueue(pVnode, handle, ahandle); pRet->qhandle = *handle; } else { *freeHandle = true; @@ -189,7 +198,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); - vnodePutItemIntoReadQueue(pVnode, handle); + vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle); } } else { assert(pCont != NULL); @@ -210,7 +219,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pReadMsg->rpcMsg.handle); // set the real rsp error code - pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle); + pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcMsg.ahandle); // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client code = TSDB_CODE_QRY_HAS_RSP; @@ -287,7 +296,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { return TSDB_CODE_QRY_NOT_READY; } - code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle); + // ahandle is the sqlObj pointer + code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle); } // If qhandle is not added into vread queue, the query should be completed already or paused with error.