diff options
Diffstat (limited to 'runtime/neurun/core/src/exec/DataflowExecutor.cc')
-rw-r--r-- | runtime/neurun/core/src/exec/DataflowExecutor.cc | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/runtime/neurun/core/src/exec/DataflowExecutor.cc b/runtime/neurun/core/src/exec/DataflowExecutor.cc new file mode 100644 index 000000000..e22d41031 --- /dev/null +++ b/runtime/neurun/core/src/exec/DataflowExecutor.cc @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DataflowExecutor.h" + +#include <cassert> + +#include "util/logging.h" + +namespace neurun +{ +namespace exec +{ + +int64_t DataflowExecutor::calculateRank(const std::vector<ir::Element> &operations) +{ + int64_t rank = 0; + if (!_indexed_ranks) + { + return rank; + } + for (const auto &element : operations) + { + auto it = _indexed_ranks->find(element.index); + if (it == _indexed_ranks->end()) + { + assert(element.node->opcode() == ir::OpCode::Permute); + // assign int32_t::max to prevent integer overflow + rank += std::numeric_limits<int32_t>::max(); + } + else + { + rank += it->second; + } + } + return rank; +} + +void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id) +{ + auto &job = _waiting_jobs[id]; + assert(job != nullptr); + auto &subg = _graph.subgraphs()->at(_job_to_op_seq[job->index()]); + auto rank = calculateRank(subg.operations()); + _ready_jobs.emplace(rank, std::move(job)); +} + +void DataflowExecutor::notify(uint32_t finished_job_id) +{ + for (auto id : _output_info[finished_job_id]) + { + assert(_input_info[id] > 0); + auto count = --_input_info[id]; + if (count == 0) // No dependent jobs left, ready for execution + { + emplaceToReadyJobs(id); + } + } +} +bool DataflowExecutor::noWaitingJobs() +{ + return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(), + [](const std::unique_ptr<Job> &job) { return job == nullptr; }); +} + +DataflowExecutor::DataflowExecutor(const ir::Graph &graph, + const std::shared_ptr<compiler::OperandContext> &operand_context, + std::unique_ptr<backend::TensorManagerSet> tensor_mgrs, + CodeMap &&code_map) + : ExecutorBase{graph, operand_context, std::move(tensor_mgrs)}, _code_map{std::move(code_map)} +{ + VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl; + + const ir::Subgraphs *subgraphs = _graph.subgraphs(); + // Assign jobs convert SubgraphIndex to job index(uint32_t) + uint32_t next_job_index = 0; + std::unordered_map<ir::SubgraphIndex, uint32_t> subgraph_to_job; + subgraphs->iterate([&](const ir::SubgraphIndex &subg_index, const ir::OpSequence &) { + VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with SubgraphIndex " + << subg_index.value() << std::endl; + _finished_jobs.emplace_back( + nnfw::cpp14::make_unique<Job>(next_job_index, _code_map.at(subg_index).get())); + subgraph_to_job[subg_index] = next_job_index++; + }); + + _waiting_jobs.resize(next_job_index); + _output_info.resize(next_job_index); + _initial_input_info.resize(next_job_index, 0); + + subgraphs->iterate([&](const ir::SubgraphIndex &subg_index, const ir::OpSequence &subg) { + auto job_index = subgraph_to_job[subg_index]; + for (auto output : subg.getOutputs()) + { + // Update output and input info + subgraphs->iterate( + [&](const ir::SubgraphIndex &subg_cur_index, const ir::OpSequence &subg_cur) { + if (subg_cur.getInputs().contains(output)) + { + auto dep_index = subgraph_to_job[subg_cur_index]; + ++_initial_input_info[dep_index]; + _output_info[job_index].push_back(dep_index); + } + }); + } + }); + for (const auto &s : subgraph_to_job) + _job_to_op_seq.emplace(s.second, s.first); + + _input_info = _initial_input_info; +} + +void DataflowExecutor::executeImpl() +{ + assert(noWaitingJobs()); + + // Execution setup + _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs + + for (uint32_t i = 0; i < _waiting_jobs.size(); ++i) + { + if (_input_info[i] == 0) + { + emplaceToReadyJobs(i); + } + } + assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs + bool is_profiling = util::getConfigBool(util::config::PROFILING_MODE); + + _subject.notifyModelBegin(this); + + while (!_ready_jobs.empty()) + { + auto job = std::move((_ready_jobs.begin())->second); + _ready_jobs.erase(_ready_jobs.begin()); + auto job_index = job->index(); + VERBOSE(DataflowExecutor) << "Run job #" << job_index << std::endl; + + auto subgraph_index = _job_to_op_seq[job_index]; + auto op_seq = &_graph.subgraphs()->at(subgraph_index); + const backend::Backend *backend = + _graph.getLowerInfo()->operation.at(subgraph_index)->backend(); + + _subject.notifyJobBegin(this, op_seq, backend); + + if (is_profiling) + job->fn()->runSync(); + else + job->run(); + + _subject.notifyJobEnd(this, op_seq, backend); + notify(job_index); + _finished_jobs[job_index] = std::move(job); + } + assert(noWaitingJobs()); + + _subject.notifyModelEnd(this); + + // Reset input info for the next execution + _input_info = _initial_input_info; +} + +} // namespace exec +} // namespace neurun |