提交 dcf17ab0 编写于 作者: H Haojun Liao

[td-225] check errors from tsdb

上级 24055909
......@@ -158,6 +158,7 @@ void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup() {
if (tscCacheHandle != NULL) {
taosCacheCleanup(tscCacheHandle);
tscCacheHandle = NULL;
}
if (tscQhandle != NULL) {
......
......@@ -16,7 +16,6 @@
#include "os.h"
#include "tulog.h"
#include "talgo.h"
#include "tutil.h"
#include "tcompare.h"
#include "exception.h"
......@@ -599,6 +598,8 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) {
// load all the comp offset value for all tables in this file
int32_t code = TSDB_CODE_SUCCESS;
*numOfBlocks = 0;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
......@@ -606,7 +607,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
pCheckInfo->numOfBlocks = 0;
tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
if (tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb) != TSDB_CODE_SUCCESS) {
code = terrno;
break;
}
SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx;
......@@ -619,7 +623,11 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
assert(t != NULL);
if (t == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
break;
}
pCheckInfo->pCompInfo = (SCompInfo*) t;
pCheckInfo->compSize = compIndex->len;
......@@ -661,7 +669,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
(*numOfBlocks) += pCheckInfo->numOfBlocks;
}
return TSDB_CODE_SUCCESS;
return code;
}
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
......@@ -672,9 +680,8 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
.uid = (_checkInfo)->tableId.uid})
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
STsdbRepo *pRepo = pQueryHandle->pTsdb;
bool blockLoaded = false;
int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) {
......@@ -684,7 +691,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
if (pCheckInfo->pDataCols == NULL) {
tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return blockLoaded;
return terrno;
}
}
......@@ -694,16 +701,17 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle)));
if (ret == TSDB_CODE_SUCCESS) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
if (ret != TSDB_CODE_SUCCESS) {
return terrno;
}
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
blockLoaded = true;
}
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
......@@ -715,12 +723,14 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p",
pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo);
return blockLoaded;
return TSDB_CODE_SUCCESS;
}
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
int32_t code = TSDB_CODE_SUCCESS;
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
......@@ -748,12 +758,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->mixBlock = true;
cur->blockCompleted = false;
return;
return code;
}
// return error, add test cases
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) {
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
return code;
}
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
......@@ -774,16 +784,20 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
pCheckInfo->lastKey = cur->lastKey;
}
return code;
}
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// query ended in/started from current block
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) {
return false;
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
*exists = false;
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
......@@ -799,12 +813,13 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
assert(pCheckInfo->lastKey <= pBlock->keyLast);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
} else { //desc order, query ended in current block
if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) {
return false;
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
*exists = false;
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
......@@ -817,11 +832,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
}
return pQueryHandle->realNumOfRows > 0;
*exists = pQueryHandle->realNumOfRows > 0;
return code;
}
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
......@@ -1577,9 +1593,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo, exists);
}
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
......@@ -1618,16 +1632,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
}
} else {
tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo);
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
*exists = pQueryHandle->realNumOfRows > 0;
return TSDB_CODE_SUCCESS;
return code;
}
}
}
......@@ -1665,8 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return false;
}
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn);
if (terrno != TSDB_CODE_SUCCESS) {
return false;
}
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return
......@@ -1737,6 +1752,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
if (!tsdbNextDataBlock((void*) pSecQueryHandle)) {
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
......@@ -2059,10 +2075,10 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
return pHandle->pColumns;
} else { // only load the file block
SCompBlock* pBlock = pBlockInfo->compBlock;
if (!doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot)) {
if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
return NULL;
}
// todo refactor
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
......
#sleep 2000
#run general/parser/alter.sim
#sleep 2000
#run general/parser/alter1.sim
#sleep 2000
#run general/parser/alter_stable.sim
#sleep 2000
#run general/parser/auto_create_tb.sim
#sleep 2000
#run general/parser/auto_create_tb_drop_tb.sim
#sleep 2000
#run general/parser/col_arithmetic_operation.sim
#sleep 2000
#run general/parser/columnValue.sim
#sleep 2000
#run general/parser/commit.sim
#sleep 2000
#run general/parser/create_db.sim
#sleep 2000
#run general/parser/create_mt.sim
#sleep 2000
#run general/parser/create_tb.sim
#sleep 2000
#run general/parser/dbtbnameValidate.sim
sleep 2000
run general/parser/alter.sim
sleep 2000
run general/parser/alter1.sim
sleep 2000
run general/parser/alter_stable.sim
sleep 2000
run general/parser/auto_create_tb.sim
sleep 2000
run general/parser/auto_create_tb_drop_tb.sim
sleep 2000
run general/parser/col_arithmetic_operation.sim
sleep 2000
run general/parser/columnValue.sim
sleep 2000
run general/parser/commit.sim
sleep 2000
run general/parser/create_db.sim
sleep 2000
run general/parser/create_mt.sim
sleep 2000
run general/parser/create_tb.sim
sleep 2000
run general/parser/dbtbnameValidate.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册