未验证 提交 9185c3a6 编写于 作者: Y Yingchun Lai 提交者: GitHub

feat(rocksdb): Support to open rocksdb with or without meta CF (#549)

上级 7ab5fb84
Subproject commit c9d5c2239ad94af981e8b3f81f41c39f2083a07e
Subproject commit 33318f3ec2a12324c17d3ff418985f25cc73c943
Subproject commit f0e22c376e8de3ca576f4a37c857172d54cc4fa7
Subproject commit 52492c31313921d0476751fffc77b84ead156363
......@@ -48,6 +48,8 @@ std::shared_ptr<rocksdb::Cache> pegasus_server_impl::_s_block_cache;
::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage;
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
: dsn::apps::rrdb_service(r),
......@@ -452,8 +454,7 @@ pegasus_server_impl::~pegasus_server_impl()
{
if (_is_open) {
dassert(_db != nullptr, "");
delete _db;
_db = nullptr;
release_db();
}
}
......@@ -1647,6 +1648,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
// 2, we can parse restore info from app env, which is stored in argv
// 3, restore_dir is exist
//
bool db_exist = true;
auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
if (::dsn::utils::filesystem::path_exists(path)) {
// only case 1
......@@ -1662,6 +1664,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
replica_name());
return ::dsn::ERR_FILE_OPERATION_FAILED;
} else {
db_exist = false;
dinfo("%s: open a new db, path = %s", replica_name(), path.c_str());
}
} else {
......@@ -1688,6 +1691,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
restore_dir.c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
} else {
db_exist = false;
dwarn(
"%s: try to restore and restore_dir(%s) isn't exist, but we don't force "
"it, the role of this replica must not primary, so we open a new db on the "
......@@ -1702,16 +1706,36 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
auto status = rocksdb::DB::Open(rocksdb::Options(_db_opts, _data_cf_opts), path, &_db);
bool need_open_with_meta_cf = false;
// Check meta CF only when db exist.
if (db_exist && check_meta_cf(path, &need_open_with_meta_cf) != ::dsn::ERR_OK) {
derror_replica("check meta column family failed");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}});
if (need_open_with_meta_cf) {
column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(
META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()));
}
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db);
if (status.ok()) {
dcheck_eq_replica(column_families.size(), handles_opened.size());
dcheck_eq_replica(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
_data_cf = handles_opened[0];
if (handles_opened.size() == 2) {
dcheck_eq_replica(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
_meta_cf = handles_opened[1];
}
_last_committed_decree = _db->GetLastFlushedDecree();
_pegasus_data_version = _db->GetPegasusDataVersion();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
derror("%s: open app failed, unsupported data version %" PRIu32,
replica_name(),
_pegasus_data_version);
delete _db;
_db = nullptr;
release_db();
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
......@@ -1736,8 +1760,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
auto err = async_checkpoint(false);
if (err != ::dsn::ERR_OK) {
derror("%s: create checkpoint failed, error = %s", replica_name(), err.to_string());
delete _db;
_db = nullptr;
release_db();
return err;
}
dassert(last_flushed == last_durable_decree(),
......@@ -1822,8 +1845,7 @@ void pegasus_server_impl::cancel_background_work(bool wait)
_context_cache.clear();
_is_open = false;
delete _db;
_db = nullptr;
release_db();
std::deque<int64_t> reserved_checkpoints;
{
......@@ -2894,5 +2916,41 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
// TODO(heyuchen): set filter _partition_version in further pr
}
::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path,
bool *need_open_with_meta_cf)
{
*need_open_with_meta_cf = false;
std::vector<std::string> column_families;
auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families);
if (!s.ok()) {
derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}", s.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
for (const auto &column_family : column_families) {
if (column_family == DATA_COLUMN_FAMILY_NAME) {
continue;
}
if (column_family == META_COLUMN_FAMILY_NAME) {
*need_open_with_meta_cf = true;
continue;
}
dassert_replica(false, "Column family '{}' should not present");
}
return ::dsn::ERR_OK;
}
void pegasus_server_impl::release_db()
{
_db->DestroyColumnFamilyHandle(_data_cf);
_data_cf = nullptr;
if (_meta_cf != nullptr) {
_db->DestroyColumnFamilyHandle(_meta_cf);
}
_meta_cf = nullptr;
delete _db;
_db = nullptr;
}
} // namespace server
} // namespace pegasus
......@@ -300,8 +300,14 @@ private:
return false;
}
::dsn::error_code check_meta_cf(const std::string &path, bool *need_open_with_meta_cf);
void release_db();
private:
static const std::string COMPRESSION_HEADER;
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
static const std::string META_COLUMN_FAMILY_NAME;
dsn::gpid _gpid;
std::string _primary_address;
......@@ -322,7 +328,9 @@ private:
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
rocksdb::DB *_db;
rocksdb::DB *_db = nullptr;
rocksdb::ColumnFamilyHandle *_data_cf = nullptr;
rocksdb::ColumnFamilyHandle *_meta_cf = nullptr;
static std::shared_ptr<rocksdb::Cache> _s_block_cache;
volatile bool _is_open;
uint32_t _pegasus_data_version;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册