未验证 提交 0a1afbdc 编写于 作者: X Xiao Ping 提交者: GitHub

Merge pull request #6335 from taosdata/feature/TD-3963

[TD-3963]async commit a msg to save config
......@@ -244,6 +244,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk")
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message")
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value")
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data")
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle")
......
......@@ -16,7 +16,7 @@
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
#define _TD_TSDB_COMMIT_QUEUE_H_
typedef enum { COMMIT_REQ, COMPACT_REQ } TSDB_REQ_T;
typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T;
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
......
......@@ -66,6 +66,7 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pAT
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbSyncCommitConfig(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo);
......
......@@ -78,7 +78,6 @@ struct STsdbRepo {
bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config
uint8_t hasCachedLastRow;
uint8_t hasCachedLastColumn;
STsdbAppH appH;
......
......@@ -180,15 +180,14 @@ static void *tsdbLoopCommit(void *arg) {
req = ((SReq *)pNode->data)->req;
pRepo = ((SReq *)pNode->data)->pRepo;
// check if need to apply new config
if (pRepo->config_changed) {
tsdbApplyRepoConfig(pRepo);
}
if (req == COMMIT_REQ) {
tsdbCommitData(pRepo);
} else if (req == COMPACT_REQ) {
tsdbCompactImpl(pRepo);
} else if (req == COMMIT_CONFIG_REQ) {
ASSERT(pRepo->config_changed);
tsdbApplyRepoConfig(pRepo);
tsem_post(&(pRepo->readyToCommit));
} else {
ASSERT(0);
}
......
......@@ -270,8 +270,8 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
pthread_mutex_unlock(&repo->save_mutex);
// schedule a commit msg then the new config will be applied immediatly
tsdbAsyncCommit(repo);
// schedule a commit msg and wait for the new config applied
tsdbSyncCommitConfig(repo);
return 0;
#if 0
......@@ -553,7 +553,6 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return NULL;
}
pRepo->config_changed = false;
atomic_store_8(&pRepo->hasCachedLastRow, 0);
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
......@@ -857,9 +856,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
}
tsdbDestroyReadH(&readh);
if (CACHE_LAST_ROW(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastRow, 1);
}
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
}
......@@ -900,20 +897,16 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
// if close last option,need to free data
if (need_free_last_row || need_free_last_col) {
if (need_free_last_row) {
atomic_store_8(&pRepo->hasCachedLastRow, 0);
}
if (need_free_last_col) {
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
}
tsdbInfo("free cache last data since cacheLast option changed");
for (int i = 1; i < maxTableIdx; i++) {
for (int i = 1; i <= maxTableIdx; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
if (need_free_last_row) {
taosTZfree(pTable->lastRow);
pTable->lastRow = NULL;
pTable->lastKey = TSKEY_INITIAL_VAL;
}
if (need_free_last_col) {
tsdbFreeLastColumns(pTable);
......@@ -983,9 +976,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
tsdbDestroyReadH(&readh);
if (cacheLastRow) {
atomic_store_8(&pRepo->hasCachedLastRow, 1);
}
if (cacheLastCol) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
}
......
......@@ -271,10 +271,34 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
return ptr;
}
int tsdbSyncCommitConfig(STsdbRepo* pRepo) {
ASSERT(pRepo->config_changed == true);
tsem_wait(&(pRepo->readyToCommit));
if (pRepo->code != TSDB_CODE_SUCCESS) {
tsdbWarn("vgId:%d try to commit config when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno));
}
if (tsdbLockRepo(pRepo) < 0) return -1;
tsdbScheduleCommit(pRepo, COMMIT_CONFIG_REQ);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
tsem_wait(&(pRepo->readyToCommit));
tsem_post(&(pRepo->readyToCommit));
if (pRepo->code != TSDB_CODE_SUCCESS) {
terrno = pRepo->code;
return -1;
}
terrno = TSDB_CODE_SUCCESS;
return 0;
}
int tsdbAsyncCommit(STsdbRepo *pRepo) {
tsem_wait(&(pRepo->readyToCommit));
//ASSERT(pRepo->imem == NULL);
ASSERT(pRepo->imem == NULL);
if (pRepo->mem == NULL) {
tsem_post(&(pRepo->readyToCommit));
return 0;
......@@ -1015,7 +1039,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
taosTZfree(pTable->lastRow);
TSDB_WLOCK_TABLE(pTable);
pTable->lastRow = NULL;
pTable->lastKey = TSKEY_INITIAL_VAL;
TSDB_WUNLOCK_TABLE(pTable);
}
......
......@@ -2469,7 +2469,6 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) {
if (ret != TSDB_CODE_SUCCESS) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
tfree(pRow);
......@@ -2860,24 +2859,29 @@ bool tsdbGetExternalRow(TsdbQueryHandleT pHandle) {
}
/*
* 1. no data at all (pTable->lastKey = TSKEY_INITIAL_VAL), just return TSKEY_INITIAL_VAL
* 2. has data but not loaded, just return lastKey but not set pRes
* 3. has data and loaded, return lastKey and set pRes
* if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
* else set pRes and return TSDB_CODE_SUCCESS and save lastKey
*/
int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) {
int32_t code = TSDB_CODE_SUCCESS;
TSDB_RLOCK_TABLE(pTable);
*lastKey = pTable->lastKey;
if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) {
if (!pTable->lastRow) {
code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
goto out;
}
if (pRes) {
*pRes = tdDataRowDup(pTable->lastRow);
if (*pRes == NULL) {
TSDB_RUNLOCK_TABLE(pTable);
return TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
}
}
out:
TSDB_RUNLOCK_TABLE(pTable);
return TSDB_CODE_SUCCESS;
return code;
}
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) {
......@@ -2887,7 +2891,6 @@ bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) {
int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) {
assert(pQueryHandle != NULL && groupList != NULL);
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
SArray* group = taosArrayGetP(groupList->pGroupList, 0);
......@@ -2898,7 +2901,7 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
int32_t code = 0;
if (((STable*)pInfo->pTable)->lastRow) {
code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key);
code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->cachelastrow = 0;
} else {
......@@ -2913,7 +2916,6 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
pQueryHandle->activeIndex = -1; // start from -1
}
tfree(pRow);
return code;
}
......
......@@ -25,7 +25,7 @@ class TDTestCase:
self.tables = 10
self.rows = 20
self.columns = 50
self.columns = 5
self.perfix = 't'
self.ts = 1601481600000
......@@ -34,7 +34,7 @@ class TDTestCase:
sql = "create table st(ts timestamp, "
for i in range(self.columns - 1):
sql += "c%d int, " % (i + 1)
sql += "c50 int) tags(t1 int)"
sql += "c5 int) tags(t1 int)"
tdSql.execute(sql)
for i in range(self.tables):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册