提交 f66f203c 编写于 作者: L liuyao

reload semi session state

上级 ccb675fe
......@@ -1874,6 +1874,31 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
return winNum;
}
static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SResultRow* pCurResult = NULL;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
// Just look for the window behind StartIndex
while (1) {
SResultWindowInfo winInfo = {0};
SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo);
if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) ||
!inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) {
taosMemoryFree(winInfo.pOutputBuf);
pAPI->stateStore.streamStateFreeCur(pCur);
break;
}
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
pAPI->stateStore.streamStateFreeCur(pCur);
taosMemoryFree(winInfo.pOutputBuf);
}
}
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
&pAggSup->stateStore);
......@@ -2081,6 +2106,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
continue;
}
......@@ -2089,6 +2115,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
break;
}
}
......@@ -2099,7 +2126,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
saveResult(parentWin, pStUpdated);
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
} else {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
break;
}
}
......@@ -2474,13 +2503,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
void streamSessionReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
}
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
......@@ -2492,6 +2519,33 @@ void resetWinRange(STimeWindow* winRange) {
winRange->ekey = INT64_MAX;
}
void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
SResultWindowInfo winInfo = {0};
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
compactSessionSemiWindow(pOperator, &winInfo);
saveSessionOutputBuf(pAggSup, &winInfo);
}
taosMemoryFree(pBuf);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
void streamSessionReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
......@@ -2731,6 +2785,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pStUpdated);
pInfo->pStUpdated = NULL;
if(pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
......@@ -2763,6 +2822,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState);
}
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册