diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index c99cf87b210b9089f32e6e3a71418f8631ec8041..d66ebf9772bcf702490b5c780de3223cd1b6a833 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -211,8 +211,8 @@ static void *dnodeProcessReadQueue(void *param) { dnodeSendRpcReadRsp(pVnode, pReadMsg, code); } else { if (code == TSDB_CODE_QRY_HAS_RSP) { - dnodeSendRpcReadRsp(pVnode, pReadMsg, TSDB_CODE_SUCCESS); - } else { + dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); + } else { // code == TSDB_CODE_NOT_READY, do not return msg to client dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); } } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 34700a33f3e7e393dbec5e09713b1879c3c304ac..20e43866d7d33778b5676fcccaafc5438dd3100d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2338,6 +2338,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { continue; } + if (terrno != TSDB_CODE_SUCCESS) { // load data block failed, abort query + longjmp(pRuntimeEnv->env, terrno); + break; + } + // query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 98153efe3e2e9ddb5df8f108ac9f7dbbc3be4c5f..0a72cfa7b9ab8a79d7599b3afd89eafc5bd90afa 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -679,7 +679,13 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo if (pCheckInfo->pDataCols == NULL) { STsdbMeta* pMeta = tsdbGetMeta(pRepo); + pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); + if (pCheckInfo->pDataCols == NULL) { + tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return blockLoaded; + } } STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); @@ -745,7 +751,11 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* return; } - doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot); + // return error, add test cases + if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) { + + } + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { /* @@ -1714,9 +1724,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); STableCheckInfo info = { .lastKey = pSecQueryHandle->window.skey, - //.tableId = pCheckInfo->tableId, .pTableObj = pCheckInfo->pTableObj, }; + info.tableId = pCheckInfo->tableId; taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info); @@ -1726,8 +1736,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); - bool ret = tsdbNextDataBlock((void*) pSecQueryHandle); - assert(ret); + if (!tsdbNextDataBlock((void*) pSecQueryHandle)) { + return false; + } tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); @@ -1770,7 +1781,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { bool exists = true; int32_t code = getDataBlocksInFiles(pQueryHandle, &exists); if (code != TSDB_CODE_SUCCESS) { - return false; + return code; } if (exists) { @@ -2048,8 +2059,10 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { return pHandle->pColumns; } else { // only load the file block SCompBlock* pBlock = pBlockInfo->compBlock; - doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot); + if (!doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot)) { + return NULL; + } // todo refactor int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 8ca76ef22de81e4e98fde59da1f889b8e306935c..c41b24579469d93f6864727a919327f782f87dbb 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -93,8 +93,11 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); } } else { - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + SRetrieveTableRsp* pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + pRsp->completed = true; + + pRet->rsp = pRsp; *freeHandle = true; } @@ -200,18 +203,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, pReadMsg->rpcMsg.handle); - code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle); - // todo test the error code case - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_QRY_HAS_RSP; - } + // set the real rsp error code + pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle); + + // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client + code = TSDB_CODE_QRY_HAS_RSP; } else { freehandle = qQueryCompleted(*qhandle); } // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle. - // if not build result, free it not by forced. + // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle if (freehandle || (!buildRes)) { qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); }