summaryrefslogtreecommitdiff
path: root/runtime/neurun/core/src/exec/DataflowExecutor.cc
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/neurun/core/src/exec/DataflowExecutor.cc')
-rw-r--r--runtime/neurun/core/src/exec/DataflowExecutor.cc176
1 files changed, 176 insertions, 0 deletions
diff --git a/runtime/neurun/core/src/exec/DataflowExecutor.cc b/runtime/neurun/core/src/exec/DataflowExecutor.cc
new file mode 100644
index 000000000..e22d41031
--- /dev/null
+++ b/runtime/neurun/core/src/exec/DataflowExecutor.cc
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DataflowExecutor.h"
+
+#include <cassert>
+
+#include "util/logging.h"
+
+namespace neurun
+{
+namespace exec
+{
+
+int64_t DataflowExecutor::calculateRank(const std::vector<ir::Element> &operations)
+{
+ int64_t rank = 0;
+ if (!_indexed_ranks)
+ {
+ return rank;
+ }
+ for (const auto &element : operations)
+ {
+ auto it = _indexed_ranks->find(element.index);
+ if (it == _indexed_ranks->end())
+ {
+ assert(element.node->opcode() == ir::OpCode::Permute);
+ // assign int32_t::max to prevent integer overflow
+ rank += std::numeric_limits<int32_t>::max();
+ }
+ else
+ {
+ rank += it->second;
+ }
+ }
+ return rank;
+}
+
+void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id)
+{
+ auto &job = _waiting_jobs[id];
+ assert(job != nullptr);
+ auto &subg = _graph.subgraphs()->at(_job_to_op_seq[job->index()]);
+ auto rank = calculateRank(subg.operations());
+ _ready_jobs.emplace(rank, std::move(job));
+}
+
+void DataflowExecutor::notify(uint32_t finished_job_id)
+{
+ for (auto id : _output_info[finished_job_id])
+ {
+ assert(_input_info[id] > 0);
+ auto count = --_input_info[id];
+ if (count == 0) // No dependent jobs left, ready for execution
+ {
+ emplaceToReadyJobs(id);
+ }
+ }
+}
+bool DataflowExecutor::noWaitingJobs()
+{
+ return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
+ [](const std::unique_ptr<Job> &job) { return job == nullptr; });
+}
+
+DataflowExecutor::DataflowExecutor(const ir::Graph &graph,
+ const std::shared_ptr<compiler::OperandContext> &operand_context,
+ std::unique_ptr<backend::TensorManagerSet> tensor_mgrs,
+ CodeMap &&code_map)
+ : ExecutorBase{graph, operand_context, std::move(tensor_mgrs)}, _code_map{std::move(code_map)}
+{
+ VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
+
+ const ir::Subgraphs *subgraphs = _graph.subgraphs();
+ // Assign jobs convert SubgraphIndex to job index(uint32_t)
+ uint32_t next_job_index = 0;
+ std::unordered_map<ir::SubgraphIndex, uint32_t> subgraph_to_job;
+ subgraphs->iterate([&](const ir::SubgraphIndex &subg_index, const ir::OpSequence &) {
+ VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with SubgraphIndex "
+ << subg_index.value() << std::endl;
+ _finished_jobs.emplace_back(
+ nnfw::cpp14::make_unique<Job>(next_job_index, _code_map.at(subg_index).get()));
+ subgraph_to_job[subg_index] = next_job_index++;
+ });
+
+ _waiting_jobs.resize(next_job_index);
+ _output_info.resize(next_job_index);
+ _initial_input_info.resize(next_job_index, 0);
+
+ subgraphs->iterate([&](const ir::SubgraphIndex &subg_index, const ir::OpSequence &subg) {
+ auto job_index = subgraph_to_job[subg_index];
+ for (auto output : subg.getOutputs())
+ {
+ // Update output and input info
+ subgraphs->iterate(
+ [&](const ir::SubgraphIndex &subg_cur_index, const ir::OpSequence &subg_cur) {
+ if (subg_cur.getInputs().contains(output))
+ {
+ auto dep_index = subgraph_to_job[subg_cur_index];
+ ++_initial_input_info[dep_index];
+ _output_info[job_index].push_back(dep_index);
+ }
+ });
+ }
+ });
+ for (const auto &s : subgraph_to_job)
+ _job_to_op_seq.emplace(s.second, s.first);
+
+ _input_info = _initial_input_info;
+}
+
+void DataflowExecutor::executeImpl()
+{
+ assert(noWaitingJobs());
+
+ // Execution setup
+ _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
+
+ for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
+ {
+ if (_input_info[i] == 0)
+ {
+ emplaceToReadyJobs(i);
+ }
+ }
+ assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
+ bool is_profiling = util::getConfigBool(util::config::PROFILING_MODE);
+
+ _subject.notifyModelBegin(this);
+
+ while (!_ready_jobs.empty())
+ {
+ auto job = std::move((_ready_jobs.begin())->second);
+ _ready_jobs.erase(_ready_jobs.begin());
+ auto job_index = job->index();
+ VERBOSE(DataflowExecutor) << "Run job #" << job_index << std::endl;
+
+ auto subgraph_index = _job_to_op_seq[job_index];
+ auto op_seq = &_graph.subgraphs()->at(subgraph_index);
+ const backend::Backend *backend =
+ _graph.getLowerInfo()->operation.at(subgraph_index)->backend();
+
+ _subject.notifyJobBegin(this, op_seq, backend);
+
+ if (is_profiling)
+ job->fn()->runSync();
+ else
+ job->run();
+
+ _subject.notifyJobEnd(this, op_seq, backend);
+ notify(job_index);
+ _finished_jobs[job_index] = std::move(job);
+ }
+ assert(noWaitingJobs());
+
+ _subject.notifyModelEnd(this);
+
+ // Reset input info for the next execution
+ _input_info = _initial_input_info;
+}
+
+} // namespace exec
+} // namespace neurun