提交 3fa2c1e6 编写于 作者: W wxyu

Set task state MOVED after resource copy it completed


Former-commit-id: a97e306b62f6a0a7a06c881e93a973ad75b8ac9d
上级 de2bb68d
......@@ -15,6 +15,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#96 - Remove .a file in milvus/lib for docker-version
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
- \#122 - Add unique id for Job
- \#130 - Set task state MOVED after resource copy it completed
## Feature
- \#115 - Using new structure for tasktable
......
......@@ -91,7 +91,7 @@ JobMgr::worker_function() {
// disk resources NEVER be empty.
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
for (auto& task : tasks) {
disk->task_table().Put(task);
disk->task_table().Put(task, nullptr);
}
}
}
......
......@@ -120,7 +120,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
if (resource->HasExecutor() == false) {
load_completed_event->task_table_item_->Move();
}
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_, resource);
break;
}
default: { break; }
......
......@@ -264,8 +264,8 @@ TaskTable::PickToExecute(uint64_t limit) {
}
void
TaskTable::Put(TaskPtr task) {
auto item = std::make_shared<TaskTableItem>();
TaskTable::Put(TaskPtr task, TaskTableItemPtr from) {
auto item = std::make_shared<TaskTableItem>(std::move(from));
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
......@@ -276,21 +276,6 @@ TaskTable::Put(TaskPtr task) {
}
}
void
TaskTable::Put(std::vector<TaskPtr>& tasks) {
for (auto& task : tasks) {
auto item = std::make_shared<TaskTableItem>();
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
item->timestamp.start = get_current_timestamp();
table_.put(std::move(item));
}
if (subscriber_) {
subscriber_();
}
}
size_t
TaskTable::TaskToExecute() {
size_t count = 0;
......
......@@ -58,8 +58,12 @@ struct TaskTimestamp : public interface::dumpable {
Dump() const override;
};
struct TaskTableItem;
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
struct TaskTableItem : public interface::dumpable {
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
explicit TaskTableItem(TaskTableItemPtr f = nullptr)
: id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex(), from(std::move(f)) {
}
TaskTableItem(const TaskTableItem& src) = delete;
......@@ -70,6 +74,7 @@ struct TaskTableItem : public interface::dumpable {
TaskTableItemState state; // the state;
std::mutex mutex;
TaskTimestamp timestamp;
TaskTableItemPtr from;
bool
IsFinish();
......@@ -96,8 +101,6 @@ struct TaskTableItem : public interface::dumpable {
Dump() const override;
};
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
class TaskTable : public interface::dumpable {
public:
TaskTable() : table_(1ULL << 16ULL) {
......@@ -120,14 +123,7 @@ class TaskTable : public interface::dumpable {
* Put one task;
*/
void
Put(TaskPtr task);
/*
* Put tasks back of task table;
* Called by DBImpl;
*/
void
Put(std::vector<TaskPtr>& tasks);
Put(TaskPtr task, TaskTableItemPtr from = nullptr);
size_t
TaskToExecute();
......
......@@ -28,13 +28,13 @@ namespace scheduler {
class Action {
public:
static void
PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self);
PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self);
static void
PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self);
PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self);
static void
PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest);
PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest);
static void
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
......
......@@ -59,7 +59,7 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
}
void
Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
Action::PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self) {
auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) {
std::vector<uint64_t> speeds;
......@@ -78,7 +78,7 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
for (uint64_t i = 0; i < speeds.size(); ++i) {
rd_speed -= speeds[i];
if (rd_speed <= 0) {
neighbours[i].first->task_table().Put(task);
neighbours[i].first->task_table().Put(task_item->task, task_item);
return;
}
}
......@@ -89,22 +89,23 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
}
void
Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
Action::PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self) {
auto neighbours = get_neighbours(self);
for (auto& neighbour : neighbours) {
neighbour->task_table().Put(task);
neighbour->task_table().Put(task_item->task, task_item);
}
}
void
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
dest->task_table().Put(task);
Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest) {
dest->task_table().Put(task_item->task, task_item);
}
void
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
bool moved = false;
......@@ -119,7 +120,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
PushTaskToResource(event->task_table_item_->task, dest_resource);
PushTaskToResource(event->task_table_item_, dest_resource);
break;
}
}
......@@ -127,7 +128,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
}
if (not moved) {
PushTaskToNeighbourRandomly(task, resource);
PushTaskToNeighbourRandomly(task_item, resource);
}
}
}
......@@ -135,6 +136,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
void
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
if (resource->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
......@@ -213,7 +215,7 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou
// next_res->task_table().Put(task);
// }
event->task_table_item_->Move();
next_res->task_table().Put(task);
next_res->task_table().Put(task, task_item);
}
}
......
......@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "Job.h"
#include "scheduler/job/Job.h"
namespace milvus {
namespace scheduler {
......
......@@ -180,6 +180,10 @@ Resource::loader_function() {
}
LoadFile(task_item->task);
task_item->Loaded();
if (task_item->from) {
task_item->from->Moved();
task_item->from = nullptr;
}
if (subscriber_) {
auto event = std::make_shared<LoadCompletedEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event));
......
......@@ -193,16 +193,13 @@ TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
TEST_F(TaskTableBaseTest, PUT_BATCH) {
std::vector<milvus::scheduler::TaskPtr> tasks{task1_, task2_};
empty_table_.Put(tasks);
for (auto& task : tasks) {
empty_table_.Put(task);
}
ASSERT_EQ(empty_table_.at(0)->task, task1_);
ASSERT_EQ(empty_table_.at(1)->task, task2_);
}
TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
std::vector<milvus::scheduler::TaskPtr> tasks{};
empty_table_.Put(tasks);
}
TEST_F(TaskTableBaseTest, SIZE) {
ASSERT_EQ(empty_table_.size(), 0);
empty_table_.Put(task1_);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册