未验证 提交 bc74d163 编写于 作者: S shengjun.li 提交者: GitHub

#2283 Suspend the building tasks when any query command arrives (#2285)

* add builder suspend
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>

* IndexIVF builder check wait
Signed-off-by: Nsahuang <xiaohai.xu@zilliz.com>

* Build suspend for all IVF
Signed-off-by: Nsahuang <xiaohai.xu@zilliz.com>

* HNSW suspend
Signed-off-by: Nsahuang <xiaohai.xu@zilliz.com>

* HNSW suspend
Signed-off-by: Nsahuang <xiaohai.xu@zilliz.com>

* HNSW suspend
Signed-off-by: Nsahuang <xiaohai.xu@zilliz.com>

* HNSW suspend
Signed-off-by: Nsahuang <xiaohai.xu@zilliz.com>

* HNSW suspend
Signed-off-by: Nsahuang <xiaohai.xu@zilliz.com>

* cpubuilder
Signed-off-by: Nwxyu <xy.wang@zilliz.com>

* add suspend check during annoy build index
Signed-off-by: Ncmli <chengming.li@zilliz.com>

* fix hnsw
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>

* fix changelog
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>

* fix clang format
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>

* suspend nsg
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>
Co-authored-by: Nsahuang <xiaohai.xu@zilliz.com>
Co-authored-by: Nwxyu <xy.wang@zilliz.com>
Co-authored-by: Ncmli <chengming.li@zilliz.com>
Co-authored-by: NJinHai-CN <hai.jin@zilliz.com>
上级 253cda3a
......@@ -41,6 +41,7 @@ Please mark all change in change log and use the issue from GitHub
- \#2206 Log file rotating
- \#2240 Obtain running rpc requests information
- \#2268 Intelligently detect openblas library in system to avoid installing from source code every time
- \#2283 Suspend the building tasks when any query comand arrives.
## Improvement
- \#221 Refactor LOG macro
......
......@@ -21,6 +21,7 @@
#include <functional>
#include <iostream>
#include <limits>
#include <mutex>
#include <queue>
#include <set>
#include <thread>
......@@ -33,6 +34,7 @@
#include "db/IDGenerator.h"
#include "db/merge/MergeManagerFactory.h"
#include "engine/EngineFactory.h"
#include "index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h"
#include "index/thirdparty/faiss/utils/distances.h"
#include "insert/MemManagerFactory.h"
#include "meta/MetaConsts.h"
......@@ -1721,10 +1723,16 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, meta::FilesH
job->AddIndexFile(file_ptr);
}
// Suspend builder
SuspendIfFirst();
// step 2: put search job to scheduler and wait result
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitResult();
// Resume builder
ResumeIfLast();
files_holder.ReleaseFiles();
if (!job->GetStatus().ok()) {
return job->GetStatus();
......@@ -2649,5 +2657,23 @@ DBImpl::OnUseBlasThresholdChanged(int64_t threshold) {
faiss::distance_compute_blas_threshold = threshold;
}
void
DBImpl::SuspendIfFirst() {
std::lock_guard<std::mutex> lock(suspend_build_mutex_);
if (++live_search_num_ == 1) {
LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
knowhere::BuilderSuspend();
}
}
void
DBImpl::ResumeIfLast() {
std::lock_guard<std::mutex> lock(suspend_build_mutex_);
if (--live_search_num_ == 0) {
LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
knowhere::BuildResume();
}
}
} // namespace engine
} // namespace milvus
......@@ -278,6 +278,12 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
Status
ExecWalRecord(const wal::MXLogRecord& record);
void
SuspendIfFirst();
void
ResumeIfLast();
private:
DBOptions options_;
......@@ -357,6 +363,9 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
IndexFailedChecker index_failed_checker_;
std::mutex flush_merge_compact_mutex_;
int64_t live_search_num_ = 0;
std::mutex suspend_build_mutex_;
}; // DBImpl
} // namespace engine
......
......@@ -17,6 +17,7 @@
#include <utility>
#include <vector>
#include "faiss/BuilderSuspend.h"
#include "hnswlib/hnswalg.h"
#include "hnswlib/space_ip.h"
#include "hnswlib/space_l2.h"
......@@ -124,6 +125,7 @@ IndexHNSW::Add(const DatasetPtr& dataset_ptr, const Config& config) {
index_->addPoint(p_data, p_ids[0]);
#pragma omp parallel for
for (int i = 1; i < rows; ++i) {
faiss::BuilderSuspend::check_wait();
index_->addPoint(((float*)p_data + Dim() * i), p_ids[i]);
}
}
......
......@@ -30,6 +30,7 @@
#include <utility>
#include <vector>
#include "faiss/BuilderSuspend.h"
#include "knowhere/common/Exception.h"
#include "knowhere/common/Log.h"
#include "knowhere/index/vector_index/IndexIVF.h"
......@@ -256,6 +257,9 @@ IVF::GenGraph(const float* data, const int64_t k, GraphType& graph, const Config
graph.resize(ntotal);
GraphType res_vec(total_search_count);
for (int i = 0; i < total_search_count; ++i) {
// it is usually used in NSG::train, to check BuilderSuspend
faiss::BuilderSuspend::check_wait();
auto b_size = (i == (total_search_count - 1)) && tail_batch_size != 0 ? tail_batch_size : batch_size;
auto& res = res_vec[i];
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include "faiss/BuilderSuspend.h"
namespace milvus {
namespace knowhere {
inline void
BuilderSuspend() {
faiss::BuilderSuspend::suspend();
}
inline void
BuildResume() {
faiss::BuilderSuspend::resume();
}
} // namespace knowhere
} // namespace milvus
......@@ -19,6 +19,7 @@
#include <string>
#include <utility>
#include "faiss/BuilderSuspend.h"
#include "knowhere/common/Exception.h"
#include "knowhere/common/Log.h"
#include "knowhere/common/Timer.h"
......@@ -432,6 +433,7 @@ NsgIndex::Link() {
boost::dynamic_bitset<> flags{ntotal, 0};
#pragma omp for schedule(dynamic, 100)
for (size_t n = 0; n < ntotal; ++n) {
faiss::BuilderSuspend::check_wait();
fullset.clear();
temp.clear();
flags.reset();
......@@ -461,6 +463,7 @@ NsgIndex::Link() {
std::vector<std::mutex> mutex_vec(ntotal);
#pragma omp for schedule(dynamic, 100)
for (unsigned n = 0; n < ntotal; ++n) {
faiss::BuilderSuspend::check_wait();
InterInsert(n, mutex_vec, cut_graph_dist);
}
delete[] cut_graph_dist;
......@@ -611,6 +614,7 @@ NsgIndex::CheckConnectivity() {
int64_t linked_count = 0;
while (linked_count < static_cast<int64_t>(ntotal)) {
faiss::BuilderSuspend::check_wait();
DFS(root, has_linked, linked_count);
if (linked_count >= static_cast<int64_t>(ntotal)) {
break;
......
......@@ -126,6 +126,7 @@ inline void set_error_from_string(char **error, const char* msg) {
#endif
#include <faiss/FaissHook.h>
#include <faiss/BuilderSuspend.h>
using std::vector;
using std::pair;
......@@ -1280,6 +1281,7 @@ protected:
vector<S> children_indices[2];
Node* m = (Node*)alloca(_s);
D::create_split(children, _f, _s, _random, m);
faiss::BuilderSuspend::check_wait();
for (size_t i = 0; i < indices.size(); i++) {
S j = indices[i];
......@@ -1319,6 +1321,7 @@ protected:
m->n_descendants = is_root ? _n_items : (S)indices.size();
for (int side = 0; side < 2; side++) {
// run _make_tree for the smallest child first (for cache locality)
faiss::BuilderSuspend::check_wait();
m->children[side^flip] = _make_tree(children_indices[side^flip], false);
}
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "BuilderSuspend.h"
namespace faiss {
std::atomic<bool> BuilderSuspend::suspend_flag_(false);
std::mutex BuilderSuspend::mutex_;
std::condition_variable BuilderSuspend::cv_;
void BuilderSuspend::suspend() {
suspend_flag_ = true;
}
void BuilderSuspend::resume() {
suspend_flag_ = false;
}
void BuilderSuspend::check_wait() {
while (suspend_flag_) {
std::unique_lock<std::mutex> lck(mutex_);
cv_.wait_for(lck, std::chrono::seconds(5));
}
}
} // namespace faiss
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <atomic>
#include <condition_variable>
#include <mutex>
namespace faiss {
class BuilderSuspend {
public:
static void suspend();
static void resume();
static void check_wait();
private:
static std::atomic<bool> suspend_flag_;
static std::mutex mutex_;
static std::condition_variable cv_;
};
} // namespace faiss
......@@ -15,7 +15,7 @@
#include <cmath>
#include <omp.h>
#include <faiss/BuilderSuspend.h>
#include <faiss/FaissHook.h>
#include <faiss/impl/AuxIndexStructures.h>
#include <faiss/impl/FaissAssert.h>
......@@ -1015,6 +1015,8 @@ void elkan_L2_sse (
float *data = (float *) malloc((bs_y * (bs_y - 1) / 2) * sizeof (float));
for (size_t j0 = 0; j0 < ny; j0 += bs_y) {
BuilderSuspend::check_wait();
size_t j1 = j0 + bs_y;
if (j1 > ny) j1 = ny;
......
......@@ -186,6 +186,7 @@ void knn_jaccard (
size_t d, size_t nx, size_t ny,
float_maxheap_array_t * res,
ConcurrentBitsetPtr bitset = nullptr);
/** same as knn_L2sqr, but base_shift[bno] is subtracted to all
* computed distances.
*
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/CPUBuilder.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
void
CPUBuilder::Start() {
std::lock_guard<std::mutex> lock(mutex_);
if (not running_) {
running_ = true;
thread_ = std::thread(&CPUBuilder::worker_function, this);
}
}
void
CPUBuilder::Stop() {
std::lock_guard<std::mutex> lock(mutex_);
if (running_) {
this->Put(nullptr);
thread_.join();
running_ = false;
}
}
void
CPUBuilder::Put(const TaskPtr& task) {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
queue_.push(task);
}
queue_cv_.notify_one();
}
void
CPUBuilder::worker_function() {
SetThreadName("cpubuilder_thread");
while (running_) {
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cv_.wait(lock, [&] { return not queue_.empty(); });
auto task = queue_.front();
queue_.pop();
lock.unlock();
if (task == nullptr) {
// thread exit
break;
}
task->Load(LoadType::DISK2CPU, 0);
task->Execute();
}
}
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include "task/Task.h"
namespace milvus {
namespace scheduler {
class CPUBuilder {
public:
CPUBuilder() = default;
void
Start();
void
Stop();
void
Put(const TaskPtr& task);
private:
void
worker_function();
private:
bool running_ = false;
std::mutex mutex_;
std::thread thread_;
std::queue<TaskPtr> queue_;
std::condition_variable queue_cv_;
std::mutex queue_mutex_;
};
using CPUBuilderPtr = std::shared_ptr<CPUBuilder>;
} // namespace scheduler
} // namespace milvus
......@@ -20,6 +20,7 @@
#include "SchedInst.h"
#include "TaskCreator.h"
#include "scheduler/Algorithm.h"
#include "scheduler/CPUBuilder.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "selector/Optimizer.h"
#include "task/Task.h"
......@@ -140,7 +141,11 @@ JobMgr::worker_function() {
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
// if (auto disk = res_mgr_->GetCpuResources()[0].lock()) {
for (auto& task : tasks) {
disk->task_table().Put(task, nullptr);
if (task->Type() == TaskType::BuildIndexTask && task->path().Last() == "cpu") {
CPUBuilderInst::GetInstance()->Put(task);
} else {
disk->task_table().Put(task, nullptr);
}
}
}
}
......
......@@ -38,6 +38,9 @@ std::mutex OptimizerInst::mutex_;
BuildMgrPtr BuildMgrInst::instance = nullptr;
std::mutex BuildMgrInst::mutex_;
CPUBuilderPtr CPUBuilderInst::instance = nullptr;
std::mutex CPUBuilderInst::mutex_;
void
load_simple_config() {
// create and connect
......@@ -94,10 +97,12 @@ StartSchedulerService() {
ResMgrInst::GetInstance()->Start();
SchedInst::GetInstance()->Start();
JobMgrInst::GetInstance()->Start();
CPUBuilderInst::GetInstance()->Start();
}
void
StopSchedulerService() {
CPUBuilderInst::GetInstance()->Stop();
JobMgrInst::GetInstance()->Stop();
SchedInst::GetInstance()->Stop();
ResMgrInst::GetInstance()->Stop();
......
......@@ -12,6 +12,7 @@
#pragma once
#include "BuildMgr.h"
#include "CPUBuilder.h"
#include "JobMgr.h"
#include "ResourceMgr.h"
#include "Scheduler.h"
......@@ -157,6 +158,24 @@ class BuildMgrInst {
static std::mutex mutex_;
};
class CPUBuilderInst {
public:
static CPUBuilderPtr
GetInstance() {
if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) {
instance = std::make_shared<CPUBuilder>();
}
}
return instance;
}
private:
static CPUBuilderPtr instance;
static std::mutex mutex_;
};
void
StartSchedulerService();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册