未验证 提交 771585ba 编写于 作者: C Cai Yudong 提交者: GitHub

use BlockingQueue in JobMgr (#2885)

* using BlockingQueue in JobMgr
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix unittest
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 52ab18fe
......@@ -34,7 +34,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#2689 Construct Knowhere Index Without Data
- \#2798 hnsw-sq8 support
- \#2802 Add new index: IVFSQ8NR
- \#2834 add C++ sdk support 4 hnsw_sq8nr
- \#2834 Add C++ sdk support 4 hnsw_sq8nr
## Improvement
- \#2543 Remove secondary_path related code
......@@ -45,6 +45,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#2828 Let Faiss not to compile half float by default
- \#2841 Replace IndexType/EngineType/MetricType
- \#2858 Unify index name in db
- \#2884 Using BlockingQueue in JobMgr
## Task
......
......@@ -9,21 +9,19 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/JobMgr.h"
#include "src/db/Utils.h"
#include "src/segment/SegmentReader.h"
#include <limits>
#include <string>
#include <utility>
#include "SchedInst.h"
#include "TaskCreator.h"
#include "db/Utils.h"
#include "scheduler/Algorithm.h"
#include "scheduler/CPUBuilder.h"
#include "scheduler/JobMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/TaskCreator.h"
#include "scheduler/selector/Optimizer.h"
#include "scheduler/task/Task.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "selector/Optimizer.h"
#include "task/Task.h"
namespace milvus {
namespace scheduler {
......@@ -33,99 +31,50 @@ JobMgr::JobMgr(ResourceMgrPtr res_mgr) : res_mgr_(std::move(res_mgr)) {
void
JobMgr::Start() {
if (not running_) {
running_ = true;
worker_thread_ = std::thread(&JobMgr::worker_function, this);
if (worker_thread_ == nullptr) {
worker_thread_ = std::make_shared<std::thread>(&JobMgr::worker_function, this);
}
}
void
JobMgr::Stop() {
if (running_) {
if (worker_thread_ != nullptr) {
this->Put(nullptr);
worker_thread_.join();
running_ = false;
worker_thread_->join();
worker_thread_ = nullptr;
}
}
json
JobMgr::Dump() const {
json ret{
{"running", running_},
{"event_queue_length", queue_.size()},
{"running", (worker_thread_ != nullptr ? true : false)},
{"event_queue_length", queue_.Size()},
};
return ret;
}
void
JobMgr::Put(const JobPtr& job) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(job);
}
cv_.notify_one();
queue_.Put(job);
}
void
JobMgr::worker_function() {
SetThreadName("jobmgr_thread");
while (running_) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !queue_.empty(); });
auto job = queue_.front();
queue_.pop();
lock.unlock();
while (true) {
auto job = queue_.Take();
if (job == nullptr) {
break;
}
auto tasks = build_task(job);
// TODO(zhiru): if the job is search by ids, pass any task where the ids don't exist
auto search_job = std::dynamic_pointer_cast<SearchJob>(job);
if (search_job != nullptr) {
search_job->GetResultIds().resize(search_job->nq(), -1);
search_job->GetResultDistances().resize(search_job->nq(), std::numeric_limits<float>::max());
if (search_job->vectors().float_data_.empty() && search_job->vectors().binary_data_.empty() &&
!search_job->vectors().id_array_.empty()) {
for (auto task = tasks.begin(); task != tasks.end();) {
auto search_task = std::static_pointer_cast<XSearchTask>(*task);
auto location = search_task->GetLocation();
// Load bloom filter
std::string segment_dir;
engine::utils::GetParentPath(location, segment_dir);
segment::SegmentReader segment_reader(segment_dir);
segment::IdBloomFilterPtr id_bloom_filter_ptr;
segment_reader.LoadBloomFilter(id_bloom_filter_ptr);
// Check if the id is present.
bool pass = true;
for (auto& id : search_job->vectors().id_array_) {
if (id_bloom_filter_ptr->Check(id)) {
pass = false;
break;
}
}
if (pass) {
// std::cout << search_task->GetIndexId() << std::endl;
search_job->SearchDone(search_task->GetIndexId());
task = tasks.erase(task);
} else {
task++;
}
}
}
}
// for (auto &task : tasks) {
// if ...
// search_job->SearchDone(task->id);
// tasks.erase(task);
// }
auto tasks = build_task(job);
for (auto& task : tasks) {
OptimizerInst::GetInstance()->Run(task);
}
......
......@@ -10,21 +10,15 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "ResourceMgr.h"
#include "interface/interfaces.h"
#include "job/Job.h"
#include "task/Task.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/interface/interfaces.h"
#include "scheduler/job/Job.h"
#include "scheduler/task/Task.h"
#include "utils/BlockingQueue.h"
namespace milvus {
namespace scheduler {
......@@ -58,14 +52,8 @@ class JobMgr : public interface::dumpable {
calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task);
private:
bool running_ = false;
std::queue<JobPtr> queue_;
std::thread worker_thread_;
std::mutex mutex_;
std::condition_variable cv_;
BlockingQueue<JobPtr> queue_;
std::shared_ptr<std::thread> worker_thread_ = nullptr;
ResourceMgrPtr res_mgr_ = nullptr;
};
......
......@@ -68,13 +68,13 @@ class BlockingQueue {
}
size_t
Size() {
Size() const {
std::lock_guard<std::mutex> lock(mtx);
return queue_.size();
}
bool
Empty() {
Empty() const {
std::unique_lock<std::mutex> lock(mtx);
return queue_.empty();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册