提交 74ad2074 编写于 作者: G godyangfight 提交者: wangzelin.wzl

Fix migration check need copy minor sstable bug

上级 53b1261f
......@@ -854,8 +854,8 @@ int ObStartCompleteMigrationTask::process()
LOG_WARN("failed to check all tablet ready", K(ret), KPC(ctx_));
} else if (OB_FAIL(wait_trans_tablet_explain_data_())) {
LOG_WARN("failed to wait log replay sync", K(ret), KPC(ctx_));
} else if (OB_FAIL(wait_ls_checkpoint_ts_push_())) {
LOG_WARN("failed to wait ls checkpoint ts push", K(ret), KPC(ctx_));
} else if (OB_FAIL(wait_log_replay_to_max_minor_end_scn_())) {
LOG_WARN("failed to wait log replay to max minor end scn", K(ret), KPC(ctx_));
} else if (OB_FAIL(update_ls_migration_status_hold_())) {
LOG_WARN("failed to update ls migration status hold", K(ret), KPC(ctx_));
} else if (OB_FAIL(change_member_list_())) {
......@@ -893,7 +893,7 @@ int ObStartCompleteMigrationTask::wait_log_sync_()
} else if (OB_ISNULL(ls = ls_handle_.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls should not be NULL", K(ret), KP(ls), KPC(ctx_));
} else if (OB_FAIL(check_need_wait_log_sync_(ls, need_wait))) {
} else if (OB_FAIL(check_need_wait_(ls, need_wait))) {
LOG_WARN("failed to check need wait log sync", K(ret), KPC(ctx_));
} else if (!need_wait) {
FLOG_INFO("no need wait log sync", KPC(ctx_));
......@@ -991,8 +991,8 @@ int ObStartCompleteMigrationTask::wait_log_replay_sync_()
} else if (OB_ISNULL(ls = ls_handle_.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls should not be NULL", K(ret), KP(ls), KPC(ctx_));
} else if (OB_FAIL(check_need_wait_log_sync_(ls, need_wait))) {
LOG_WARN("failed to check need wait log sync", K(ret), KPC(ctx_));
} else if (OB_FAIL(check_need_wait_(ls, need_wait))) {
LOG_WARN("failed to check need wait log replay", K(ret), KPC(ctx_));
} else if (!need_wait) {
FLOG_INFO("no need wait replay log sync", KPC(ctx_));
} else {
......@@ -1118,12 +1118,13 @@ int ObStartCompleteMigrationTask::change_member_list_()
return ret;
}
int ObStartCompleteMigrationTask::check_need_wait_log_sync_(
int ObStartCompleteMigrationTask::check_need_wait_(
ObLS *ls,
bool &need_wait)
{
int ret = OB_SUCCESS;
need_wait = true;
ObLSRestoreStatus ls_restore_status;
if (!is_inited_) {
ret = OB_NOT_INIT;
......@@ -1131,19 +1132,15 @@ int ObStartCompleteMigrationTask::check_need_wait_log_sync_(
} else if (OB_ISNULL(ls)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("check need wait log sync get invalid argument", K(ret), KP(ls));
} else if (OB_FAIL(ls->get_restore_status(ls_restore_status))) {
LOG_WARN("failed to get restore status", K(ret), KPC(ctx_));
} else if (ls_restore_status.is_in_restore()) {
need_wait = false;
} else if (ObMigrationOpType::REBUILD_LS_OP == ctx_->arg_.type_) {
need_wait = false;
} else if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_
|| ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_) {
need_wait = true;
ObLSRestoreStatus ls_restore_status;
if (OB_FAIL(ls->get_restore_status(ls_restore_status))) {
LOG_WARN("failed to get restore status", K(ret), KPC(ctx_));
} else if (!ls_restore_status.is_in_restore()) {
need_wait = true;
} else if (!ls_restore_status.can_restore_log()) {
need_wait = false;
}
} else if (ObMigrationOpType::CHANGE_LS_OP == ctx_->arg_.type_) {
if (!ObReplicaTypeCheck::is_replica_with_ssstore(ls->get_replica_type())
&& ObReplicaTypeCheck::is_full_replica(ctx_->arg_.dst_.get_replica_type())) {
......@@ -1295,46 +1292,16 @@ int ObStartCompleteMigrationTask::check_tablet_ready_(
return ret;
}
int ObStartCompleteMigrationTask::check_need_wait_checkpoint_ts_push_(
ObLS *ls,
bool &need_wait)
{
int ret = OB_SUCCESS;
need_wait = true;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("start complete migration task do not init", K(ret));
} else if (OB_ISNULL(ls)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("check need wait log sync get invalid argument", K(ret), KP(ls));
} else if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_
|| ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_) {
need_wait = true;
ObLSRestoreStatus ls_restore_status;
if (OB_FAIL(ls->get_restore_status(ls_restore_status))) {
LOG_WARN("failed to get restore status", K(ret), KPC(ctx_));
} else if (!ls_restore_status.is_in_restore()) {
need_wait = true;
} else if (!ls_restore_status.can_restore_log()) {
need_wait = false;
}
}
return ret;
}
//TODO(muwei.ym) remove it later
int ObStartCompleteMigrationTask::wait_ls_checkpoint_ts_push_()
int ObStartCompleteMigrationTask::wait_log_replay_to_max_minor_end_scn_()
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
ObLS *ls = nullptr;
checkpoint::ObCheckpointExecutor *checkpoint_executor = NULL;
int64_t checkpoint_ts = 0;
const int64_t MAX_WAIT_INTERVAL_BY_CHECKPOINT_BY_FLUSH = GCONF._advance_checkpoint_timeout;
const int64_t MAX_SLEEP_INTERVAL_MS = 1 * 1000 * 1000; //1s
bool is_cancel = false;
bool need_wait = true;
int64_t current_replay_log_ts_ns = 0;
const int64_t OB_WAIT_LOG_REPLAY_INTERVAL = 200 * 1000; // 200ms
const int64_t OB_WAIT_LOG_REPLAY_TIMEOUT = 30 * 60 * 1000 * 1000L; // 30 min
if (!is_inited_) {
ret = OB_NOT_INIT;
......@@ -1344,15 +1311,12 @@ int ObStartCompleteMigrationTask::wait_ls_checkpoint_ts_push_()
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls should not be NULL", K(ret), KP(ls), KPC(ctx_));
} else if (OB_FAIL(check_need_wait_checkpoint_ts_push_(ls, need_wait))) {
LOG_WARN("failed to check need wait log sync", K(ret), KPC(ls), KPC(ctx_));
} else if (OB_FAIL(check_need_wait_(ls, need_wait))) {
LOG_WARN("failed to check need replay to max minor end scn", K(ret), KPC(ls), KPC(ctx_));
} else if (!need_wait) {
LOG_INFO("no need to wait ls checkpoint ts push", K(ret), KPC(ctx_));
} else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("checkpoint executor should not be NULL", K(ret), KPC(ctx_), KP(checkpoint_executor));
} else {
const int64_t wait_checkpoint_push_start_ts = ObTimeUtility::current_time();
const int64_t wait_replay_start_ts = ObTimeUtility::current_time();
while (OB_SUCC(ret)) {
if (ctx_->is_failed()) {
ret = OB_CANCELED;
......@@ -1362,29 +1326,35 @@ int ObStartCompleteMigrationTask::wait_ls_checkpoint_ts_push_()
} else if (is_cancel) {
ret = OB_CANCELED;
STORAGE_LOG(WARN, "task is cancelled", K(ret), K(*this));
} else if (FALSE_IT(checkpoint_ts = ls->get_clog_checkpoint_ts())) {
} else if (checkpoint_ts >= max_minor_end_scn_) {
const int64_t cost_ts = ObTimeUtility::current_time() - wait_checkpoint_push_start_ts;
LOG_INFO("succeed wait clog checkpoint ts push", "cost", cost_ts, "ls_id", ctx_->arg_.ls_id_);
} else if (OB_FAIL(ls->get_max_decided_log_ts_ns(current_replay_log_ts_ns))) {
LOG_WARN("failed to get current replay log ts", K(ret), KPC(ctx_));
} else if (current_replay_log_ts_ns >= max_minor_end_scn_) {
const int64_t cost_ts = ObTimeUtility::current_time() - wait_replay_start_ts;
LOG_INFO("wait replay log ts push to max minor end scn success, stop wait", "arg", ctx_->arg_,
K(cost_ts), K(max_minor_end_scn_), K(current_replay_log_ts_ns));
break;
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(max_minor_end_scn_))) {
if (OB_NO_NEED_UPDATE == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to advance checkpoint by flush", K(ret), KPC(ctx_));
} else {
const int64_t current_ts = ObTimeUtility::current_time();
if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000)) {
LOG_INFO("ls wait replay to max minor sstable end log ts, retry next loop", "arg", ctx_->arg_,
"wait_replay_start_ts", wait_replay_start_ts,
"current_ts", current_ts);
}
}
if (OB_SUCC(ret)) {
const int64_t current_ts = ObTimeUtility::current_time();
if (current_ts - wait_checkpoint_push_start_ts >= MAX_WAIT_INTERVAL_BY_CHECKPOINT_BY_FLUSH) {
ret = OB_TIMEOUT;
LOG_WARN("wait ls checkpoint ts push time out",
"ls_checkpoint_ts", checkpoint_ts, "need_checkpoint_ts", max_minor_end_scn_, "ls_id", ctx_->arg_.ls_id_);
if (current_ts - wait_replay_start_ts < OB_WAIT_LOG_REPLAY_TIMEOUT) {
} else {
LOG_INFO("wait ls checkpoint ts push", "ls_checkpoint_ts", checkpoint_ts,
"need_checkpoint_ts", max_minor_end_scn_, "ls_id", ctx_->arg_.ls_id_);
ob_usleep(MAX_SLEEP_INTERVAL_MS);
if (OB_FAIL(ctx_->set_result(OB_WAIT_REPLAY_TIMEOUT, true /*allow_retry*/))) {
LOG_WARN("failed to set result", K(ret), KPC(ctx_));
} else {
ret = OB_WAIT_REPLAY_TIMEOUT;
STORAGE_LOG(WARN, "failed to wait replay to max minor end scn, timeout, stop migration task",
K(ret), K(*ctx_), K(current_ts),
K(wait_replay_start_ts));
}
}
if (OB_SUCC(ret)) {
ob_usleep(OB_WAIT_LOG_REPLAY_INTERVAL);
}
}
}
......
......@@ -173,7 +173,7 @@ private:
int wait_log_replay_sync_();
int wait_trans_tablet_explain_data_();
int change_member_list_();
int check_need_wait_log_sync_(
int check_need_wait_(
ObLS *ls,
bool &need_wait);
int update_ls_migration_status_hold_();
......@@ -181,10 +181,7 @@ private:
int check_tablet_ready_(
const common::ObTabletID &tablet_id,
ObLS *ls);
int check_need_wait_checkpoint_ts_push_(
ObLS *ls,
bool &need_wait);
int wait_ls_checkpoint_ts_push_();
int wait_log_replay_to_max_minor_end_scn_();
int record_server_event_();
private:
......
......@@ -1191,6 +1191,7 @@ int ObTabletCopyFinishTask::create_new_table_store_()
ObTabletHandle tablet_handle;
ObTablet *tablet = nullptr;
const bool is_rollback = false;
bool need_merge = false;
if (!is_inited_) {
ret = OB_NOT_INIT;
......@@ -1200,10 +1201,12 @@ int ObTabletCopyFinishTask::create_new_table_store_()
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet should not be NULL", K(ret), K(tablet_id_));
} else if (OB_FAIL(check_need_merge_tablet_meta_(tablet, need_merge))) {
LOG_WARN("failedto check remote logical sstable exist", K(ret), KPC(tablet));
} else {
update_table_store_param.multi_version_start_ = 0;
update_table_store_param.need_report_ = true;
update_table_store_param.tablet_meta_ = src_tablet_meta_;
update_table_store_param.tablet_meta_ = need_merge ? src_tablet_meta_ : nullptr;
update_table_store_param.rebuild_seq_ = ls_->get_rebuild_seq();
if (OB_FAIL(update_table_store_param.tables_handle_.assign(tables_handle_))) {
......@@ -1229,6 +1232,7 @@ int ObTabletCopyFinishTask::update_tablet_data_status_()
ObTabletHandle tablet_handle;
ObTablet *tablet = nullptr;
const ObTabletDataStatus::STATUS data_status = ObTabletDataStatus::COMPLETE;
bool is_logical_sstable_exist = false;
if (!is_inited_) {
ret = OB_NOT_INIT;
......@@ -1243,24 +1247,19 @@ int ObTabletCopyFinishTask::update_tablet_data_status_()
LOG_WARN("tablet here should only has one", K(ret), KPC(tablet));
} else if (tablet->get_tablet_meta().ha_status_.is_data_status_complete()) {
//do nothing
} else if (OB_FAIL(check_remote_logical_sstable_exist_(tablet, is_logical_sstable_exist))) {
LOG_WARN("failedto check remote logical sstable exist", K(ret), KPC(tablet));
} else if (is_logical_sstable_exist && tablet->get_tablet_meta().ha_status_.is_restore_status_full()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet still has remote logical sstable, unexpected !!!", K(ret), KPC(tablet));
} else {
const ObSSTableArray &minor_sstables = tablet->get_table_store().get_minor_sstables();
const ObSSTableArray &major_sstables = tablet->get_table_store().get_major_sstables();
for (int64_t i = 0; OB_SUCC(ret) && i < minor_sstables.count(); ++i) {
const ObITable *table = minor_sstables[i];
if (table->is_remote_logical_minor_sstable()
&& tablet->get_tablet_meta().ha_status_.is_restore_status_full()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet still has remote logical sstable, unexpected !!!", K(ret), KPC(table), KPC(tablet));
}
}
if (OB_SUCC(ret)
&& tablet->get_tablet_meta().table_store_flag_.with_major_sstable()
&& tablet->get_tablet_meta().ha_status_.is_restore_status_full()
&& major_sstables.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet should has major sstable, unexpected", K(ret), K(tablet), K(major_sstables));
LOG_WARN("tablet should has major sstable, unexpected", K(ret), KPC(tablet), K(major_sstables));
}
#ifdef ERRSIM
......@@ -1287,11 +1286,64 @@ int ObTabletCopyFinishTask::update_tablet_data_status_()
LOG_WARN("failed to submit tablet update task", K(tmp_ret), KPC(ls_), K(tablet_id_));
}
}
}
return ret;
}
int ObTabletCopyFinishTask::check_need_merge_tablet_meta_(
ObTablet *tablet,
bool &need_merge)
{
int ret = OB_SUCCESS;
need_merge = false;
bool is_exist = false;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("tablet copy finish task do not init", K(ret));
} else if (OB_ISNULL(tablet)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("check need merge tablet meta get invalid argument", K(ret), KP(tablet));
} else if (tablet->get_tablet_meta().clog_checkpoint_ts_ >= src_tablet_meta_->clog_checkpoint_ts_) {
need_merge = false;
} else if (OB_FAIL(check_remote_logical_sstable_exist_(tablet, is_exist))) {
LOG_WARN("failed to check remote logical sstable exist", K(ret), KPC(tablet));
} else if (!is_exist) {
need_merge = false;
} else {
need_merge = true;
}
return ret;
}
int ObTabletCopyFinishTask::check_remote_logical_sstable_exist_(
ObTablet *tablet,
bool &is_exist)
{
int ret = OB_SUCCESS;
is_exist = false;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("tablet copy finish task do not init", K(ret));
} else if (OB_ISNULL(tablet)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("check remote logical sstable exist get invalid argument", K(ret), KP(tablet));
} else {
const ObSSTableArray &minor_sstables = tablet->get_table_store().get_minor_sstables();
for (int64_t i = 0; OB_SUCC(ret) && i < minor_sstables.count(); ++i) {
const ObITable *table = minor_sstables.array_[i];
if (OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("minor sstable should not be NULL", K(ret), KP(table));
} else if (table->is_remote_logical_minor_sstable()) {
is_exist = true;
break;
}
}
}
return ret;
}
}
}
......@@ -216,6 +216,12 @@ public:
private:
int create_new_table_store_();
int update_tablet_data_status_();
int check_need_merge_tablet_meta_(
ObTablet *tablet,
bool &need_merge);
int check_remote_logical_sstable_exist_(
ObTablet *tablet,
bool &is_exist);
private:
bool is_inited_;
......
......@@ -321,8 +321,8 @@ bool ObBatchUpdateTableStoreParam::is_valid() const
return snapshot_version_ >= 0
&& multi_version_start_ >= 0
&& rebuild_seq_ > OB_INVALID_VERSION
&& ((!update_logical_minor_sstable_ && OB_NOT_NULL(tablet_meta_))
|| (update_logical_minor_sstable_ && start_scn_ > 0));
&& (!update_logical_minor_sstable_
|| (update_logical_minor_sstable_ && start_scn_ > 0 && OB_ISNULL(tablet_meta_)));
}
int ObBatchUpdateTableStoreParam::assign(
......
......@@ -284,7 +284,6 @@ int ObTablet::init(
LOG_WARN("tablet pointer handle is invalid", K(ret), K_(pointer_hdl), K_(memtable_mgr), K_(log_handler));
} else if (is_update
&& !tablet_id.is_ls_inner_tablet()
&& param.ha_status_.is_restore_status_full() // is_update && is_migrate: init memtable_mgr. restore reuse memtable_mgr
&& OB_FAIL(init_storage_related_member(ls_id, tablet_id, param.max_sync_storage_schema_version_))) {
LOG_WARN("failed to init storage related member", K(ret), K(ls_id), K(tablet_id));
} else if (!is_update && OB_FAIL(init_shared_params(ls_id, tablet_id, param.max_sync_storage_schema_version_, freezer))) {
......
......@@ -1036,9 +1036,6 @@ int ObTabletTableStore::need_remove_old_table(
LOG_WARN("get invalid arguments", K(ret), K(multi_version_start));
} else if (minor_tables_.empty() || INT64_MAX == minor_tables_[0]->get_upper_trans_version()) {
// do nothing
} else if (minor_tables_[0]->get_end_log_ts() > tablet_ptr_->get_tablet_meta().clog_checkpoint_ts_) {
need_remove = false;
//TODO(muwei.ym) remove it later
} else if (minor_tables_[0]->get_upper_trans_version() <= major_tables_[0]->get_snapshot_version()) {
// at least one minor sstable is coverd by major sstable
// don't need to care about kept_multi_version_start here
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册