提交 d4d9911a 编写于 作者: O obdev 提交者: wangzelin.wzl

Fix ddl sync auto increment value error

上级 84689879
...@@ -1738,7 +1738,6 @@ int ObServer::init_autoincrement_service() ...@@ -1738,7 +1738,6 @@ int ObServer::init_autoincrement_service()
&sql_proxy_, &sql_proxy_,
&srv_rpc_proxy_, &srv_rpc_proxy_,
&schema_service_, &schema_service_,
location_service_,
net_frame_.get_req_transport()))) { net_frame_.get_req_transport()))) {
LOG_ERROR("init autoincrement_service_ fail", KR(ret)); LOG_ERROR("init autoincrement_service_ fail", KR(ret));
} }
......
...@@ -249,7 +249,6 @@ int ObAutoincrementService::init(ObAddr &addr, ...@@ -249,7 +249,6 @@ int ObAutoincrementService::init(ObAddr &addr,
ObMySQLProxy *mysql_proxy, ObMySQLProxy *mysql_proxy,
ObSrvRpcProxy *srv_proxy, ObSrvRpcProxy *srv_proxy,
ObMultiVersionSchemaService *schema_service, ObMultiVersionSchemaService *schema_service,
ObLocationService &location_service,
rpc::frame::ObReqTransport *req_transport) rpc::frame::ObReqTransport *req_transport)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -260,7 +259,7 @@ int ObAutoincrementService::init(ObAddr &addr, ...@@ -260,7 +259,7 @@ int ObAutoincrementService::init(ObAddr &addr,
if (OB_FAIL(distributed_autoinc_service_.init(mysql_proxy))) { if (OB_FAIL(distributed_autoinc_service_.init(mysql_proxy))) {
LOG_WARN("fail init distributed_autoinc_service_ service", K(ret)); LOG_WARN("fail init distributed_autoinc_service_ service", K(ret));
} else if (OB_FAIL(global_autoinc_service_.init(my_addr_, *schema_service, location_service, req_transport))) { } else if (OB_FAIL(global_autoinc_service_.init(my_addr_, req_transport))) {
LOG_WARN("fail init auto inc global service", K(ret)); LOG_WARN("fail init auto inc global service", K(ret));
} else if (OB_FAIL(node_allocator_.init(sizeof(TableNode), ObModIds::OB_AUTOINCREMENT))) { } else if (OB_FAIL(node_allocator_.init(sizeof(TableNode), ObModIds::OB_AUTOINCREMENT))) {
LOG_WARN("failed to init table node allocator", K(ret)); LOG_WARN("failed to init table node allocator", K(ret));
...@@ -277,7 +276,6 @@ int ObAutoincrementService::init_for_backup(ObAddr &addr, ...@@ -277,7 +276,6 @@ int ObAutoincrementService::init_for_backup(ObAddr &addr,
ObMySQLProxy *mysql_proxy, ObMySQLProxy *mysql_proxy,
ObSrvRpcProxy *srv_proxy, ObSrvRpcProxy *srv_proxy,
ObMultiVersionSchemaService *schema_service, ObMultiVersionSchemaService *schema_service,
share::ObLocationService &location_service,
rpc::frame::ObReqTransport *req_transport) rpc::frame::ObReqTransport *req_transport)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -286,7 +284,7 @@ int ObAutoincrementService::init_for_backup(ObAddr &addr, ...@@ -286,7 +284,7 @@ int ObAutoincrementService::init_for_backup(ObAddr &addr,
srv_proxy_ = srv_proxy; srv_proxy_ = srv_proxy;
schema_service_ = schema_service; schema_service_ = schema_service;
OZ(distributed_autoinc_service_.init(mysql_proxy)); OZ(distributed_autoinc_service_.init(mysql_proxy));
OZ(global_autoinc_service_.init(my_addr_, *schema_service, location_service, req_transport)); OZ(global_autoinc_service_.init(my_addr_, req_transport));
return ret; return ret;
} }
...@@ -1441,8 +1439,6 @@ int ObInnerTableGlobalAutoIncrementService::local_sync_with_global_value( ...@@ -1441,8 +1439,6 @@ int ObInnerTableGlobalAutoIncrementService::local_sync_with_global_value(
int ObRpcGlobalAutoIncrementService::init( int ObRpcGlobalAutoIncrementService::init(
const common::ObAddr &addr, const common::ObAddr &addr,
share::schema::ObMultiVersionSchemaService &schema_service,
share::ObLocationService &location_service,
rpc::frame::ObReqTransport *req_transport) rpc::frame::ObReqTransport *req_transport)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -1452,13 +1448,11 @@ int ObRpcGlobalAutoIncrementService::init( ...@@ -1452,13 +1448,11 @@ int ObRpcGlobalAutoIncrementService::init(
} else if (!addr.is_valid()) { } else if (!addr.is_valid()) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(addr)); LOG_WARN("invalid argument", KR(ret), K(addr));
} else if (OB_FAIL(location_adapter_def_.init(&schema_service, &location_service))) {
LOG_WARN("localtion adapter init error", K(ret));
} else if (OB_FAIL(gais_request_rpc_proxy_.init(req_transport, addr))) { } else if (OB_FAIL(gais_request_rpc_proxy_.init(req_transport, addr))) {
LOG_WARN("rpc proxy init failed", K(ret), K(req_transport), K(addr)); LOG_WARN("rpc proxy init failed", K(ret), K(req_transport), K(addr));
} else if (OB_FAIL(gais_request_rpc_.init(&gais_request_rpc_proxy_, addr))) { } else if (OB_FAIL(gais_request_rpc_.init(&gais_request_rpc_proxy_, addr))) {
LOG_WARN("response rpc init failed", K(ret), K(addr)); LOG_WARN("response rpc init failed", K(ret), K(addr));
} else if (OB_FAIL(gais_client_.init(addr, &gais_request_rpc_, &location_adapter_def_))) { } else if (OB_FAIL(gais_client_.init(addr, &gais_request_rpc_))) {
LOG_WARN("init client failed", K(ret)); LOG_WARN("init client failed", K(ret));
} }
return ret; return ret;
......
...@@ -277,8 +277,6 @@ public: ...@@ -277,8 +277,6 @@ public:
virtual ~ObRpcGlobalAutoIncrementService() = default; virtual ~ObRpcGlobalAutoIncrementService() = default;
int init(const common::ObAddr &addr, int init(const common::ObAddr &addr,
share::schema::ObMultiVersionSchemaService &schema_service,
share::ObLocationService &location_service,
rpc::frame::ObReqTransport *req_transport); rpc::frame::ObReqTransport *req_transport);
virtual int get_value( virtual int get_value(
...@@ -316,7 +314,6 @@ public: ...@@ -316,7 +314,6 @@ public:
private: private:
bool is_inited_; bool is_inited_;
ObGAISClient gais_client_; ObGAISClient gais_client_;
transaction::ObLocationAdapter location_adapter_def_;
obrpc::ObGAISRpcProxy gais_request_rpc_proxy_; obrpc::ObGAISRpcProxy gais_request_rpc_proxy_;
ObGAISRequestRpc gais_request_rpc_; ObGAISRequestRpc gais_request_rpc_;
}; };
...@@ -335,13 +332,11 @@ public: ...@@ -335,13 +332,11 @@ public:
common::ObMySQLProxy *mysql_proxy, common::ObMySQLProxy *mysql_proxy,
obrpc::ObSrvRpcProxy *srv_proxy, obrpc::ObSrvRpcProxy *srv_proxy,
share::schema::ObMultiVersionSchemaService *schema_service, share::schema::ObMultiVersionSchemaService *schema_service,
share::ObLocationService &location_service,
rpc::frame::ObReqTransport *req_transport); rpc::frame::ObReqTransport *req_transport);
int init_for_backup(common::ObAddr &addr, int init_for_backup(common::ObAddr &addr,
common::ObMySQLProxy *mysql_proxy, common::ObMySQLProxy *mysql_proxy,
obrpc::ObSrvRpcProxy *srv_proxy, obrpc::ObSrvRpcProxy *srv_proxy,
share::schema::ObMultiVersionSchemaService *schema_service, share::schema::ObMultiVersionSchemaService *schema_service,
share::ObLocationService &location_service,
rpc::frame::ObReqTransport *req_transport); rpc::frame::ObReqTransport *req_transport);
int get_handle(AutoincParam &param, CacheHandle *&handle); int get_handle(AutoincParam &param, CacheHandle *&handle);
void release_handle(CacheHandle *&handle); void release_handle(CacheHandle *&handle);
......
...@@ -28,21 +28,18 @@ using namespace oceanbase::transaction; ...@@ -28,21 +28,18 @@ using namespace oceanbase::transaction;
namespace share namespace share
{ {
int ObGAISClient::init(const ObAddr &self, ObGAISRequestRpc *gais_request_rpc, int ObGAISClient::init(const ObAddr &self, ObGAISRequestRpc *gais_request_rpc)
ObILocationAdapter *location_adapter)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) { if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE; ret = OB_INIT_TWICE;
LOG_WARN("init twice", KR(ret)); LOG_WARN("init twice", KR(ret));
} else if (OB_UNLIKELY(!self.is_valid()) || OB_ISNULL(gais_request_rpc) || } else if (OB_UNLIKELY(!self.is_valid()) || OB_ISNULL(gais_request_rpc)) {
OB_ISNULL(location_adapter)) {
LOG_WARN("invalid argument", KR(ret), K(self), KP(gais_request_rpc), KP(location_adapter));
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(self), KP(gais_request_rpc));
} else { } else {
self_ = self; self_ = self;
gais_request_rpc_ = gais_request_rpc; gais_request_rpc_ = gais_request_rpc;
location_adapter_ = location_adapter;
is_inited_ = true; is_inited_ = true;
LOG_INFO("gais client init success", K(self), KP(this)); LOG_INFO("gais client init success", K(self), KP(this));
} }
...@@ -54,7 +51,6 @@ void ObGAISClient::reset() ...@@ -54,7 +51,6 @@ void ObGAISClient::reset()
is_inited_ = false; is_inited_ = false;
self_.reset(); self_.reset();
gais_request_rpc_ = NULL; gais_request_rpc_ = NULL;
location_adapter_ = NULL;
reset_cache_leader_(); reset_cache_leader_();
} }
...@@ -75,31 +71,29 @@ int ObGAISClient::get_value(const AutoincKey &key, ...@@ -75,31 +71,29 @@ int ObGAISClient::get_value(const AutoincKey &key,
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret)); LOG_WARN("not inited", K(ret));
} else { } else {
MTL_SWITCH(tenant_id) { ObGAISNextAutoIncValReq msg;
ObGAISNextAutoIncValReq msg; ObGAISNextValRpcResult rpc_result;
ObGAISNextValRpcResult rpc_result; ObAddr leader;
ObAddr leader; if (OB_FAIL(get_leader_(tenant_id, leader))) {
if (OB_FAIL(get_leader_(tenant_id, leader))) { LOG_WARN("get leader fail", K(ret));
LOG_WARN("get leader fail", K(ret)); (void)refresh_location_(tenant_id);
(void)refresh_location_(tenant_id); } else if (OB_FAIL(msg.init(key, offset, increment, table_auto_increment, max_value,
} else if (OB_FAIL(msg.init(key, offset, increment, table_auto_increment, max_value, desired_count, cache_size, self_))) {
desired_count, cache_size, self_))) { LOG_WARN("fail to init request msg", K(ret));
LOG_WARN("fail to init request msg", K(ret)); } else if (OB_UNLIKELY(!msg.is_valid())) {
} else if (OB_UNLIKELY(!msg.is_valid())) { ret = OB_INVALID_ARGUMENT;
ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(msg));
LOG_WARN("invalid argument", K(ret), K(msg)); } else if (OB_FAIL(gais_request_rpc_->next_autoinc_val(leader, msg, rpc_result))) {
} else if (OB_FAIL(gais_request_rpc_->next_autoinc_val(leader, msg, rpc_result))) { LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result));
LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); (void)refresh_location_(tenant_id);
(void)refresh_location_(tenant_id); } else if (!rpc_result.is_valid()) {
} else if (!rpc_result.is_valid()) { ret = OB_ERR_UNEXPECTED;
ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc result is unexpected", K(ret), K(rpc_result));
LOG_WARN("rpc result is unexpected", K(ret), K(rpc_result)); } else {
} else { start_inclusive = rpc_result.start_inclusive_;
start_inclusive = rpc_result.start_inclusive_; end_inclusive = rpc_result.end_inclusive_;
end_inclusive = rpc_result.end_inclusive_; sync_value = rpc_result.sync_value_;
sync_value = rpc_result.sync_value_; LOG_DEBUG("handle gais success", K(rpc_result));
LOG_DEBUG("handle gais success", K(rpc_result));
}
} }
} }
return ret; return ret;
...@@ -113,25 +107,23 @@ int ObGAISClient::get_sequence_value(const AutoincKey &key, uint64_t &sequence_v ...@@ -113,25 +107,23 @@ int ObGAISClient::get_sequence_value(const AutoincKey &key, uint64_t &sequence_v
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret)); LOG_WARN("not inited", K(ret));
} else { } else {
MTL_SWITCH(tenant_id) { ObGAISAutoIncKeyArg msg;
ObGAISAutoIncKeyArg msg; ObGAISCurrValRpcResult rpc_result;
ObGAISCurrValRpcResult rpc_result; ObAddr leader;
ObAddr leader; if (OB_FAIL(get_leader_(tenant_id, leader))) {
if (OB_FAIL(get_leader_(tenant_id, leader))) { LOG_WARN("get leader fail", K(ret));
LOG_WARN("get leader fail", K(ret)); (void)refresh_location_(tenant_id);
(void)refresh_location_(tenant_id); } else if (OB_FAIL(msg.init(key, self_))) {
} else if (OB_FAIL(msg.init(key, self_))) { LOG_WARN("fail to init request msg", K(ret));
LOG_WARN("fail to init request msg", K(ret)); } else if (OB_UNLIKELY(!msg.is_valid())) {
} else if (OB_UNLIKELY(!msg.is_valid())) { ret = OB_INVALID_ARGUMENT;
ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(msg));
LOG_WARN("invalid argument", K(ret), K(msg)); } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) {
} else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result));
LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); (void)refresh_location_(tenant_id);
(void)refresh_location_(tenant_id); } else {
} else { sequence_value = rpc_result.sequence_value_;
sequence_value = rpc_result.sequence_value_; LOG_DEBUG("handle gais success", K(rpc_result));
LOG_DEBUG("handle gais success", K(rpc_result));
}
} }
} }
...@@ -152,30 +144,28 @@ int ObGAISClient::get_auto_increment_values( ...@@ -152,30 +144,28 @@ int ObGAISClient::get_auto_increment_values(
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret)); LOG_WARN("not inited", K(ret));
} else { } else {
MTL_SWITCH(tenant_id) { ObGAISAutoIncKeyArg msg;
ObGAISAutoIncKeyArg msg; ObGAISCurrValRpcResult rpc_result;
ObGAISCurrValRpcResult rpc_result; ObAddr leader;
ObAddr leader; if (OB_FAIL(get_leader_(tenant_id, leader))) {
if (OB_FAIL(get_leader_(tenant_id, leader))) { LOG_WARN("get leader fail", K(ret));
LOG_WARN("get leader fail", K(ret)); (void)refresh_location_(tenant_id);
(void)refresh_location_(tenant_id); } else {
} else { for (int64_t i = 0; OB_SUCC(ret) && i < autoinc_keys.count(); ++i) {
for (int64_t i = 0; OB_SUCC(ret) && i < autoinc_keys.count(); ++i) { rpc_result.reset();
rpc_result.reset(); AutoincKey key = autoinc_keys.at(i);
AutoincKey key = autoinc_keys.at(i); if (OB_FAIL(msg.init(key, self_))) {
if (OB_FAIL(msg.init(key, self_))) { LOG_WARN("fail to init request msg", K(ret));
LOG_WARN("fail to init request msg", K(ret)); } else if (OB_UNLIKELY(!msg.is_valid())) {
} else if (OB_UNLIKELY(!msg.is_valid())) { ret = OB_INVALID_ARGUMENT;
ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(msg));
LOG_WARN("invalid argument", K(ret), K(msg)); } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) {
} else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result));
LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); (void)refresh_location_(tenant_id);
(void)refresh_location_(tenant_id); } else if (OB_FAIL(seq_values.set_refactored(key, rpc_result.sequence_value_))) {
} else if (OB_FAIL(seq_values.set_refactored(key, rpc_result.sequence_value_))) { LOG_WARN("fail to get int_value.", K(ret));
LOG_WARN("fail to get int_value.", K(ret)); } else {
} else { LOG_DEBUG("handle gais success", K(rpc_result));
LOG_DEBUG("handle gais success", K(rpc_result));
}
} }
} }
} }
...@@ -196,25 +186,23 @@ int ObGAISClient::local_push_to_global_value(const AutoincKey &key, ...@@ -196,25 +186,23 @@ int ObGAISClient::local_push_to_global_value(const AutoincKey &key,
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret)); LOG_WARN("not inited", K(ret));
} else { } else {
MTL_SWITCH(tenant_id) { ObGAISPushAutoIncValReq msg;
ObGAISPushAutoIncValReq msg; uint64_t new_sync_value = 0;
uint64_t new_sync_value = 0; ObAddr leader;
ObAddr leader; if (OB_FAIL(get_leader_(tenant_id, leader))) {
if (OB_FAIL(get_leader_(tenant_id, leader))) { LOG_WARN("get leader fail", K(ret));
LOG_WARN("get leader fail", K(ret)); (void)refresh_location_(tenant_id);
(void)refresh_location_(tenant_id); } else if (OB_FAIL(msg.init(key, local_sync_value, max_value, self_))) {
} else if (OB_FAIL(msg.init(key, local_sync_value, max_value, self_))) { LOG_WARN("fail to init request msg", K(ret));
LOG_WARN("fail to init request msg", K(ret)); } else if (OB_UNLIKELY(!msg.is_valid())) {
} else if (OB_UNLIKELY(!msg.is_valid())) { ret = OB_INVALID_ARGUMENT;
ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(msg));
LOG_WARN("invalid argument", K(ret), K(msg)); } else if (OB_FAIL(gais_request_rpc_->push_autoinc_val(leader, msg, new_sync_value))) {
} else if (OB_FAIL(gais_request_rpc_->push_autoinc_val(leader, msg, new_sync_value))) { LOG_WARN("handle gais request failed", K(ret), K(msg));
LOG_WARN("handle gais request failed", K(ret), K(msg)); (void)refresh_location_(tenant_id);
(void)refresh_location_(tenant_id); } else {
} else { global_sync_value = new_sync_value;
global_sync_value = new_sync_value; LOG_DEBUG("handle gais success", K(global_sync_value));
LOG_DEBUG("handle gais success", K(global_sync_value));
}
} }
} }
return ret; return ret;
...@@ -228,28 +216,26 @@ int ObGAISClient::local_sync_with_global_value(const AutoincKey &key, uint64_t & ...@@ -228,28 +216,26 @@ int ObGAISClient::local_sync_with_global_value(const AutoincKey &key, uint64_t &
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret)); LOG_WARN("not inited", K(ret));
} else { } else {
MTL_SWITCH(tenant_id) { ObGAISAutoIncKeyArg msg;
ObGAISAutoIncKeyArg msg; ObGAISCurrValRpcResult rpc_result;
ObGAISCurrValRpcResult rpc_result; ObAddr leader;
ObAddr leader; if (OB_FAIL(get_leader_(tenant_id, leader))) {
if (OB_FAIL(get_leader_(tenant_id, leader))) { LOG_WARN("get leader fail", K(ret));
LOG_WARN("get leader fail", K(ret)); (void)refresh_location_(key.tenant_id_);
(void)refresh_location_(key.tenant_id_); } else if (OB_FAIL(msg.init(key, self_))) {
} else if (OB_FAIL(msg.init(key, self_))) { LOG_WARN("fail to init request msg", K(ret));
LOG_WARN("fail to init request msg", K(ret)); } else if (OB_UNLIKELY(!msg.is_valid())) {
} else if (OB_UNLIKELY(!msg.is_valid())) { ret = OB_INVALID_ARGUMENT;
ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(msg));
LOG_WARN("invalid argument", K(ret), K(msg)); } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) {
} else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { LOG_WARN("handle gais request failed", K(ret), K(msg));
LOG_WARN("handle gais request failed", K(ret), K(msg)); (void)refresh_location_(key.tenant_id_);
(void)refresh_location_(key.tenant_id_); } else if (!rpc_result.is_valid()) {
} else if (!rpc_result.is_valid()) { ret = OB_ERR_UNEXPECTED;
ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc result is unexpected", K(ret), K(rpc_result));
LOG_WARN("rpc result is unexpected", K(ret), K(rpc_result)); } else {
} else { global_sync_value = rpc_result.sync_value_;
global_sync_value = rpc_result.sync_value_; LOG_DEBUG("handle gais success", K(global_sync_value));
LOG_DEBUG("handle gais success", K(global_sync_value));
}
} }
} }
return ret; return ret;
...@@ -263,23 +249,21 @@ int ObGAISClient::clear_global_autoinc_cache(const AutoincKey &key) ...@@ -263,23 +249,21 @@ int ObGAISClient::clear_global_autoinc_cache(const AutoincKey &key)
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret)); LOG_WARN("not inited", K(ret));
} else { } else {
MTL_SWITCH(tenant_id) { ObGAISAutoIncKeyArg msg;
ObGAISAutoIncKeyArg msg; ObAddr leader;
ObAddr leader; if (OB_FAIL(get_leader_(tenant_id, leader))) {
if (OB_FAIL(get_leader_(tenant_id, leader))) { LOG_WARN("get leader fail", K(ret));
LOG_WARN("get leader fail", K(ret)); (void)refresh_location_(key.tenant_id_);
(void)refresh_location_(key.tenant_id_); } else if (OB_FAIL(msg.init(key, self_))) {
} else if (OB_FAIL(msg.init(key, self_))) { LOG_WARN("fail to init request msg", K(ret));
LOG_WARN("fail to init request msg", K(ret)); } else if (OB_UNLIKELY(!msg.is_valid())) {
} else if (OB_UNLIKELY(!msg.is_valid())) { ret = OB_INVALID_ARGUMENT;
ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(msg));
LOG_WARN("invalid argument", K(ret), K(msg)); } else if (OB_FAIL(gais_request_rpc_->clear_autoinc_cache(leader, msg))) {
} else if (OB_FAIL(gais_request_rpc_->clear_autoinc_cache(leader, msg))) { LOG_WARN("handle gais request failed", K(ret), K(msg));
LOG_WARN("handle gais request failed", K(ret), K(msg)); (void)refresh_location_(key.tenant_id_);
(void)refresh_location_(key.tenant_id_); } else {
} else { LOG_DEBUG("clear global autoinc cache success", K(msg));
LOG_DEBUG("clear global autoinc cache success", K(msg));
}
} }
} }
return ret; return ret;
...@@ -292,8 +276,11 @@ int ObGAISClient::get_leader_(const uint64_t tenant_id, ObAddr &leader) ...@@ -292,8 +276,11 @@ int ObGAISClient::get_leader_(const uint64_t tenant_id, ObAddr &leader)
lib::ObMutexGuard guard(cache_leader_mutex_); lib::ObMutexGuard guard(cache_leader_mutex_);
if (OB_LIKELY(gais_cache_leader_.is_valid())) { if (OB_LIKELY(gais_cache_leader_.is_valid())) {
leader = gais_cache_leader_; leader = gais_cache_leader_;
} else if (OB_FAIL(location_adapter_->nonblock_get_leader( } else if (OB_ISNULL(GCTX.location_service_)) {
cluster_id, tenant_id, GAIS_LS, leader))) { ret = OB_NOT_INIT;
LOG_WARN("location cache is NULL", K(ret));
} else if (OB_FAIL(GCTX.location_service_->nonblock_get_leader(
cluster_id, tenant_id, GAIS_LS, leader))) {
LOG_WARN("gais nonblock get leader failed", K(ret), K(tenant_id), K(GAIS_LS)); LOG_WARN("gais nonblock get leader failed", K(ret), K(tenant_id), K(GAIS_LS));
} else if (OB_UNLIKELY(!leader.is_valid())) { } else if (OB_UNLIKELY(!leader.is_valid())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
...@@ -309,7 +296,10 @@ int ObGAISClient::refresh_location_(const uint64_t tenant_id) ...@@ -309,7 +296,10 @@ int ObGAISClient::refresh_location_(const uint64_t tenant_id)
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const int64_t cluster_id = GCONF.cluster_id; const int64_t cluster_id = GCONF.cluster_id;
reset_cache_leader_(); reset_cache_leader_();
if (OB_FAIL(location_adapter_->nonblock_renew(cluster_id, tenant_id, GAIS_LS))) { if (OB_ISNULL(GCTX.location_service_)) {
ret = OB_NOT_INIT;
LOG_WARN("location cache is NULL", K(ret));
} else if (OB_FAIL(GCTX.location_service_->nonblock_renew(cluster_id, tenant_id, GAIS_LS))) {
LOG_WARN("gais nonblock renew error", KR(ret), K(tenant_id), K(GAIS_LS)); LOG_WARN("gais nonblock renew error", KR(ret), K(tenant_id), K(GAIS_LS));
} }
return ret; return ret;
......
...@@ -25,13 +25,10 @@ namespace share ...@@ -25,13 +25,10 @@ namespace share
class ObGAISClient class ObGAISClient
{ {
public: public:
ObGAISClient() : is_inited_(false), self_(), gais_request_rpc_(nullptr), ObGAISClient() : is_inited_(false), self_(), gais_request_rpc_(nullptr), gais_cache_leader_() { }
location_adapter_(nullptr), gais_cache_leader_() { }
~ObGAISClient() { } ~ObGAISClient() { }
int init(const common::ObAddr &self, share::ObGAISRequestRpc *gais_request_rpc, int init(const common::ObAddr &self, share::ObGAISRequestRpc *gais_request_rpc);
transaction::ObILocationAdapter *location_adapter);
void reset(); void reset();
int refresh_location(const uint64_t tenant_id) { return refresh_location_(tenant_id); }
TO_STRING_KV(K_(self), K_(gais_cache_leader)); TO_STRING_KV(K_(self), K_(gais_cache_leader));
public: public:
...@@ -68,7 +65,6 @@ private: ...@@ -68,7 +65,6 @@ private:
bool is_inited_; bool is_inited_;
common::ObAddr self_; common::ObAddr self_;
share::ObGAISRequestRpc *gais_request_rpc_; share::ObGAISRequestRpc *gais_request_rpc_;
transaction::ObILocationAdapter *location_adapter_;
common::ObAddr gais_cache_leader_; common::ObAddr gais_cache_leader_;
lib::ObMutex cache_leader_mutex_; lib::ObMutex cache_leader_mutex_;
}; };
......
...@@ -162,17 +162,20 @@ int ObGAISRequestRpc::next_autoinc_val(const ObAddr &server, ...@@ -162,17 +162,20 @@ int ObGAISRequestRpc::next_autoinc_val(const ObAddr &server,
} else if (server == self_) { } else if (server == self_) {
// Use local calls instead of rpc // Use local calls instead of rpc
ObGlobalAutoIncService *gais = nullptr; ObGlobalAutoIncService *gais = nullptr;
if (OB_ISNULL(gais = MTL_WITH_CHECK_TENANT(ObGlobalAutoIncService *, msg.autoinc_key_.tenant_id_))) { const uint64_t tenant_id = msg.autoinc_key_.tenant_id_;
ret = OB_ERR_UNEXPECTED; MTL_SWITCH(tenant_id) {
LOG_WARN("global autoinc service is null", K(ret)); if (OB_ISNULL(gais = MTL(ObGlobalAutoIncService *))) {
} else if (OB_FAIL(gais->handle_next_autoinc_request(msg, rpc_result))) { ret = OB_ERR_UNEXPECTED;
LOG_WARN("post local gais require autoinc request failed", KR(ret), K(server), K(msg)); LOG_WARN("global autoinc service is null", K(ret));
} else if (!rpc_result.is_valid()) { } else if (OB_FAIL(gais->handle_next_autoinc_request(msg, rpc_result))) {
ret = OB_ERR_UNEXPECTED; LOG_WARN("post local gais require autoinc request failed", KR(ret), K(server), K(msg));
LOG_ERROR("post local gais require autoinc and gais_rpc_result is invalid", KR(ret), K(server), } else if (!rpc_result.is_valid()) {
K(msg), K(rpc_result)); ret = OB_ERR_UNEXPECTED;
} else { LOG_ERROR("post local gais require autoinc and gais_rpc_result is invalid", KR(ret), K(server),
LOG_TRACE("post local require autoinc request success", K(msg), K(rpc_result)); K(msg), K(rpc_result));
} else {
LOG_TRACE("post local require autoinc request success", K(msg), K(rpc_result));
}
} }
} else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).next_autoinc_val(msg, rpc_result))) { } else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).next_autoinc_val(msg, rpc_result))) {
LOG_WARN("post require autoinc request failed", KR(ret), K(server), K(msg)); LOG_WARN("post require autoinc request failed", KR(ret), K(server), K(msg));
...@@ -201,13 +204,16 @@ int ObGAISRequestRpc::curr_autoinc_val(const ObAddr &server, ...@@ -201,13 +204,16 @@ int ObGAISRequestRpc::curr_autoinc_val(const ObAddr &server,
} else if (server == self_) { } else if (server == self_) {
// Use local calls instead of rpc // Use local calls instead of rpc
ObGlobalAutoIncService *gais = nullptr; ObGlobalAutoIncService *gais = nullptr;
if (OB_ISNULL(gais = MTL_WITH_CHECK_TENANT(ObGlobalAutoIncService *, msg.autoinc_key_.tenant_id_))) { const uint64_t tenant_id = msg.autoinc_key_.tenant_id_;
ret = OB_ERR_UNEXPECTED; MTL_SWITCH(tenant_id) {
LOG_WARN("global autoinc service is null", K(ret)); if (OB_ISNULL(gais = MTL(ObGlobalAutoIncService *))) {
} else if (OB_FAIL(gais->handle_curr_autoinc_request(msg, rpc_result))) { ret = OB_ERR_UNEXPECTED;
LOG_WARN("post local gais get autoinc request failed", KR(ret), K(server), K(msg)); LOG_WARN("global autoinc service is null", K(ret));
} else { } else if (OB_FAIL(gais->handle_curr_autoinc_request(msg, rpc_result))) {
LOG_TRACE("post local get autoinc request success", K(msg), K(rpc_result)); LOG_WARN("post local gais get autoinc request failed", KR(ret), K(server), K(msg));
} else {
LOG_TRACE("post local get autoinc request success", K(msg), K(rpc_result));
}
} }
} else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).curr_autoinc_val(msg, rpc_result))) { } else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).curr_autoinc_val(msg, rpc_result))) {
LOG_WARN("post gais request failed", KR(ret), K(server), K(msg)); LOG_WARN("post gais request failed", KR(ret), K(server), K(msg));
...@@ -232,13 +238,16 @@ int ObGAISRequestRpc::push_autoinc_val(const ObAddr &server, ...@@ -232,13 +238,16 @@ int ObGAISRequestRpc::push_autoinc_val(const ObAddr &server,
} else if (server == self_) { } else if (server == self_) {
// Use local calls instead of rpc // Use local calls instead of rpc
ObGlobalAutoIncService *gais = nullptr; ObGlobalAutoIncService *gais = nullptr;
if (OB_ISNULL(gais = MTL_WITH_CHECK_TENANT(ObGlobalAutoIncService *, msg.autoinc_key_.tenant_id_))) { const uint64_t tenant_id = msg.autoinc_key_.tenant_id_;
ret = OB_ERR_UNEXPECTED; MTL_SWITCH(tenant_id) {
LOG_WARN("global autoinc service is null", K(ret)); if (OB_ISNULL(gais = MTL(ObGlobalAutoIncService *))) {
} else if (OB_FAIL(gais->handle_push_autoinc_request(msg, sync_value))) { ret = OB_ERR_UNEXPECTED;
LOG_WARN("post local gais push global request failed", KR(ret), K(server), K(msg)); LOG_WARN("global autoinc service is null", K(ret));
} else { } else if (OB_FAIL(gais->handle_push_autoinc_request(msg, sync_value))) {
LOG_TRACE("post local gais push global request request success", K(msg), K(sync_value)); LOG_WARN("post local gais push global request failed", KR(ret), K(server), K(msg));
} else {
LOG_TRACE("post local gais push global request request success", K(msg), K(sync_value));
}
} }
} else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).push_autoinc_val(msg, sync_value))) { } else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).push_autoinc_val(msg, sync_value))) {
LOG_WARN("post remote push global request failed", KR(ret), K(server), K(msg)); LOG_WARN("post remote push global request failed", KR(ret), K(server), K(msg));
...@@ -261,13 +270,16 @@ int ObGAISRequestRpc::clear_autoinc_cache(const ObAddr &server, const ObGAISAuto ...@@ -261,13 +270,16 @@ int ObGAISRequestRpc::clear_autoinc_cache(const ObAddr &server, const ObGAISAuto
} else if (server == self_) { } else if (server == self_) {
// Use local calls instead of rpc // Use local calls instead of rpc
ObGlobalAutoIncService *gais = nullptr; ObGlobalAutoIncService *gais = nullptr;
if (OB_ISNULL(gais = MTL_WITH_CHECK_TENANT(ObGlobalAutoIncService *, msg.autoinc_key_.tenant_id_))) { const uint64_t tenant_id = msg.autoinc_key_.tenant_id_;
ret = OB_ERR_UNEXPECTED; MTL_SWITCH(tenant_id) {
LOG_WARN("global autoinc service is null", K(ret)); if (OB_ISNULL(gais = MTL(ObGlobalAutoIncService *))) {
} else if (OB_FAIL(gais->handle_clear_autoinc_cache_request(msg))) { ret = OB_ERR_UNEXPECTED;
LOG_WARN("post local gais clear autoinc cache failed", KR(ret), K(server), K(msg)); LOG_WARN("global autoinc service is null", K(ret));
} else { } else if (OB_FAIL(gais->handle_clear_autoinc_cache_request(msg))) {
LOG_TRACE("clear autoinc cache success", K(server), K(msg)); LOG_WARN("post local gais clear autoinc cache failed", KR(ret), K(server), K(msg));
} else {
LOG_TRACE("clear autoinc cache success", K(server), K(msg));
}
} }
} else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).clear_autoinc_cache(msg))) { } else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).clear_autoinc_cache(msg))) {
LOG_WARN("post gais request failed", KR(ret), K(server), K(msg)); LOG_WARN("post gais request failed", KR(ret), K(server), K(msg));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册