SnapshotUtils.cpp 10.2 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11 12
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "db/SnapshotUtils.h"
G
groot 已提交
13 14
#include "db/SnapshotHandlers.h"
#include "db/SnapshotVisitor.h"
G
groot 已提交
15
#include "db/Utils.h"
G
groot 已提交
16 17 18 19 20
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Resources.h"
#include "db/snapshot/Snapshots.h"
#include "segment/Segment.h"

G
groot 已提交
21
#include <algorithm>
G
groot 已提交
22
#include <memory>
G
groot 已提交
23
#include <unordered_map>
G
groot 已提交
24 25 26 27 28 29
#include <utility>
#include <vector>

namespace milvus {
namespace engine {

G
groot 已提交
30 31 32 33 34
const char* JSON_ROW_COUNT = "row_count";
const char* JSON_ID = "id";
const char* JSON_PARTITIONS = "partitions";
const char* JSON_SEGMENTS = "segments";
const char* JSON_FIELD = "field";
C
Cai Yudong 已提交
35 36
const char* JSON_FIELD_ELEMENT = "field_element";
const char* JSON_PARTITION_TAG = "tag";
G
groot 已提交
37
const char* JSON_FILES = "files";
G
groot 已提交
38
const char* JSON_NAME = "name";
G
groot 已提交
39
const char* JSON_INDEX_TYPE = "index_type";
G
groot 已提交
40 41
const char* JSON_DATA_SIZE = "data_size";
const char* JSON_PATH = "path";
G
groot 已提交
42

G
groot 已提交
43 44 45 46 47 48 49 50 51 52 53
Status
SetSnapshotIndex(const std::string& collection_name, const std::string& field_name,
                 engine::CollectionIndex& index_info) {
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
    auto field = ss->GetField(field_name);
    if (field == nullptr) {
        return Status(DB_ERROR, "Invalid field name");
    }

    snapshot::OperationContext ss_context;
G
groot 已提交
54 55 56 57
    auto index_element =
        std::make_shared<snapshot::FieldElement>(ss->GetCollectionId(), field->GetID(), index_info.index_name_,
                                                 milvus::engine::FieldElementType::FET_INDEX, index_info.index_type_);
    ss_context.new_field_elements.push_back(index_element);
G
groot 已提交
58
    if (IsVectorField(field)) {
G
groot 已提交
59
        milvus::json json;
G
groot 已提交
60
        json[engine::PARAM_INDEX_METRIC_TYPE] = index_info.metric_name_;
G
groot 已提交
61
        json[engine::PARAM_INDEX_EXTRA_PARAMS] = index_info.extra_params_;
G
groot 已提交
62
        index_element->SetParams(json);
G
groot 已提交
63

G
groot 已提交
64
        if (utils::RequireCompressFile(index_info.index_type_)) {
G
groot 已提交
65 66
            auto compress_element =
                std::make_shared<snapshot::FieldElement>(ss->GetCollectionId(), field->GetID(), ELEMENT_INDEX_COMPRESS,
G
groot 已提交
67
                                                         milvus::engine::FieldElementType::FET_COMPRESS);
G
groot 已提交
68
            ss_context.new_field_elements.push_back(compress_element);
G
groot 已提交
69
        }
G
groot 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
    }

    auto op = std::make_shared<snapshot::AddFieldElementOperation>(ss_context, ss);
    auto status = op->Push();
    if (!status.ok()) {
        return status;
    }

    return Status::OK();
}

Status
GetSnapshotIndex(const std::string& collection_name, const std::string& field_name,
                 engine::CollectionIndex& index_info) {
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto field = ss->GetField(field_name);
    if (field == nullptr) {
        return Status(DB_ERROR, "Invalid field name");
    }

    auto field_elements = ss->GetFieldElementsByField(field_name);
G
groot 已提交
93
    if (IsVectorField(field)) {
G
groot 已提交
94
        for (auto& field_element : field_elements) {
95
            if (field_element->GetFEtype() == engine::FieldElementType::FET_INDEX) {
G
groot 已提交
96
                index_info.index_name_ = field_element->GetName();
G
groot 已提交
97
                index_info.index_type_ = field_element->GetTypeName();
G
groot 已提交
98 99
                auto json = field_element->GetParams();
                if (json.find(engine::PARAM_INDEX_METRIC_TYPE) != json.end()) {
G
groot 已提交
100
                    index_info.metric_name_ = json[engine::PARAM_INDEX_METRIC_TYPE];
G
groot 已提交
101 102 103 104 105 106 107 108 109
                }
                if (json.find(engine::PARAM_INDEX_EXTRA_PARAMS) != json.end()) {
                    index_info.extra_params_ = json[engine::PARAM_INDEX_EXTRA_PARAMS];
                }
                break;
            }
        }
    } else {
        for (auto& field_element : field_elements) {
110
            if (field_element->GetFEtype() == engine::FieldElementType::FET_INDEX) {
G
groot 已提交
111 112
                index_info.index_name_ = field_element->GetName();
                index_info.index_type_ = field_element->GetTypeName();
G
groot 已提交
113 114 115 116 117 118 119 120 121
            }
        }
    }

    return Status::OK();
}

Status
DeleteSnapshotIndex(const std::string& collection_name, const std::string& field_name) {
G
groot 已提交
122 123 124 125 126 127 128 129 130
    // drop for all fields or drop for one field?
    std::vector<std::string> field_names;
    if (field_name.empty()) {
        snapshot::ScopedSnapshotT ss;
        STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
        field_names = ss->GetFieldNames();
    } else {
        field_names.push_back(field_name);
    }
G
groot 已提交
131

G
groot 已提交
132 133 134 135 136
    for (auto& name : field_names) {
        snapshot::ScopedSnapshotT ss;
        STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
        std::vector<snapshot::FieldElementPtr> elements = ss->GetFieldElementsByField(name);
        for (auto& element : elements) {
137
            if (element->GetFEtype() == engine::FieldElementType::FET_INDEX ||
G
groot 已提交
138
                element->GetFEtype() == engine::FieldElementType::FET_COMPRESS) {
G
groot 已提交
139
                snapshot::OperationContext context;
W
Wang XiangYu 已提交
140
                context.stale_field_elements.push_back(element);
G
groot 已提交
141 142 143
                auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
                STATUS_CHECK(op->Push());
            }
G
groot 已提交
144 145 146 147 148 149
        }
    }

    return Status::OK();
}

G
groot 已提交
150 151 152 153 154 155
bool
IsVectorField(const engine::snapshot::FieldPtr& field) {
    if (field == nullptr) {
        return false;
    }

C
Cai Yudong 已提交
156
    auto ftype = static_cast<engine::DataType>(field->GetFtype());
G
groot 已提交
157 158 159 160 161 162
    return IsVectorField(ftype);
}

bool
IsVectorField(engine::DataType type) {
    return type == engine::DataType::VECTOR_FLOAT || type == engine::DataType::VECTOR_BINARY;
G
groot 已提交
163 164
}

G
groot 已提交
165
Status
G
groot 已提交
166
GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
G
groot 已提交
167 168 169 170
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    size_t total_row_count = 0;
G
groot 已提交
171
    size_t total_data_size = 0;
G
groot 已提交
172

G
groot 已提交
173
    // get partition information
G
groot 已提交
174 175 176 177 178 179
    std::unordered_map<snapshot::ID_TYPE, milvus::json> partitions;
    auto partition_names = ss->GetPartitionNames();
    for (auto& name : partition_names) {
        auto partition = ss->GetPartition(name);

        milvus::json json_partition;
C
Cai Yudong 已提交
180
        json_partition[JSON_PARTITION_TAG] = name;
G
groot 已提交
181 182 183 184 185
        json_partition[JSON_ID] = partition->GetID();

        auto partition_commit = ss->GetPartitionCommitByPartitionId(partition->GetID());
        json_partition[JSON_ROW_COUNT] = partition_commit->GetRowCount();
        total_row_count += partition_commit->GetRowCount();
G
groot 已提交
186 187
        json_partition[JSON_DATA_SIZE] = partition_commit->GetSize();
        total_data_size += partition_commit->GetSize();
G
groot 已提交
188 189 190 191

        partitions.insert(std::make_pair(partition->GetID(), json_partition));
    }

G
groot 已提交
192
    // just ensure segments listed in id order
G
groot 已提交
193 194 195
    snapshot::IDS_TYPE segment_ids;
    auto handler = std::make_shared<SegmentsToSearchCollector>(ss, segment_ids);
    handler->Iterate();
G
groot 已提交
196
    std::sort(segment_ids.begin(), segment_ids.end());
G
groot 已提交
197

G
groot 已提交
198
    // get segment information and construct segment json nodes
G
groot 已提交
199
    std::unordered_map<snapshot::ID_TYPE, std::vector<milvus::json>> json_partition_segments;
G
groot 已提交
200 201 202 203 204 205
    for (auto id : segment_ids) {
        auto segment_commit = ss->GetSegmentCommitBySegmentId(id);
        if (segment_commit == nullptr) {
            continue;
        }

G
groot 已提交
206
        milvus::json json_files;
G
groot 已提交
207 208 209 210 211
        auto seg_visitor = engine::SegmentVisitor::Build(ss, id);
        auto& field_visitors = seg_visitor->GetFieldVisitors();
        for (auto& iter : field_visitors) {
            const engine::snapshot::FieldPtr& field = iter.second->GetField();

G
groot 已提交
212
            auto& elements = iter.second->GetElementVistors();
C
chen qingxiang 已提交
213
            for (const auto& pair : elements) {
G
groot 已提交
214 215
                if (pair.second == nullptr || pair.second->GetElement() == nullptr) {
                    continue;
G
groot 已提交
216 217
                }

G
groot 已提交
218 219 220 221 222 223 224
                milvus::json json_file;
                auto element = pair.second->GetElement();
                if (pair.second->GetFile()) {
                    json_file[JSON_DATA_SIZE] = pair.second->GetFile()->GetSize();
                    json_file[JSON_PATH] =
                        engine::snapshot::GetResPath<engine::snapshot::SegmentFile>("", pair.second->GetFile());
                    json_file[JSON_FIELD] = field->GetName();
G
groot 已提交
225 226 227

                    // if the element is index, print index name/type
                    // else print element name
228
                    if (element->GetFEtype() == engine::FieldElementType::FET_INDEX) {
G
groot 已提交
229
                        json_file[JSON_NAME] = element->GetName();
G
groot 已提交
230 231
                        json_file[JSON_INDEX_TYPE] = element->GetTypeName();
                    } else {
G
groot 已提交
232
                        json_file[JSON_NAME] = element->GetName();
G
groot 已提交
233
                    }
G
groot 已提交
234 235
                }
                json_files.push_back(json_file);
G
groot 已提交
236 237 238 239 240 241 242
            }
        }

        milvus::json json_segment;
        json_segment[JSON_ID] = id;
        json_segment[JSON_ROW_COUNT] = segment_commit->GetRowCount();
        json_segment[JSON_DATA_SIZE] = segment_commit->GetSize();
G
groot 已提交
243 244
        json_segment[JSON_FILES] = json_files;
        json_partition_segments[segment_commit->GetPartitionId()].push_back(json_segment);
G
groot 已提交
245 246
    }

G
groot 已提交
247
    // construct partition json nodes
G
groot 已提交
248 249 250
    milvus::json json_partitions;
    for (auto pair : partitions) {
        milvus::json json_segments;
G
groot 已提交
251
        auto seg_array = json_partition_segments[pair.first];
G
groot 已提交
252 253 254 255 256 257 258 259
        for (auto& json : seg_array) {
            json_segments.push_back(json);
        }
        pair.second[JSON_SEGMENTS] = json_segments;
        json_partitions.push_back(pair.second);
    }

    json_info[JSON_ROW_COUNT] = total_row_count;
G
groot 已提交
260
    json_info[JSON_DATA_SIZE] = total_data_size;
G
groot 已提交
261 262 263 264 265
    json_info[JSON_PARTITIONS] = json_partitions;

    return Status::OK();
}

G
groot 已提交
266 267
}  // namespace engine
}  // namespace milvus