提交 52d765e9 编写于 作者: W wangzelin.wzl

Adjust location cache's thread cnt dynamically

上级 5003ce27
......@@ -97,6 +97,26 @@ void ObPartitionLocationUpdater::destroy()
wait();
}
int ObPartitionLocationUpdater::set_thread_cnt(const int64_t thread_cnt)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("updater is not inited yet", KR(ret), K(thread_cnt));
} else if (!lib::is_mini_mode() && thread_cnt > 0 && thread_cnt != thread_cnt_) {
if (OB_FAIL(sender_.set_thread_count(thread_cnt))) {
LOG_ERROR("fail to set sender's thread cnt", KR(ret), K(thread_cnt), K(thread_cnt_));
} else if (OB_FAIL(receiver_.set_thread_count(thread_cnt))) {
LOG_ERROR("fail to set receiver's thread cnt", KR(ret), K(thread_cnt), K(thread_cnt_));
} else {
LOG_INFO(
"[LOCATION] location updater change thread cnt", "pre_thread_cnt", thread_cnt_, "cur_thread_cnt", thread_cnt);
thread_cnt_ = thread_cnt;
}
}
return ret;
}
int ObPartitionLocationUpdater::submit_broadcast_task(const ObPartitionBroadcastTask& task)
{
int ret = OB_SUCCESS;
......
......@@ -66,6 +66,8 @@ public:
obrpc::ObSrvRpcProxy*& srv_rpc_proxy, share::ObPartitionLocationCache*& location_cache,
share::ObIAliveServerTracer& server_tracer);
int set_thread_cnt(const int64_t thread_cnt);
void stop();
void wait();
void destroy();
......
......@@ -29,6 +29,7 @@
#include "observer/ob_server.h"
#include "storage/ob_partition_scheduler.h"
#include "storage/ob_partition_migrator.h"
#include "observer/ob_service.h"
using namespace oceanbase::lib;
using namespace oceanbase::common;
......@@ -338,5 +339,13 @@ int ObServerReloadConfig::operator()()
{
OB_STORE_FILE.resize_file(GCONF.datafile_size, GCONF.datafile_disk_percentage);
}
{
const int64_t location_thread_cnt = GCONF.location_refresh_thread_count;
if (OB_NOT_NULL(GCTX.ob_service_) &&
OB_FAIL(GCTX.ob_service_->get_partition_location_updater().set_thread_cnt(location_thread_cnt))) {
real_ret = ret;
LOG_WARN("fail to set location updater's thread cnt", KR(ret), K(location_thread_cnt));
}
}
return real_ret;
}
......@@ -286,6 +286,10 @@ public:
int64_t get_partition_table_updater_user_queue_size() const;
int64_t get_partition_table_updater_sys_queue_size() const;
int64_t get_partition_table_updater_core_queue_size() const;
ObPartitionLocationUpdater &get_partition_location_updater()
{
return partition_location_updater_;
}
int get_all_partition_status(int64_t& inactive_num, int64_t& total_num) const;
int get_root_server_status(obrpc::ObGetRootserverRoleResult& get_role_result);
......
......@@ -305,7 +305,7 @@ void ObUniqTaskQueue<Task, Process>::run1()
ret = common::OB_NOT_INIT;
SERVER_LOG(WARN, "not init", K(ret));
} else {
while (!has_set_stop()) {
while (!lib::this_thread::has_set_stop()) {
Task* t = NULL;
tasks.reuse();
if (OB_SUCC(tasks.reserve(batch_exec_cnt))) {
......@@ -431,10 +431,11 @@ template <typename Task, typename Process>
int ObUniqTaskQueue<Task, Process>::process_barrier(Task& task)
{
int ret = common::OB_SUCCESS;
bool stopped = lib::this_thread::has_set_stop();
if (OB_ISNULL(updater_)) {
ret = common::OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "invalid updater", K(ret), K(updater_));
} else if (OB_FAIL(updater_->process_barrier(task, has_set_stop()))) {
} else if (OB_FAIL(updater_->process_barrier(task, stopped))) {
SERVER_LOG(WARN, "fail to batch process task", K(ret));
}
return ret;
......@@ -444,12 +445,13 @@ template <typename Task, typename Process>
int ObUniqTaskQueue<Task, Process>::batch_process_tasks(common::ObIArray<Task>& tasks)
{
int ret = common::OB_SUCCESS;
bool stopped = lib::this_thread::has_set_stop();
if (0 == tasks.count()) {
// nothing todo
} else if (OB_ISNULL(updater_)) {
ret = common::OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "invalid updater", K(ret), K(updater_));
} else if (OB_FAIL(updater_->batch_process_tasks(tasks, has_set_stop()))) {
} else if (OB_FAIL(updater_->batch_process_tasks(tasks, stopped))) {
SERVER_LOG(WARN, "fail to batch process task", K(ret));
}
return ret;
......
......@@ -2002,6 +2002,24 @@ int ObLocationAsyncUpdateQueueSet::init(ObServerConfig& config)
return ret;
}
int ObLocationAsyncUpdateQueueSet::set_thread_count(const int64_t thread_cnt)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (!lib::is_mini_mode() && thread_cnt > 0) {
if (OB_FAIL(tenant_space_update_queue_.set_thread_count(thread_cnt))) {
LOG_ERROR("fail to set thread count", KR(ret), K(thread_cnt));
} else if (OB_FAIL(user_update_queue_.set_thread_count(thread_cnt))) {
LOG_ERROR("fail to set thread count", KR(ret), K(thread_cnt));
} else {
LOG_INFO("[LOCATION] location queue may change thread cnt", K(thread_cnt));
}
}
return ret;
}
int ObLocationAsyncUpdateQueueSet::add_task(const ObLocationAsyncUpdateTask& task)
{
int ret = OB_SUCCESS;
......@@ -2181,6 +2199,13 @@ int ObPartitionLocationCache::reload_config()
LOG_WARN("not init", K(ret));
} else {
sem_.set_max_count(config_->location_fetch_concurrency);
int64_t thread_cnt = config_->location_refresh_thread_count;
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = local_async_queue_set_.set_thread_count(thread_cnt))) {
LOG_WARN("fail to set thread count", KR(tmp_ret), K(thread_cnt));
} else if (OB_SUCCESS != (tmp_ret = remote_async_queue_set_.set_thread_count(thread_cnt))) {
LOG_WARN("fail to set thread count", KR(tmp_ret), K(thread_cnt));
}
}
return ret;
}
......
......@@ -629,6 +629,7 @@ public:
ObLocationAsyncUpdateQueueSet(ObIPartitionLocationCache* loc_cache_);
virtual ~ObLocationAsyncUpdateQueueSet();
int init(common::ObServerConfig& config);
int set_thread_count(const int64_t thread_cnt);
int add_task(const ObLocationAsyncUpdateTask& task);
void stop();
void wait();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册