/* * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "DataflowExecutor.h" #include #include "util/logging.h" namespace neurun { namespace exec { int64_t DataflowExecutor::calculateRank(const std::vector &operations) { int64_t rank = 0; if (!_indexed_ranks) { return rank; } for (const auto &element : operations) { auto it = _indexed_ranks->find(element.index); if (it == _indexed_ranks->end()) { assert(element.node->getName() == "Permute"); // assign int32_t::max to prevent integer overflow rank += std::numeric_limits::max(); } else { rank += it->second; } } return rank; } void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id) { auto &job = _waiting_jobs[id]; assert(job != nullptr); auto &subg = _subgraphs->at(_job_to_subgraph[job->index()]); auto rank = calculateRank(subg.operations()); _ready_jobs.emplace(rank, std::move(job)); } void DataflowExecutor::notify(uint32_t finished_job_id) { for (auto id : _output_info[finished_job_id]) { assert(_input_info[id] > 0); auto count = --_input_info[id]; if (count == 0) // No dependent jobs left, ready for execution { emplaceToReadyJobs(id); } } } bool DataflowExecutor::noWaitingJobs() { return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(), [](const std::unique_ptr &job) { return job == nullptr; }); } DataflowExecutor::DataflowExecutor(const std::shared_ptr &model, std::unique_ptr subgraphs, const std::shared_ptr &operand_context, std::unique_ptr lower_info, std::unique_ptr tensor_mgrs, CodeMap &&code_map) : ExecutorBase{model, std::move(subgraphs), operand_context, std::move(lower_info), std::move(tensor_mgrs)}, _code_map{std::move(code_map)} { VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl; assert(_subgraphs); // Assign jobs convert SubgraphIndex to job index(uint32_t) uint32_t next_job_index = 0; std::unordered_map subgraph_to_job; _subgraphs->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &) { VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with SubgraphIndex " << subg_index.value() << std::endl; _finished_jobs.emplace_back( nnfw::cpp14::make_unique(next_job_index, _code_map.at(subg_index).get(), _lower_info->operation.at(subg_index)->backend())); subgraph_to_job[subg_index] = next_job_index++; }); _waiting_jobs.resize(next_job_index); _output_info.resize(next_job_index); _initial_input_info.resize(next_job_index, 0); _subgraphs->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &subg) { auto job_index = subgraph_to_job[subg_index]; for (auto output : subg.getOutputs()) { // Update output and input info _subgraphs->iterate( [&](const model::SubgraphIndex &subg_cur_index, const model::Subgraph &subg_cur) { if (subg_cur.getInputs().contains(output)) { auto dep_index = subgraph_to_job[subg_cur_index]; ++_initial_input_info[dep_index]; _output_info[job_index].push_back(dep_index); } }); } }); for (const auto &s : subgraph_to_job) _job_to_subgraph.emplace(s.second, s.first); _input_info = _initial_input_info; } void DataflowExecutor::executeImpl() { assert(noWaitingJobs()); // Execution setup _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs for (uint32_t i = 0; i < _waiting_jobs.size(); ++i) { if (_input_info[i] == 0) { emplaceToReadyJobs(i); } } assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs bool is_profiling = util::getConfigBool(util::config::PROFILING_MODE); // TODO Fix indentation { // Notifiy Execution Begin for (auto &o : _observers) { o->handleBegin(this); } } while (!_ready_jobs.empty()) { auto job = std::move((_ready_jobs.begin())->second); _ready_jobs.erase(_ready_jobs.begin()); auto job_index = job->index(); VERBOSE(DataflowExecutor) << "Run job #" << job_index << std::endl; notifyJobBegin(job_index); if (is_profiling) job->fn()->runSync(); else job->run(); notifyJobEnd(job_index); notify(job_index); _finished_jobs[job_index] = std::move(job); } assert(noWaitingJobs()); for (auto &o : _observers) { o->handleEnd(this); } // Reset input info for the next execution _input_info = _initial_input_info; } void DataflowExecutor::notifyJobBegin(uint32_t job_index) { auto subgraph_index = _job_to_subgraph[job_index]; // Workaround - assumes only one operation auto node = _subgraphs->at(subgraph_index).operations().at(0).node; const backend::Backend *backend = _lower_info->operation.at(subgraph_index)->backend(); for (auto &o : _observers) { o->handleBegin(this, node, backend); } } void DataflowExecutor::notifyJobEnd(uint32_t job_index) { auto subgraph_index = _job_to_subgraph[job_index]; // Workaround - assumes only one operation auto node = _subgraphs->at(subgraph_index).operations().at(0).node; const backend::Backend *backend = _lower_info->operation.at(subgraph_index)->backend(); for (auto &o : _observers) { o->handleEnd(this, node, backend); } } } // namespace exec } // namespace neurun