提交 20ceb4d8 编写于 作者: H hjxilinx

[td-186] fix bug in super table join

上级 b6bae78b
......@@ -45,9 +45,9 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
SSqlRes *pRes = &pSql->res;
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
......@@ -146,7 +146,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
}
// local reducer has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
}
......@@ -176,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
}
pSql->fp = fp;
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
......@@ -225,7 +225,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
} else {
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
......@@ -257,7 +257,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
tscResetForNextRetrieve(pRes);
pSql->fp = tscAsyncFetchSingleRowProxy;
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
......
......@@ -403,7 +403,6 @@ int doProcessSql(SSqlObj *pSql) {
int tscProcessSql(SSqlObj *pSql) {
char * name = NULL;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......@@ -423,36 +422,35 @@ int tscProcessSql(SSqlObj *pSql) {
}
tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
if (pSql->cmd.command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
if (pTableMetaInfo == NULL) {
pSql->res.code = TSDB_CODE_OTHERS;
return pSql->res.code;
}
} else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
pSql->ipList = tscMgmtIpSet;
} else if (pCmd->command < TSDB_SQL_LOCAL) {
pSql->ipList = tscMgmtIpSet; //?
} else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql);
}
// todo handle async situation
if (QUERY_IS_JOIN_QUERY(type)) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
return tscHandleMasterJoinQuery(pSql);
} else {
// for first stage sub query, iterate all vnodes to get all timestamp
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
return doProcessSql(pSql);
}
}
}
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscHandleMasterSTableQuery(pSql);
return pRes->code;
} else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion
tscHandleMultivnodeInsert(pSql);
return pSql->res.code;
}
// if (QUERY_IS_JOIN_QUERY(type)) {
// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
// return tscHandleMasterJoinQuery(pSql);
// } else {
// // for first stage sub query, iterate all vnodes to get all timestamp
// if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
// return doProcessSql(pSql);
// }
// }
// }
//
// if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
// tscHandleMasterSTableQuery(pSql);
// return pRes->code;
// } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion
// tscHandleMultivnodeInsert(pSql);
// return pRes->code;
// }
return doProcessSql(pSql);
}
......@@ -489,7 +487,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
const int64_t MAX_WAITING_TIME = 10000; // 10 Sec.
int64_t stime = taosGetTimestampMs();
while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
taosMsleep(100);
if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) {
break;
......@@ -1461,7 +1459,7 @@ int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
return tscLocalResultCommonBuilder(pSql, numOfRes);
}
int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -2257,7 +2255,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth;
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
// taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0;
}
......@@ -2637,7 +2635,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
......
......@@ -414,7 +414,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
}
// secondary merge has handle this situation
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
}
......@@ -476,7 +476,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current data are exhausted, fetch more data
if (pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_METRIC ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
......
......@@ -1254,7 +1254,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
// pRes->code check only serves in launching metric sub-queries
if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill super table function.
pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE; // enable the abort of kill super table function.
return pRes->code;
}
......@@ -1564,7 +1564,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tscFreeSubSqlObj(trsupport, pSql);
// set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
pPObj->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
if (pPObj->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0);
} else {
......
......@@ -352,7 +352,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
STscObj* pObj = pSql->pTscObj;
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_METRIC || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
tscRemoveFromSqlList(pSql);
}
......@@ -1819,6 +1819,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
// todo handle the agg arithmetic expression
for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
for(int32_t k1 = 0; k1 < numOfExprs; ++k1) {
SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
......@@ -1875,24 +1877,54 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
return pNew;
}
/**
* To decide if current is a two-stage super table query, join query, or insert. And invoke different
* procedure accordingly
* @param pSql
*/
void tscDoQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
pSql->res.code = TSDB_CODE_SUCCESS;
pRes->code = TSDB_CODE_SUCCESS;
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
return;
}
if (pCmd->command == TSDB_SQL_SELECT) {
tscAddIntoSqlList(pSql);
}
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
tscProcessMultiVnodesInsertFromFile(pSql);
} else {
if (pCmd->command == TSDB_SQL_SELECT) {
tscAddIntoSqlList(pSql);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
uint16_t type = pQueryInfo->type;
if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion
tscHandleMultivnodeInsert(pSql);
return;
}
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
tscProcessMultiVnodesInsertFromFile(pSql);
} else {
// pSql may be released in this function if it is a async insertion.
tscProcessSql(pSql);
if (QUERY_IS_JOIN_QUERY(type)) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
tscHandleMasterJoinQuery(pSql);
return;
} else {
// for first stage sub query, iterate all vnodes to get all timestamp
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
// doProcessSql(pSql);
assert(0);
}
}
} else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscHandleMasterSTableQuery(pSql);
return;
}
tscProcessSql(pSql);
}
}
......
......@@ -60,16 +60,16 @@ enum _sql_type {
TSDB_SQL_LOCAL, // SQL below for client local
TSDB_SQL_DESCRIBE_TABLE,
TSDB_SQL_RETRIEVE_METRIC,
TSDB_SQL_RETRIEVE_LOCALMERGE,
TSDB_SQL_METRIC_JOIN_RETRIEVE,
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache
*/
TSDB_SQL_RETRIEVE_EMPTY_RESULT, // 40
TSDB_SQL_RETRIEVE_EMPTY_RESULT,
TSDB_SQL_RESET_CACHE,
TSDB_SQL_RESET_CACHE, // 40
TSDB_SQL_SERV_STATUS,
TSDB_SQL_CURRENT_DB,
TSDB_SQL_SERV_VERSION,
......@@ -77,7 +77,7 @@ enum _sql_type {
TSDB_SQL_CURRENT_USER,
TSDB_SQL_CFG_LOCAL,
TSDB_SQL_MAX // 48
TSDB_SQL_MAX // 47
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册