未验证 提交 2a9fa6c7 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20313 from taosdata/fix/TS-2788.a

fix: join query limit doesn't work issue
...@@ -1631,6 +1631,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1631,6 +1631,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SJoinSupporter* pSupporter = (SJoinSupporter*)param; SJoinSupporter* pSupporter = (SJoinSupporter*)param;
int64_t handle = pSupporter->pObj; int64_t handle = pSupporter->pObj;
tscDebug("***enter joinRetrieveFinalResCallback");
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return; if (pParentSql == NULL) return;
...@@ -1639,7 +1641,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1639,7 +1641,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code); tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) { if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
...@@ -1682,10 +1684,16 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1682,10 +1684,16 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups; numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
} }
if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) { tscDebug("**clauseLimit:%" PRId64 " numOfClauseTotal:%" PRId64 " vgIdx:%d numOfVgroups:%d",
pParentSql->cmd.active->clauseLimit, pParentSql->res.numOfClauseTotal, pTableMetaInfo->vgroupIndex, numOfVgroups);
if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups && (pParentSql->cmd.active->clauseLimit < 0 || pParentSql->cmd.active->clauseLimit > pParentSql->res.numOfClauseTotal)) {
tscDebug("0x%"PRIx64" no result in current vnode anymore, try next vnode, vgIndex:%d", pSql->self, pTableMetaInfo->vgroupIndex); tscDebug("0x%"PRIx64" no result in current vnode anymore, try next vnode, vgIndex:%d", pSql->self, pTableMetaInfo->vgroupIndex);
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback; pSql->fp = tscJoinQueryCallback;
pSql->cmd.active->clauseLimit = pParentSql->cmd.active->clauseLimit;
pSql->cmd.active->limit.limit = pParentSql->cmd.active->clauseLimit - pParentSql->res.numOfClauseTotal;
pSql->cmd.active->limit.offset = pSql->res.offset;
tscBuildAndSendRequest(pSql, NULL); tscBuildAndSendRequest(pSql, NULL);
goto _return; goto _return;
...@@ -1744,6 +1752,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1744,6 +1752,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
bool hasData = true; bool hasData = true;
bool reachLimit = false; bool reachLimit = false;
tscDebug("***enter tscFetchDatablockForSubquery");
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
assert(pSql->subState.numOfSub >= 1); assert(pSql->subState.numOfSub >= 1);
...@@ -1782,6 +1791,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1782,6 +1791,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
// has data remains in client side, and continue to return data to app // has data remains in client side, and continue to return data to app
if (hasData) { if (hasData) {
tscDebug("*hasData");
tscBuildResFromSubqueries(pSql); tscBuildResFromSubqueries(pSql);
return; return;
} }
...@@ -1789,6 +1799,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1789,6 +1799,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
// If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
// super table projection query. // super table projection query.
if (reachLimit) { if (reachLimit) {
tscDebug("*reachLimit");
pSql->res.completed = true; pSql->res.completed = true;
freeJoinSubqueryObj(pSql); freeJoinSubqueryObj(pSql);
...@@ -1847,8 +1858,11 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1847,8 +1858,11 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
tscDebug("**nonorderedPrj:%d resRow:%d numOfRows:%d com:%d numOfClauseTotal:%"PRId64 " clauseLimit:%" PRId64,
tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0), pSub->res.row, pSub->res.numOfRows,
pSub->res.completed, pSql->res.numOfClauseTotal, pSql->cmd.active->clauseLimit);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows && if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
pSub->res.completed) { pSub->res.completed && (pSql->cmd.active->clauseLimit < 0 || pSql->res.numOfClauseTotal < pSql->cmd.active->clauseLimit)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1); assert(pQueryInfo->numOfTables == 1);
...@@ -1865,6 +1879,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1865,6 +1879,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
pTableMetaInfo->vgroupIndex); pTableMetaInfo->vgroupIndex);
pSub->cmd.command = TSDB_SQL_SELECT; pSub->cmd.command = TSDB_SQL_SELECT;
pSub->fp = tscJoinQueryCallback; pSub->fp = tscJoinQueryCallback;
pSub->cmd.active->clauseLimit = pSql->cmd.active->clauseLimit;
pSub->cmd.active->limit.limit = pSql->cmd.active->clauseLimit - pSql->res.numOfClauseTotal;
pSub->cmd.active->limit.offset = pSub->res.offset;
tscBuildAndSendRequest(pSub, NULL); tscBuildAndSendRequest(pSub, NULL);
tryNextVnode = true; tryNextVnode = true;
...@@ -2132,6 +2149,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -2132,6 +2149,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd); SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
assert(pNewQueryInfo != NULL); assert(pNewQueryInfo != NULL);
pNewQueryInfo->clauseLimit = -1;
pSupporter->colList = pNewQueryInfo->colList; pSupporter->colList = pNewQueryInfo->colList;
pNewQueryInfo->colList = NULL; pNewQueryInfo->colList = NULL;
...@@ -3794,6 +3813,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3794,6 +3813,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
if (numOfRes == 0) { // no result any more, free all subquery objects if (numOfRes == 0) { // no result any more, free all subquery objects
tscDebug("*******query complete");
pSql->res.completed = true; pSql->res.completed = true;
freeJoinSubqueryObj(pSql); freeJoinSubqueryObj(pSql);
return; return;
...@@ -3861,7 +3881,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3861,7 +3881,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
pRes->numOfRows = numOfRes; pRes->numOfRows = numOfRes;
pRes->numOfClauseTotal += numOfRes; //pRes->numOfClauseTotal += numOfRes;
int32_t finalRowSize = 0; int32_t finalRowSize = 0;
for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) { for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
...@@ -3878,6 +3898,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3878,6 +3898,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
void tscBuildResFromSubqueries(SSqlObj *pSql) { void tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
tscDebug("*enter tscBuildResFromSubqueries");
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return; return;
......
...@@ -4561,7 +4561,9 @@ int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, c ...@@ -4561,7 +4561,9 @@ int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, c
bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) { bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0); assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0);
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit); bool reachLimit = (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
tscDebug("reachLimit:%d, limit:%" PRId64 " total:%" PRId64, reachLimit, pQueryInfo->clauseLimit, pRes->numOfClauseTotal);
return reachLimit;
} }
char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; } char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册