提交 46f6abea 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!5810 add execution tree state to fix minddata profiling.

Merge pull request !5810 from anzhengqi/add-execution-tree-state
......@@ -104,6 +104,8 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
// Common code init and error checking in the base class.
RETURN_IF_NOT_OK(IteratorBase::FetchNextTensorRow(out_row));
bool isProfilingEnable = root_->Tree()->GetProfilingManager()->IsProfilingEnable();
// Once eof is handled, always return empty row. Class must be destroyed and recreated if you
// want to iterate again.
if (eof_handled_) {
......@@ -127,6 +129,9 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
if (curr_buffer_->eoe()) {
MS_LOG(INFO) << "End of data iteration.";
curr_buffer_.reset(); // explicitly free the eoe buffer
if (isProfilingEnable) {
root_->Tree()->SetEpochEnd();
}
return Status::OK();
}
......@@ -136,6 +141,7 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
if (curr_buffer_->eof()) {
eof_handled_ = true;
curr_buffer_.reset(); // explicitly free the eof buffer
root_->Tree()->SetFinished();
std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
RETURN_STATUS_UNEXPECTED(err);
}
......
......@@ -170,6 +170,7 @@ Status DeviceQueueOp::SendDataToAscend() {
if (isProfilingEnable) {
connector_size = ChildOpConnectorSize();
connector_capacity = ChildOpConnectorCapacity();
tree_->SetEpochEnd();
}
RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
}
......
......@@ -48,6 +48,7 @@ class ExecutionTree {
kDeTStatePrepare, // The tree has been assigned a root node and is pending prepare
kDeTStateReady, // The tree has been prepared and is ready to be launched
kDeTStateExecuting, // The tree has been launched and is executing
kDeTStateEpochEnd, // The tree has been received end of epoch signal, just for profiling
kDeTStateFinished // The tree has been drained, dataset iterator received EOF
};
......@@ -207,6 +208,16 @@ class ExecutionTree {
// @return raw pointer to the TaskGroup
TaskGroup *AllTasks() const { return tg_.get(); }
// Return if the ExecutionTree is at end of epoch status
// @return bool - true is ExecutionTree is end of epoch status
bool IsEpochEnd() const { return tree_state_ == TreeState::kDeTStateEpochEnd; }
// Set the ExecutionTree to EOE state
void SetEpochEnd() { tree_state_ = TreeState::kDeTStateEpochEnd; }
// Set the ExecutionTree to executing state
void SetExecuting() { tree_state_ = TreeState::kDeTStateExecuting; }
// Return if the ExecutionTree is finished (iterator receives EOF).
// @return Bool - true is ExecutionTree is finished
bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; }
......
......@@ -36,6 +36,10 @@ Status Monitor::operator()() {
// 1) Monitor Task is not interrupted by TaskManager AND
// 2) Iterator has not received EOF
while (!this_thread::is_interrupted() && !(tree_->isFinished())) {
if (tree_->IsEpochEnd()) {
tree_->GetProfilingManager()->SaveProfilingData();
tree_->SetExecuting();
}
for (auto &node : tree_->GetProfilingManager()->GetSamplingNodes()) {
RETURN_IF_NOT_OK(node.second->Sample());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册