提交 8024d8ff 编写于 作者: F fengdeyiji 提交者: ob-robot

[MDS] fix mds_table flush won't do bug

上级 d137c146
......@@ -177,6 +177,7 @@ int ObLSTabletService::offline()
if (OB_FAIL(tablet_id_set_.foreach(clean_mem_op))) {
LOG_WARN("fail to clean memtables", K(ret), K(clean_mem_op.cur_tablet_id_));
}
mds_table_mgr_.offline();
}
return ret;
}
......
......@@ -64,6 +64,11 @@ int ObMdsTableMgr::reset()
return ret;
}
void ObMdsTableMgr::offline()
{
freezing_scn_.atomic_store(share::SCN::min_scn());
}
void ObMdsTableMgr::destroy() { reset(); }
int ObMdsTableMgr::register_to_mds_table_mgr(MdsTableBase *p_mds_table)
......@@ -156,14 +161,13 @@ int ObMdsTableMgr::unregister_from_mds_table_mgr(MdsTableBase *p_mds_table)
return ret;
}
int ObMdsTableMgr::first_scan_to_get_max_min_rec_scn_(share::SCN &max_rec_scn, share::SCN &min_rec_scn)
int ObMdsTableMgr::first_scan_to_get_min_rec_scn_(share::SCN &min_rec_scn)
{
MDS_TG(10_s);
int ret = OB_SUCCESS;
int64_t scan_cnt = 0;
auto get_max_min_rec_scn_op =
[&max_rec_scn, &min_rec_scn, &scan_cnt](const common::ObTabletID &tablet_id,
List<MdsTableBase> &mds_table_list) {
[&min_rec_scn, &scan_cnt](const common::ObTabletID &tablet_id, List<MdsTableBase> &mds_table_list) {
int tmp_ret = OB_SUCCESS;
if (mds_table_list.empty()) {
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "meet empty mds table list", K(tablet_id));
......@@ -173,8 +177,7 @@ int ObMdsTableMgr::first_scan_to_get_max_min_rec_scn_(share::SCN &max_rec_scn, s
// jsut skip it
} else {
share::SCN rec_scn = mds_table->get_rec_scn();
max_rec_scn = std::max(rec_scn, max_rec_scn);
min_rec_scn = std::min(rec_scn, max_rec_scn);
min_rec_scn = std::min(rec_scn, min_rec_scn);
++scan_cnt;
}
}
......@@ -182,9 +185,7 @@ int ObMdsTableMgr::first_scan_to_get_max_min_rec_scn_(share::SCN &max_rec_scn, s
return true;
};
if (OB_FAIL(mds_table_map_.for_each(get_max_min_rec_scn_op))) {
MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_cnt), K(max_rec_scn), K(min_rec_scn));
} else {
MDS_LOG(INFO, "success to do first scan to get max and min rec scn", KR(ret), K(*this), K(scan_cnt), K(max_rec_scn), K(min_rec_scn));
MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_cnt), K(min_rec_scn));
}
return ret;
}
......@@ -215,37 +216,42 @@ int ObMdsTableMgr::second_scan_to_do_flush_(share::SCN do_flush_limit_scn)
if (OB_FAIL(mds_table_map_.for_each(flush_op))) {
MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_mds_table_cnt), K(do_flush_limit_scn));
} else {
MDS_LOG(INFO, "success to do first scan to get max and min rec scn", KR(ret), K(*this), K(scan_mds_table_cnt), K(do_flush_limit_scn));
MDS_LOG(INFO, "success to do second scan to do flush", KR(ret), K(*this), K(scan_mds_table_cnt), K(do_flush_limit_scn));
}
return ret;
}
int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze)
{
#define PRINT_WRAPPER KR(ret), K(ls_->get_ls_id()), K(recycle_scn), K(need_freeze), K(max_rec_scn), K(min_rec_scn)
#define PRINT_WRAPPER KR(ret), K(ls_->get_ls_id()), K(recycle_scn), K(need_freeze), K(min_rec_scn),\
K(max_consequent_callbacked_scn), K(*this)
MDS_TG(10_s);
int ret = OB_SUCCESS;
MdsTableFreezeGuard freeze_guard;
freeze_guard.init(this);
int64_t flushed_table_cnt = 0;
MdsTableHandle *flushing_mds_table = nullptr;
share::SCN max_rec_scn = share::SCN::min_scn();
share::SCN min_rec_scn = share::SCN::max_scn();
share::SCN max_consequent_callbacked_scn;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
MDS_LOG_FREEZE(ERROR, "mds table mgr not inited");
} else if (!freeze_guard.can_freeze()) {
MDS_LOG_FREEZE(INFO, "mds table mgr is doing flush, skip flush once");
} else if (MDS_FAIL(first_scan_to_get_max_min_rec_scn_(max_rec_scn, min_rec_scn))) {
MDS_LOG_FREEZE(WARN, "do first_scan_to_get_max_min_rec_scn_ failed");
} else if (max_rec_scn == share::SCN::min_scn() && min_rec_scn == share::SCN::max_scn()) {// no mds table
} else if (MDS_FAIL(first_scan_to_get_min_rec_scn_(min_rec_scn))) {
MDS_LOG_FREEZE(WARN, "do first_scan_to_get_min_rec_scn_ failed");
} else if (min_rec_scn == share::SCN::max_scn()) {// no mds table
MDS_LOG_FREEZE(INFO, "no valid mds table there, no need do flush");
} else if (MDS_FAIL(ls_->get_freezer()->get_max_consequent_callbacked_scn(max_consequent_callbacked_scn))) {
MDS_LOG_FREEZE(WARN, "fail to get max_consequent_callbacked_scn", KR(ret), K(*this));
} else if (!max_consequent_callbacked_scn.is_valid() || max_consequent_callbacked_scn.is_max() || max_consequent_callbacked_scn.is_min()) {
ret = OB_ERR_UNEXPECTED;
MDS_LOG_FREEZE(ERROR, "invalid max_consequent_callbacked_scn", KR(ret), K(*this));
} else {
if (need_freeze) {
if (recycle_scn.is_max() || !recycle_scn.is_valid()) {
MDS_LOG_FREEZE(INFO, "invalid recycle_scn redirected to max_rec_scn");
recycle_scn = max_rec_scn;
recycle_scn = max_consequent_callbacked_scn;
}
if (recycle_scn > freezing_scn_) {
MDS_LOG_FREEZE(INFO, "generate new freezing scn");
......@@ -258,6 +264,8 @@ int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze)
} else {
MDS_LOG_FREEZE(INFO, "success to do flush");
}
} else {
MDS_LOG_FREEZE(INFO, "no need do flush cause min_rec_scn is larger than freezing scn");
}
}
return ret;
......@@ -268,43 +276,18 @@ SCN ObMdsTableMgr::get_freezing_scn() const { return freezing_scn_.atomic_get();
SCN ObMdsTableMgr::get_rec_scn()
{
MDS_TG(1_s);
int ret = OB_SUCCESS;
// TODO : @gengli FIX ME
SCN min_rec_scn = share::SCN::max_scn();
common::ObTimeGuard tg("get mds table rec log scn", 100 * 1000);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
MDS_LOG(ERROR, "regsiter mds table failed", KR(ret));
} else if (MDS_FAIL(first_scan_to_get_min_rec_scn_(min_rec_scn))) {
min_rec_scn = SCN::min_scn();
MDS_LOG(WARN, "fail to scan get min_rec_scn", KR(ret), K(min_rec_scn), K(*this));
} else {
auto get_scn_op = [&min_rec_scn, &ret](const common::ObTabletID &tablet_id,
List<MdsTableBase> &mds_table_list) {
SCN rec_scn;
if (mds_table_list.empty()) {
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "meet empty mds table list", K(tablet_id));
} else {
MdsTableBase *p_mds_table = static_cast<MdsTableBase *>(mds_table_list.list_head_);
if (p_mds_table->is_removed_from_t3m()) {
// jsut skip it
} else if (FALSE_IT(rec_scn = p_mds_table->get_rec_scn())) {
} else if (!rec_scn.is_valid()) {
ret = OB_ERR_UNEXPECTED;
MDS_LOG(ERROR, "get invalid rec scn", KR(ret));
} else {
min_rec_scn = std::min(min_rec_scn, rec_scn);
}
}
// true means iterating the next mds table
return OB_SUCCESS == ret;
};
if (OB_FAIL(mds_table_map_.for_each(get_scn_op))) {
MDS_LOG(WARN, "fail to do for_each in map", KR(ret));
}
// todo zk: retry
if (OB_FAIL(ret)) {
min_rec_scn = SCN::min_scn();
}
MDS_LOG(INFO, "get rec_scn from MdsTableMgr", KR(ret), K(min_rec_scn));
MDS_LOG(INFO, "get rec_scn from MdsTableMgr", KR(ret), K(min_rec_scn), K(*this));
}
return min_rec_scn;
}
......
......@@ -42,6 +42,7 @@ public:
int init(ObLS *ls);
int reset();
void offline();
void destroy();
int register_to_mds_table_mgr(MdsTableBase *p_mds_table);
int unregister_from_mds_table_mgr(MdsTableBase *p_mds_table);
......@@ -66,7 +67,7 @@ public: // getter and setter
int64_t get_ref() { return ATOMIC_LOAD(&ref_cnt_); }
private:
int first_scan_to_get_max_min_rec_scn_(share::SCN &max_rec_scn, share::SCN &min_rec_scn);
int first_scan_to_get_min_rec_scn_(share::SCN &min_rec_scn);
int second_scan_to_do_flush_(share::SCN min_rec_scn);
private:
......
......@@ -5872,14 +5872,13 @@ int ObPartTransCtx::get_tx_ctx_table_info_(ObTxCtxTableInfo &info)
info.tx_id_ = trans_id_;
info.ls_id_ = ls_id_;
info.cluster_id_ = cluster_id_;
exec_info_.mds_buffer_ctx_array_.reset();
if (OB_FAIL(mt_ctx_.get_table_lock_store_info(info.table_lock_info_))) {
TRANS_LOG(WARN, "get_table_lock_store_info failed", K(ret), K(info));
} else {
TRANS_LOG(INFO, "store ctx_info: ", K(ret), K(info), KPC(this));
}
}
exec_info_.mds_buffer_ctx_array_.reset();
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册