diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 44d5d26f7132efa78a78acb2a8aad7cde73149b3..c7026b45c6260ec2e8b68a5bc2226fc08617932f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -52,10 +52,10 @@ typedef struct SWindowStatus { typedef struct SWindowResult { uint16_t numOfRows; // number of rows of current time window + SWindowStatus status; // this result status: closed or opened SPosInfo pos; // Position of current result in disk-based output buffer SResultInfo* resultInfo; // For each result column, there is a resultInfo STimeWindow window; // The time window that current result covers. - SWindowStatus status; // this result status: closed or opened } SWindowResult; /** @@ -122,6 +122,7 @@ typedef struct SQueryCostInfo { uint32_t discardBlocks; uint64_t elapsedTime; uint64_t computTime; + uint64_t internalSupSize; } SQueryCostInfo; typedef struct SQuery { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e9d1ffa639997e4b63b3a7f51e4e6fd4c220f99b..6f4afcd6bd44ec8854c14a0b36c5963b5e634627 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -398,8 +398,18 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin // more than the capacity, reallocate the resources if (pWindowResInfo->size >= pWindowResInfo->capacity) { - int64_t newCap = pWindowResInfo->capacity * 1.5; + int64_t newCap = 0; + if (pWindowResInfo->capacity > 10000) { + newCap = pWindowResInfo->capacity * 1.25; + } else { + newCap = pWindowResInfo->capacity * 1.5; + } + + printf("%ld\n", newCap); + char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult)); + pRuntimeEnv->summary.internalSupSize += (newCap - pWindowResInfo->capacity) * sizeof(SWindowResult); + if (t == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -2659,7 +2669,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { qDebug("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1); } - if (pQInfo->groupIndex == numOfGroups) { + if (pQInfo->groupIndex == numOfGroups && pQInfo->offset == pQInfo->numOfGroupResultPages) { SET_STABLE_QUERY_OVER(pQInfo); } @@ -2705,7 +2715,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->num, bytes * pData->num); } -// rows += pData->num; offset += pData->num; } @@ -2796,6 +2805,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { int64_t startt = taosGetTimestampMs(); while (1) { + if (IS_QUERY_KILLED(pQInfo)) { + qDebug("QInfo:%p it is already killed, abort", pQInfo); + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + int32_t pos = pTree->pNode[0].index; SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; @@ -3958,6 +3972,8 @@ static void queryCostStatis(SQInfo *pQInfo) { " load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, pQInfo, pSummary->elapsedTime, pSummary->totalBlocks, pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); + + qDebug("QInfo:%p :cost summary: internal size:%"PRId64, pQInfo, pSummary->internalSupSize); } static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { @@ -6039,8 +6055,6 @@ static void freeQInfo(SQInfo *pQInfo) { } SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - setQueryKilled(pQInfo); - qDebug("QInfo:%p start to free QInfo", pQInfo); for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { taosTFree(pQuery->sdata[col]); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 26c106247993e540a0f7a1c74a90c33cee40a0e6..a02413822e8555ff4e55d54374dedff7b7efb6e4 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -251,6 +251,10 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { return code; } + // todo add more error check here + // register the qhandle to connect to quit query immediate if connection is broken + vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId); + bool freeHandle = true; bool buildRes = false;