diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 75ffe57e258ec5d4d333753188d0d7baf8076181..4d61ad31c8bfb35fb9ec8531285a1338b274ae26 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -1081,7 +1081,8 @@ DBImpl::BackgroundMerge(std::set collection_ids, bool force_merge_all) for (auto& collection_id : collection_ids) { const std::lock_guard lock(flush_merge_compact_mutex_); - auto status = merge_mgr_ptr_->MergeFiles(collection_id); + MergeStrategyType type = force_merge_all ? MergeStrategyType::SIMPLE : MergeStrategyType::LAYERED; + auto status = merge_mgr_ptr_->MergeSegments(collection_id, type); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Failed to get merge files for collection id: " << collection_id << " reason:" << status.message(); diff --git a/core/src/db/merge/MergeLayerStrategy.cpp b/core/src/db/merge/MergeLayerStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..237841a27d34271c4d02c83e98dce9121d6e668b --- /dev/null +++ b/core/src/db/merge/MergeLayerStrategy.cpp @@ -0,0 +1,117 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/MergeLayerStrategy.h" +#include "db/Utils.h" +#include "utils/Log.h" + +#include +#include + +namespace milvus { +namespace engine { + +namespace { +const int64_t FORCE_MERGE_THREASHOLD = 30; // force merge files older this time(in second) + +using LayerGroups = std::map; + +void +ConstructLayers(LayerGroups& groups, int64_t row_count_per_segment) { + groups.clear(); + int64_t power = 12; + while (true) { + int64_t key = 1UL << power; + power += 2; + groups.insert(std::pair(key, SegmentInfoList())); + if (key >= row_count_per_segment || key >= MAX_SEGMENT_ROW_COUNT) { + break; + } + } +} +} // namespace + +Status +MergeLayerStrategy::RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, + SegmentGroups& groups) { + auto now = utils::GetMicroSecTimeStamp(); + for (auto& kv : part2segment) { + if (kv.second.size() <= 1) { + continue; // no segment or only one segment, no need to merge + } + + LayerGroups layers; + ConstructLayers(layers, row_per_segment); + + // distribute segments to layers according to segment row count + SegmentInfoList temp_list = kv.second; + for (SegmentInfoList::iterator iter = temp_list.begin(); iter != temp_list.end();) { + SegmentInfo& segment_info = *iter; + if (segment_info.row_count_ <= 0 || segment_info.row_count_ >= row_per_segment) { + iter = temp_list.erase(iter); + continue; // empty segment or full segment + } + + for (auto layer_iter = layers.begin(); layer_iter != layers.end(); ++layer_iter) { + if (segment_info.row_count_ < layer_iter->first) { + layer_iter->second.push_back(segment_info); + break; + } + } + + iter = temp_list.erase(iter); + } + + // if some segment's create time is 30 seconds ago, and it still un-merged, force merge with upper layer + SegmentInfoList force_list; + for (auto& pair : layers) { + SegmentInfoList& segments = pair.second; + if (!force_list.empty()) { + segments.insert(segments.begin(), force_list.begin(), force_list.end()); + force_list.clear(); + } + + if (segments.size() == 1) { + if (now - segments[0].create_on_ > (int64_t)(FORCE_MERGE_THREASHOLD * 1000)) { + force_list.swap(segments); + } + } + } + + // merge for each layer + for (auto& pair : layers) { + snapshot::IDS_TYPE ids; + int64_t row_count_sum = 0; + SegmentInfoList& segments = pair.second; + for (auto& segment : segments) { + ids.push_back(segment.id_); + row_count_sum += segment.row_count_; + if (row_count_sum >= row_per_segment) { + if (ids.size() >= 2) { + groups.push_back(ids); + } + ids.clear(); + row_count_sum = 0; + continue; + } + } + + if (ids.size() >= 2) { + groups.push_back(ids); + } + } + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeLayerStrategy.h b/core/src/db/merge/MergeLayerStrategy.h new file mode 100644 index 0000000000000000000000000000000000000000..1634060db6160b0df97ba1914b53c3d134318f1d --- /dev/null +++ b/core/src/db/merge/MergeLayerStrategy.h @@ -0,0 +1,30 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include + +#include "db/merge/MergeStrategy.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class MergeLayerStrategy : public MergeStrategy { + public: + Status + RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, SegmentGroups& groups) override; +}; // MergeSimpleStrategy + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeManager.h b/core/src/db/merge/MergeManager.h index 095e41be951e365c24244cdd099e59613f5e00ba..df44883bd57c9950351fd00d0e455a462effcca5 100644 --- a/core/src/db/merge/MergeManager.h +++ b/core/src/db/merge/MergeManager.h @@ -24,29 +24,27 @@ namespace milvus { namespace engine { // 1. SIMPLE -// merge in old way, merge files one by one, stop merge until file size exceed index_file_size +// merge in old way, merge segment one by one, stop merge until segment row count exceed segment_row_count // 2. LAYERED -// distribute files to several groups according to file size -// firstly, define layers by file size: 4MB, 16MB, 64MB, 256MB, 1024MB -// if file size between 0MB~4MB, put it into layer "4" -// if file size between 4MB~16MB, put it into layer "16" -// if file size between 16MB~64MB, put it into layer "64" -// if file size between 64MB~256MB, put it into layer "256" -// if file size between 256MB~1024MB, put it into layer "1024" -// secondly, merge files for each group -// third, if some file's create time is 30 seconds ago, and it still un-merged, force merge with upper layer files -// 3. ADAPTIVE -// Pick files that sum of size is close to index_file_size, merge them +// distribute segments to several groups according to segment row count +// firstly, define layers by row count: 4KB, 16KB, 64KB, 256KB, 1024KB +// if segment row count between 0KB~4KB, put it into layer "4096" +// if segment row count between 4KB~16KB, put it into layer "16384" +// if segment row count between 16KB~64KB, put it into layer "65536" +// if segment row count between 64KB~256KB, put it into layer "262144" +// if segment row count between 256KB~1024KB, put it into layer "1048576" +// file row count greater than 1024KB, put into layer MAX_SEGMENT_ROW_COUNT +// secondly, merge segments for each group +// third, if some segment's create time is 30 seconds ago, and it still un-merged, force merge with upper layer enum class MergeStrategyType { SIMPLE = 1, LAYERED = 2, - ADAPTIVE = 3, }; class MergeManager { public: virtual Status - MergeFiles(int64_t collection_id, MergeStrategyType type = MergeStrategyType::SIMPLE) = 0; + MergeSegments(int64_t collection_id, MergeStrategyType type = MergeStrategyType::LAYERED) = 0; }; // MergeManager using MergeManagerPtr = std::shared_ptr; diff --git a/core/src/db/merge/MergeManagerImpl.cpp b/core/src/db/merge/MergeManagerImpl.cpp index 1fb5dcd0865d69689565e7a02269f3076430d6ad..e2eedbcb79a99cb201d9252e1c71b9d05918214b 100644 --- a/core/src/db/merge/MergeManagerImpl.cpp +++ b/core/src/db/merge/MergeManagerImpl.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/merge/MergeManagerImpl.h" +#include "db/merge/MergeLayerStrategy.h" #include "db/merge/MergeSimpleStrategy.h" #include "db/merge/MergeTask.h" #include "db/snapshot/Snapshots.h" @@ -17,6 +18,7 @@ #include "utils/Log.h" #include +#include namespace milvus { namespace engine { @@ -31,8 +33,10 @@ MergeManagerImpl::CreateStrategy(MergeStrategyType type, MergeStrategyPtr& strat strategy = std::make_shared(); break; } - case MergeStrategyType::LAYERED: - case MergeStrategyType::ADAPTIVE: + case MergeStrategyType::LAYERED: { + strategy = std::make_shared(); + break; + } default: { std::string msg = "Unsupported merge strategy type: " + std::to_string(static_cast(type)); LOG_ENGINE_ERROR_ << msg; @@ -44,7 +48,7 @@ MergeManagerImpl::CreateStrategy(MergeStrategyType type, MergeStrategyPtr& strat } Status -MergeManagerImpl::MergeFiles(int64_t collection_id, MergeStrategyType type) { +MergeManagerImpl::MergeSegments(int64_t collection_id, MergeStrategyType type) { MergeStrategyPtr strategy; auto status = CreateStrategy(type, strategy); if (!status.ok()) { @@ -59,12 +63,31 @@ MergeManagerImpl::MergeFiles(int64_t collection_id, MergeStrategyType type) { Partition2SegmentsMap part2seg; auto& segments = latest_ss->GetResources(); for (auto& kv : segments) { - part2seg[kv.second->GetPartitionId()].push_back(kv.second->GetID()); + snapshot::ID_TYPE segment_id = kv.second->GetID(); + auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id); + if (segment_commit == nullptr) { + continue; // maybe stale + } + + SegmentInfo info(segment_id, segment_commit->GetRowCount(), segment_commit->GetCreatedTime()); + part2seg[kv.second->GetPartitionId()].emplace_back(info); + } + + if (part2seg.empty()) { + break; // nothing to merge + } + + // get row count per segment + auto collection = latest_ss->GetCollection(); + int64_t row_count_per_segment = DEFAULT_SEGMENT_ROW_COUNT; + const json params = collection->GetParams(); + if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) { + row_count_per_segment = params[PARAM_SEGMENT_ROW_COUNT]; } // distribute segments to groups by some strategy SegmentGroups segment_groups; - auto status = strategy->RegroupSegments(latest_ss, part2seg, segment_groups); + auto status = strategy->RegroupSegments(part2seg, row_count_per_segment, segment_groups); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Failed to regroup segments for collection: " << latest_ss->GetName() << ", continue to merge all files into one"; diff --git a/core/src/db/merge/MergeManagerImpl.h b/core/src/db/merge/MergeManagerImpl.h index a1bd7576f23627db30cb2d47340cb2a513f8355e..f64a3810ee7a6aac49d08b74e127f3b26e262541 100644 --- a/core/src/db/merge/MergeManagerImpl.h +++ b/core/src/db/merge/MergeManagerImpl.h @@ -32,7 +32,7 @@ class MergeManagerImpl : public MergeManager { explicit MergeManagerImpl(const DBOptions& options); Status - MergeFiles(int64_t collection_id, MergeStrategyType type) override; + MergeSegments(int64_t collection_id, MergeStrategyType type) override; private: Status diff --git a/core/src/db/merge/MergeSimpleStrategy.cpp b/core/src/db/merge/MergeSimpleStrategy.cpp index ac1f717af017bba439db33b3896ce0d513d86cad..b39f42e76cfda62f529bedea7030e0ddc275ca32 100644 --- a/core/src/db/merge/MergeSimpleStrategy.cpp +++ b/core/src/db/merge/MergeSimpleStrategy.cpp @@ -10,23 +10,14 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/merge/MergeSimpleStrategy.h" -#include "db/snapshot/Snapshots.h" #include "utils/Log.h" namespace milvus { namespace engine { Status -MergeSimpleStrategy::RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment, +MergeSimpleStrategy::RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, SegmentGroups& groups) { - auto collection = ss->GetCollection(); - - int64_t row_count_per_segment = DEFAULT_SEGMENT_ROW_COUNT; - const json params = collection->GetParams(); - if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) { - row_count_per_segment = params[PARAM_SEGMENT_ROW_COUNT]; - } - for (auto& kv : part2segment) { if (kv.second.size() <= 1) { continue; // no segment or only one segment, no need to merge @@ -34,20 +25,14 @@ MergeSimpleStrategy::RegroupSegments(const snapshot::ScopedSnapshotT& ss, const snapshot::IDS_TYPE ids; int64_t row_count_sum = 0; - for (auto& id : kv.second) { - auto segment_commit = ss->GetSegmentCommitBySegmentId(id); - if (segment_commit == nullptr) { - continue; // maybe stale - } - - auto segment_row = segment_commit->GetRowCount(); - if (segment_row <= 0) { - continue; // empty segment? + for (const SegmentInfo& segment_info : kv.second) { + if (segment_info.row_count_ <= 0 || segment_info.row_count_ >= row_per_segment) { + continue; // empty segment or full segment } - ids.push_back(id); - row_count_sum += segment_row; - if (row_count_sum >= row_count_per_segment) { + ids.push_back(segment_info.id_); + row_count_sum += segment_info.row_count_; + if (row_count_sum >= row_per_segment) { if (ids.size() >= 2) { groups.push_back(ids); } diff --git a/core/src/db/merge/MergeSimpleStrategy.h b/core/src/db/merge/MergeSimpleStrategy.h index d5a757cee7177b9b7177c36747f2227639f455a8..b2d1ca5b08145d8dacd89ee563c8fb2363e2f1ab 100644 --- a/core/src/db/merge/MergeSimpleStrategy.h +++ b/core/src/db/merge/MergeSimpleStrategy.h @@ -23,8 +23,7 @@ namespace engine { class MergeSimpleStrategy : public MergeStrategy { public: Status - RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment, - SegmentGroups& groups) override; + RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, SegmentGroups& groups) override; }; // MergeSimpleStrategy } // namespace engine diff --git a/core/src/db/merge/MergeStrategy.h b/core/src/db/merge/MergeStrategy.h index 03ed48bcad3d2734a583da3da30e74cb95b4d8f0..5749b2501bf458a6cd6e9ac5b11a7c9b3a276115 100644 --- a/core/src/db/merge/MergeStrategy.h +++ b/core/src/db/merge/MergeStrategy.h @@ -11,28 +11,38 @@ #pragma once -#include #include #include #include +#include +#include #include #include "db/Types.h" #include "db/snapshot/ResourceTypes.h" -#include "db/snapshot/Snapshot.h" #include "utils/Status.h" namespace milvus { namespace engine { -using Partition2SegmentsMap = std::map; +struct SegmentInfo { + SegmentInfo(snapshot::ID_TYPE id, int64_t row_count, snapshot::TS_TYPE create_on) + : id_(id), row_count_(row_count), create_on_(create_on) { + } + + snapshot::ID_TYPE id_ = 0; + int64_t row_count_ = 0; + snapshot::TS_TYPE create_on_ = 0; +}; + +using SegmentInfoList = std::vector; +using Partition2SegmentsMap = std::unordered_map; using SegmentGroups = std::vector; class MergeStrategy { public: virtual Status - RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment, - SegmentGroups& groups) = 0; + RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, SegmentGroups& groups) = 0; }; // MergeStrategy using MergeStrategyPtr = std::shared_ptr; diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index ab18be3c3c2f8dc46e5c13c9b40be66d616d0f1c..4bc6106a6506c3fe53bc2f90cce78a96969fa479 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -19,6 +19,8 @@ #include #include +#include "db/merge/MergeLayerStrategy.h" +#include "db/merge/MergeSimpleStrategy.h" #include "db/SnapshotUtils.h" #include "db/SnapshotVisitor.h" #include "db/snapshot/IterateHandler.h" @@ -576,6 +578,68 @@ TEST_F(DBTest, InsertTest) { do_insert(false, false); } +TEST(MergeTest, MergeStrategyTest) { + milvus::engine::Partition2SegmentsMap part2segments; + milvus::engine::SegmentInfoList segmet_list_1 = { + milvus::engine::SegmentInfo(1, 100, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(2, 2500, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(3, 300, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(4, 5, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(5, 60000, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(6, 99999, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(7, 60, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(8, 800, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(9, 6600, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(10, 110000, milvus::engine::utils::GetMicroSecTimeStamp()), + }; + milvus::engine::SegmentInfoList segmet_list_2 = { + milvus::engine::SegmentInfo(11, 9000, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(12, 1500, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(13, 20, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(14, 1, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(15, 100001, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(16, 15000, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(17, 0, milvus::engine::utils::GetMicroSecTimeStamp()), + }; + milvus::engine::SegmentInfoList segmet_list_3 = { + milvus::engine::SegmentInfo(21, 19000, milvus::engine::utils::GetMicroSecTimeStamp()), + milvus::engine::SegmentInfo(22, 40500, milvus::engine::utils::GetMicroSecTimeStamp() - 2000000), + milvus::engine::SegmentInfo(23, 1000, milvus::engine::utils::GetMicroSecTimeStamp() - 1000000), + }; + part2segments.insert(std::make_pair(1, segmet_list_1)); + part2segments.insert(std::make_pair(2, segmet_list_2)); + part2segments.insert(std::make_pair(3, segmet_list_3)); + + int64_t row_per_segment = 100000; + { + milvus::engine::SegmentGroups groups; + milvus::engine::MergeSimpleStrategy strategy; + auto status = strategy.RegroupSegments(part2segments, row_per_segment, groups); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(groups.size(), 4); + std::set compare = {3, 3, 5, 6}; + std::set result; + for (auto& group : groups) { + result.insert(group.size()); + } + ASSERT_EQ(compare, result); + } + + { + milvus::engine::SegmentGroups groups; + milvus::engine::MergeLayerStrategy strategy; + auto status = strategy.RegroupSegments(part2segments, row_per_segment, groups); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(groups.size(), 4); + std::set compare = {2, 3, 3, 6}; + std::set result; + for (auto& group : groups) { + result.insert(group.size()); + } + ASSERT_EQ(compare, result); + } +} + TEST_F(DBTest, MergeTest) { std::string collection_name = "MERGE_TEST"; auto status = CreateCollection2(db_, collection_name, 0);