提交 8137a287 编写于 作者: H Haojun Liao

[td-225] fix last query bug and add some function definitions.

上级 455fd0f7
......@@ -221,20 +221,18 @@ typedef struct STableDataBlocks {
SParamInfo *params;
} STableDataBlocks;
//typedef struct SDataBlockList { // todo remove
// uint32_t nSize;
// uint32_t nAlloc;
// STableDataBlocks **pData;
//} SDataBlockList;
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
// TODO refactor
char intervalTimeUnit;
char slidingTimeUnit;
uint32_t type; // query/insert type
STimeWindow window; // query time window
int64_t intervalTime; // aggregation time interval
int64_t intervalTime; // aggregation time window range
int64_t slidingTime; // sliding window in mseconds
int64_t intervalOffset;// start offset of each time window
int32_t tz; // query client timezone
SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
......
......@@ -69,6 +69,15 @@ extern "C" {
#define TSDB_FUNC_AVG_IRATE 33
#define TSDB_FUNC_TID_TAG 34
#define TSDB_FUNC_HISTOGRAM 35
#define TSDB_FUNC_HLL 36
#define TSDB_FUNC_MODE 37
#define TSDB_FUNC_SAMPLE 38
#define TSDB_FUNC_CEIL 39
#define TSDB_FUNC_FLOOR 40
#define TSDB_FUNC_ROUND 41
#define TSDB_FUNC_MAVG 42
#define TSDB_FUNC_CSUM 43
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......
......@@ -35,8 +35,6 @@
* forced to load primary column explicitly.
*/
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
......@@ -1602,11 +1600,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
SColIndex* pIndex = &pSqlFuncMsg->colInfo;
if (TSDB_COL_REQ_NULL(pIndex->flag)) {
pCtx->requireNull = true;
pIndex->flag &= ~(TSDB_COL_NULL);
pCtx->requireNull = true;
pIndex->flag &= ~(TSDB_COL_NULL);
} else {
pCtx->requireNull = false;
}
pCtx->requireNull = false;
}
int32_t index = pSqlFuncMsg->colInfo.colIndex;
if (TSDB_COL_IS_TAG(pIndex->flag)) {
......@@ -1927,24 +1925,24 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD
static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
// todo refactor, add iterator
static void doExchangeTimeWindow(SQInfo* pQInfo) {
size_t t = GET_NUM_OF_TABLEGROUP(pQInfo);
static void doExchangeTimeWindow(SQInfo* pQInfo, STimeWindow* win) {
size_t t = taosArrayGetSize(pQInfo->tableGroupInfo.pGroupList);
for(int32_t i = 0; i < t; ++i) {
SArray* p1 = GET_TABLEGROUP(pQInfo, i);
SArray* p1 = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
SArray* tableKeyGroup = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
size_t len = taosArrayGetSize(p1);
for(int32_t j = 0; j < len; ++j) {
STableQueryInfo* pTableQueryInfo = (STableQueryInfo*) taosArrayGetP(p1, j);
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
STableKeyInfo* pInfo = taosArrayGet(p1, j);
STableKeyInfo* pInfo = taosArrayGet(tableKeyGroup, j);
pInfo->lastKey = pTableQueryInfo->win.skey;
// update the new lastkey if it is equalled to the value of the old skey
if (pInfo->lastKey == win->ekey) {
pInfo->lastKey = win->skey;
}
}
}
}
static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool stableQuery) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
// in case of point-interpolation query, use asc order scan
......@@ -1961,15 +1959,17 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
if (pQuery->window.skey > pQuery->window.ekey) {
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
return;
}
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) && pQuery->order.order == TSDB_ORDER_DESC) {
pQuery->order.order = TSDB_ORDER_ASC;
if (pQuery->window.skey > pQuery->window.ekey) {
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
doExchangeTimeWindow(pQInfo, &pQuery->window);
return;
}
......@@ -1991,7 +1991,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo);
doExchangeTimeWindow(pQInfo, &pQuery->window);
}
pQuery->order.order = TSDB_ORDER_ASC;
......@@ -2001,7 +2001,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo);
doExchangeTimeWindow(pQInfo, &pQuery->window);
}
pQuery->order.order = TSDB_ORDER_DESC;
......@@ -2015,6 +2015,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo, &pQuery->window);
}
pQuery->order.order = TSDB_ORDER_ASC;
......@@ -2024,6 +2025,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo, &pQuery->window);
}
pQuery->order.order = TSDB_ORDER_DESC;
......@@ -4449,10 +4451,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
setScanLimitationByResultBuffer(pQuery);
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
// TODO fixme
changeExecuteScanOrder(pQInfo, isSTableQuery);
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -6022,14 +6020,6 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
}
}
static int compareTableIdInfo(const void* a, const void* b) {
const STableIdInfo* x = (const STableIdInfo*)a;
const STableIdInfo* y = (const STableIdInfo*)b;
if (x->uid > y->uid) return 1;
if (x->uid < y->uid) return -1;
return 0;
}
static void freeQInfo(SQInfo *pQInfo);
static void calResultBufSize(SQuery* pQuery) {
......@@ -6051,8 +6041,8 @@ static void calResultBufSize(SQuery* pQuery) {
}
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) {
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery) {
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput;
......@@ -6151,8 +6141,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
}
int tableIndex = 0;
STimeWindow window = pQueryMsg->window;
taosArraySort(pTableIdList, compareTableIdInfo);
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
......@@ -6161,12 +6149,20 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
}
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
// changeExecuteScanOrder(pQInfo, stableQuery);
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo));
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pthread_mutex_init(&pQInfo->lock, NULL);
pQuery->pos = -1;
pQuery->window = pQueryMsg->window;
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
STimeWindow window = pQuery->window;
int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
SArray* pa = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
size_t s = taosArrayGetSize(pa);
SArray* p1 = taosArrayInit(s, POINTER_BYTES);
......@@ -6179,12 +6175,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
for(int32_t j = 0; j < s; ++j) {
STableKeyInfo* info = taosArrayGet(pa, j);
STableId* id = TSDB_TABLEID(info->pTable);
STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
window.skey = (pTableId != NULL)? pTableId->key:pQueryMsg->window.skey;
void* buf = (char*)pQInfo->pBuf + index * sizeof(STableQueryInfo);
window.skey = info->lastKey;
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf);
if (item == NULL) {
goto _cleanup;
......@@ -6192,17 +6185,13 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
item->groupIndex = i;
taosArrayPush(p1, &item);
STableId* id = TSDB_TABLEID(info->pTable);
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
index += 1;
}
}
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo));
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pthread_mutex_init(&pQInfo->lock, NULL);
pQuery->pos = -1;
pQuery->window = pQueryMsg->window;
colIdCheck(pQuery);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
......@@ -6558,7 +6547,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
assert(0);
}
(*pQInfo) = createQInfoImpl(pQueryMsg, pTableIdList, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo);
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery);
pExprs = NULL;
pGroupbyExpr = NULL;
pTagColumnInfo = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册