提交 6733029e 编写于 作者: H Haojun Liao

[TD-2375]<enhance>: configure the number of CPU cores available for query processing.

上级 5721bd12
...@@ -29,8 +29,11 @@ ...@@ -29,8 +29,11 @@
# number of threads per CPU core # number of threads per CPU core
# numOfThreadsPerCore 1.0 # numOfThreadsPerCore 1.0
# the proportion of total threads responsible for query # the proportion of total CPU cores available for query processing
# ratioOfQueryThreads 0.5 # 1.0: all CPU cores are available for query processing
# 0.5: only half of the CPU cores are available for query
# 0.0: only one core available
# ratioOfQueryThreads 1.0
# number of management nodes in the system # number of management nodes in the system
# numOfMnodes 3 # numOfMnodes 3
...@@ -265,5 +268,5 @@ ...@@ -265,5 +268,5 @@
# enable/disable stream (continuous query) # enable/disable stream (continuous query)
# stream 1 # stream 1
# only 50% CPU resources will be used in query processing # in retrieve blocking model, only in 50% query threads will be used in query processing in dnode
# halfCoresForQuery 0 # retrieveBlockModel 0
...@@ -46,7 +46,7 @@ extern int32_t tsShellActivityTimer; ...@@ -46,7 +46,7 @@ extern int32_t tsShellActivityTimer;
extern uint32_t tsMaxTmrCtrl; extern uint32_t tsMaxTmrCtrl;
extern float tsNumOfThreadsPerCore; extern float tsNumOfThreadsPerCore;
extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfCommitThreads;
extern float tsRatioOfQueryThreads; // todo remove it extern float tsRatioOfQueryThreads;
extern int8_t tsDaylight; extern int8_t tsDaylight;
extern char tsTimezone[]; extern char tsTimezone[];
extern char tsLocale[]; extern char tsLocale[];
...@@ -57,7 +57,7 @@ extern char tsTempDir[]; ...@@ -57,7 +57,7 @@ extern char tsTempDir[];
//query buffer management //query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing
extern int32_t tsHalfCoresForQuery; // only 50% will be used in query processing extern int32_t tsRetrieveBlockModel; // only 50% will be used in query processing
// client // client
extern int32_t tsTableMetaKeepTimer; extern int32_t tsTableMetaKeepTimer;
......
...@@ -52,7 +52,7 @@ int32_t tsMaxConnections = 5000; ...@@ -52,7 +52,7 @@ int32_t tsMaxConnections = 5000;
int32_t tsShellActivityTimer = 3; // second int32_t tsShellActivityTimer = 3; // second
float tsNumOfThreadsPerCore = 1.0f; float tsNumOfThreadsPerCore = 1.0f;
int32_t tsNumOfCommitThreads = 1; int32_t tsNumOfCommitThreads = 1;
float tsRatioOfQueryThreads = 0.5f; float tsRatioOfQueryThreads = 1.0f;
int8_t tsDaylight = 0; int8_t tsDaylight = 0;
char tsTimezone[TSDB_TIMEZONE_LEN] = {0}; char tsTimezone[TSDB_TIMEZONE_LEN] = {0};
char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsLocale[TSDB_LOCALE_LEN] = {0};
...@@ -107,8 +107,8 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance ...@@ -107,8 +107,8 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// positive value (in MB) // positive value (in MB)
int32_t tsQueryBufferSize = -1; int32_t tsQueryBufferSize = -1;
// only 50% cpu will be used in query processing in dnode // in retrieve blocking model, only in 50% query threads will be used in query processing in dnode
int32_t tsHalfCoresForQuery = 0; int32_t tsRetrieveBlockModel = 0;
// db parameters // db parameters
int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
...@@ -887,8 +887,8 @@ static void doInitGlobalConfig(void) { ...@@ -887,8 +887,8 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_BYTE; cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "halfCoresForQuery"; cfg.option = "retrieveBlockModel";
cfg.ptr = &tsHalfCoresForQuery; cfg.ptr = &tsRetrieveBlockModel;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0; cfg.minValue = 0;
......
...@@ -70,8 +70,7 @@ int32_t dnodeInitShell() { ...@@ -70,8 +70,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; int32_t numOfThreads = (tsNumOfCores * tsNumOfThreadsPerCore) / 2.0;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
} }
......
...@@ -26,16 +26,17 @@ static SWorkerPool tsVQueryWP; ...@@ -26,16 +26,17 @@ static SWorkerPool tsVQueryWP;
static SWorkerPool tsVFetchWP; static SWorkerPool tsVFetchWP;
int32_t dnodeInitVRead() { int32_t dnodeInitVRead() {
const int32_t maxFetchThreads = 4;
tsVQueryWP.name = "vquery"; tsVQueryWP.name = "vquery";
tsVQueryWP.workerFp = dnodeProcessReadQueue; tsVQueryWP.workerFp = dnodeProcessReadQueue;
tsVQueryWP.min = tsNumOfCores; tsVQueryWP.min = tsNumOfCores * tsRatioOfQueryThreads;
tsVQueryWP.max = tsNumOfCores/* * tsNumOfThreadsPerCore*/; tsVQueryWP.max = tsVQueryWP.min;
// if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min;
if (tWorkerInit(&tsVQueryWP) != 0) return -1; if (tWorkerInit(&tsVQueryWP) != 0) return -1;
tsVFetchWP.name = "vfetch"; tsVFetchWP.name = "vfetch";
tsVFetchWP.workerFp = dnodeProcessReadQueue; tsVFetchWP.workerFp = dnodeProcessReadQueue;
tsVFetchWP.min = MIN(4, tsNumOfCores); tsVFetchWP.min = MIN(maxFetchThreads, tsNumOfCores);
tsVFetchWP.max = tsVFetchWP.min; tsVFetchWP.max = tsVFetchWP.min;
if (tWorkerInit(&tsVFetchWP) != 0) return -1; if (tWorkerInit(&tsVFetchWP) != 0) return -1;
......
...@@ -7635,7 +7635,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -7635,7 +7635,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (tsHalfCoresForQuery) { if (tsRetrieveBlockModel) {
pQInfo->rspContext = pRspContext; pQInfo->rspContext = pRspContext;
tsem_wait(&pQInfo->ready); tsem_wait(&pQInfo->ready);
*buildRes = true; *buildRes = true;
......
...@@ -281,7 +281,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -281,7 +281,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
// In the retrieve blocking model, only 50% CPU will be used in query processing // In the retrieve blocking model, only 50% CPU will be used in query processing
if (tsHalfCoresForQuery) { if (tsRetrieveBlockModel) {
qTableQuery(*qhandle); // do execute query qTableQuery(*qhandle); // do execute query
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false); qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false);
} else { } else {
...@@ -380,7 +380,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -380,7 +380,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
freeHandle = true; freeHandle = true;
} else { // result is not ready, return immediately } else { // result is not ready, return immediately
// Only effects in the non-blocking model // Only effects in the non-blocking model
if (!tsHalfCoresForQuery) { if (!tsRetrieveBlockModel) {
if (!buildRes) { if (!buildRes) {
assert(pRead->rpcHandle != NULL); assert(pRead->rpcHandle != NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册