summaryrefslogtreecommitdiff
path: root/boost/thread/executors
diff options
context:
space:
mode:
authorDongHun Kwak <dh0128.kwak@samsung.com>2016-03-21 15:45:20 +0900
committerDongHun Kwak <dh0128.kwak@samsung.com>2016-03-21 15:46:37 +0900
commit733b5d5ae2c5d625211e2985ac25728ac3f54883 (patch)
treea5b214744b256f07e1dc2bd7273035a7808c659f /boost/thread/executors
parent08c1e93fa36a49f49325a07fe91ff92c964c2b6c (diff)
downloadboost-733b5d5ae2c5d625211e2985ac25728ac3f54883.tar.gz
boost-733b5d5ae2c5d625211e2985ac25728ac3f54883.tar.bz2
boost-733b5d5ae2c5d625211e2985ac25728ac3f54883.zip
Imported Upstream version 1.58.0upstream/1.58.0
Change-Id: If0072143aa26874812e0db6872e1efb10a3e5e94 Signed-off-by: DongHun Kwak <dh0128.kwak@samsung.com>
Diffstat (limited to 'boost/thread/executors')
-rw-r--r--boost/thread/executors/basic_thread_pool.hpp28
-rw-r--r--boost/thread/executors/detail/priority_executor_base.hpp77
-rw-r--r--boost/thread/executors/detail/scheduled_executor_base.hpp66
-rw-r--r--boost/thread/executors/inline_executor.hpp54
-rw-r--r--boost/thread/executors/loop_executor.hpp45
-rw-r--r--boost/thread/executors/scheduled_thread_pool.hpp48
-rw-r--r--boost/thread/executors/scheduler.hpp271
-rw-r--r--boost/thread/executors/scheduling_adaptor.hpp51
-rw-r--r--boost/thread/executors/serial_executor.hpp35
-rw-r--r--boost/thread/executors/serial_executor_cont.hpp170
-rw-r--r--boost/thread/executors/thread_executor.hpp36
-rw-r--r--boost/thread/executors/work.hpp25
12 files changed, 811 insertions, 95 deletions
diff --git a/boost/thread/executors/basic_thread_pool.hpp b/boost/thread/executors/basic_thread_pool.hpp
index 11283ddf42..64ba1e90e0 100644
--- a/boost/thread/executors/basic_thread_pool.hpp
+++ b/boost/thread/executors/basic_thread_pool.hpp
@@ -14,7 +14,7 @@
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/scoped_thread.hpp>
-#include <boost/thread/sync_queue.hpp>
+#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/csbl/vector.hpp>
@@ -36,7 +36,7 @@ namespace executors
typedef csbl::vector<thread_t> thread_vector;
/// the thread safe work queue
- sync_queue<work > work_queue;
+ concurrent::sync_queue<work > work_queue;
/// A move aware vector
thread_vector threads;
@@ -48,22 +48,19 @@ namespace executors
*/
bool try_executing_one()
{
- work task;
try
{
- if (work_queue.try_pull_front(task) == queue_op_status::success)
+ work task;
+ if (work_queue.try_pull(task) == queue_op_status::success)
{
task();
return true;
}
return false;
}
- catch (std::exception& )
- {
- return false;
- }
catch (...)
{
+ std::terminate();
return false;
}
}
@@ -90,17 +87,14 @@ namespace executors
for(;;)
{
work task;
- queue_op_status st = work_queue.wait_pull_front(task);
+ queue_op_status st = work_queue.wait_pull(task);
if (st == queue_op_status::closed) return;
task();
}
}
- catch (std::exception& )
- {
- return;
- }
catch (...)
{
+ std::terminate();
return;
}
}
@@ -134,7 +128,7 @@ namespace executors
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
- basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency())
+ basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
{
try
{
@@ -275,18 +269,18 @@ namespace executors
template <typename Closure>
void submit(Closure & closure)
{
- work_queue.push_back(work(closure));
+ work_queue.push(work(closure));
}
#endif
void submit(void (*closure)())
{
- work_queue.push_back(work(closure));
+ work_queue.push(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
- work_queue.push_back(work(boost::forward<Closure>(closure)));
+ work_queue.push(work(boost::forward<Closure>(closure)));
}
/**
diff --git a/boost/thread/executors/detail/priority_executor_base.hpp b/boost/thread/executors/detail/priority_executor_base.hpp
new file mode 100644
index 0000000000..2191c0b37a
--- /dev/null
+++ b/boost/thread/executors/detail/priority_executor_base.hpp
@@ -0,0 +1,77 @@
+// Copyright (C) 2014 Ian Forbed
+// Copyright (C) 2014 Vicente J. Botet Escriba
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_THREAD_EXECUTORS_DETAIL_PRIORITY_EXECUTOR_BASE_HPP
+#define BOOST_THREAD_EXECUTORS_DETAIL_PRIORITY_EXECUTOR_BASE_HPP
+
+#include <boost/atomic.hpp>
+#include <boost/function.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/concurrent_queues/sync_timed_queue.hpp>
+#include <boost/thread/executors/work.hpp>
+
+namespace boost
+{
+namespace executors
+{
+namespace detail
+{
+ template <class Queue>
+ class priority_executor_base
+ {
+ public:
+ //typedef boost::function<void()> work;
+ typedef executors::work_pq work;
+ protected:
+ typedef Queue queue_type;
+ queue_type _workq;
+
+ priority_executor_base() {}
+ public:
+
+ ~priority_executor_base()
+ {
+ if(!closed())
+ {
+ this->close();
+ }
+ }
+
+ void close()
+ {
+ _workq.close();
+ }
+
+ bool closed()
+ {
+ return _workq.closed();
+ }
+
+ void loop()
+ {
+ try
+ {
+ for(;;)
+ {
+ work task;
+ queue_op_status st = _workq.wait_pull(task);
+ if (st == queue_op_status::closed) return;
+ task();
+ }
+ }
+ catch (...)
+ {
+ std::terminate();
+ return;
+ }
+ }
+ }; //end class
+
+} //end detail namespace
+} //end executors namespace
+} //end boost namespace
+#endif
diff --git a/boost/thread/executors/detail/scheduled_executor_base.hpp b/boost/thread/executors/detail/scheduled_executor_base.hpp
new file mode 100644
index 0000000000..ec0038f7e3
--- /dev/null
+++ b/boost/thread/executors/detail/scheduled_executor_base.hpp
@@ -0,0 +1,66 @@
+// Copyright (C) 2014 Ian Forbed
+// Copyright (C) 2014-2015 Vicente J. Botet Escriba
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_THREAD_EXECUTORS_DETAIL_SCHEDULED_EXECUTOR_BASE_HPP
+#define BOOST_THREAD_EXECUTORS_DETAIL_SCHEDULED_EXECUTOR_BASE_HPP
+
+#include <boost/thread/concurrent_queues/sync_timed_queue.hpp>
+#include <boost/thread/executors/detail/priority_executor_base.hpp>
+#include <boost/thread/executors/work.hpp>
+#include <boost/thread/thread.hpp>
+
+#include <boost/atomic.hpp>
+#include <boost/function.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost
+{
+namespace executors
+{
+namespace detail
+{
+ template <class Clock=chrono::steady_clock>
+ class scheduled_executor_base : public priority_executor_base<concurrent::sync_timed_queue<executors::work_pq, Clock > >
+ {
+ public:
+ typedef executors::work_pq work;
+ typedef Clock clock;
+ typedef typename clock::duration duration;
+ typedef typename clock::time_point time_point;
+ protected:
+
+ scheduled_executor_base() {}
+ public:
+
+ ~scheduled_executor_base()
+ {
+ if(! this->closed())
+ {
+ this->close();
+ }
+ }
+
+ void submit_at(work w, const time_point& tp)
+ {
+ this->_workq.push(boost::move(w), tp);
+ }
+
+ void submit_after(work w, const duration& dura)
+ {
+ this->_workq.push(boost::move(w), dura+clock::now());
+ }
+
+ }; //end class
+
+} //end detail namespace
+} //end executors namespace
+} //end boost namespace
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif
diff --git a/boost/thread/executors/inline_executor.hpp b/boost/thread/executors/inline_executor.hpp
index bc6bd9fe7e..5dd523184e 100644
--- a/boost/thread/executors/inline_executor.hpp
+++ b/boost/thread/executors/inline_executor.hpp
@@ -26,6 +26,7 @@ namespace executors
/// type-erasure to store the works to do
typedef executors::work work;
bool closed_;
+ mutable mutex mtx_;
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
@@ -66,16 +67,22 @@ namespace executors
*/
void close()
{
+ lock_guard<mutex> lk(mtx_);
closed_ = true;
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
- bool closed()
+ bool closed(lock_guard<mutex>& )
{
return closed_;
}
+ bool closed()
+ {
+ lock_guard<mutex> lk(mtx_);
+ return closed(lk);
+ }
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
@@ -93,21 +100,54 @@ namespace executors
template <typename Closure>
void submit(Closure & closure)
{
- if (closed()) return;
- closure();
+ {
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ }
+ try
+ {
+ closure();
+ }
+ catch (...)
+ {
+ std::terminate();
+ return;
+ }
}
#endif
void submit(void (*closure)())
{
- if (closed()) return;
- closure();
+ {
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ }
+ try
+ {
+ closure();
+ }
+ catch (...)
+ {
+ std::terminate();
+ return;
+ }
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
- if (closed()) return;
- closure();
+ {
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ }
+ try
+ {
+ closure();
+ }
+ catch (...)
+ {
+ std::terminate();
+ return;
+ }
}
/**
diff --git a/boost/thread/executors/loop_executor.hpp b/boost/thread/executors/loop_executor.hpp
index c2798b4461..e9eadadf9e 100644
--- a/boost/thread/executors/loop_executor.hpp
+++ b/boost/thread/executors/loop_executor.hpp
@@ -14,7 +14,7 @@
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
-#include <boost/thread/sync_queue.hpp>
+#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -31,7 +31,7 @@ namespace executors
typedef executors::work work;
private:
/// the thread safe work queue
- sync_queue<work > work_queue;
+ concurrent::sync_queue<work > work_queue;
public:
/**
@@ -44,19 +44,16 @@ namespace executors
work task;
try
{
- if (work_queue.try_pull_front(task) == queue_op_status::success)
+ if (work_queue.try_pull(task) == queue_op_status::success)
{
task();
return true;
}
return false;
}
- catch (std::exception& )
- {
- return false;
- }
catch (...)
{
+ std::terminate();
return false;
}
}
@@ -74,19 +71,7 @@ namespace executors
}
- /**
- * The main loop of the worker thread
- */
- void worker_thread()
- {
- while (!closed())
- {
- schedule_one_or_yield();
- }
- while (try_executing_one())
- {
- }
- }
+
public:
/// loop_executor is not copyable.
@@ -112,9 +97,19 @@ namespace executors
}
/**
- * loop
+ * The main loop of the worker thread
*/
- void loop() { worker_thread(); }
+ void loop()
+ {
+ while (!closed())
+ {
+ schedule_one_or_yield();
+ }
+ while (try_executing_one())
+ {
+ }
+ }
+
/**
* \b Effects: close the \c loop_executor for submissions.
* The loop will work until there is no more closures to run.
@@ -148,18 +143,18 @@ namespace executors
template <typename Closure>
void submit(Closure & closure)
{
- work_queue.push_back(work(closure));
+ work_queue.push(work(closure));
}
#endif
void submit(void (*closure)())
{
- work_queue.push_back(work(closure));
+ work_queue.push(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
- work_queue.push_back(work(boost::forward<Closure>(closure)));
+ work_queue.push(work(boost::forward<Closure>(closure)));
}
/**
diff --git a/boost/thread/executors/scheduled_thread_pool.hpp b/boost/thread/executors/scheduled_thread_pool.hpp
new file mode 100644
index 0000000000..408013b283
--- /dev/null
+++ b/boost/thread/executors/scheduled_thread_pool.hpp
@@ -0,0 +1,48 @@
+// Copyright (C) 2014 Ian Forbed
+// Copyright (C) 2014 Vicente J. Botet Escriba
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP
+#define BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP
+
+#include <boost/thread/executors/detail/scheduled_executor_base.hpp>
+
+namespace boost
+{
+namespace executors
+{
+
+ class scheduled_thread_pool : public detail::scheduled_executor_base<>
+ {
+ private:
+ thread_group _workers;
+ public:
+
+ scheduled_thread_pool(size_t num_threads) : super()
+ {
+ for(size_t i = 0; i < num_threads; i++)
+ {
+ _workers.create_thread(bind(&super::loop, this));
+ }
+ }
+
+ ~scheduled_thread_pool()
+ {
+ this->close();
+ _workers.join_all();
+ }
+
+ private:
+ typedef detail::scheduled_executor_base<> super;
+ }; //end class
+
+} //end executors namespace
+
+using executors::scheduled_thread_pool;
+
+} //end boost
+#endif
+
diff --git a/boost/thread/executors/scheduler.hpp b/boost/thread/executors/scheduler.hpp
new file mode 100644
index 0000000000..5796a7d394
--- /dev/null
+++ b/boost/thread/executors/scheduler.hpp
@@ -0,0 +1,271 @@
+// Copyright (C) 2014 Vicente J. Botet Escriba
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_THREAD_EXECUTORS_SCHEDULER_HPP
+#define BOOST_THREAD_EXECUTORS_SCHEDULER_HPP
+
+#include <boost/thread/detail/config.hpp>
+#include <boost/thread/executors/detail/scheduled_executor_base.hpp>
+
+#include <boost/chrono/time_point.hpp>
+#include <boost/chrono/duration.hpp>
+#include <boost/chrono/system_clocks.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost
+{
+ namespace executors
+ {
+ /// Wraps the reference to an executor and a function to make a work that submit the function using the executor.
+ template <class Executor, class Function>
+ class resubmitter
+ {
+ public:
+ resubmitter(Executor& ex, Function funct) :
+ ex(ex),
+ funct(boost::move(funct))
+ {}
+
+ void operator()()
+ {
+ ex.submit(funct);
+ }
+
+ private:
+ Executor& ex;
+ Function funct;
+ };
+
+ /// resubmitter factory
+ template <class Executor, class Function>
+ resubmitter<Executor, typename decay<Function>::type>
+ resubmit(Executor& ex, BOOST_THREAD_FWD_REF(Function) funct) {
+ return resubmitter<Executor, typename decay<Function>::type >(ex, boost::move(funct));
+ }
+
+ /// Wraps references to a @c Scheduler and an @c Executor providing an @c Executor that
+ /// resubmit the function using the referenced Executor at a given @c time_point known at construction.
+ template <class Scheduler, class Executor>
+ class resubmit_at_executor
+ {
+ public:
+ typedef typename Scheduler::clock clock;
+ typedef typename Scheduler::work work;
+
+ template <class Duration>
+ resubmit_at_executor(Scheduler& sch, Executor& ex, chrono::time_point<clock, Duration> const& tp) :
+ sch(sch),
+ ex(ex),
+ tp(tp),
+ is_closed(false)
+ {
+ }
+
+ ~resubmit_at_executor()
+ {
+ close();
+ }
+
+ template <class Work>
+ void submit(BOOST_THREAD_FWD_REF(Work) w)
+ {
+ if (closed())
+ {
+ BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ }
+ sch.submit_at(resubmit(ex,boost::forward<Work>(w)), tp);
+ }
+
+ Executor& underlying_executor()
+ {
+ return ex;
+ }
+ Scheduler& underlying_scheduler()
+ {
+ return sch;
+ }
+
+ void close()
+ {
+ is_closed = true;
+ }
+
+ bool closed()
+ {
+ return is_closed || sch.closed() || ex.closed();
+ }
+
+ private:
+ Scheduler& sch;
+ Executor& ex;
+ typename clock::time_point tp;
+ bool is_closed;
+ };
+
+
+ /// Expression template helper storing a pair of references to an @c Scheduler and an @c Executor
+ /// It provides factory helper functions such as at/after that convert these a pair of @c Scheduler @c Executor
+ /// into an new @c Executor that submit the work using the referenced @c Executor at/after a specific time/duration
+ /// respectively, using the referenced @Scheduler.
+ template <class Scheduler, class Executor>
+ class scheduler_executor_wrapper
+ {
+ public:
+ typedef typename Scheduler::clock clock;
+ typedef typename Scheduler::work work;
+ typedef resubmit_at_executor<Scheduler, Executor> the_executor;
+
+ scheduler_executor_wrapper(Scheduler& sch, Executor& ex) :
+ sch(sch),
+ ex(ex)
+ {}
+
+ ~scheduler_executor_wrapper()
+ {
+ }
+
+ Executor& underlying_executor()
+ {
+ return ex;
+ }
+ Scheduler& underlying_scheduler()
+ {
+ return sch;
+ }
+
+ template <class Rep, class Period>
+ the_executor after(chrono::duration<Rep,Period> const& rel_time)
+ {
+ return at(clock::now() + rel_time );
+ }
+
+ template <class Duration>
+ the_executor at(chrono::time_point<clock,Duration> const& abs_time)
+ {
+ return the_executor(sch, ex, abs_time);
+ }
+
+ private:
+ Scheduler& sch;
+ Executor& ex;
+ }; //end class
+
+ /// Wraps a reference to a @c Scheduler providing an @c Executor that
+ /// run the function at a given @c time_point known at construction.
+ template <class Scheduler>
+ class at_executor
+ {
+ public:
+ typedef typename Scheduler::clock clock;
+ typedef typename Scheduler::work work;
+ typedef typename clock::time_point time_point;
+
+ template <class Duration>
+ at_executor(Scheduler& sch, chrono::time_point<clock,Duration> const& tp) :
+ sch(sch),
+ tp(tp),
+ is_closed(false)
+ {}
+
+ ~at_executor()
+ {
+ close();
+ }
+
+ Scheduler& underlying_scheduler()
+ {
+ return sch;
+ }
+
+ void close()
+ {
+ is_closed = true;
+ }
+
+ bool closed()
+ {
+ return is_closed || sch.closed();
+ }
+
+ template <class Work>
+ void submit(BOOST_THREAD_FWD_REF(Work) w)
+ {
+ if (closed())
+ {
+ BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ }
+ sch.submit_at(boost::forward<Work>(w), tp);
+ }
+
+ template <class Executor>
+ resubmit_at_executor<Scheduler, Executor> on(Executor& ex)
+ {
+ return resubmit_at_executor<Scheduler, Executor>(sch, ex, tp);
+ }
+
+ private:
+ Scheduler& sch;
+ time_point tp;
+ bool is_closed;
+ }; //end class
+
+ /// A @c Scheduler using a specific thread. Note that a Scheduler is not an Executor.
+ /// It provides factory helper functions such as at/after that convert a @c Scheduler into an @c Executor
+ /// that submit the work at/after a specific time/duration respectively.
+ template <class Clock = chrono::steady_clock>
+ class scheduler : public detail::scheduled_executor_base<Clock>
+ {
+ public:
+ typedef typename detail::scheduled_executor_base<Clock>::work work;
+
+ typedef Clock clock;
+
+ scheduler()
+ : super(),
+ thr(&super::loop, this) {}
+
+ ~scheduler()
+ {
+ this->close();
+ thr.join();
+ }
+ template <class Ex>
+ scheduler_executor_wrapper<scheduler, Ex> on(Ex& ex)
+ {
+ return scheduler_executor_wrapper<scheduler, Ex>(*this, ex);
+ }
+
+ template <class Rep, class Period>
+ at_executor<scheduler> after(chrono::duration<Rep,Period> const& rel_time)
+ {
+ return at(rel_time + clock::now());
+ }
+
+ template <class Duration>
+ at_executor<scheduler> at(chrono::time_point<clock,Duration> const& tp)
+ {
+ return at_executor<scheduler>(*this, tp);
+ }
+
+ private:
+ typedef detail::scheduled_executor_base<Clock> super;
+ thread thr;
+ };
+
+
+ }
+ using executors::resubmitter;
+ using executors::resubmit;
+ using executors::resubmit_at_executor;
+ using executors::scheduler_executor_wrapper;
+ using executors::at_executor;
+ using executors::scheduler;
+}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif
diff --git a/boost/thread/executors/scheduling_adaptor.hpp b/boost/thread/executors/scheduling_adaptor.hpp
new file mode 100644
index 0000000000..ac0a0acbce
--- /dev/null
+++ b/boost/thread/executors/scheduling_adaptor.hpp
@@ -0,0 +1,51 @@
+// Copyright (C) 2014 Ian Forbed
+// Copyright (C) 2014 Vicente J. Botet Escriba
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_THREAD_EXECUTORS_SCHEDULING_ADAPTOR_HPP
+#define BOOST_THREAD_EXECUTORS_SCHEDULING_ADAPTOR_HPP
+
+#include <boost/thread/executors/detail/scheduled_executor_base.hpp>
+
+namespace boost
+{
+namespace executors
+{
+
+ template <typename Executor>
+ class scheduling_adpator : public detail::scheduled_executor_base<>
+ {
+ private:
+ Executor& _exec;
+ thread _scheduler;
+ public:
+
+ scheduling_adpator(Executor& ex)
+ : super(),
+ _exec(ex),
+ _scheduler(&super::loop, this) {}
+
+ ~scheduling_adpator()
+ {
+ this->close();
+ _scheduler.join();
+ }
+
+ Executor& underlying_executor()
+ {
+ return _exec;
+ }
+
+ private:
+ typedef detail::scheduled_executor_base<> super;
+ }; //end class
+
+} //end executors
+
+ using executors::scheduling_adpator;
+
+} //end boost
+#endif
diff --git a/boost/thread/executors/serial_executor.hpp b/boost/thread/executors/serial_executor.hpp
index dae1014b47..6f4266668f 100644
--- a/boost/thread/executors/serial_executor.hpp
+++ b/boost/thread/executors/serial_executor.hpp
@@ -12,7 +12,7 @@
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
-#include <boost/thread/sync_queue.hpp>
+#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor_ref.hpp>
#include <boost/thread/future.hpp>
@@ -33,7 +33,7 @@ namespace executors
typedef scoped_thread<> thread_t;
/// the thread safe work queue
- sync_queue<work > work_queue;
+ concurrent::sync_queue<work > work_queue;
generic_executor_ref ex;
thread_t thr;
@@ -43,8 +43,13 @@ namespace executors
try_executing_one_task(work& task, boost::promise<void> &p)
: task(task), p(p) {}
void operator()() {
- task(); // if task() throws promise is not set but as the the program terminates and should terminate there is no need to use try-catch here.
- p.set_value();
+ try {
+ task();
+ p.set_value();
+ } catch (...)
+ {
+ p.set_exception(current_exception());
+ }
}
};
public:
@@ -52,7 +57,7 @@ namespace executors
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
- generic_executor_ref underlying_executor() BOOST_NOEXCEPT { return ex; }
+ generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; }
/**
* Effects: try to execute one task.
@@ -64,27 +69,19 @@ namespace executors
work task;
try
{
- if (work_queue.try_pull_front(task) == queue_op_status::success)
+ if (work_queue.try_pull(task) == queue_op_status::success)
{
boost::promise<void> p;
try_executing_one_task tmp(task,p);
ex.submit(tmp);
-// ex.submit([&task, &p]()
-// {
-// task(); // if task() throws promise is not set but as the the program terminates and should terminate there is no need to use try-catch here.
-// p.set_value();
-// });
p.get_future().wait();
return true;
}
return false;
}
- catch (std::exception& )
- {
- return false;
- }
catch (...)
{
+ std::terminate();
return false;
}
}
@@ -136,7 +133,7 @@ namespace executors
*/
~serial_executor()
{
- // signal to all the worker thread that there will be no more submissions.
+ // signal to the worker thread that there will be no more submissions.
close();
}
@@ -173,18 +170,18 @@ namespace executors
template <typename Closure>
void submit(Closure & closure)
{
- work_queue.push_back(work(closure));
+ work_queue.push(work(closure));
}
#endif
void submit(void (*closure)())
{
- work_queue.push_back(work(closure));
+ work_queue.push(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
- work_queue.push_back(work(boost::forward<Closure>(closure)));
+ work_queue.push(work(boost::forward<Closure>(closure)));
}
/**
diff --git a/boost/thread/executors/serial_executor_cont.hpp b/boost/thread/executors/serial_executor_cont.hpp
new file mode 100644
index 0000000000..1c4cc14aad
--- /dev/null
+++ b/boost/thread/executors/serial_executor_cont.hpp
@@ -0,0 +1,170 @@
+// Copyright (C) 2015 Vicente J. Botet Escriba
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// 2013/11 Vicente J. Botet Escriba
+// first implementation of a simple serial scheduler.
+
+#ifndef BOOST_THREAD_SERIAL_EXECUTOR_CONT_HPP
+#define BOOST_THREAD_SERIAL_EXECUTOR_CONT_HPP
+
+#include <boost/thread/detail/config.hpp>
+#include <boost/thread/detail/delete.hpp>
+#include <boost/thread/detail/move.hpp>
+#include <boost/thread/concurrent_queues/sync_queue.hpp>
+#include <boost/thread/executors/work.hpp>
+#include <boost/thread/executors/generic_executor_ref.hpp>
+#include <boost/thread/future.hpp>
+#include <boost/thread/scoped_thread.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost
+{
+namespace executors
+{
+ class serial_executor_cont
+ {
+ public:
+ /// type-erasure to store the works to do
+ typedef executors::work work;
+ private:
+
+ generic_executor_ref ex_;
+ future<void> fut_; // protected by mtx_
+ bool closed_; // protected by mtx_
+ mutex mtx_;
+
+ struct continuation {
+ work task;
+ template <class X>
+ struct result {
+ typedef void type;
+ };
+ continuation(BOOST_THREAD_RV_REF(work) tsk)
+ : task(boost::move(tsk)) {}
+ void operator()(future<void> f)
+ {
+ try {
+ task();
+ } catch (...) {
+ std::terminate();
+ }
+ }
+ };
+
+ bool closed(lock_guard<mutex>&) const
+ {
+ return closed_;
+ }
+ public:
+ /**
+ * \par Returns
+ * The underlying executor wrapped on a generic executor reference.
+ */
+ generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex_; }
+
+ /// serial_executor_cont is not copyable.
+ BOOST_THREAD_NO_COPYABLE(serial_executor_cont)
+
+ /**
+ * \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor.
+ *
+ * \b Throws: Whatever exception is thrown while initializing the needed resources.
+ *
+ * \b Notes:
+ * * The lifetime of the associated executor must outlive the serial executor.
+ * * The current implementation doesn't support submission from synchronous continuation, that is,
+ * - the executor must execute the continuation asynchronously or
+ * - the continuation can not submit to this serial executor.
+ */
+ template <class Executor>
+ serial_executor_cont(Executor& ex)
+ : ex_(ex), fut_(make_ready_future()), closed_(false)
+ {
+ }
+ /**
+ * \b Effects: Destroys the thread pool.
+ *
+ * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor_cont destructor.
+ */
+ ~serial_executor_cont()
+ {
+ // signal to the worker thread that there will be no more submissions.
+ close();
+ }
+
+ /**
+ * \b Effects: close the \c serial_executor_cont for submissions.
+ * The loop will work until there is no more closures to run.
+ */
+ void close()
+ {
+ lock_guard<mutex> lk(mtx_);
+ closed_ = true;;
+ }
+
+ /**
+ * \b Returns: whether the pool is closed for submissions.
+ */
+ bool closed()
+ {
+ lock_guard<mutex> lk(mtx_);
+ return closed(lk);
+ }
+
+ /**
+ * Effects: none.
+ * Returns: always false.
+ * Throws: No.
+ * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
+ */
+ bool try_executing_one()
+ {
+ return false;
+ }
+
+ /**
+ * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
+ *
+ * \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish.
+ * If the invoked closure throws an exception the \c serial_executor_cont will call \c std::terminate, as is the case with threads.
+ *
+ * \b Throws: \c sync_queue_is_closed if the executor is closed.
+ * Whatever exception that can be throw while storing the closure.
+ *
+ */
+
+#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
+ template <typename Closure>
+ void submit(Closure & closure)
+ {
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ fut_ = fut_.then(ex_, continuation(work(closure)));
+ }
+#endif
+ void submit(void (*closure)())
+ {
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ fut_ = fut_.then(ex_, continuation(work(closure)));
+ }
+
+ template <typename Closure>
+ void submit(BOOST_THREAD_RV_REF(Closure) closure)
+ {
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ fut_ = fut_.then(ex_, continuation(work(boost::forward<Closure>(closure))));
+ }
+
+ };
+}
+using executors::serial_executor_cont;
+}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif
diff --git a/boost/thread/executors/thread_executor.hpp b/boost/thread/executors/thread_executor.hpp
index 9fc3362fde..a8cd5c212f 100644
--- a/boost/thread/executors/thread_executor.hpp
+++ b/boost/thread/executors/thread_executor.hpp
@@ -15,6 +15,8 @@
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/thread_only.hpp>
+#include <boost/thread/scoped_thread.hpp>
+#include <boost/thread/csbl/vector.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -28,6 +30,11 @@ namespace executors
/// type-erasure to store the works to do
typedef executors::work work;
bool closed_;
+ typedef scoped_thread<> thread_t;
+ typedef csbl::vector<thread_t> threads_type;
+ threads_type threads_;
+ mutable mutex mtx_;
+
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
@@ -52,7 +59,7 @@ namespace executors
{
}
/**
- * \b Effects: Destroys the inline executor.
+ * \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor.
*/
@@ -60,6 +67,7 @@ namespace executors
{
// signal to all the worker thread that there will be no more submissions.
close();
+ // all the scoped threads will join before destroying
}
/**
@@ -68,16 +76,22 @@ namespace executors
*/
void close()
{
+ lock_guard<mutex> lk(mtx_);
closed_ = true;
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
- bool closed()
+ bool closed(lock_guard<mutex>& )
{
return closed_;
}
+ bool closed()
+ {
+ lock_guard<mutex> lk(mtx_);
+ return closed(lk);
+ }
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
@@ -95,24 +109,30 @@ namespace executors
template <typename Closure>
void submit(Closure & closure)
{
- if (closed()) return;
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ threads_.reserve(threads_.size() + 1);
thread th(closure);
- th.detach();
+ threads_.push_back(thread_t(boost::move(th)));
}
#endif
void submit(void (*closure)())
{
- if (closed()) return;
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ threads_.reserve(threads_.size() + 1);
thread th(closure);
- th.detach();
+ threads_.push_back(thread_t(boost::move(th)));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
- if (closed()) return;
+ lock_guard<mutex> lk(mtx_);
+ if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
+ threads_.reserve(threads_.size() + 1);
thread th(boost::forward<Closure>(closure));
- th.detach();
+ threads_.push_back(thread_t(boost::move(th)));
}
/**
diff --git a/boost/thread/executors/work.hpp b/boost/thread/executors/work.hpp
index df1512cd95..bdaf7651b9 100644
--- a/boost/thread/executors/work.hpp
+++ b/boost/thread/executors/work.hpp
@@ -8,36 +8,23 @@
#define BOOST_THREAD_EXECUTORS_WORK_HPP
#include <boost/thread/detail/config.hpp>
-
-#if ! defined BOOST_THREAD_EXECUTORS_WORK_ACCEPTS_MOVABLE \
- && ! defined BOOST_THREAD_EXECUTORS_WORK_DONT_ACCEPT_MOVABLE
-#define BOOST_THREAD_EXECUTORS_WORK_ACCEPTS_MOVABLE
-//#define BOOST_THREAD_EXECUTORS_WORK_DONT_ACCEPT_MOVABLE
-#endif
-
-#if defined BOOST_THREAD_EXECUTORS_WORK_ACCEPTS_MOVABLE
-
#include <boost/thread/detail/nullary_function.hpp>
+#include <boost/thread/csbl/functional.hpp>
namespace boost
{
namespace executors
{
typedef detail::nullary_function<void()> work;
- }
-} // namespace boost
+#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
+ typedef detail::nullary_function<void()> work_pq;
+ //typedef csbl::function<void()> work_pq;
#else
-#include <boost/thread/csbl/functional.hpp>
-
-namespace boost
-{
- namespace executors
- {
- typedef csbl::function<void()> work;
+ typedef csbl::function<void()> work_pq;
+#endif
}
} // namespace boost
-#endif
#endif // BOOST_THREAD_EXECUTORS_WORK_HPP