提交 1164ee6b 编写于 作者: G groot 提交者: JinHai-CN

#2378 (#2388)

* return partition lsn
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* fix wal lsn
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>

* fix wal issue
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* changelog
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* typo
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* all collection include partition
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* fix build error
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* fix flush
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>
Co-authored-by: Nshengjun.li <shengjun.li@zilliz.com>
上级 2b26468c
......@@ -4,6 +4,7 @@ Please mark all change in change log and use the issue from GitHub
# Milvus 0.9.1 (TBD)
## Bug
- \#2378 Duplicate data after server restart
## Feature
- \#2363 Update branch version to 0.9.1
......
......@@ -487,7 +487,11 @@ DBImpl::CreatePartition(const std::string& collection_id, const std::string& par
}
uint64_t lsn = 0;
meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
if (options_.wal_enable_) {
lsn = wal_mgr_->CreatePartition(collection_id, partition_tag);
} else {
meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
}
return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
}
......@@ -545,6 +549,10 @@ DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string&
return status;
}
if (options_.wal_enable_) {
wal_mgr_->DropPartition(collection_id, partition_tag);
}
return DropPartition(partition_name);
}
......@@ -891,7 +899,7 @@ DBImpl::Flush(const std::string& collection_id) {
swn_wal_.Notify();
flush_req_swn_.Wait();
}
StartMergeTask();
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
InternalFlush(collection_id);
......@@ -918,6 +926,7 @@ DBImpl::Flush() {
swn_wal_.Notify();
flush_req_swn_.Wait();
}
StartMergeTask();
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
InternalFlush();
......@@ -1421,7 +1430,9 @@ DBImpl::DropIndex(const std::string& collection_id) {
}
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
return DropCollectionIndexRecursively(collection_id);
auto status = DropCollectionIndexRecursively(collection_id);
StartMergeTask(); // merge small files after drop index
return status;
}
Status
......@@ -2407,30 +2418,39 @@ Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
fiu_return_on("DBImpl.ExexWalRecord.return", Status(););
auto collections_flushed = [&](const std::set<std::string>& collection_ids) -> uint64_t {
if (collection_ids.empty()) {
return 0;
}
auto collections_flushed = [&](const std::string collection_id,
const std::set<std::string>& target_collection_names) -> uint64_t {
uint64_t max_lsn = 0;
if (options_.wal_enable_) {
for (auto& collection : collection_ids) {
uint64_t lsn = 0;
uint64_t lsn = 0;
for (auto& collection : target_collection_names) {
meta_ptr_->GetCollectionFlushLSN(collection, lsn);
wal_mgr_->CollectionFlushed(collection, lsn);
if (lsn > max_lsn) {
max_lsn = lsn;
}
}
wal_mgr_->CollectionFlushed(collection_id, lsn);
}
std::lock_guard<std::mutex> lck(merge_result_mutex_);
for (auto& collection : collection_ids) {
for (auto& collection : target_collection_names) {
merge_collection_ids_.insert(collection);
}
return max_lsn;
};
auto partition_flushed = [&](const std::string& collection_id, const std::string& partition,
const std::string& target_collection_name) {
if (options_.wal_enable_) {
uint64_t lsn = 0;
meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn);
wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
}
std::lock_guard<std::mutex> lck(merge_result_mutex_);
merge_collection_ids_.insert(target_collection_name);
};
Status status;
switch (record.type) {
......@@ -2447,7 +2467,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
(record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.attr_nbytes, record.attr_data_size,
record.attr_data, record.lsn, flushed_collections);
collections_flushed(flushed_collections);
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
milvus::server::CollectInsertMetrics metrics(record.length, status);
break;
......@@ -2465,7 +2487,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
(record.data_size / record.length / sizeof(uint8_t)),
(const u_int8_t*)record.data, record.lsn, flushed_collections);
// even though !status.ok, run
collections_flushed(flushed_collections);
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
......@@ -2485,7 +2509,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
(record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.lsn, flushed_collections);
// even though !status.ok, run
collections_flushed(flushed_collections);
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
......@@ -2548,7 +2574,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
flushed_collections.insert(collection_id);
}
collections_flushed(flushed_collections);
collections_flushed(record.collection_id, flushed_collections);
} else {
// flush all collections
......@@ -2558,7 +2584,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
status = mem_mgr_->Flush(collection_ids);
}
uint64_t lsn = collections_flushed(collection_ids);
uint64_t lsn = collections_flushed("", collection_ids);
if (options_.wal_enable_) {
wal_mgr_->RemoveOldFiles(lsn);
}
......
......@@ -59,7 +59,7 @@ class Meta {
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) = 0;
virtual Status
AllCollections(std::vector<CollectionSchema>& table_schema_array) = 0;
AllCollections(std::vector<CollectionSchema>& table_schema_array, bool is_root = false) = 0;
virtual Status
UpdateCollectionFlag(const std::string& collection_id, int64_t flag) = 0;
......
......@@ -582,7 +582,7 @@ MySQLMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not,
}
Status
MySQLMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_array) {
MySQLMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root) {
try {
server::MetricCollector metric;
mysqlpp::StoreQueryResult res;
......@@ -599,8 +599,12 @@ MySQLMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_a
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, dimension, engine_type, index_params, index_file_size, metric_type"
<< " ,owner_table, partition_tag, version, flush_lsn"
<< " FROM " << META_TABLES << " WHERE state <> " << std::to_string(CollectionSchema::TO_DELETE)
<< " AND owner_table = \"\";";
<< " FROM " << META_TABLES << " WHERE state <> " << std::to_string(CollectionSchema::TO_DELETE);
if (is_root) {
statement << " AND owner_table = \"\";";
} else {
statement << ";";
}
LOG_ENGINE_DEBUG_ << "AllCollections: " << statement.str();
......@@ -1535,8 +1539,8 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id,
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT table_id, id, state, dimension, created_on, flag, index_file_size,"
<< " engine_type, index_params, metric_type, partition_tag, version FROM " << META_TABLES
<< " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> "
<< " engine_type, index_params, metric_type, partition_tag, version, flush_lsn FROM "
<< META_TABLES << " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> "
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
LOG_ENGINE_DEBUG_ << "ShowPartitions: " << statement.str();
......@@ -1559,6 +1563,7 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id,
partition_schema.owner_collection_ = collection_id;
resRow["partition_tag"].to_string(partition_schema.partition_tag_);
resRow["version"].to_string(partition_schema.version_);
partition_schema.flush_lsn_ = resRow["flush_lsn"];
partition_schema_array.emplace_back(partition_schema);
}
......@@ -2755,6 +2760,7 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
}
bool first_create = false;
uint64_t last_lsn = 0;
{
mysqlpp::StoreQueryResult res;
mysqlpp::Query statement = connectionPtr->query();
......@@ -2762,6 +2768,8 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
res = statement.store();
if (res.num_rows() == 0) {
first_create = true;
} else {
last_lsn = res[0]["global_lsn"];
}
}
......@@ -2773,7 +2781,7 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
if (!statement.exec()) {
return HandleException("QUERY ERROR WHEN SET GLOBAL LSN", statement.error());
}
} else {
} else if (lsn > last_lsn) {
mysqlpp::Query statement = connectionPtr->query();
statement << "UPDATE " << META_ENVIRONMENT << " SET global_lsn = " << lsn << ";";
LOG_ENGINE_DEBUG_ << "SetGlobalLastLSN: " << statement.str();
......@@ -2783,8 +2791,6 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
}
}
} // Scoped Connection
LOG_ENGINE_DEBUG_ << "Successfully update global_lsn: " << lsn;
} catch (std::exception& e) {
return HandleException("Failed to set global lsn", e.what());
}
......
......@@ -42,7 +42,7 @@ class MySQLMetaImpl : public Meta {
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override;
Status
AllCollections(std::vector<CollectionSchema>& collection_schema_array) override;
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
Status
DropCollection(const std::string& collection_id) override;
......
......@@ -309,29 +309,37 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
}
Status
SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_array) {
SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root) {
try {
fiu_do_on("SqliteMetaImpl.AllCollections.throw_exception", throw std::exception());
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto selected = ConnectorPtr->select(
columns(&CollectionSchema::id_,
&CollectionSchema::collection_id_,
&CollectionSchema::dimension_,
&CollectionSchema::created_on_,
&CollectionSchema::flag_,
&CollectionSchema::index_file_size_,
&CollectionSchema::engine_type_,
&CollectionSchema::index_params_,
&CollectionSchema::metric_type_,
&CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_,
&CollectionSchema::version_,
&CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
and c(&CollectionSchema::owner_collection_) == ""));
auto select_columns = columns(&CollectionSchema::id_,
&CollectionSchema::collection_id_,
&CollectionSchema::dimension_,
&CollectionSchema::created_on_,
&CollectionSchema::flag_,
&CollectionSchema::index_file_size_,
&CollectionSchema::engine_type_,
&CollectionSchema::index_params_,
&CollectionSchema::metric_type_,
&CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_,
&CollectionSchema::version_,
&CollectionSchema::flush_lsn_);
decltype(ConnectorPtr->select(select_columns)) selected;
if (is_root) {
selected = ConnectorPtr->select(select_columns,
where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
and c(&CollectionSchema::owner_collection_) == ""));
} else {
selected = ConnectorPtr->select(select_columns,
where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
}
for (auto& collection : selected) {
CollectionSchema schema;
schema.id_ = std::get<0>(collection);
......@@ -992,7 +1000,8 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id,
&CollectionSchema::metric_type_,
&CollectionSchema::partition_tag_,
&CollectionSchema::version_,
&CollectionSchema::collection_id_),
&CollectionSchema::collection_id_,
&CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::owner_collection_) == collection_id and
c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
......@@ -1011,6 +1020,7 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id,
partition_schema.partition_tag_ = std::get<9>(partitions[i]);
partition_schema.version_ = std::get<10>(partitions[i]);
partition_schema.collection_id_ = std::get<11>(partitions[i]);
partition_schema.flush_lsn_ = std::get<12>(partitions[i]);
partition_schema_array.emplace_back(partition_schema);
}
} catch (std::exception& e) {
......@@ -1939,7 +1949,7 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
ConnectorPtr->insert(env);
} else {
uint64_t last_lsn = std::get<0>(selected[0]);
if (lsn == last_lsn) {
if (lsn <= last_lsn) {
return Status::OK();
}
......
......@@ -44,7 +44,7 @@ class SqliteMetaImpl : public Meta {
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override;
Status
AllCollections(std::vector<CollectionSchema>& collection_schema_array) override;
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
Status
DropCollection(const std::string& collection_id) override;
......
......@@ -62,35 +62,55 @@ WalManager::Init(const meta::MetaPtr& meta) {
if (meta != nullptr) {
meta->GetGlobalLastLSN(recovery_start);
std::vector<meta::CollectionSchema> table_schema_array;
auto status = meta->AllCollections(table_schema_array);
std::vector<meta::CollectionSchema> collention_schema_array;
auto status = meta->AllCollections(collention_schema_array);
if (!status.ok()) {
return WAL_META_ERROR;
}
if (!table_schema_array.empty()) {
// get min and max flushed lsn
uint64_t min_flused_lsn = table_schema_array[0].flush_lsn_;
uint64_t max_flused_lsn = table_schema_array[0].flush_lsn_;
for (size_t i = 1; i < table_schema_array.size(); i++) {
if (min_flused_lsn > table_schema_array[i].flush_lsn_) {
min_flused_lsn = table_schema_array[i].flush_lsn_;
} else if (max_flused_lsn < table_schema_array[i].flush_lsn_) {
max_flused_lsn = table_schema_array[i].flush_lsn_;
if (!collention_schema_array.empty()) {
u_int64_t min_flushed_lsn = ~(u_int64_t)0;
u_int64_t max_flushed_lsn = 0;
auto update_limit_lsn = [&](u_int64_t lsn) {
if (min_flushed_lsn > lsn) {
min_flushed_lsn = lsn;
}
if (max_flushed_lsn < lsn) {
max_flushed_lsn = lsn;
}
};
for (auto& col_schema : collention_schema_array) {
auto& collection = collections_[col_schema.collection_id_];
auto& default_part = collection[""];
default_part.flush_lsn = col_schema.flush_lsn_;
update_limit_lsn(default_part.flush_lsn);
std::vector<meta::CollectionSchema> partition_schema_array;
status = meta->ShowPartitions(col_schema.collection_id_, partition_schema_array);
if (!status.ok()) {
return WAL_META_ERROR;
}
for (auto& par_schema : partition_schema_array) {
auto& partition = collection[par_schema.partition_tag_];
partition.flush_lsn = par_schema.flush_lsn_;
update_limit_lsn(partition.flush_lsn);
}
}
if (applied_lsn < max_flused_lsn) {
if (applied_lsn < max_flushed_lsn) {
// a new WAL folder?
applied_lsn = max_flused_lsn;
applied_lsn = max_flushed_lsn;
}
if (recovery_start < min_flused_lsn) {
if (recovery_start < min_flushed_lsn) {
// not flush all yet
recovery_start = min_flused_lsn;
recovery_start = min_flushed_lsn;
}
for (auto& schema : table_schema_array) {
TableLsn tb_lsn = {schema.flush_lsn_, applied_lsn};
tables_[schema.collection_id_] = tb_lsn;
for (auto& col : collections_) {
for (auto& part : col.second) {
part.second.wal_lsn = applied_lsn;
}
}
}
}
......@@ -141,9 +161,10 @@ WalManager::GetNextRecovery(MXLogRecord& record) {
// background thread has not started.
// so, needn't lock here.
auto it = tables_.find(record.collection_id);
if (it != tables_.end()) {
if (it->second.flush_lsn < record.lsn) {
auto it_col = collections_.find(record.collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(record.partition_tag);
if (it_part->second.flush_lsn < record.lsn) {
break;
}
}
......@@ -179,9 +200,10 @@ WalManager::GetNextEntityRecovery(milvus::engine::wal::MXLogRecord& record) {
// background thread has not started.
// so, needn't lock here.
auto it = tables_.find(record.collection_id);
if (it != tables_.end()) {
if (it->second.flush_lsn < record.lsn) {
auto it_col = collections_.find(record.collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(record.partition_tag);
if (it_part->second.flush_lsn < record.lsn) {
break;
}
}
......@@ -229,9 +251,10 @@ WalManager::GetNextRecord(MXLogRecord& record) {
}
std::lock_guard<std::mutex> lck(mutex_);
auto it = tables_.find(record.collection_id);
if (it != tables_.end()) {
if (it->second.flush_lsn < record.lsn) {
auto it_col = collections_.find(record.collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(record.partition_tag);
if (it_part->second.flush_lsn < record.lsn) {
break;
}
}
......@@ -275,9 +298,10 @@ WalManager::GetNextEntityRecord(milvus::engine::wal::MXLogRecord& record) {
}
std::lock_guard<std::mutex> lck(mutex_);
auto it = tables_.find(record.collection_id);
if (it != tables_.end()) {
if (it->second.flush_lsn < record.lsn) {
auto it_col = collections_.find(record.collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(record.partition_tag);
if (it_part->second.flush_lsn < record.lsn) {
break;
}
}
......@@ -293,7 +317,16 @@ WalManager::CreateCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "create collection " << collection_id << " " << last_applied_lsn_;
std::lock_guard<std::mutex> lck(mutex_);
uint64_t applied_lsn = last_applied_lsn_;
tables_[collection_id] = {applied_lsn, applied_lsn};
collections_[collection_id][""] = {applied_lsn, applied_lsn};
return applied_lsn;
}
uint64_t
WalManager::CreatePartition(const std::string& collection_id, const std::string& partition_tag) {
LOG_WAL_INFO_ << "create collection " << collection_id << " " << partition_tag << " " << last_applied_lsn_;
std::lock_guard<std::mutex> lck(mutex_);
uint64_t applied_lsn = last_applied_lsn_;
collections_[collection_id][partition_tag] = {applied_lsn, applied_lsn};
return applied_lsn;
}
......@@ -302,7 +335,7 @@ WalManager::CreateHybridCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "create hybrid collection " << collection_id << " " << last_applied_lsn_;
std::lock_guard<std::mutex> lck(mutex_);
uint64_t applied_lsn = last_applied_lsn_;
tables_[collection_id] = {applied_lsn, applied_lsn};
collections_[collection_id][""] = {applied_lsn, applied_lsn};
return applied_lsn;
}
......@@ -310,21 +343,84 @@ void
WalManager::DropCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "drop collection " << collection_id;
std::lock_guard<std::mutex> lck(mutex_);
tables_.erase(collection_id);
collections_.erase(collection_id);
}
void
WalManager::DropPartition(const std::string& collection_id, const std::string& partition_tag) {
LOG_WAL_INFO_ << collection_id << " drop partition " << partition_tag;
std::lock_guard<std::mutex> lck(mutex_);
auto it = collections_.find(collection_id);
if (it != collections_.end()) {
it->second.erase(partition_tag);
}
}
void
WalManager::CollectionFlushed(const std::string& collection_id, uint64_t lsn) {
std::unique_lock<std::mutex> lck(mutex_);
auto it = tables_.find(collection_id);
if (it != tables_.end()) {
it->second.flush_lsn = lsn;
if (collection_id.empty()) {
// all collections
for (auto& col : collections_) {
for (auto& part : col.second) {
part.second.flush_lsn = lsn;
}
}
} else {
// one collection
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
for (auto& part : it_col->second) {
part.second.flush_lsn = lsn;
}
}
}
lck.unlock();
LOG_WAL_INFO_ << collection_id << " is flushed by lsn " << lsn;
}
void
WalManager::PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) {
std::unique_lock<std::mutex> lck(mutex_);
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(partition_tag);
if (it_part != it_col->second.end()) {
it_part->second.flush_lsn = lsn;
}
}
lck.unlock();
LOG_WAL_INFO_ << collection_id << " " << partition_tag << " is flushed by lsn " << lsn;
}
void
WalManager::CollectionUpdated(const std::string& collection_id, uint64_t lsn) {
std::unique_lock<std::mutex> lck(mutex_);
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
for (auto& part : it_col->second) {
part.second.wal_lsn = lsn;
}
}
lck.unlock();
}
void
WalManager::PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) {
std::unique_lock<std::mutex> lck(mutex_);
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(partition_tag);
if (it_part != it_col->second.end()) {
it_part->second.wal_lsn = lsn;
}
}
lck.unlock();
}
template <typename T>
bool
WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids,
......@@ -380,13 +476,8 @@ WalManager::Insert(const std::string& collection_id, const std::string& partitio
new_lsn = record.lsn;
}
std::unique_lock<std::mutex> lck(mutex_);
last_applied_lsn_ = new_lsn;
auto it = tables_.find(collection_id);
if (it != tables_.end()) {
it->second.wal_lsn = new_lsn;
}
lck.unlock();
PartitionUpdated(collection_id, partition_tag, new_lsn);
LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag
<< " with lsn " << new_lsn;
......@@ -472,13 +563,8 @@ WalManager::InsertEntities(const std::string& collection_id, const std::string&
new_lsn = record.lsn;
}
std::unique_lock<std::mutex> lck(mutex_);
last_applied_lsn_ = new_lsn;
auto it = tables_.find(collection_id);
if (it != tables_.end()) {
it->second.wal_lsn = new_lsn;
}
lck.unlock();
PartitionUpdated(collection_id, partition_tag, new_lsn);
LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag
<< " with lsn " << new_lsn;
......@@ -525,13 +611,8 @@ WalManager::DeleteById(const std::string& collection_id, const IDNumbers& vector
new_lsn = record.lsn;
}
std::unique_lock<std::mutex> lck(mutex_);
last_applied_lsn_ = new_lsn;
auto it = tables_.find(collection_id);
if (it != tables_.end()) {
it->second.wal_lsn = new_lsn;
}
lck.unlock();
CollectionUpdated(collection_id, new_lsn);
LOG_WAL_INFO_ << collection_id << " delete rows by id, lsn " << new_lsn;
......@@ -548,19 +629,25 @@ WalManager::Flush(const std::string& collection_id) {
uint64_t lsn = 0;
if (collection_id.empty()) {
// flush all tables
for (auto& it : tables_) {
if (it.second.wal_lsn > it.second.flush_lsn) {
lsn = last_applied_lsn_;
break;
for (auto& col : collections_) {
for (auto& part : col.second) {
if (part.second.wal_lsn > part.second.flush_lsn) {
lsn = last_applied_lsn_;
break;
}
}
}
} else {
// flush one collection
auto it = tables_.find(collection_id);
if (it != tables_.end()) {
if (it->second.wal_lsn > it->second.flush_lsn) {
lsn = it->second.wal_lsn;
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
for (auto& part : it_col->second) {
auto wal_lsn = part.second.wal_lsn;
auto flush_lsn = part.second.flush_lsn;
if (wal_lsn > flush_lsn && wal_lsn > lsn) {
lsn = wal_lsn;
}
}
}
}
......
......@@ -62,6 +62,7 @@ class WalManager {
ErrorCode
GetNextEntityRecord(MXLogRecord& record);
/*
* Create collection
* @param collection_id: collection id
......@@ -70,6 +71,15 @@ class WalManager {
uint64_t
CreateCollection(const std::string& collection_id);
/*
* Create partition
* @param collection_id: collection id
* @param partition_tag: partition tag
* @retval lsn
*/
uint64_t
CreatePartition(const std::string& collection_id, const std::string& partition_tag);
/*
* Create hybrid collection
* @param collection_id: collection id
......@@ -87,13 +97,49 @@ class WalManager {
DropCollection(const std::string& collection_id);
/*
* Collection is flushed
* Drop partition
* @param collection_id: collection id
* @param partition_tag: partition tag
* @retval none
*/
void
DropPartition(const std::string& collection_id, const std::string& partition_tag);
/*
* Collection is flushed (update flushed_lsn)
* @param collection_id: collection id
* @param lsn: flushed lsn
*/
void
CollectionFlushed(const std::string& collection_id, uint64_t lsn);
/*
* Partition is flushed (update flushed_lsn)
* @param collection_id: collection id
* @param partition_tag: partition_tag
* @param lsn: flushed lsn
*/
void
PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn);
/*
* Collection is updated (update wal_lsn)
* @param collection_id: collection id
* @param partition_tag: partition_tag
* @param lsn: flushed lsn
*/
void
CollectionUpdated(const std::string& collection_id, uint64_t lsn);
/*
* Partition is updated (update wal_lsn)
* @param collection_id: collection id
* @param partition_tag: partition_tag
* @param lsn: flushed lsn
*/
void
PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn);
/*
* Insert
* @param collection_id: collection id
......@@ -155,7 +201,7 @@ class WalManager {
uint64_t wal_lsn;
};
std::mutex mutex_;
std::map<std::string, TableLsn> tables_;
std::map<std::string, std::map<std::string, TableLsn>> collections_;
std::atomic<uint64_t> last_applied_lsn_;
// if multi-thread call Flush(), use list
......
......@@ -60,7 +60,7 @@ class TestWalMeta : public SqliteMetaImpl {
}
Status
AllCollections(std::vector<CollectionSchema>& table_schema_array) override {
AllCollections(std::vector<CollectionSchema>& table_schema_array, bool is_root) override {
table_schema_array = tables_;
return Status::OK();
}
......@@ -88,7 +88,7 @@ class TestWalMetaError : public SqliteMetaImpl {
}
Status
AllCollections(std::vector<CollectionSchema>& table_schema_array) override {
AllCollections(std::vector<CollectionSchema>& table_schema_array, bool is_root) override {
return Status(DB_ERROR, "error");
}
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册