From fe05941584cc6c5973a3cb0c55c5f85bd148802c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Jul 2020 10:51:21 +0800 Subject: [PATCH] [td-255] --- src/query/src/qExecutor.c | 5 +--- src/util/src/tcache.c | 3 +- src/vnode/src/vnodeRead.c | 58 ++++++++++++++++++--------------------- 3 files changed, 30 insertions(+), 36 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index cc4fab343d..bca07e7150 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6372,16 +6372,14 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex return pQInfo->code; } - *buildRes = false; int32_t code = TSDB_CODE_SUCCESS; - pthread_mutex_lock(&pQInfo->lock); if (pQInfo->dataReady == QUERY_RESULT_READY) { *buildRes = true; - qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, pQInfo->code); } else { + *buildRes = false; qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); pQInfo->rspContext = pRspContext; } @@ -6473,7 +6471,6 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } else { // failed to dump result, free qhandle immediately *continueExec = false; qKillQuery(pQInfo); - qDestroyQueryInfo(pQInfo); } return code; diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 0d295a0cfa..a086a87b4b 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -419,7 +419,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { // note: extend lifespan before dec ref count bool inTrashCan = pNode->inTrashCan; - if (pCacheObj->extendLifespan && (!inTrashCan)) { + if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) { atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime); } @@ -643,6 +643,7 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t __cache_wr_lock(pCacheObj); while (taosHashIterNext(pIter)) { SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) { taosCacheReleaseNode(pCacheObj, pNode); continue; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 49c10dca3f..2a4ca0e663 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -66,12 +66,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } -static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle, void* handle) { +static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->pCont = qhandle; pRead->contLen = 0; - pRead->rpcMsg.handle = handle; + pRead->rpcMsg.handle = NULL; atomic_add_fetch_32(&pVnode->refCount, 1); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); @@ -99,6 +99,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); } else { assert(*qhandle == (void*) killQueryMsg->qhandle); + qKillQuery(*qhandle); qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); } @@ -123,8 +124,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // current connect is broken if (code == TSDB_CODE_SUCCESS) { -// handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); - handle = &pQInfo; + handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); if (handle == NULL) { // failed to register qhandle vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, tstrerror(pRsp->code)); @@ -135,11 +135,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRsp->qhandle = htobe64((uint64_t) pQInfo); } -// pQInfo = NULL; if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; -// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); return pRsp->code; } } else { @@ -149,15 +148,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); - vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); -// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); + vnodePutItemIntoReadQueue(pVnode, *handle); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } } else { assert(pCont != NULL); - void* p = (void*) pCont; - handle = &p; -// handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont); + + handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont); if (handle == NULL) { vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle); code = TSDB_CODE_QRY_INVALID_QHANDLE; @@ -166,23 +164,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { bool buildRes = qTableQuery(*handle); // do execute query if (buildRes) { // build result rsp + // update the connection info according to the retrieve connection + pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle); + assert(pReadMsg->rpcMsg.handle != NULL); - void* retrieveHandle = qGetResultRetrieveMsg(*handle); - assert(retrieveHandle != NULL); - - vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, retrieveHandle); - pReadMsg->rpcMsg.handle = retrieveHandle; // update the connection info according to the retrieve connection + vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); pRet = &pReadMsg->rspRet; - code = TSDB_CODE_QRY_HAS_RSP; +// code = TSDB_CODE_QRY_HAS_RSP; bool continueExec = false; if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if (continueExec) { - vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); + vTrace("QInfo:%p add to queue for further exec", *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); pRet->qhandle = *handle; - code = TSDB_CODE_SUCCESS; +// code = TSDB_CODE_SUCCESS; + } else { + vDebug("QInfo:%p query completed", *handle); } } else { // todo handle error } @@ -190,7 +190,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { code = TSDB_CODE_QRY_HAS_RSP; } } -// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); + + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } return code; @@ -209,11 +210,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet, 0, sizeof(SRspRet)); int32_t code = TSDB_CODE_SUCCESS; - void** handle = NULL; - void* p1 = (void*) pRetrieve->qhandle; - handle = &p1; - -// void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); + void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { code = TSDB_CODE_QRY_INVALID_QHANDLE; vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); @@ -233,7 +230,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pRetrieve->free == 1) { vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); qKillQuery(*handle); -// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); @@ -255,16 +252,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { //TODO handle malloc failure pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - } else { - // result is not ready, return immediately + } else { // result is not ready, return immediately if (!buildRes) { + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); return TSDB_CODE_QRY_NOT_READY; } bool continueExec = false; if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if (continueExec) { - vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); + vnodePutItemIntoReadQueue(pVnode, *handle); pRet->qhandle = *handle; freeHandle = false; } @@ -274,8 +271,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } } - UNUSED(freeHandle); -// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle); return code; } -- GitLab