提交 f7d2e7d9 编写于 作者: H Hongze Cheng

fix TD-1362

上级 1dad7945
...@@ -26,11 +26,13 @@ ...@@ -26,11 +26,13 @@
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile); static void tsdbDestroyFile(SFile *pFile);
static int compFGroup(const void *arg1, const void *arg2); static int compFGroup(const void *arg1, const void *arg2);
static int keyFGroupCompFunc(const void *key, const void *fgroup); static int keyFGroupCompFunc(const void *key, const void *fgroup);
static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo); static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo);
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep);
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
...@@ -79,9 +81,11 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { ...@@ -79,9 +81,11 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
int vid = 0; int vid = 0;
regex_t regex1, regex2; regex_t regex1, regex2;
int code = 0; int code = 0;
char fname[TSDB_FILENAME_LEN] = "\0";
SFileGroup fileGroup = {0}; SFileGroup fileGroup = {0};
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &(pRepo->config);
tDataDir = tsdbGetDataDirName(pRepo->rootDir); tDataDir = tsdbGetDataDirName(pRepo->rootDir);
if (tDataDir == NULL) { if (tDataDir == NULL) {
...@@ -108,6 +112,8 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { ...@@ -108,6 +112,8 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
goto _err; goto _err;
} }
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
struct dirent *dp = NULL; struct dirent *dp = NULL;
while ((dp = readdir(dir)) != NULL) { while ((dp = readdir(dir)) != NULL) {
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue; if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
...@@ -120,6 +126,14 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { ...@@ -120,6 +126,14 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
continue; continue;
} }
if (fid < mfid) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbGetDataFileName(pRepo, fid, type, fname);
(void)remove(fname);
}
continue;
}
if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue; if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue;
memset((void *)(&fileGroup), 0, sizeof(SFileGroup)); memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
fileGroup.fileId = fid; fileGroup.fileId = fid;
...@@ -179,8 +193,18 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { ...@@ -179,8 +193,18 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) { SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &(pRepo->config);
if (pFileH->nFGroups >= pFileH->maxFGroups) {
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
if (pFileH->pFGroup[0].fileId < mfid) {
pthread_rwlock_wrlock(&pFileH->fhlock);
tsdbRemoveFileGroup(pRepo, &(pFileH->pFGroup[0]));
pthread_rwlock_unlock(&pFileH->fhlock);
}
}
if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL; ASSERT(pFileH->nFGroups < pFileH->maxFGroups);
SFileGroup fGroup; SFileGroup fGroup;
SFileGroup *pFGroup = &fGroup; SFileGroup *pFGroup = &fGroup;
...@@ -342,8 +366,7 @@ void tsdbFitRetention(STsdbRepo *pRepo) { ...@@ -342,8 +366,7 @@ void tsdbFitRetention(STsdbRepo *pRepo) {
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = pFileH->pFGroup; SFileGroup *pGroup = pFileH->pFGroup;
int mfid = (int)(TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) - int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile));
pthread_rwlock_wrlock(&(pFileH->fhlock)); pthread_rwlock_wrlock(&(pFileH->fhlock));
...@@ -533,3 +556,11 @@ static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) { ...@@ -533,3 +556,11 @@ static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) {
} }
} }
} }
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) {
return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]);
}
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) {
return TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册