未验证 提交 c31aef1b 编写于 作者: W Wesley Wang 提交者: GitHub

Merge pull request #42 from TyphoonTai/master

feat: enable weakly consistent read for users on obproxy V4
......@@ -768,6 +768,22 @@ int ObConfigProcessor::get_proxy_config_int_item(const ObVipAddr &addr, const Ob
return ret;
}
int ObConfigProcessor::get_proxy_config_strlist_item(const ObVipAddr &addr, const ObString &cluster_name,
const ObString &tenant_name, const ObString& name,
ObConfigStrListItem &ret_item)
{
int ret = OB_SUCCESS;
ObConfigItem item;
if (OB_FAIL(get_proxy_config(addr, cluster_name, tenant_name, name, item))) {
LOG_WARN("get proxy config failed", K(addr), K(cluster_name), K(tenant_name), K(name), K(ret));
} else {
ret_item = item.str();
LOG_DEBUG("get list item succ", K(ret_item));
}
return ret;
}
} // end of obutils
} // end of obproxy
} // end of oceanbase
......@@ -116,6 +116,9 @@ public:
int get_proxy_config_int_item(const ObVipAddr &addr, const common::ObString &cluster_name,
const common::ObString &tenant_name, const common::ObString& name,
common::ObConfigIntItem &ret_item);
int get_proxy_config_strlist_item(const ObVipAddr &addr, const common::ObString &cluster_name,
const common::ObString &tenant_name, const common::ObString& name,
common::ObConfigStrListItem &ret_item);
int get_proxy_config_with_level(const ObVipAddr &addr, const common::ObString &cluster_name,
const common::ObString &tenant_name, const common::ObString& name,
common::ObConfigItem &ret_item, const ObString level, bool &found);
......
......@@ -486,6 +486,8 @@ public:
DEF_BOOL(enable_read_write_split, "false", "if enabled, use read write split mode", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
DEF_BOOL(enable_transaction_split, "false", "if enabled, support transaction split", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
DEF_STR_LIST(weak_read_user_list, "", "weak read for list of users, format user1;user2", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
// binlog service
DEF_STR(binlog_service_ip, "", "binlog service ip, format ip1:sql_port1", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
DEF_BOOL(enable_binlog_service, "false", "if enabled, obproxy will send binlog request to OBLogProxy", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
......
......@@ -6131,26 +6131,57 @@ int ObMysqlTransact::ObTransState::get_config_item(const ObString& cluster_name,
}
if (OB_SUCC(ret)) {
ObConfigIntItem int_item;
if (OB_FAIL(get_global_config_processor().get_proxy_config_int_item(
addr, cluster_name, tenant_name, "obproxy_read_consistency", int_item))) {
LOG_WARN("get vip obproxy_read_consistency failed", K(addr), K(cluster_name), K(tenant_name), K(ret));
ObConfigStrListItem strlist_item;
if (OB_FAIL(get_global_config_processor().get_proxy_config_strlist_item(
addr, cluster_name, tenant_name, "weak_read_user_list", strlist_item))) {
LOG_WARN("get vip weak_read_user_list failed", K(addr), K(cluster_name), K(tenant_name), K(ret));
} else {
bool is_request_follower = (RequestFollower == int_item.get_value());
bool is_sys_var_update = (session_info.is_request_follower_user() != is_request_follower);
session_info.set_is_request_follower_user(is_request_follower);
if (is_sys_var_update) {
ObString ob_read_consistency("ob_read_consistency");
ObString weak;
if (session_info.is_request_follower_user()) {
weak = "2";
} else {
weak = "3";
bool is_weak_read_user = false;
int64_t total_size = strlist_item.size();
if (OB_UNLIKELY(total_size > 0)) {
ObMysqlAuthRequest &auth_req = session_info.get_login_req();
ObHSRResult &hsr = auth_req.get_hsr_result();
char user_buf[MAX_VALUE_LENGTH];
for (int64_t i = 0; OB_SUCC(ret) && i < total_size; ++i) {
user_buf[0] = '\0';
if (OB_FAIL(strlist_item.get(i, user_buf, static_cast<int64_t>(sizeof(user_buf))))) {
LOG_WARN("get weak read user list variables failed", K(ret));
} else{
if (hsr.response_.get_username().prefix_match(user_buf)){
is_weak_read_user = true;
break;
}
}
}
if (OB_FAIL(session_info.update_sys_variable(ob_read_consistency, weak))) {
LOG_WARN("replace user variables failed", K(weak), K(ret));
} else {
session_info.set_read_consistency_set_flag(true);
}
bool is_request_follower = false;
if (is_weak_read_user){
is_request_follower = true;
} else{
ObConfigIntItem int_item;
if (OB_FAIL(get_global_config_processor().get_proxy_config_int_item(
addr, cluster_name, tenant_name, "obproxy_read_consistency", int_item))) {
LOG_WARN("get vip obproxy_read_consistency failed", K(addr), K(cluster_name), K(tenant_name), K(ret));
} else{
is_request_follower = (RequestFollower == int_item.get_value());
}
}
if (OB_SUCC(ret)) {
bool is_sys_var_update = (session_info.is_request_follower_user() != is_request_follower);
session_info.set_is_request_follower_user(is_request_follower);
if (is_sys_var_update) {
ObString ob_read_consistency("ob_read_consistency");
ObString weak;
if (session_info.is_request_follower_user()) {
weak = "2";
} else {
weak = "3";
}
if (OB_FAIL(session_info.update_sys_variable(ob_read_consistency, weak))) {
LOG_WARN("replace user variables failed", K(weak), K(ret));
} else {
session_info.set_read_consistency_set_flag(true);
}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册