diff options
author | DongHun Kwak <dh0128.kwak@samsung.com> | 2016-03-21 15:45:20 +0900 |
---|---|---|
committer | DongHun Kwak <dh0128.kwak@samsung.com> | 2016-03-21 15:46:37 +0900 |
commit | 733b5d5ae2c5d625211e2985ac25728ac3f54883 (patch) | |
tree | a5b214744b256f07e1dc2bd7273035a7808c659f /boost/thread/executors | |
parent | 08c1e93fa36a49f49325a07fe91ff92c964c2b6c (diff) | |
download | boost-733b5d5ae2c5d625211e2985ac25728ac3f54883.tar.gz boost-733b5d5ae2c5d625211e2985ac25728ac3f54883.tar.bz2 boost-733b5d5ae2c5d625211e2985ac25728ac3f54883.zip |
Imported Upstream version 1.58.0upstream/1.58.0
Change-Id: If0072143aa26874812e0db6872e1efb10a3e5e94
Signed-off-by: DongHun Kwak <dh0128.kwak@samsung.com>
Diffstat (limited to 'boost/thread/executors')
-rw-r--r-- | boost/thread/executors/basic_thread_pool.hpp | 28 | ||||
-rw-r--r-- | boost/thread/executors/detail/priority_executor_base.hpp | 77 | ||||
-rw-r--r-- | boost/thread/executors/detail/scheduled_executor_base.hpp | 66 | ||||
-rw-r--r-- | boost/thread/executors/inline_executor.hpp | 54 | ||||
-rw-r--r-- | boost/thread/executors/loop_executor.hpp | 45 | ||||
-rw-r--r-- | boost/thread/executors/scheduled_thread_pool.hpp | 48 | ||||
-rw-r--r-- | boost/thread/executors/scheduler.hpp | 271 | ||||
-rw-r--r-- | boost/thread/executors/scheduling_adaptor.hpp | 51 | ||||
-rw-r--r-- | boost/thread/executors/serial_executor.hpp | 35 | ||||
-rw-r--r-- | boost/thread/executors/serial_executor_cont.hpp | 170 | ||||
-rw-r--r-- | boost/thread/executors/thread_executor.hpp | 36 | ||||
-rw-r--r-- | boost/thread/executors/work.hpp | 25 |
12 files changed, 811 insertions, 95 deletions
diff --git a/boost/thread/executors/basic_thread_pool.hpp b/boost/thread/executors/basic_thread_pool.hpp index 11283ddf42..64ba1e90e0 100644 --- a/boost/thread/executors/basic_thread_pool.hpp +++ b/boost/thread/executors/basic_thread_pool.hpp @@ -14,7 +14,7 @@ #include <boost/thread/detail/delete.hpp> #include <boost/thread/detail/move.hpp> #include <boost/thread/scoped_thread.hpp> -#include <boost/thread/sync_queue.hpp> +#include <boost/thread/concurrent_queues/sync_queue.hpp> #include <boost/thread/executors/work.hpp> #include <boost/thread/csbl/vector.hpp> @@ -36,7 +36,7 @@ namespace executors typedef csbl::vector<thread_t> thread_vector; /// the thread safe work queue - sync_queue<work > work_queue; + concurrent::sync_queue<work > work_queue; /// A move aware vector thread_vector threads; @@ -48,22 +48,19 @@ namespace executors */ bool try_executing_one() { - work task; try { - if (work_queue.try_pull_front(task) == queue_op_status::success) + work task; + if (work_queue.try_pull(task) == queue_op_status::success) { task(); return true; } return false; } - catch (std::exception& ) - { - return false; - } catch (...) { + std::terminate(); return false; } } @@ -90,17 +87,14 @@ namespace executors for(;;) { work task; - queue_op_status st = work_queue.wait_pull_front(task); + queue_op_status st = work_queue.wait_pull(task); if (st == queue_op_status::closed) return; task(); } } - catch (std::exception& ) - { - return; - } catch (...) { + std::terminate(); return; } } @@ -134,7 +128,7 @@ namespace executors * * \b Throws: Whatever exception is thrown while initializing the needed resources. */ - basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()) + basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1) { try { @@ -275,18 +269,18 @@ namespace executors template <typename Closure> void submit(Closure & closure) { - work_queue.push_back(work(closure)); + work_queue.push(work(closure)); } #endif void submit(void (*closure)()) { - work_queue.push_back(work(closure)); + work_queue.push(work(closure)); } template <typename Closure> void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work_queue.push_back(work(boost::forward<Closure>(closure))); + work_queue.push(work(boost::forward<Closure>(closure))); } /** diff --git a/boost/thread/executors/detail/priority_executor_base.hpp b/boost/thread/executors/detail/priority_executor_base.hpp new file mode 100644 index 0000000000..2191c0b37a --- /dev/null +++ b/boost/thread/executors/detail/priority_executor_base.hpp @@ -0,0 +1,77 @@ +// Copyright (C) 2014 Ian Forbed +// Copyright (C) 2014 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_THREAD_EXECUTORS_DETAIL_PRIORITY_EXECUTOR_BASE_HPP +#define BOOST_THREAD_EXECUTORS_DETAIL_PRIORITY_EXECUTOR_BASE_HPP + +#include <boost/atomic.hpp> +#include <boost/function.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/concurrent_queues/sync_timed_queue.hpp> +#include <boost/thread/executors/work.hpp> + +namespace boost +{ +namespace executors +{ +namespace detail +{ + template <class Queue> + class priority_executor_base + { + public: + //typedef boost::function<void()> work; + typedef executors::work_pq work; + protected: + typedef Queue queue_type; + queue_type _workq; + + priority_executor_base() {} + public: + + ~priority_executor_base() + { + if(!closed()) + { + this->close(); + } + } + + void close() + { + _workq.close(); + } + + bool closed() + { + return _workq.closed(); + } + + void loop() + { + try + { + for(;;) + { + work task; + queue_op_status st = _workq.wait_pull(task); + if (st == queue_op_status::closed) return; + task(); + } + } + catch (...) + { + std::terminate(); + return; + } + } + }; //end class + +} //end detail namespace +} //end executors namespace +} //end boost namespace +#endif diff --git a/boost/thread/executors/detail/scheduled_executor_base.hpp b/boost/thread/executors/detail/scheduled_executor_base.hpp new file mode 100644 index 0000000000..ec0038f7e3 --- /dev/null +++ b/boost/thread/executors/detail/scheduled_executor_base.hpp @@ -0,0 +1,66 @@ +// Copyright (C) 2014 Ian Forbed +// Copyright (C) 2014-2015 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_THREAD_EXECUTORS_DETAIL_SCHEDULED_EXECUTOR_BASE_HPP +#define BOOST_THREAD_EXECUTORS_DETAIL_SCHEDULED_EXECUTOR_BASE_HPP + +#include <boost/thread/concurrent_queues/sync_timed_queue.hpp> +#include <boost/thread/executors/detail/priority_executor_base.hpp> +#include <boost/thread/executors/work.hpp> +#include <boost/thread/thread.hpp> + +#include <boost/atomic.hpp> +#include <boost/function.hpp> + +#include <boost/config/abi_prefix.hpp> + +namespace boost +{ +namespace executors +{ +namespace detail +{ + template <class Clock=chrono::steady_clock> + class scheduled_executor_base : public priority_executor_base<concurrent::sync_timed_queue<executors::work_pq, Clock > > + { + public: + typedef executors::work_pq work; + typedef Clock clock; + typedef typename clock::duration duration; + typedef typename clock::time_point time_point; + protected: + + scheduled_executor_base() {} + public: + + ~scheduled_executor_base() + { + if(! this->closed()) + { + this->close(); + } + } + + void submit_at(work w, const time_point& tp) + { + this->_workq.push(boost::move(w), tp); + } + + void submit_after(work w, const duration& dura) + { + this->_workq.push(boost::move(w), dura+clock::now()); + } + + }; //end class + +} //end detail namespace +} //end executors namespace +} //end boost namespace + +#include <boost/config/abi_suffix.hpp> + +#endif diff --git a/boost/thread/executors/inline_executor.hpp b/boost/thread/executors/inline_executor.hpp index bc6bd9fe7e..5dd523184e 100644 --- a/boost/thread/executors/inline_executor.hpp +++ b/boost/thread/executors/inline_executor.hpp @@ -26,6 +26,7 @@ namespace executors /// type-erasure to store the works to do typedef executors::work work; bool closed_; + mutable mutex mtx_; /** * Effects: try to execute one task. * Returns: whether a task has been executed. @@ -66,16 +67,22 @@ namespace executors */ void close() { + lock_guard<mutex> lk(mtx_); closed_ = true; } /** * \b Returns: whether the pool is closed for submissions. */ - bool closed() + bool closed(lock_guard<mutex>& ) { return closed_; } + bool closed() + { + lock_guard<mutex> lk(mtx_); + return closed(lk); + } /** * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. @@ -93,21 +100,54 @@ namespace executors template <typename Closure> void submit(Closure & closure) { - if (closed()) return; - closure(); + { + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } } #endif void submit(void (*closure)()) { - if (closed()) return; - closure(); + { + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } } template <typename Closure> void submit(BOOST_THREAD_FWD_REF(Closure) closure) { - if (closed()) return; - closure(); + { + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } } /** diff --git a/boost/thread/executors/loop_executor.hpp b/boost/thread/executors/loop_executor.hpp index c2798b4461..e9eadadf9e 100644 --- a/boost/thread/executors/loop_executor.hpp +++ b/boost/thread/executors/loop_executor.hpp @@ -14,7 +14,7 @@ #include <boost/thread/detail/config.hpp> #include <boost/thread/detail/delete.hpp> #include <boost/thread/detail/move.hpp> -#include <boost/thread/sync_queue.hpp> +#include <boost/thread/concurrent_queues/sync_queue.hpp> #include <boost/thread/executors/work.hpp> #include <boost/config/abi_prefix.hpp> @@ -31,7 +31,7 @@ namespace executors typedef executors::work work; private: /// the thread safe work queue - sync_queue<work > work_queue; + concurrent::sync_queue<work > work_queue; public: /** @@ -44,19 +44,16 @@ namespace executors work task; try { - if (work_queue.try_pull_front(task) == queue_op_status::success) + if (work_queue.try_pull(task) == queue_op_status::success) { task(); return true; } return false; } - catch (std::exception& ) - { - return false; - } catch (...) { + std::terminate(); return false; } } @@ -74,19 +71,7 @@ namespace executors } - /** - * The main loop of the worker thread - */ - void worker_thread() - { - while (!closed()) - { - schedule_one_or_yield(); - } - while (try_executing_one()) - { - } - } + public: /// loop_executor is not copyable. @@ -112,9 +97,19 @@ namespace executors } /** - * loop + * The main loop of the worker thread */ - void loop() { worker_thread(); } + void loop() + { + while (!closed()) + { + schedule_one_or_yield(); + } + while (try_executing_one()) + { + } + } + /** * \b Effects: close the \c loop_executor for submissions. * The loop will work until there is no more closures to run. @@ -148,18 +143,18 @@ namespace executors template <typename Closure> void submit(Closure & closure) { - work_queue.push_back(work(closure)); + work_queue.push(work(closure)); } #endif void submit(void (*closure)()) { - work_queue.push_back(work(closure)); + work_queue.push(work(closure)); } template <typename Closure> void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work_queue.push_back(work(boost::forward<Closure>(closure))); + work_queue.push(work(boost::forward<Closure>(closure))); } /** diff --git a/boost/thread/executors/scheduled_thread_pool.hpp b/boost/thread/executors/scheduled_thread_pool.hpp new file mode 100644 index 0000000000..408013b283 --- /dev/null +++ b/boost/thread/executors/scheduled_thread_pool.hpp @@ -0,0 +1,48 @@ +// Copyright (C) 2014 Ian Forbed +// Copyright (C) 2014 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP +#define BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP + +#include <boost/thread/executors/detail/scheduled_executor_base.hpp> + +namespace boost +{ +namespace executors +{ + + class scheduled_thread_pool : public detail::scheduled_executor_base<> + { + private: + thread_group _workers; + public: + + scheduled_thread_pool(size_t num_threads) : super() + { + for(size_t i = 0; i < num_threads; i++) + { + _workers.create_thread(bind(&super::loop, this)); + } + } + + ~scheduled_thread_pool() + { + this->close(); + _workers.join_all(); + } + + private: + typedef detail::scheduled_executor_base<> super; + }; //end class + +} //end executors namespace + +using executors::scheduled_thread_pool; + +} //end boost +#endif + diff --git a/boost/thread/executors/scheduler.hpp b/boost/thread/executors/scheduler.hpp new file mode 100644 index 0000000000..5796a7d394 --- /dev/null +++ b/boost/thread/executors/scheduler.hpp @@ -0,0 +1,271 @@ +// Copyright (C) 2014 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_THREAD_EXECUTORS_SCHEDULER_HPP +#define BOOST_THREAD_EXECUTORS_SCHEDULER_HPP + +#include <boost/thread/detail/config.hpp> +#include <boost/thread/executors/detail/scheduled_executor_base.hpp> + +#include <boost/chrono/time_point.hpp> +#include <boost/chrono/duration.hpp> +#include <boost/chrono/system_clocks.hpp> + +#include <boost/config/abi_prefix.hpp> + +namespace boost +{ + namespace executors + { + /// Wraps the reference to an executor and a function to make a work that submit the function using the executor. + template <class Executor, class Function> + class resubmitter + { + public: + resubmitter(Executor& ex, Function funct) : + ex(ex), + funct(boost::move(funct)) + {} + + void operator()() + { + ex.submit(funct); + } + + private: + Executor& ex; + Function funct; + }; + + /// resubmitter factory + template <class Executor, class Function> + resubmitter<Executor, typename decay<Function>::type> + resubmit(Executor& ex, BOOST_THREAD_FWD_REF(Function) funct) { + return resubmitter<Executor, typename decay<Function>::type >(ex, boost::move(funct)); + } + + /// Wraps references to a @c Scheduler and an @c Executor providing an @c Executor that + /// resubmit the function using the referenced Executor at a given @c time_point known at construction. + template <class Scheduler, class Executor> + class resubmit_at_executor + { + public: + typedef typename Scheduler::clock clock; + typedef typename Scheduler::work work; + + template <class Duration> + resubmit_at_executor(Scheduler& sch, Executor& ex, chrono::time_point<clock, Duration> const& tp) : + sch(sch), + ex(ex), + tp(tp), + is_closed(false) + { + } + + ~resubmit_at_executor() + { + close(); + } + + template <class Work> + void submit(BOOST_THREAD_FWD_REF(Work) w) + { + if (closed()) + { + BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + sch.submit_at(resubmit(ex,boost::forward<Work>(w)), tp); + } + + Executor& underlying_executor() + { + return ex; + } + Scheduler& underlying_scheduler() + { + return sch; + } + + void close() + { + is_closed = true; + } + + bool closed() + { + return is_closed || sch.closed() || ex.closed(); + } + + private: + Scheduler& sch; + Executor& ex; + typename clock::time_point tp; + bool is_closed; + }; + + + /// Expression template helper storing a pair of references to an @c Scheduler and an @c Executor + /// It provides factory helper functions such as at/after that convert these a pair of @c Scheduler @c Executor + /// into an new @c Executor that submit the work using the referenced @c Executor at/after a specific time/duration + /// respectively, using the referenced @Scheduler. + template <class Scheduler, class Executor> + class scheduler_executor_wrapper + { + public: + typedef typename Scheduler::clock clock; + typedef typename Scheduler::work work; + typedef resubmit_at_executor<Scheduler, Executor> the_executor; + + scheduler_executor_wrapper(Scheduler& sch, Executor& ex) : + sch(sch), + ex(ex) + {} + + ~scheduler_executor_wrapper() + { + } + + Executor& underlying_executor() + { + return ex; + } + Scheduler& underlying_scheduler() + { + return sch; + } + + template <class Rep, class Period> + the_executor after(chrono::duration<Rep,Period> const& rel_time) + { + return at(clock::now() + rel_time ); + } + + template <class Duration> + the_executor at(chrono::time_point<clock,Duration> const& abs_time) + { + return the_executor(sch, ex, abs_time); + } + + private: + Scheduler& sch; + Executor& ex; + }; //end class + + /// Wraps a reference to a @c Scheduler providing an @c Executor that + /// run the function at a given @c time_point known at construction. + template <class Scheduler> + class at_executor + { + public: + typedef typename Scheduler::clock clock; + typedef typename Scheduler::work work; + typedef typename clock::time_point time_point; + + template <class Duration> + at_executor(Scheduler& sch, chrono::time_point<clock,Duration> const& tp) : + sch(sch), + tp(tp), + is_closed(false) + {} + + ~at_executor() + { + close(); + } + + Scheduler& underlying_scheduler() + { + return sch; + } + + void close() + { + is_closed = true; + } + + bool closed() + { + return is_closed || sch.closed(); + } + + template <class Work> + void submit(BOOST_THREAD_FWD_REF(Work) w) + { + if (closed()) + { + BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + sch.submit_at(boost::forward<Work>(w), tp); + } + + template <class Executor> + resubmit_at_executor<Scheduler, Executor> on(Executor& ex) + { + return resubmit_at_executor<Scheduler, Executor>(sch, ex, tp); + } + + private: + Scheduler& sch; + time_point tp; + bool is_closed; + }; //end class + + /// A @c Scheduler using a specific thread. Note that a Scheduler is not an Executor. + /// It provides factory helper functions such as at/after that convert a @c Scheduler into an @c Executor + /// that submit the work at/after a specific time/duration respectively. + template <class Clock = chrono::steady_clock> + class scheduler : public detail::scheduled_executor_base<Clock> + { + public: + typedef typename detail::scheduled_executor_base<Clock>::work work; + + typedef Clock clock; + + scheduler() + : super(), + thr(&super::loop, this) {} + + ~scheduler() + { + this->close(); + thr.join(); + } + template <class Ex> + scheduler_executor_wrapper<scheduler, Ex> on(Ex& ex) + { + return scheduler_executor_wrapper<scheduler, Ex>(*this, ex); + } + + template <class Rep, class Period> + at_executor<scheduler> after(chrono::duration<Rep,Period> const& rel_time) + { + return at(rel_time + clock::now()); + } + + template <class Duration> + at_executor<scheduler> at(chrono::time_point<clock,Duration> const& tp) + { + return at_executor<scheduler>(*this, tp); + } + + private: + typedef detail::scheduled_executor_base<Clock> super; + thread thr; + }; + + + } + using executors::resubmitter; + using executors::resubmit; + using executors::resubmit_at_executor; + using executors::scheduler_executor_wrapper; + using executors::at_executor; + using executors::scheduler; +} + +#include <boost/config/abi_suffix.hpp> + +#endif diff --git a/boost/thread/executors/scheduling_adaptor.hpp b/boost/thread/executors/scheduling_adaptor.hpp new file mode 100644 index 0000000000..ac0a0acbce --- /dev/null +++ b/boost/thread/executors/scheduling_adaptor.hpp @@ -0,0 +1,51 @@ +// Copyright (C) 2014 Ian Forbed +// Copyright (C) 2014 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_THREAD_EXECUTORS_SCHEDULING_ADAPTOR_HPP +#define BOOST_THREAD_EXECUTORS_SCHEDULING_ADAPTOR_HPP + +#include <boost/thread/executors/detail/scheduled_executor_base.hpp> + +namespace boost +{ +namespace executors +{ + + template <typename Executor> + class scheduling_adpator : public detail::scheduled_executor_base<> + { + private: + Executor& _exec; + thread _scheduler; + public: + + scheduling_adpator(Executor& ex) + : super(), + _exec(ex), + _scheduler(&super::loop, this) {} + + ~scheduling_adpator() + { + this->close(); + _scheduler.join(); + } + + Executor& underlying_executor() + { + return _exec; + } + + private: + typedef detail::scheduled_executor_base<> super; + }; //end class + +} //end executors + + using executors::scheduling_adpator; + +} //end boost +#endif diff --git a/boost/thread/executors/serial_executor.hpp b/boost/thread/executors/serial_executor.hpp index dae1014b47..6f4266668f 100644 --- a/boost/thread/executors/serial_executor.hpp +++ b/boost/thread/executors/serial_executor.hpp @@ -12,7 +12,7 @@ #include <boost/thread/detail/config.hpp> #include <boost/thread/detail/delete.hpp> #include <boost/thread/detail/move.hpp> -#include <boost/thread/sync_queue.hpp> +#include <boost/thread/concurrent_queues/sync_queue.hpp> #include <boost/thread/executors/work.hpp> #include <boost/thread/executors/generic_executor_ref.hpp> #include <boost/thread/future.hpp> @@ -33,7 +33,7 @@ namespace executors typedef scoped_thread<> thread_t; /// the thread safe work queue - sync_queue<work > work_queue; + concurrent::sync_queue<work > work_queue; generic_executor_ref ex; thread_t thr; @@ -43,8 +43,13 @@ namespace executors try_executing_one_task(work& task, boost::promise<void> &p) : task(task), p(p) {} void operator()() { - task(); // if task() throws promise is not set but as the the program terminates and should terminate there is no need to use try-catch here. - p.set_value(); + try { + task(); + p.set_value(); + } catch (...) + { + p.set_exception(current_exception()); + } } }; public: @@ -52,7 +57,7 @@ namespace executors * \par Returns * The underlying executor wrapped on a generic executor reference. */ - generic_executor_ref underlying_executor() BOOST_NOEXCEPT { return ex; } + generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; } /** * Effects: try to execute one task. @@ -64,27 +69,19 @@ namespace executors work task; try { - if (work_queue.try_pull_front(task) == queue_op_status::success) + if (work_queue.try_pull(task) == queue_op_status::success) { boost::promise<void> p; try_executing_one_task tmp(task,p); ex.submit(tmp); -// ex.submit([&task, &p]() -// { -// task(); // if task() throws promise is not set but as the the program terminates and should terminate there is no need to use try-catch here. -// p.set_value(); -// }); p.get_future().wait(); return true; } return false; } - catch (std::exception& ) - { - return false; - } catch (...) { + std::terminate(); return false; } } @@ -136,7 +133,7 @@ namespace executors */ ~serial_executor() { - // signal to all the worker thread that there will be no more submissions. + // signal to the worker thread that there will be no more submissions. close(); } @@ -173,18 +170,18 @@ namespace executors template <typename Closure> void submit(Closure & closure) { - work_queue.push_back(work(closure)); + work_queue.push(work(closure)); } #endif void submit(void (*closure)()) { - work_queue.push_back(work(closure)); + work_queue.push(work(closure)); } template <typename Closure> void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work_queue.push_back(work(boost::forward<Closure>(closure))); + work_queue.push(work(boost::forward<Closure>(closure))); } /** diff --git a/boost/thread/executors/serial_executor_cont.hpp b/boost/thread/executors/serial_executor_cont.hpp new file mode 100644 index 0000000000..1c4cc14aad --- /dev/null +++ b/boost/thread/executors/serial_executor_cont.hpp @@ -0,0 +1,170 @@ +// Copyright (C) 2015 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// 2013/11 Vicente J. Botet Escriba +// first implementation of a simple serial scheduler. + +#ifndef BOOST_THREAD_SERIAL_EXECUTOR_CONT_HPP +#define BOOST_THREAD_SERIAL_EXECUTOR_CONT_HPP + +#include <boost/thread/detail/config.hpp> +#include <boost/thread/detail/delete.hpp> +#include <boost/thread/detail/move.hpp> +#include <boost/thread/concurrent_queues/sync_queue.hpp> +#include <boost/thread/executors/work.hpp> +#include <boost/thread/executors/generic_executor_ref.hpp> +#include <boost/thread/future.hpp> +#include <boost/thread/scoped_thread.hpp> + +#include <boost/config/abi_prefix.hpp> + +namespace boost +{ +namespace executors +{ + class serial_executor_cont + { + public: + /// type-erasure to store the works to do + typedef executors::work work; + private: + + generic_executor_ref ex_; + future<void> fut_; // protected by mtx_ + bool closed_; // protected by mtx_ + mutex mtx_; + + struct continuation { + work task; + template <class X> + struct result { + typedef void type; + }; + continuation(BOOST_THREAD_RV_REF(work) tsk) + : task(boost::move(tsk)) {} + void operator()(future<void> f) + { + try { + task(); + } catch (...) { + std::terminate(); + } + } + }; + + bool closed(lock_guard<mutex>&) const + { + return closed_; + } + public: + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex_; } + + /// serial_executor_cont is not copyable. + BOOST_THREAD_NO_COPYABLE(serial_executor_cont) + + /** + * \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + * + * \b Notes: + * * The lifetime of the associated executor must outlive the serial executor. + * * The current implementation doesn't support submission from synchronous continuation, that is, + * - the executor must execute the continuation asynchronously or + * - the continuation can not submit to this serial executor. + */ + template <class Executor> + serial_executor_cont(Executor& ex) + : ex_(ex), fut_(make_ready_future()), closed_(false) + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor_cont destructor. + */ + ~serial_executor_cont() + { + // signal to the worker thread that there will be no more submissions. + close(); + } + + /** + * \b Effects: close the \c serial_executor_cont for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + lock_guard<mutex> lk(mtx_); + closed_ = true;; + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + lock_guard<mutex> lk(mtx_); + return closed(lk); + } + + /** + * Effects: none. + * Returns: always false. + * Throws: No. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. + */ + bool try_executing_one() + { + return false; + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish. + * If the invoked closure throws an exception the \c serial_executor_cont will call \c std::terminate, as is the case with threads. + * + * \b Throws: \c sync_queue_is_closed if the executor is closed. + * Whatever exception that can be throw while storing the closure. + * + */ + +#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template <typename Closure> + void submit(Closure & closure) + { + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(closure))); + } +#endif + void submit(void (*closure)()) + { + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(closure))); + } + + template <typename Closure> + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(boost::forward<Closure>(closure)))); + } + + }; +} +using executors::serial_executor_cont; +} + +#include <boost/config/abi_suffix.hpp> + +#endif diff --git a/boost/thread/executors/thread_executor.hpp b/boost/thread/executors/thread_executor.hpp index 9fc3362fde..a8cd5c212f 100644 --- a/boost/thread/executors/thread_executor.hpp +++ b/boost/thread/executors/thread_executor.hpp @@ -15,6 +15,8 @@ #include <boost/thread/executors/work.hpp> #include <boost/thread/executors/executor.hpp> #include <boost/thread/thread_only.hpp> +#include <boost/thread/scoped_thread.hpp> +#include <boost/thread/csbl/vector.hpp> #include <boost/config/abi_prefix.hpp> @@ -28,6 +30,11 @@ namespace executors /// type-erasure to store the works to do typedef executors::work work; bool closed_; + typedef scoped_thread<> thread_t; + typedef csbl::vector<thread_t> threads_type; + threads_type threads_; + mutable mutex mtx_; + /** * Effects: try to execute one task. * Returns: whether a task has been executed. @@ -52,7 +59,7 @@ namespace executors { } /** - * \b Effects: Destroys the inline executor. + * \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads. * * \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor. */ @@ -60,6 +67,7 @@ namespace executors { // signal to all the worker thread that there will be no more submissions. close(); + // all the scoped threads will join before destroying } /** @@ -68,16 +76,22 @@ namespace executors */ void close() { + lock_guard<mutex> lk(mtx_); closed_ = true; } /** * \b Returns: whether the pool is closed for submissions. */ - bool closed() + bool closed(lock_guard<mutex>& ) { return closed_; } + bool closed() + { + lock_guard<mutex> lk(mtx_); + return closed(lk); + } /** * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. @@ -95,24 +109,30 @@ namespace executors template <typename Closure> void submit(Closure & closure) { - if (closed()) return; + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); thread th(closure); - th.detach(); + threads_.push_back(thread_t(boost::move(th))); } #endif void submit(void (*closure)()) { - if (closed()) return; + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); thread th(closure); - th.detach(); + threads_.push_back(thread_t(boost::move(th))); } template <typename Closure> void submit(BOOST_THREAD_FWD_REF(Closure) closure) { - if (closed()) return; + lock_guard<mutex> lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); thread th(boost::forward<Closure>(closure)); - th.detach(); + threads_.push_back(thread_t(boost::move(th))); } /** diff --git a/boost/thread/executors/work.hpp b/boost/thread/executors/work.hpp index df1512cd95..bdaf7651b9 100644 --- a/boost/thread/executors/work.hpp +++ b/boost/thread/executors/work.hpp @@ -8,36 +8,23 @@ #define BOOST_THREAD_EXECUTORS_WORK_HPP #include <boost/thread/detail/config.hpp> - -#if ! defined BOOST_THREAD_EXECUTORS_WORK_ACCEPTS_MOVABLE \ - && ! defined BOOST_THREAD_EXECUTORS_WORK_DONT_ACCEPT_MOVABLE -#define BOOST_THREAD_EXECUTORS_WORK_ACCEPTS_MOVABLE -//#define BOOST_THREAD_EXECUTORS_WORK_DONT_ACCEPT_MOVABLE -#endif - -#if defined BOOST_THREAD_EXECUTORS_WORK_ACCEPTS_MOVABLE - #include <boost/thread/detail/nullary_function.hpp> +#include <boost/thread/csbl/functional.hpp> namespace boost { namespace executors { typedef detail::nullary_function<void()> work; - } -} // namespace boost +#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES + typedef detail::nullary_function<void()> work_pq; + //typedef csbl::function<void()> work_pq; #else -#include <boost/thread/csbl/functional.hpp> - -namespace boost -{ - namespace executors - { - typedef csbl::function<void()> work; + typedef csbl::function<void()> work_pq; +#endif } } // namespace boost -#endif #endif // BOOST_THREAD_EXECUTORS_WORK_HPP |