diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 29e46a88afa5e74f85e0075ceeff3d7071d67a6b..75b742e81a9cd9c619543310bda981d807625263 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -26,11 +26,13 @@ const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; -static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); -static void tsdbDestroyFile(SFile *pFile); -static int compFGroup(const void *arg1, const void *arg2); -static int keyFGroupCompFunc(const void *key, const void *fgroup); -static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo); +static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); +static void tsdbDestroyFile(SFile *pFile); +static int compFGroup(const void *arg1, const void *arg2); +static int keyFGroupCompFunc(const void *key, const void *fgroup); +static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo); +static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep); +static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); // ---------------- INTERNAL FUNCTIONS ---------------- STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { @@ -79,9 +81,11 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { int vid = 0; regex_t regex1, regex2; int code = 0; + char fname[TSDB_FILENAME_LEN] = "\0"; SFileGroup fileGroup = {0}; STsdbFileH *pFileH = pRepo->tsdbFileH; + STsdbCfg * pCfg = &(pRepo->config); tDataDir = tsdbGetDataDirName(pRepo->rootDir); if (tDataDir == NULL) { @@ -108,6 +112,8 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { goto _err; } + int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); + struct dirent *dp = NULL; while ((dp = readdir(dir)) != NULL) { if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue; @@ -120,6 +126,14 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { continue; } + if (fid < mfid) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + tsdbGetDataFileName(pRepo, fid, type, fname); + (void)remove(fname); + } + continue; + } + if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue; memset((void *)(&fileGroup), 0, sizeof(SFileGroup)); fileGroup.fileId = fid; @@ -179,8 +193,18 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) { STsdbFileH *pFileH = pRepo->tsdbFileH; + STsdbCfg * pCfg = &(pRepo->config); + + if (pFileH->nFGroups >= pFileH->maxFGroups) { + int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); + if (pFileH->pFGroup[0].fileId < mfid) { + pthread_rwlock_wrlock(&pFileH->fhlock); + tsdbRemoveFileGroup(pRepo, &(pFileH->pFGroup[0])); + pthread_rwlock_unlock(&pFileH->fhlock); + } + } - if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL; + ASSERT(pFileH->nFGroups < pFileH->maxFGroups); SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; @@ -342,8 +366,7 @@ void tsdbFitRetention(STsdbRepo *pRepo) { STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pGroup = pFileH->pFGroup; - int mfid = (int)(TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) - - TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile)); + int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); pthread_rwlock_wrlock(&(pFileH->fhlock)); @@ -533,3 +556,11 @@ static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) { } } } + +static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) { + return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]); +} + +static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { + return TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision); +} \ No newline at end of file