提交 5295955b 编写于 作者: H hjxilinx

add the union support in sql parser: refactor some codes. #1032. [TBASE-1140]

上级 9382e4d8
......@@ -97,10 +97,10 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscProjectionQueryOnSTable(SSqlCmd* pCmd, int32_t subClauseIndex);
bool tscProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd);
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
......@@ -126,7 +126,7 @@ void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, co
void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible);
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size);
void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src);
......@@ -135,6 +135,7 @@ int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscGetResRowLength(SQueryInfo* pQueryInfo);
void tscClearFieldInfo(SFieldInfo* pFieldInfo);
int32_t tscNumOfFields(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex);
......@@ -163,7 +164,7 @@ int32_t tscValidateName(SSQLToken* pToken);
void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId);
bool tscValidateColumnId(SMeterMetaInfo* pMeterMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex);
......
......@@ -253,9 +253,9 @@ typedef struct {
union {
int32_t count;
int32_t numOfTablesInSubmit;
int32_t clauseIndex; // index of multiple subclause query
};
int32_t clauseIndex; // index of multiple subclause query
short numOfCols;
uint32_t allocSize;
char * payload;
......
......@@ -121,7 +121,7 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx
if (numOfRows == 0 && tscProjectionQueryOnSTable(pCmd, 0)) {
if (numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) {
// vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
......@@ -133,7 +133,6 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
}
/* update the limit value according to current retrieval results */
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
pQueryInfo->limit.offset = pRes->offset;
......@@ -269,14 +268,14 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (numOfRows == 0) {
// sequentially retrieve data from remain vnodes.
if (tscProjectionQueryOnSTable(pCmd, 0)) {
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
/*
* vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved
*/
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
/* reach the maximum number of output rows, abort */
......@@ -527,7 +526,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
}
} else { // stream computing
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code;
......
......@@ -438,7 +438,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
} else if (numOfRows == 0) { // no data from this vnode anymore
if (tscProjectionQueryOnSTable(&pParentSql->cmd, 0)) {
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -494,9 +494,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSql->res.numOfTotal += pSql->res.numOfRows;
}
SSqlCmd* pCmd = &pSql->cmd;
if (tscProjectionQueryOnSTable(pCmd, 0) && numOfRows == 0) {
if (tscProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -541,7 +539,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if (tscProjectionQueryOnSTable(&pSql->cmd, 0)) {
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes &&
(!tscHasReachLimitation(pSql->pSubs[i]))) {
numOfFetch++;
......@@ -709,7 +707,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnSTable(&pSql->cmd, 0)) {
if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) {
assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value
......
......@@ -655,7 +655,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd,
int32_t *totalNum) {
SSqlCmd * pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
STableDataBlocks *dataBuf = NULL;
......@@ -1143,7 +1143,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
strcpy(pDataBlock->filename, fname);
} else if (sToken.type == TK_LP) {
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0, 0)->pMeterMeta;
SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pMeterMeta;
SSchema * pSchema = tsGetSchema(pMeterMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
......@@ -1275,7 +1275,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
pCmd->command = TSDB_SQL_INSERT;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
if (sToken.type == TK_INSERT) {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
......@@ -1343,7 +1343,8 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd *pCmd = &pSql->cmd;
SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0, 0)->pMeterMeta;
assert(pCmd->numOfClause == 1);
SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pMeterMeta;
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);
......@@ -1375,8 +1376,11 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int numOfRows = 0;
int32_t code = 0;
int nrows = 0;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
assert(pCmd->numOfClause == 1);
int32_t rowSize = pMeterMeta->rowSize;
pCmd->pDataBlocks = tscCreateBlockArrayList();
......@@ -1465,7 +1469,9 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
}
STableDataBlocks *pDataBlock = NULL;
SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
int32_t code = TSDB_CODE_SUCCESS;
/* the first block has been sent to server in processSQL function */
......
......@@ -408,7 +408,7 @@ static int insertStmtReset(STscStmt* pStmt) {
}
pCmd->batchSize = 0;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
pMeterMetaInfo->vnodeIndex = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -422,7 +422,8 @@ static int insertStmtExecute(STscStmt* stmt) {
++pCmd->batchSize;
}
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgid
......
......@@ -101,7 +101,7 @@ static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex);
static int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj* pSql);
static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql);
static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql);
static int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
......@@ -205,7 +205,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), pInfo->pzErrMsg);
}
int32_t code = tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
assert(pQueryInfo->numOfTables == 0);
SMeterMetaInfo* pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
......@@ -502,7 +502,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_SELECT: {
assert(pCmd->numOfClause == 1);
const char* msg1 = "columns in select caluse not identical";
for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) {
SQueryInfo* pqi = NULL;
if ((code = tscGetQueryInfoDetailSafely(pCmd, i, &pqi)) != TSDB_CODE_SUCCESS) {
......@@ -526,6 +527,17 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// if there is only one element, the limit of clause is the limit of global result.
if (pCmd->numOfClause == 1) {
pCmd->globalLimit = pQueryInfo1->clauseLimit;
} else { // check the output fields information, column name and column type
pCmd->globalLimit = -1;
for(int32_t i = 1; i < pCmd->numOfClause; ++i) {
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pCmd, i);
int32_t ret = tscFieldInfoCompare(&pQueryInfo1->fieldsInfo, &pQueryInfo2->fieldsInfo);
if (ret != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
}
return TSDB_CODE_SUCCESS; // do not build query message here
......@@ -852,7 +864,9 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
const char* msg5 = "invalid binary/nchar tag length";
const char* msg6 = "invalid data type in tags";
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
assert(pCmd->numOfClause == 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta;
// no more than 6 tags
......@@ -921,7 +935,8 @@ bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
const char* msg5 = "invalid column name";
const char* msg6 = "invalid column length";
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
assert(pCmd->numOfClause == 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta;
// no more max columns
......@@ -1975,8 +1990,9 @@ int32_t changeFunctionID(int32_t optr, int16_t* functionId) {
int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
pCmd->command = TSDB_SQL_SHOW;
const char* msg1 = "invalid name";
......@@ -2125,7 +2141,7 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) {
}
}
tscFieldInfoUpdateOffset(pQueryInfo);
tscFieldInfoUpdateOffsetForInterResult(pQueryInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -2370,7 +2386,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) {
if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
tscFieldInfoUpdateOffset(pQueryInfo);
tscFieldInfoUpdateOffsetForInterResult(pQueryInfo);
} else {
tscFieldInfoCalOffset(pQueryInfo);
}
......@@ -4410,7 +4426,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) {
return (pQueryInfo->stime == pQueryInfo->etime) && (pQueryInfo->stime != 0);
}
int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj* pSql) {
int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* pQuerySql, SSqlObj* pSql) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
const char* msg0 = "soffset/offset can not be less than 0";
......@@ -4443,7 +4459,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj*
if (queryOnTags == true) { // local handle the metric tag query
pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS;
} else {
if (tscProjectionQueryOnSTable(&pSql->cmd, 0) &&
if (tscProjectionQueryOnSTable(pQueryInfo, 0) &&
(pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
......@@ -4461,7 +4477,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj*
* And then launching multiple async-queries against all qualified virtual nodes, during the first-stage
* query operation.
*/
int32_t code = tscGetMetricMeta(pSql, 0);
int32_t code = tscGetMetricMeta(pSql, clauseIndex);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4983,7 +4999,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// projection query on metric does not compatible with "group by" syntax
if (tscProjectionQueryOnSTable(pCmd, 0)) {
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -5564,7 +5580,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pQueryInfo->limit = pQuerySql->limit;
// temporarily save the original limitation value
if ((code = parseLimitClause(pQueryInfo, pQuerySql, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = parseLimitClause(pQueryInfo, index, pQuerySql, pSql)) != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -311,7 +311,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pRes->pLocalReducer = pReducer;
pRes->numOfGroups = 0;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
......@@ -582,7 +582,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
tColModel *pModel = NULL;
*pFinalModel = NULL;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pMeterMetaInfo->pMetricMeta->numOfVnodes);
......@@ -871,7 +871,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId;
}
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int8_t precision = pMeterMetaInfo->pMeterMeta->precision;
while (1) {
......@@ -1212,7 +1212,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
pQueryInfo->limit.offset = pLocalReducer->offset;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int16_t precision = pMeterMetaInfo->pMeterMeta->precision;
// for group result interpolation, do not return if not data is generated
......
......@@ -129,6 +129,11 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
pSql->fp = tscProcessHeartBeatRsp;
pSql->cmd.command = TSDB_SQL_HB;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
pQueryInfo->command = TSDB_SQL_HB;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
tfree(pSql);
return;
......@@ -223,7 +228,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
pSql->thandle = NULL;
SSqlCmd * pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { // multiple vnode query
SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
......@@ -423,7 +428,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
return ahandle;
}
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
if (msg == NULL) {
tscTrace("%p no response from ip:0x%x", pSql, pSql->ip);
......@@ -783,6 +788,9 @@ int tscProcessSql(SSqlObj *pSql) {
}
type = pQueryInfo->type;
// for hearbeat, numOfTables == 0;
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
}
tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
......@@ -796,6 +804,7 @@ int tscProcessSql(SSqlObj *pSql) {
if (pMeterMetaInfo == NULL) { // the pMeterMetaInfo cannot be NULL
pSql->res.code = TSDB_CODE_OTHERS;
assert(0);
return pSql->res.code;
}
......@@ -862,7 +871,7 @@ int tscProcessSql(SSqlObj *pSql) {
}
}
if (tscIsTwoStageMergeMetricQuery(pCmd)) {
if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
/*
* (ref. line: 964)
* Before this function returns from tscLaunchMetricSubQueries and continues, pSql may have been released at user
......@@ -932,7 +941,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
const uint32_t nBufferSize = (1 << 16); // 64KB
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int32_t numOfSubQueries = pMeterMetaInfo->pMetricMeta->numOfVnodes;
assert(numOfSubQueries > 0);
......@@ -1308,7 +1317,10 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
}
void tscKillMetricQuery(SSqlObj *pSql) {
if (!tscIsTwoStageMergeMetricQuery(&pSql->cmd)) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (!tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
return;
}
......@@ -1351,14 +1363,17 @@ void tscKillMetricQuery(SSqlObj *pSql) {
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj);
const int32_t table_index = 0;
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
assert(pQueryInfo->numOfTables == 1);
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
// launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, table_index);
pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex;
pSql->pSubs[trsupport->subqueryIndex] = pNew;
......@@ -1471,7 +1486,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
SShellSubmitMsg *pShellMsg;
char * pMsg;
SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
......@@ -1506,13 +1521,13 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
htons(pShellMsg->vnode));
return msgLen;
return TSDB_CODE_SUCCESS;
}
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
SSqlCmd * pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
char * pStart = buf + tsRpcHeadSize;
SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;
......@@ -1561,7 +1576,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
}
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfMeters, int32_t vnodeId, char *pMsg) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
......@@ -1611,8 +1626,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
char * pStart = pCmd->payload + tsRpcHeadSize;
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
......@@ -1772,7 +1787,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
hasArithmeticFunction = true;
}
if (!tscValidateColumnId(pCmd, pExpr->colInfo.colId)) {
if (!tscValidateColumnId(pMeterMetaInfo, pExpr->colInfo.colId)) {
/* column id is not valid according to the cached metermeta, the meter meta is expired */
tscError("%p table schema is not matched with parsed sql", pSql);
return -1;
......@@ -1913,7 +1928,9 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = doBuildMsgHeader(pSql, &pStart);
pCreateDbMsg = (SCreateDbMsg *)pMsg;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
assert(pCmd->numOfClause == 1);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strncpy(pCreateDbMsg->db, pMeterMetaInfo->name, tListLen(pCreateDbMsg->db));
pMsg += sizeof(SCreateDbMsg);
......@@ -2063,7 +2080,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = doBuildMsgHeader(pSql, &pStart);
pDropDbMsg = (SDropDbMsg *)pMsg;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db));
pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
......@@ -2085,7 +2102,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = doBuildMsgHeader(pSql, &pStart);
pDropTableMsg = (SDropTableMsg *)pMsg;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropTableMsg->meterId, pMeterMetaInfo->name);
pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
......@@ -2103,7 +2120,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char * pMsg, *pStart;
SSqlCmd * pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
pMsg = doBuildMsgHeader(pSql, &pStart);
pDrop = (SDropDnodeMsg *)pMsg;
......@@ -2128,7 +2145,7 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = doBuildMsgHeader(pSql, &pStart);
pDropMsg = (SDropUserMsg *)pMsg;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropMsg->user, pMeterMetaInfo->name);
pMsg += sizeof(SDropUserMsg);
......@@ -2148,7 +2165,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = doBuildMsgHeader(pSql, &pStart);
pUseDbMsg = (SUseDbMsg *)pMsg;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pUseDbMsg->db, pMeterMetaInfo->name);
pMsg += sizeof(SUseDbMsg);
......@@ -2178,7 +2195,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
size_t nameLen = strlen(pMeterMetaInfo->name);
if (nameLen > 0) {
......@@ -2414,7 +2431,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd * pCmd = &pSql->cmd;
STscObj * pObj = pSql->pTscObj;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
pStart = pCmd->payload + tsRpcHeadSize;
pMsg = pStart;
......@@ -2532,7 +2549,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
int tscProcessDescribeTableRsp(SSqlObj *pSql) {
SSqlCmd * pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int32_t numOfRes = pMeterMetaInfo->pMeterMeta->numOfColumns + pMeterMetaInfo->pMeterMeta->numOfTags;
......@@ -2772,7 +2789,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(int16_t);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, i);
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, i);
uint64_t uid = pMeterMetaInfo->pMeterMeta->uid;
offset = pMsg - (char *)pMetaMsg;
......@@ -3431,7 +3448,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
* If the query result is exhausted, or current query is to free resource at server side,
* the connection will be recycled.
*/
if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnSTable(pCmd, 0) && pRes->offset > 0)) ||
if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnSTable(pQueryInfo, 0) && pRes->offset > 0)) ||
((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) {
tscTrace("%p no result or free resource, recycle connection", pSql);
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "hash.h"
#include "os.h"
#include "tcache.h"
#include "tlog.h"
......@@ -28,7 +29,6 @@
#include "tsocket.h"
#include "ttimer.h"
#include "tutil.h"
#include "hash.h"
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
......@@ -205,7 +205,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
taosCleanUpHashTable(pSql->pTableHashList);
pSql->pTableHashList = NULL;
}
tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj);
pRes->code = (uint8_t)tsParseSql(pSql, false);
......@@ -293,11 +293,11 @@ int taos_num_fields(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return 0;
}
SFieldInfo *pFieldsInfo = &pQueryInfo->fieldsInfo;
return (pFieldsInfo->numOfOutputCols - pFieldsInfo->numOfHiddenCols);
}
......@@ -319,8 +319,8 @@ int taos_affected_rows(TAOS *taos) {
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
return pQueryInfo->fieldsInfo.pFields;
}
......@@ -370,7 +370,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
pRes->numOfTotal += pRes->numOfRows;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) +
pRes->bytes[i] * (1 - pQueryInfo->order.order) * (pRes->numOfRows - 1);
......@@ -384,9 +384,9 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
static void **doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
int32_t num = 0;
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
......@@ -439,16 +439,17 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
if (tscProjectionQueryOnSTable(pCmd, 0)) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SQueryInfo * pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo1, 0);
assert(pQueryInfo->numOfTables == 1);
/*
......@@ -465,12 +466,12 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
hasData = !allSubqueryExhausted;
} else { // otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pSql->pSubs[i]) &&
tscProjectionQueryOnTable(pQueryInfo)) || (pRes1->numOfRows == 0)) {
tscProjectionQueryOnTable(pQueryInfo1)) ||
(pRes1->numOfRows == 0)) {
hasData = false;
break;
}
......@@ -501,9 +502,9 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
free(pState);
return NULL;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pRes->tsrow == NULL) {
pRes->tsrow = malloc(POINTER_BYTES * pQueryInfo->exprsInfo.numOfExprs);
}
......@@ -578,9 +579,82 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
if (pRes->numOfRows == 0) {
return NULL;
tscProcessSql(pSql); // retrieve data from virtual node
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
/*
* no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query
*/
if (pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes;
while (++pMeterMetaInfo->vnodeIndex < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pMeterMetaInfo->vnodeIndex - 1, pMeterMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal);
// reach the maximum number of output rows, abort
if (tscHasReachLimitation(pSql)) {
return NULL;
}
/*
* update the limit and offset value for the query on the next vnode,
* according to current retrieval results
*
* NOTE:
* if the pRes->offset is larger than 0, the start returned position has not reached yet.
* Therefore, the pRes->numOfRows, as well as pRes->numOfTotal, must be 0.
* The pRes->offset value will be updated by virtual node, during query execution.
*/
if (pQueryInfo->clauseLimit >= 0) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotal;
}
pQueryInfo->limit.offset = pRes->offset;
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64,
pSql, pMeterMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset,
pQueryInfo->clauseLimit);
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
* Therefore, we need to reset the value of numOfSubs to be 0.
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql->numOfSubs = 0;
pCmd->command = TSDB_SQL_SELECT;
assert(pSql->fp == NULL);
int32_t ret = tscProcessSql(pSql); // todo check for failure
if (ret != TSDB_CODE_SUCCESS) {
pSql->res.code = ret;
return NULL;
}
// retrieve data
assert(pCmd->command == TSDB_SQL_SELECT);
pCmd->command = TSDB_SQL_FETCH;
if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
pSql->res.code = ret;
return NULL;
}
// if the result from current virtual node are empty, try next if exists. otherwise, return the results.
if (pRes->numOfRows > 0) {
break;
}
}
if (pRes->numOfRows == 0) {
tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal);
}
}
/*
......@@ -590,6 +664,10 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
pRes->numOfTotal += pRes->numOfRows;
}
if (pRes->numOfRows == 0) {
return NULL;
}
}
return getOneRowFromBuf(pSql);
......@@ -597,8 +675,8 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
// SSqlCmd *pCmd = &pSql->cmd;
// SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) {
globalCode = TSDB_CODE_DISCONNECTED;
......@@ -607,63 +685,64 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// projection query on metric, pipeline retrieve data from vnode list, instead of two-stage merge
TAOS_ROW rows = taos_fetch_row_impl(res);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
while (rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
// reach the maximum number of output rows, abort
if (tscHasReachLimitation(pSql)) {
return NULL;
}
/*
* update the limit and offset value according to current retrieval results
* Note: if pRes->offset > 0, pRes->numOfRows = 0, pRes->numOfTotal = 0;
*/
pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
pQueryInfo->limit.offset = pRes->offset;
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries, so
* we need to reset the value of numOfSubs to be 0.
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql->numOfSubs = 0;
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pCmd->command = TSDB_SQL_SELECT;
assert(pSql->fp == NULL);
tscProcessSql(pSql);
rows = taos_fetch_row_impl(res);
}
// check!!!
if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
break;
}
}
// current subclause is completed, try the next subclause
if (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
pSql->cmd.command = TSDB_SQL_SELECT;
pCmd->clauseIndex++;
assert(pSql->fp == NULL);
tscProcessSql(pSql);
rows = taos_fetch_row_impl(res);
}
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// while (rows == NULL && tscProjectionQueryOnSTable(pQueryInfo, 0)) {
// SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
//
// // reach the maximum number of output rows, abort
// if (tscHasReachLimitation(pSql)) {
// return NULL;
// }
//
// /*
// * update the limit and offset value according to current retrieval results
// * Note: if pRes->offset > 0, pRes->numOfRows = 0, pRes->numOfTotal = 0;
// */
// pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
// pQueryInfo->limit.offset = pRes->offset;
//
// assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
//
// /*
// * For project query with super table join, the numOfSub is equalled to the number of all subqueries, so
// * we need to reset the value of numOfSubs to be 0.
// *
// * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
// */
// pSql->numOfSubs = 0;
//
// if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
// pCmd->command = TSDB_SQL_SELECT;
// assert(pSql->fp == NULL);
// tscProcessSql(pSql);
// rows = taos_fetch_row_impl(res);
// }
//
// // check!!!
// if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
// break;
// }
// }
//
// // current subclause is completed, try the next subclause
// if (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
// pSql->cmd.command = TSDB_SQL_SELECT;
// pCmd->clauseIndex++;
//
// assert(pSql->fp == NULL);
//
// tscTrace("%p start next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
// tscProcessSql(pSql);
//
// rows = taos_fetch_row_impl(res);
// }
return rows;
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
int nRows = 0;
......@@ -677,9 +756,9 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
// projection query on metric, pipeline retrieve data from vnode list,
// instead of two-stage mergevnodeProcessMsgFromShell free qhandle
nRows = taos_fetch_block_impl(res, rows);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
while (*rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
while (*rows == NULL && tscProjectionQueryOnSTable(pQueryInfo, 0)) {
/* reach the maximum number of output rows, abort */
if (tscHasReachLimitation(pSql)) {
return 0;
......@@ -746,7 +825,7 @@ void taos_free_result(TAOS_RES *res) {
}
// set freeFlag to 1 in retrieve message if there are un-retrieved results
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -863,12 +942,15 @@ void taos_stop_query(TAOS_RES *res) {
if (res == NULL) return;
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
if (pSql->signature != pSql) return;
tscTrace("%p start to cancel query", res);
pSql->res.code = TSDB_CODE_QUERY_CANCELLED;
if (tscIsTwoStageMergeMetricQuery(&pSql->cmd)) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
tscKillMetricQuery(pSql);
return;
}
......@@ -915,15 +997,13 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
float fv = 0;
fv = GET_FLOAT_VAL(row[i]);
len += sprintf(str + len, "%f ", fv);
}
break;
} break;
case TSDB_DATA_TYPE_DOUBLE:{
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(row[i]);
len += sprintf(str + len, "%lf ", dv);
}
break;
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
......@@ -1011,9 +1091,9 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
int code = TSDB_CODE_INVALID_METER_ID;
char *str = (char *)tblNameList;
SQueryInfo* pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) {
......
......@@ -208,47 +208,38 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
return (SMeterSidExtInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
}
bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd) {
assert(pCmd != NULL);
int32_t subClauseIndex = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (pQueryInfo == NULL) {
return false;
}
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
if (pMeterMetaInfo == NULL || pMeterMetaInfo->pMetricMeta == NULL) {
return false;
}
// for projection query, iterate all qualified vnodes sequentially
if (tscProjectionQueryOnSTable(pCmd, subClauseIndex)) {
if (tscProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
return false;
}
if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) &&
pCmd->command == TSDB_SQL_SELECT) {
pQueryInfo->command == TSDB_SQL_SELECT) {
return UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
}
return false;
}
bool tscProjectionQueryOnSTable(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
assert(pQueryInfo->numOfTables > 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
bool tscProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
/*
* In following cases, return false for project query on metric
* 1. failed to get metermeta from server; 2. not a metric; 3. limit 0; 4. show query, instead of a select query
*/
if (pMeterMetaInfo == NULL || !UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo) ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) {
pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) {
return false;
}
......@@ -538,14 +529,16 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
assert(pDataBlock->pMeterMeta != NULL);
pCmd->numOfTablesInSubmit = pDataBlock->numOfMeters;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
assert(pCmd->numOfClause == 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
strcpy(pMeterMetaInfo->name, pDataBlock->meterId);
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pDataBlock->pMeterMeta);
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pDataBlock->pMeterMeta);
} else {
assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0);
}
......@@ -654,9 +647,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
STableDataBlocks* dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid,
TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->meterId,
pOneTableBlock->pMeterMeta, &dataBuf);
int32_t ret =
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->meterId, pOneTableBlock->pMeterMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosCleanUpHashTable(pVnodeDataBlockHashList);
......@@ -859,7 +852,7 @@ void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) {
}
}
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) {
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
if (pFieldInfo->numOfOutputCols == 0) {
return;
......@@ -922,6 +915,26 @@ int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
return pQueryInfo->fieldsInfo.pOffset[index];
}
int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2) {
assert(pFieldInfo1 != NULL && pFieldInfo2 != NULL);
if (pFieldInfo1->numOfOutputCols != pFieldInfo2->numOfOutputCols) {
return pFieldInfo1->numOfOutputCols - pFieldInfo2->numOfOutputCols;
}
for (int32_t i = 0; i < pFieldInfo1->numOfOutputCols; ++i) {
TAOS_FIELD* pField1 = &pFieldInfo1->pFields[i];
TAOS_FIELD* pField2 = &pFieldInfo2->pFields[i];
if (pField1->type != pField2->type || pField1->bytes != pField2->bytes ||
strcasecmp(pField1->name, pField2->name) != 0) {
return 1;
}
}
return 0;
}
int32_t tscGetResRowLength(SQueryInfo* pQueryInfo) {
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
if (pFieldInfo->numOfOutputCols <= 0) {
......@@ -1394,8 +1407,7 @@ void tscIncStreamExecutionCount(void* pStream) {
ps->num += 1;
}
bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
bool tscValidateColumnId(SMeterMetaInfo* pMeterMetaInfo, int32_t colId) {
if (pMeterMetaInfo->pMeterMeta == NULL) {
return false;
}
......@@ -1592,7 +1604,7 @@ SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo* pQueryInfo, int32_t
SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
......@@ -1600,19 +1612,19 @@ SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
return pCmd->pQueryInfo[subClauseIndex];
}
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) {
int32_t tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) {
int32_t ret = TSDB_CODE_SUCCESS;
*pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
while ((*pQueryInfo) == NULL) {
if ((ret = tscAddSubqueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
return ret;
}
(*pQueryInfo) = tscGetQueryInfoDetail(pCmd, subClauseIndex);
}
return TSDB_CODE_SUCCESS;
}
......@@ -1654,20 +1666,20 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
tscTagCondRelease(&pQueryInfo->tagCond);
tscClearFieldInfo(&pQueryInfo->fieldsInfo);
tfree(pQueryInfo->exprsInfo.pExprs);
memset(&pQueryInfo->exprsInfo, 0, sizeof(pQueryInfo->exprsInfo));
tscColumnBaseInfoDestroy(&pQueryInfo->colList);
memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList));
pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
tfree(pQueryInfo->defaultVal);
}
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
for(int32_t i = 0; i < pCmd->numOfClause; ++i) {
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
doClearSubqueryInfo(pQueryInfo);
}
......@@ -1679,27 +1691,21 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) {
}
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
char *addr = (char *) pCmd - offsetof(SSqlObj, cmd);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
doClearSubqueryInfo(pQueryInfo);
tscRemoveAllMeterMetaInfo(pQueryInfo, (const char *) addr, false);
tscRemoveAllMeterMetaInfo(pQueryInfo, (const char*)addr, false);
tfree(pQueryInfo);
}
pCmd->numOfClause = 0;
tfree(pCmd->pQueryInfo);
}
SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SMeterMeta* pMeterMeta,
SMetricMeta* pMetricMeta, int16_t numOfTags, int16_t* tags) {
// while (pCmd->numOfClause <= subClauseIndex) {
// tscAddSubqueryInfo(pCmd);
// }
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
void* pAlloc = realloc(pQueryInfo->pMeterInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
if (pAlloc == NULL) {
return NULL;
......@@ -1777,7 +1783,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, tableIndex);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, tableIndex);
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
......@@ -1804,6 +1810,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->cmd.pQueryInfo = NULL;
pNew->cmd.numOfClause = 0;
pNew->cmd.clauseIndex = 0;
if (tscAddSubqueryInfo(&pNew->cmd) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew);
......@@ -1811,25 +1818,25 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo));
memset(&pNewQueryInfo->colList, 0, sizeof(pNewQueryInfo->colList));
memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
pNewQueryInfo->pMeterInfo = NULL;
pNewQueryInfo->defaultVal = NULL;
pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->tsBuf = NULL;
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
}
if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex);
tscFreeSqlObj(pNew);
......@@ -1840,7 +1847,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
// set the correct query type
if (pPrevSql != NULL) {
SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, 0);
SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex);
pNewQueryInfo->type = pPrevQueryInfo->type;
} else {
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery
......@@ -1863,14 +1870,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pNewQueryInfo->fieldsInfo, indexList, numOfOutputCols);
free(indexList);
tscFieldInfoUpdateOffset(pNewQueryInfo);
tscFieldInfoUpdateOffsetForInterResult(pNewQueryInfo);
}
pNew->fp = fp;
pNew->param = param;
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pCmd, 0, key, uid);
tscGetMetricMetaCacheKey(pCmd, pCmd->clauseIndex, key, uid);
#ifdef _DEBUG_VIEW
printf("the metricmeta key is:%s\n", key);
......@@ -1886,12 +1893,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object.
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0, 0);
SMeterMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pPrevInfo->pMeterMeta);
SMetricMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pPrevInfo->pMetricMeta);
} else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object.
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
SMeterMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta);
SMetricMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册