未验证 提交 dd4f909f 编写于 作者: S slguan 提交者: GitHub

Merge pull request #797 from taosdata/feature/liaohj

[jira none]only open and memory map one group of files
......@@ -111,7 +111,7 @@ typedef enum {
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
typedef int32_t (*__read_data_fn_t)(int fd, SQInfo* pQInfo, SQueryFileInfo* pQueryFile, char* buf, uint64_t offset,
typedef int32_t (*__read_data_fn_t)(int fd, SQInfo* pQInfo, SQueryFilesInfo* pQueryFile, char* buf, uint64_t offset,
int32_t size);
static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
......@@ -191,7 +191,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slo
int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus);
uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData,
int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo);
int32_t numOfMeters, const char* filePath, SMeterDataInfo** pMeterDataInfo);
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex);
......
......@@ -47,29 +47,14 @@ typedef struct SQueryLoadCompBlockInfo {
int32_t fileId;
int32_t fileListIndex;
} SQueryLoadCompBlockInfo;
/*
* the header file info for one vnode
*/
typedef struct SQueryFileInfo {
int32_t fileID; /* file id */
char headerFilePath[PATH_MAX]; /* full file name */
char dataFilePath[PATH_MAX];
char lastFilePath[PATH_MAX];
int32_t defaultMappingSize; /* default mapping size */
int32_t headerFd; /* file handler */
char* pHeaderFileData; /* mmap header files */
size_t headFileSize;
int32_t dataFd;
char* pDataFileData;
size_t dataFileSize;
uint64_t dtFileMappingOffset;
int32_t lastFd;
size_t lastFileSize;
uint64_t lastFileMappingOffset;
} SQueryFileInfo;
typedef struct SHeaderFileInfo {
int32_t fileID; // file id
size_t headFileSize; // header file size
} SHeaderFileInfo;
typedef struct SQueryCostSummary {
double cacheTimeUs;
......@@ -106,6 +91,27 @@ typedef struct SOutputRes {
SResultInfo* resultInfo;
} SOutputRes;
/*
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
typedef struct SQueryFilesInfo {
SHeaderFileInfo* pFileInfo;
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
int32_t vnodeId;
int32_t headerFd; // header file fd
char* pHeaderFileData; // mmap header files
int32_t dataFd;
int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct RuntimeEnvironment {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
......@@ -122,17 +128,10 @@ typedef struct RuntimeEnvironment {
SQLFunctionCtx* pCtx;
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
/*
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
SQueryFileInfo* pVnodeFiles;
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t mmapedHFileIndex; // the mmaped header file, NOTE: only one header file can be mmap.
SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; // denotes reversed scan of data or not
int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult
......
......@@ -266,7 +266,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm];
int32_t vnodeId = pTempMeter->vnode;
dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pRuntimeEnv->numOfFiles);
SQueryFilesInfo* pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo;
dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles);
int32_t fid = QUERY_IS_ASC_QUERY(pQuery) ? -1 : INT32_MAX;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
......@@ -289,9 +291,8 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
pQuery->fileId = fid;
pSummary->numOfFiles++;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx);
if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next
char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx);
if (pHeaderFileData == NULL) { // failed to mmap header file into buffer, ignore current file, try next
fid += step;
continue;
}
......@@ -308,20 +309,21 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
return NULL;
}
dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters);
dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pVnodeFileInfo->dataFilePath, numOfQualifiedMeters);
// none of meters in query set have pHeaderData in this file, try next file
// none of meters in query set have pHeaderFileData in this file, try next file
if (numOfQualifiedMeters == 0) {
fid += step;
tfree(pReqMeterDataInfo);
continue;
}
uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderData, numOfQualifiedMeters, pQueryFileInfo,
pReqMeterDataInfo);
uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderFileData, numOfQualifiedMeters,
pVnodeFileInfo->headerFilePath, pReqMeterDataInfo);
dTrace("QInfo:%p file:%s, %d meters contains %d blocks to be checked", pQInfo, pQueryFileInfo->dataFilePath,
dTrace("QInfo:%p file:%s, %d meters contains %d blocks to be checked", pQInfo, pVnodeFileInfo->dataFilePath,
numOfQualifiedMeters, numOfBlocks);
if (numOfBlocks == 0) {
fid += step;
tfree(pReqMeterDataInfo);
......@@ -346,7 +348,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
totalBlocks += numOfBlocks;
// sequentially scan the pHeaderData file
// sequentially scan the pHeaderFileData file
int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1;
for (; j < numOfBlocks && j >= 0; j += step) {
......@@ -428,7 +430,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
}
int64_t time = taosGetTimestampUs() - st;
dTrace("QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms", pQInfo, pRuntimeEnv->numOfFiles,
dTrace("QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms", pQInfo, pVnodeFileInfo->numOfFiles,
totalBlocks, time / 1000.0);
pSummary->fileTimeUs += time;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册