diff options
Diffstat (limited to 'runtimes/neurun/core/src/exec/ParallelExecutor.cc')
-rw-r--r-- | runtimes/neurun/core/src/exec/ParallelExecutor.cc | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/runtimes/neurun/core/src/exec/ParallelExecutor.cc b/runtimes/neurun/core/src/exec/ParallelExecutor.cc new file mode 100644 index 000000000..81d4ac03f --- /dev/null +++ b/runtimes/neurun/core/src/exec/ParallelExecutor.cc @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ParallelExecutor.h" + +#include <cassert> + +#include "util/logging.h" +#include "exec/IFunction.h" + +namespace neurun +{ +namespace exec +{ + +class HookFunction : public IFunction +{ +public: + HookFunction(IFunction *fn, std::function<void()> teardown) : _fn{fn}, _teardown{teardown} {} + +public: + void run() override + { + // TODO Introduce and call setup() function here + _fn->run(); + _teardown(); + } + void runSync() override { throw("runSync is needed just for profiling in Dataflow executor"); } + +private: + IFunction *_fn; + std::function<void()> _teardown; +}; + +void ParallelExecutor::notify(uint32_t finished_job_id) +{ + std::unique_lock<std::mutex> lock{_mu_jobs}; + + DataflowExecutor::notify(finished_job_id); + + lock.unlock(); + _cv_jobs.notify_all(); +} + +ParallelExecutor::ParallelExecutor(const std::shared_ptr<const model::Model> &model, + std::unique_ptr<model::Subgraphs> subgraphs, + const std::shared_ptr<compiler::OperandContext> &operand_context, + std::unique_ptr<graph::LowerInfoMap> lower_info, + std::unique_ptr<backend::TensorManagerSet> tensor_mgrs, + CodeMap &&code_map) + : DataflowExecutor{model, + std::move(subgraphs), + operand_context, + std::move(lower_info), + std::move(tensor_mgrs), + std::move(code_map)} +{ + VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl; +} + +void ParallelExecutor::executeImpl() +{ + // Init scheduler + // TODO Consider to have distinct backend set in LowerInfoMap + graph::BackendSet backends; + for (auto &itr : _lower_info->operation) + { + backends.add(itr.second->backend()); + } + _scheduler = nnfw::cpp14::make_unique<ParallelScheduler>(backends); + + assert(noWaitingJobs()); + + // Execution setup + _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs + + for (uint32_t i = 0; i < _waiting_jobs.size(); ++i) + { + VERBOSE(ParallelExecutor) << i << ": " << _input_info[i] << std::endl; + if (_input_info[i] == 0) + { + emplaceToReadyJobs(i); + } + } + assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs + + VERBOSE(ParallelExecutor) << "INITIAL JOBS : " << _ready_jobs.size() << std::endl; + + while (true) + { + std::unique_lock<std::mutex> lock{_mu_jobs}; + + if (_ready_jobs.empty()) + { + _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty() || noWaitingJobs(); }); + // Check finish condition + if (_ready_jobs.empty() && noWaitingJobs()) + { + break; + } + } + + auto job = std::move(_ready_jobs.begin()->second); + _ready_jobs.erase(_ready_jobs.begin()); + + lock.unlock(); + + VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index() << std::endl; + + auto job_index = job->index(); + auto teardown = [&, job_index]() { notify(job_index); }; + + _scheduler->assign(nnfw::cpp14::make_unique<HookFunction>(job->fn(), teardown), job->backend()); + _finished_jobs[job_index] = std::move(job); + } + + assert(noWaitingJobs()); + + // Wait for all the jobs done + _scheduler->finish(); + + // Reset input info for the next execution + _input_info = _initial_input_info; +} + +} // namespace exec +} // namespace neurun |