未验证 提交 d654d81b 编写于 作者: G groot 提交者: GitHub

layer merge strategy (#3486)

* layer merge strategy
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* merge strategy unittest
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 736bb61e
......@@ -1081,7 +1081,8 @@ DBImpl::BackgroundMerge(std::set<int64_t> collection_ids, bool force_merge_all)
for (auto& collection_id : collection_ids) {
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
auto status = merge_mgr_ptr_->MergeFiles(collection_id);
MergeStrategyType type = force_merge_all ? MergeStrategyType::SIMPLE : MergeStrategyType::LAYERED;
auto status = merge_mgr_ptr_->MergeSegments(collection_id, type);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection id: " << collection_id
<< " reason:" << status.message();
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeLayerStrategy.h"
#include "db/Utils.h"
#include "utils/Log.h"
#include <map>
#include <utility>
namespace milvus {
namespace engine {
namespace {
const int64_t FORCE_MERGE_THREASHOLD = 30; // force merge files older this time(in second)
using LayerGroups = std::map<int64_t, SegmentInfoList>;
void
ConstructLayers(LayerGroups& groups, int64_t row_count_per_segment) {
groups.clear();
int64_t power = 12;
while (true) {
int64_t key = 1UL << power;
power += 2;
groups.insert(std::pair(key, SegmentInfoList()));
if (key >= row_count_per_segment || key >= MAX_SEGMENT_ROW_COUNT) {
break;
}
}
}
} // namespace
Status
MergeLayerStrategy::RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment,
SegmentGroups& groups) {
auto now = utils::GetMicroSecTimeStamp();
for (auto& kv : part2segment) {
if (kv.second.size() <= 1) {
continue; // no segment or only one segment, no need to merge
}
LayerGroups layers;
ConstructLayers(layers, row_per_segment);
// distribute segments to layers according to segment row count
SegmentInfoList temp_list = kv.second;
for (SegmentInfoList::iterator iter = temp_list.begin(); iter != temp_list.end();) {
SegmentInfo& segment_info = *iter;
if (segment_info.row_count_ <= 0 || segment_info.row_count_ >= row_per_segment) {
iter = temp_list.erase(iter);
continue; // empty segment or full segment
}
for (auto layer_iter = layers.begin(); layer_iter != layers.end(); ++layer_iter) {
if (segment_info.row_count_ < layer_iter->first) {
layer_iter->second.push_back(segment_info);
break;
}
}
iter = temp_list.erase(iter);
}
// if some segment's create time is 30 seconds ago, and it still un-merged, force merge with upper layer
SegmentInfoList force_list;
for (auto& pair : layers) {
SegmentInfoList& segments = pair.second;
if (!force_list.empty()) {
segments.insert(segments.begin(), force_list.begin(), force_list.end());
force_list.clear();
}
if (segments.size() == 1) {
if (now - segments[0].create_on_ > (int64_t)(FORCE_MERGE_THREASHOLD * 1000)) {
force_list.swap(segments);
}
}
}
// merge for each layer
for (auto& pair : layers) {
snapshot::IDS_TYPE ids;
int64_t row_count_sum = 0;
SegmentInfoList& segments = pair.second;
for (auto& segment : segments) {
ids.push_back(segment.id_);
row_count_sum += segment.row_count_;
if (row_count_sum >= row_per_segment) {
if (ids.size() >= 2) {
groups.push_back(ids);
}
ids.clear();
row_count_sum = 0;
continue;
}
}
if (ids.size() >= 2) {
groups.push_back(ids);
}
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeLayerStrategy : public MergeStrategy {
public:
Status
RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, SegmentGroups& groups) override;
}; // MergeSimpleStrategy
} // namespace engine
} // namespace milvus
......@@ -24,29 +24,27 @@ namespace milvus {
namespace engine {
// 1. SIMPLE
// merge in old way, merge files one by one, stop merge until file size exceed index_file_size
// merge in old way, merge segment one by one, stop merge until segment row count exceed segment_row_count
// 2. LAYERED
// distribute files to several groups according to file size
// firstly, define layers by file size: 4MB, 16MB, 64MB, 256MB, 1024MB
// if file size between 0MB~4MB, put it into layer "4"
// if file size between 4MB~16MB, put it into layer "16"
// if file size between 16MB~64MB, put it into layer "64"
// if file size between 64MB~256MB, put it into layer "256"
// if file size between 256MB~1024MB, put it into layer "1024"
// secondly, merge files for each group
// third, if some file's create time is 30 seconds ago, and it still un-merged, force merge with upper layer files
// 3. ADAPTIVE
// Pick files that sum of size is close to index_file_size, merge them
// distribute segments to several groups according to segment row count
// firstly, define layers by row count: 4KB, 16KB, 64KB, 256KB, 1024KB
// if segment row count between 0KB~4KB, put it into layer "4096"
// if segment row count between 4KB~16KB, put it into layer "16384"
// if segment row count between 16KB~64KB, put it into layer "65536"
// if segment row count between 64KB~256KB, put it into layer "262144"
// if segment row count between 256KB~1024KB, put it into layer "1048576"
// file row count greater than 1024KB, put into layer MAX_SEGMENT_ROW_COUNT
// secondly, merge segments for each group
// third, if some segment's create time is 30 seconds ago, and it still un-merged, force merge with upper layer
enum class MergeStrategyType {
SIMPLE = 1,
LAYERED = 2,
ADAPTIVE = 3,
};
class MergeManager {
public:
virtual Status
MergeFiles(int64_t collection_id, MergeStrategyType type = MergeStrategyType::SIMPLE) = 0;
MergeSegments(int64_t collection_id, MergeStrategyType type = MergeStrategyType::LAYERED) = 0;
}; // MergeManager
using MergeManagerPtr = std::shared_ptr<MergeManager>;
......
......@@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeManagerImpl.h"
#include "db/merge/MergeLayerStrategy.h"
#include "db/merge/MergeSimpleStrategy.h"
#include "db/merge/MergeTask.h"
#include "db/snapshot/Snapshots.h"
......@@ -17,6 +18,7 @@
#include "utils/Log.h"
#include <map>
#include <utility>
namespace milvus {
namespace engine {
......@@ -31,8 +33,10 @@ MergeManagerImpl::CreateStrategy(MergeStrategyType type, MergeStrategyPtr& strat
strategy = std::make_shared<MergeSimpleStrategy>();
break;
}
case MergeStrategyType::LAYERED:
case MergeStrategyType::ADAPTIVE:
case MergeStrategyType::LAYERED: {
strategy = std::make_shared<MergeLayerStrategy>();
break;
}
default: {
std::string msg = "Unsupported merge strategy type: " + std::to_string(static_cast<int32_t>(type));
LOG_ENGINE_ERROR_ << msg;
......@@ -44,7 +48,7 @@ MergeManagerImpl::CreateStrategy(MergeStrategyType type, MergeStrategyPtr& strat
}
Status
MergeManagerImpl::MergeFiles(int64_t collection_id, MergeStrategyType type) {
MergeManagerImpl::MergeSegments(int64_t collection_id, MergeStrategyType type) {
MergeStrategyPtr strategy;
auto status = CreateStrategy(type, strategy);
if (!status.ok()) {
......@@ -59,12 +63,31 @@ MergeManagerImpl::MergeFiles(int64_t collection_id, MergeStrategyType type) {
Partition2SegmentsMap part2seg;
auto& segments = latest_ss->GetResources<snapshot::Segment>();
for (auto& kv : segments) {
part2seg[kv.second->GetPartitionId()].push_back(kv.second->GetID());
snapshot::ID_TYPE segment_id = kv.second->GetID();
auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
if (segment_commit == nullptr) {
continue; // maybe stale
}
SegmentInfo info(segment_id, segment_commit->GetRowCount(), segment_commit->GetCreatedTime());
part2seg[kv.second->GetPartitionId()].emplace_back(info);
}
if (part2seg.empty()) {
break; // nothing to merge
}
// get row count per segment
auto collection = latest_ss->GetCollection();
int64_t row_count_per_segment = DEFAULT_SEGMENT_ROW_COUNT;
const json params = collection->GetParams();
if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) {
row_count_per_segment = params[PARAM_SEGMENT_ROW_COUNT];
}
// distribute segments to groups by some strategy
SegmentGroups segment_groups;
auto status = strategy->RegroupSegments(latest_ss, part2seg, segment_groups);
auto status = strategy->RegroupSegments(part2seg, row_count_per_segment, segment_groups);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to regroup segments for collection: " << latest_ss->GetName()
<< ", continue to merge all files into one";
......
......@@ -32,7 +32,7 @@ class MergeManagerImpl : public MergeManager {
explicit MergeManagerImpl(const DBOptions& options);
Status
MergeFiles(int64_t collection_id, MergeStrategyType type) override;
MergeSegments(int64_t collection_id, MergeStrategyType type) override;
private:
Status
......
......@@ -10,23 +10,14 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeSimpleStrategy.h"
#include "db/snapshot/Snapshots.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
Status
MergeSimpleStrategy::RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment,
MergeSimpleStrategy::RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment,
SegmentGroups& groups) {
auto collection = ss->GetCollection();
int64_t row_count_per_segment = DEFAULT_SEGMENT_ROW_COUNT;
const json params = collection->GetParams();
if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) {
row_count_per_segment = params[PARAM_SEGMENT_ROW_COUNT];
}
for (auto& kv : part2segment) {
if (kv.second.size() <= 1) {
continue; // no segment or only one segment, no need to merge
......@@ -34,20 +25,14 @@ MergeSimpleStrategy::RegroupSegments(const snapshot::ScopedSnapshotT& ss, const
snapshot::IDS_TYPE ids;
int64_t row_count_sum = 0;
for (auto& id : kv.second) {
auto segment_commit = ss->GetSegmentCommitBySegmentId(id);
if (segment_commit == nullptr) {
continue; // maybe stale
}
auto segment_row = segment_commit->GetRowCount();
if (segment_row <= 0) {
continue; // empty segment?
for (const SegmentInfo& segment_info : kv.second) {
if (segment_info.row_count_ <= 0 || segment_info.row_count_ >= row_per_segment) {
continue; // empty segment or full segment
}
ids.push_back(id);
row_count_sum += segment_row;
if (row_count_sum >= row_count_per_segment) {
ids.push_back(segment_info.id_);
row_count_sum += segment_info.row_count_;
if (row_count_sum >= row_per_segment) {
if (ids.size() >= 2) {
groups.push_back(ids);
}
......
......@@ -23,8 +23,7 @@ namespace engine {
class MergeSimpleStrategy : public MergeStrategy {
public:
Status
RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment,
SegmentGroups& groups) override;
RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, SegmentGroups& groups) override;
}; // MergeSimpleStrategy
} // namespace engine
......
......@@ -11,28 +11,38 @@
#pragma once
#include <map>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "db/Types.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshot.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
using Partition2SegmentsMap = std::map<snapshot::ID_TYPE, snapshot::IDS_TYPE>;
struct SegmentInfo {
SegmentInfo(snapshot::ID_TYPE id, int64_t row_count, snapshot::TS_TYPE create_on)
: id_(id), row_count_(row_count), create_on_(create_on) {
}
snapshot::ID_TYPE id_ = 0;
int64_t row_count_ = 0;
snapshot::TS_TYPE create_on_ = 0;
};
using SegmentInfoList = std::vector<SegmentInfo>;
using Partition2SegmentsMap = std::unordered_map<snapshot::ID_TYPE, SegmentInfoList>;
using SegmentGroups = std::vector<snapshot::IDS_TYPE>;
class MergeStrategy {
public:
virtual Status
RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment,
SegmentGroups& groups) = 0;
RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, SegmentGroups& groups) = 0;
}; // MergeStrategy
using MergeStrategyPtr = std::shared_ptr<MergeStrategy>;
......
......@@ -19,6 +19,8 @@
#include <experimental/filesystem>
#include <src/cache/CpuCacheMgr.h>
#include "db/merge/MergeLayerStrategy.h"
#include "db/merge/MergeSimpleStrategy.h"
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
#include "db/snapshot/IterateHandler.h"
......@@ -576,6 +578,68 @@ TEST_F(DBTest, InsertTest) {
do_insert(false, false);
}
TEST(MergeTest, MergeStrategyTest) {
milvus::engine::Partition2SegmentsMap part2segments;
milvus::engine::SegmentInfoList segmet_list_1 = {
milvus::engine::SegmentInfo(1, 100, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(2, 2500, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(3, 300, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(4, 5, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(5, 60000, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(6, 99999, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(7, 60, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(8, 800, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(9, 6600, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(10, 110000, milvus::engine::utils::GetMicroSecTimeStamp()),
};
milvus::engine::SegmentInfoList segmet_list_2 = {
milvus::engine::SegmentInfo(11, 9000, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(12, 1500, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(13, 20, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(14, 1, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(15, 100001, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(16, 15000, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(17, 0, milvus::engine::utils::GetMicroSecTimeStamp()),
};
milvus::engine::SegmentInfoList segmet_list_3 = {
milvus::engine::SegmentInfo(21, 19000, milvus::engine::utils::GetMicroSecTimeStamp()),
milvus::engine::SegmentInfo(22, 40500, milvus::engine::utils::GetMicroSecTimeStamp() - 2000000),
milvus::engine::SegmentInfo(23, 1000, milvus::engine::utils::GetMicroSecTimeStamp() - 1000000),
};
part2segments.insert(std::make_pair(1, segmet_list_1));
part2segments.insert(std::make_pair(2, segmet_list_2));
part2segments.insert(std::make_pair(3, segmet_list_3));
int64_t row_per_segment = 100000;
{
milvus::engine::SegmentGroups groups;
milvus::engine::MergeSimpleStrategy strategy;
auto status = strategy.RegroupSegments(part2segments, row_per_segment, groups);
ASSERT_TRUE(status.ok());
ASSERT_EQ(groups.size(), 4);
std::set<size_t> compare = {3, 3, 5, 6};
std::set<size_t> result;
for (auto& group : groups) {
result.insert(group.size());
}
ASSERT_EQ(compare, result);
}
{
milvus::engine::SegmentGroups groups;
milvus::engine::MergeLayerStrategy strategy;
auto status = strategy.RegroupSegments(part2segments, row_per_segment, groups);
ASSERT_TRUE(status.ok());
ASSERT_EQ(groups.size(), 4);
std::set<size_t> compare = {2, 3, 3, 6};
std::set<size_t> result;
for (auto& group : groups) {
result.insert(group.size());
}
ASSERT_EQ(compare, result);
}
}
TEST_F(DBTest, MergeTest) {
std::string collection_name = "MERGE_TEST";
auto status = CreateCollection2(db_, collection_name, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册