summaryrefslogtreecommitdiff
path: root/boost/asio/experimental/impl/co_spawn.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/asio/experimental/impl/co_spawn.hpp')
-rw-r--r--boost/asio/experimental/impl/co_spawn.hpp878
1 files changed, 878 insertions, 0 deletions
diff --git a/boost/asio/experimental/impl/co_spawn.hpp b/boost/asio/experimental/impl/co_spawn.hpp
new file mode 100644
index 0000000000..51ffb4eeb1
--- /dev/null
+++ b/boost/asio/experimental/impl/co_spawn.hpp
@@ -0,0 +1,878 @@
+//
+// experimental/impl/co_spawn.hpp
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+//
+// Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP
+#define BOOST_ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+# pragma once
+#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
+
+#include <boost/asio/detail/config.hpp>
+#include <exception>
+#include <functional>
+#include <memory>
+#include <new>
+#include <tuple>
+#include <utility>
+#include <boost/asio/async_result.hpp>
+#include <boost/asio/detail/thread_context.hpp>
+#include <boost/asio/detail/thread_info_base.hpp>
+#include <boost/asio/detail/type_traits.hpp>
+#include <boost/asio/dispatch.hpp>
+#include <boost/asio/post.hpp>
+
+#include <boost/asio/detail/push_options.hpp>
+
+namespace boost {
+namespace asio {
+namespace experimental {
+namespace detail {
+
+// Promise object for coroutine at top of thread-of-execution "stack".
+template <typename Executor>
+class awaiter
+{
+public:
+ struct deleter
+ {
+ void operator()(awaiter* a)
+ {
+ if (a)
+ a->release();
+ }
+ };
+
+ typedef std::unique_ptr<awaiter, deleter> ptr;
+
+ typedef Executor executor_type;
+
+ ~awaiter()
+ {
+ if (has_executor_)
+ static_cast<Executor*>(static_cast<void*>(executor_))->~Executor();
+ }
+
+ void set_executor(const Executor& ex)
+ {
+ new (&executor_) Executor(ex);
+ has_executor_ = true;
+ }
+
+ executor_type get_executor() const noexcept
+ {
+ return *static_cast<const Executor*>(static_cast<const void*>(executor_));
+ }
+
+ awaiter* get_return_object()
+ {
+ return this;
+ }
+
+ auto initial_suspend()
+ {
+ return std::experimental::suspend_always();
+ }
+
+ auto final_suspend()
+ {
+ return std::experimental::suspend_always();
+ }
+
+ void return_void()
+ {
+ }
+
+ awaiter* add_ref()
+ {
+ ++ref_count_;
+ return this;
+ }
+
+ void release()
+ {
+ if (--ref_count_ == 0)
+ coroutine_handle<awaiter>::from_promise(*this).destroy();
+ }
+
+ void unhandled_exception()
+ {
+ pending_exception_ = std::current_exception();
+ }
+
+ void rethrow_unhandled_exception()
+ {
+ if (pending_exception_)
+ {
+ std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
+ std::rethrow_exception(ex);
+ }
+ }
+
+private:
+ std::size_t ref_count_ = 0;
+ std::exception_ptr pending_exception_ = nullptr;
+ alignas(Executor) unsigned char executor_[sizeof(Executor)];
+ bool has_executor_ = false;
+};
+
+// Base promise for coroutines further down the thread-of-execution "stack".
+template <typename Executor>
+class awaitee_base
+{
+public:
+#if !defined(BOOST_ASIO_DISABLE_AWAITEE_RECYCLING)
+ void* operator new(std::size_t size)
+ {
+ return boost::asio::detail::thread_info_base::allocate(
+ boost::asio::detail::thread_info_base::awaitee_tag(),
+ boost::asio::detail::thread_context::thread_call_stack::top(),
+ size);
+ }
+
+ void operator delete(void* pointer, std::size_t size)
+ {
+ boost::asio::detail::thread_info_base::deallocate(
+ boost::asio::detail::thread_info_base::awaitee_tag(),
+ boost::asio::detail::thread_context::thread_call_stack::top(),
+ pointer, size);
+ }
+#endif // !defined(BOOST_ASIO_DISABLE_AWAITEE_RECYCLING)
+
+ auto initial_suspend()
+ {
+ return std::experimental::suspend_never();
+ }
+
+ struct final_suspender
+ {
+ awaitee_base* this_;
+
+ bool await_ready() const noexcept
+ {
+ return false;
+ }
+
+ void await_suspend(coroutine_handle<void>)
+ {
+ this_->wake_caller();
+ }
+
+ void await_resume() const noexcept
+ {
+ }
+ };
+
+ auto final_suspend()
+ {
+ return final_suspender{this};
+ }
+
+ void set_except(std::exception_ptr e)
+ {
+ pending_exception_ = e;
+ }
+
+ void unhandled_exception()
+ {
+ set_except(std::current_exception());
+ }
+
+ void rethrow_exception()
+ {
+ if (pending_exception_)
+ {
+ std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
+ std::rethrow_exception(ex);
+ }
+ }
+
+ awaiter<Executor>* top()
+ {
+ return awaiter_;
+ }
+
+ coroutine_handle<void> caller()
+ {
+ return caller_;
+ }
+
+ bool ready() const
+ {
+ return ready_;
+ }
+
+ void wake_caller()
+ {
+ if (caller_)
+ caller_.resume();
+ else
+ ready_ = true;
+ }
+
+ class awaitable_executor
+ {
+ public:
+ explicit awaitable_executor(awaitee_base* a)
+ : this_(a)
+ {
+ }
+
+ bool await_ready() const noexcept
+ {
+ return this_->awaiter_ != nullptr;
+ }
+
+ template <typename U, typename Ex>
+ void await_suspend(coroutine_handle<detail::awaitee<U, Ex>> h) noexcept
+ {
+ this_->resume_on_attach_ = h;
+ }
+
+ Executor await_resume()
+ {
+ return this_->awaiter_->get_executor();
+ }
+
+ private:
+ awaitee_base* this_;
+ };
+
+ awaitable_executor await_transform(this_coro::executor_t) noexcept
+ {
+ return awaitable_executor(this);
+ }
+
+ class awaitable_token
+ {
+ public:
+ explicit awaitable_token(awaitee_base* a)
+ : this_(a)
+ {
+ }
+
+ bool await_ready() const noexcept
+ {
+ return this_->awaiter_ != nullptr;
+ }
+
+ template <typename U, typename Ex>
+ void await_suspend(coroutine_handle<detail::awaitee<U, Ex>> h) noexcept
+ {
+ this_->resume_on_attach_ = h;
+ }
+
+ await_token<Executor> await_resume()
+ {
+ return await_token<Executor>(this_->awaiter_);
+ }
+
+ private:
+ awaitee_base* this_;
+ };
+
+ awaitable_token await_transform(this_coro::token_t) noexcept
+ {
+ return awaitable_token(this);
+ }
+
+ template <typename T>
+ awaitable<T, Executor> await_transform(awaitable<T, Executor>& t) const
+ {
+ return std::move(t);
+ }
+
+ template <typename T>
+ awaitable<T, Executor> await_transform(awaitable<T, Executor>&& t) const
+ {
+ return std::move(t);
+ }
+
+ std::experimental::suspend_always await_transform(
+ std::experimental::suspend_always) const
+ {
+ return std::experimental::suspend_always();
+ }
+
+ void attach_caller(coroutine_handle<awaiter<Executor>> h)
+ {
+ this->caller_ = h;
+ this->attach_callees(&h.promise());
+ }
+
+ template <typename U>
+ void attach_caller(coroutine_handle<awaitee<U, Executor>> h)
+ {
+ this->caller_ = h;
+ if (h.promise().awaiter_)
+ this->attach_callees(h.promise().awaiter_);
+ else
+ h.promise().unattached_callee_ = this;
+ }
+
+ void attach_callees(awaiter<Executor>* a)
+ {
+ for (awaitee_base* curr = this; curr != nullptr;
+ curr = std::exchange(curr->unattached_callee_, nullptr))
+ {
+ curr->awaiter_ = a;
+ if (curr->resume_on_attach_)
+ return std::exchange(curr->resume_on_attach_, nullptr).resume();
+ }
+ }
+
+protected:
+ awaiter<Executor>* awaiter_ = nullptr;
+ coroutine_handle<void> caller_ = nullptr;
+ awaitee_base<Executor>* unattached_callee_ = nullptr;
+ std::exception_ptr pending_exception_ = nullptr;
+ coroutine_handle<void> resume_on_attach_ = nullptr;
+ bool ready_ = false;
+};
+
+// Promise object for coroutines further down the thread-of-execution "stack".
+template <typename T, typename Executor>
+class awaitee
+ : public awaitee_base<Executor>
+{
+public:
+ awaitee()
+ {
+ }
+
+ awaitee(awaitee&& other) noexcept
+ : awaitee_base<Executor>(std::move(other))
+ {
+ }
+
+ ~awaitee()
+ {
+ if (has_result_)
+ static_cast<T*>(static_cast<void*>(result_))->~T();
+ }
+
+ awaitable<T, Executor> get_return_object()
+ {
+ return awaitable<T, Executor>(this);
+ };
+
+ template <typename U>
+ void return_value(U&& u)
+ {
+ new (&result_) T(std::forward<U>(u));
+ has_result_ = true;
+ }
+
+ T get()
+ {
+ this->caller_ = nullptr;
+ this->rethrow_exception();
+ return std::move(*static_cast<T*>(static_cast<void*>(result_)));
+ }
+
+private:
+ alignas(T) unsigned char result_[sizeof(T)];
+ bool has_result_ = false;
+};
+
+// Promise object for coroutines further down the thread-of-execution "stack".
+template <typename Executor>
+class awaitee<void, Executor>
+ : public awaitee_base<Executor>
+{
+public:
+ awaitable<void, Executor> get_return_object()
+ {
+ return awaitable<void, Executor>(this);
+ };
+
+ void return_void()
+ {
+ }
+
+ void get()
+ {
+ this->caller_ = nullptr;
+ this->rethrow_exception();
+ }
+};
+
+template <typename Executor>
+class awaiter_task
+{
+public:
+ typedef Executor executor_type;
+
+ awaiter_task(awaiter<Executor>* a)
+ : awaiter_(a->add_ref())
+ {
+ }
+
+ awaiter_task(awaiter_task&& other) noexcept
+ : awaiter_(std::exchange(other.awaiter_, nullptr))
+ {
+ }
+
+ ~awaiter_task()
+ {
+ if (awaiter_)
+ {
+ // Coroutine "stack unwinding" must be performed through the executor.
+ executor_type ex(awaiter_->get_executor());
+ (post)(ex,
+ [a = std::move(awaiter_)]() mutable
+ {
+ typename awaiter<Executor>::ptr(std::move(a));
+ });
+ }
+ }
+
+ executor_type get_executor() const noexcept
+ {
+ return awaiter_->get_executor();
+ }
+
+protected:
+ typename awaiter<Executor>::ptr awaiter_;
+};
+
+template <typename Executor>
+class co_spawn_handler : public awaiter_task<Executor>
+{
+public:
+ using awaiter_task<Executor>::awaiter_task;
+
+ void operator()()
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ coroutine_handle<awaiter<Executor>>::from_promise(*ptr.get()).resume();
+ }
+};
+
+template <typename Executor, typename T>
+class await_handler_base : public awaiter_task<Executor>
+{
+public:
+ typedef awaitable<T, Executor> awaitable_type;
+
+ await_handler_base(await_token<Executor> token)
+ : awaiter_task<Executor>(token.awaiter_),
+ awaitee_(nullptr)
+ {
+ }
+
+ await_handler_base(await_handler_base&& other) noexcept
+ : awaiter_task<Executor>(std::move(other)),
+ awaitee_(std::exchange(other.awaitee_, nullptr))
+ {
+ }
+
+ void attach_awaitee(const awaitable<T, Executor>& a)
+ {
+ awaitee_ = a.awaitee_;
+ }
+
+protected:
+ awaitee<T, Executor>* awaitee_;
+};
+
+template <typename, typename...> class await_handler;
+
+template <typename Executor>
+class await_handler<Executor, void>
+ : public await_handler_base<Executor, void>
+{
+public:
+ using await_handler_base<Executor, void>::await_handler_base;
+
+ void operator()()
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ this->awaitee_->return_void();
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename Executor>
+class await_handler<Executor, boost::system::error_code>
+ : public await_handler_base<Executor, void>
+{
+public:
+ typedef void return_type;
+
+ using await_handler_base<Executor, void>::await_handler_base;
+
+ void operator()(const boost::system::error_code& ec)
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ if (ec)
+ {
+ this->awaitee_->set_except(
+ std::make_exception_ptr(boost::system::system_error(ec)));
+ }
+ else
+ this->awaitee_->return_void();
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename Executor>
+class await_handler<Executor, std::exception_ptr>
+ : public await_handler_base<Executor, void>
+{
+public:
+ using await_handler_base<Executor, void>::await_handler_base;
+
+ void operator()(std::exception_ptr ex)
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ if (ex)
+ this->awaitee_->set_except(ex);
+ else
+ this->awaitee_->return_void();
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename Executor, typename T>
+class await_handler<Executor, T>
+ : public await_handler_base<Executor, T>
+{
+public:
+ using await_handler_base<Executor, T>::await_handler_base;
+
+ template <typename Arg>
+ void operator()(Arg&& arg)
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ this->awaitee_->return_value(std::forward<Arg>(arg));
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename Executor, typename T>
+class await_handler<Executor, boost::system::error_code, T>
+ : public await_handler_base<Executor, T>
+{
+public:
+ using await_handler_base<Executor, T>::await_handler_base;
+
+ template <typename Arg>
+ void operator()(const boost::system::error_code& ec, Arg&& arg)
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ if (ec)
+ {
+ this->awaitee_->set_except(
+ std::make_exception_ptr(boost::system::system_error(ec)));
+ }
+ else
+ this->awaitee_->return_value(std::forward<Arg>(arg));
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename Executor, typename T>
+class await_handler<Executor, std::exception_ptr, T>
+ : public await_handler_base<Executor, T>
+{
+public:
+ using await_handler_base<Executor, T>::await_handler_base;
+
+ template <typename Arg>
+ void operator()(std::exception_ptr ex, Arg&& arg)
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ if (ex)
+ this->awaitee_->set_except(ex);
+ else
+ this->awaitee_->return_value(std::forward<Arg>(arg));
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename Executor, typename... Ts>
+class await_handler
+ : public await_handler_base<Executor, std::tuple<Ts...>>
+{
+public:
+ using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base;
+
+ template <typename... Args>
+ void operator()(Args&&... args)
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ this->awaitee_->return_value(
+ std::forward_as_tuple(std::forward<Args>(args)...));
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename Executor, typename... Ts>
+class await_handler<Executor, boost::system::error_code, Ts...>
+ : public await_handler_base<Executor, std::tuple<Ts...>>
+{
+public:
+ using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base;
+
+ template <typename... Args>
+ void operator()(const boost::system::error_code& ec, Args&&... args)
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ if (ec)
+ {
+ this->awaitee_->set_except(
+ std::make_exception_ptr(boost::system::system_error(ec)));
+ }
+ else
+ {
+ this->awaitee_->return_value(
+ std::forward_as_tuple(std::forward<Args>(args)...));
+ }
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename Executor, typename... Ts>
+class await_handler<Executor, std::exception_ptr, Ts...>
+ : public await_handler_base<Executor, std::tuple<Ts...>>
+{
+public:
+ using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base;
+
+ template <typename... Args>
+ void operator()(std::exception_ptr ex, Args&&... args)
+ {
+ typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
+ if (ex)
+ this->awaitee_->set_except(ex);
+ else
+ {
+ this->awaitee_->return_value(
+ std::forward_as_tuple(std::forward<Args>(args)...));
+ }
+ this->awaitee_->wake_caller();
+ ptr->rethrow_unhandled_exception();
+ }
+};
+
+template <typename T>
+struct awaitable_signature;
+
+template <typename T, typename Executor>
+struct awaitable_signature<awaitable<T, Executor>>
+{
+ typedef void type(std::exception_ptr, T);
+};
+
+template <typename Executor>
+struct awaitable_signature<awaitable<void, Executor>>
+{
+ typedef void type(std::exception_ptr);
+};
+
+template <typename T, typename Executor, typename F, typename Handler>
+awaiter<Executor>* co_spawn_entry_point(awaitable<T, Executor>*,
+ executor_work_guard<Executor> work_guard, F f, Handler handler)
+{
+ bool done = false;
+
+ try
+ {
+ T t = co_await f();
+
+ done = true;
+
+ (dispatch)(work_guard.get_executor(),
+ [handler = std::move(handler), t = std::move(t)]() mutable
+ {
+ handler(std::exception_ptr(), std::move(t));
+ });
+ }
+ catch (...)
+ {
+ if (done)
+ throw;
+
+ (dispatch)(work_guard.get_executor(),
+ [handler = std::move(handler), e = std::current_exception()]() mutable
+ {
+ handler(e, T());
+ });
+ }
+}
+
+template <typename Executor, typename F, typename Handler>
+awaiter<Executor>* co_spawn_entry_point(awaitable<void, Executor>*,
+ executor_work_guard<Executor> work_guard, F f, Handler handler)
+{
+ std::exception_ptr e = nullptr;
+
+ try
+ {
+ co_await f();
+ }
+ catch (...)
+ {
+ e = std::current_exception();
+ }
+
+ (dispatch)(work_guard.get_executor(),
+ [handler = std::move(handler), e]() mutable
+ {
+ handler(e);
+ });
+}
+
+template <typename Executor, typename F, typename CompletionToken>
+auto co_spawn(const Executor& ex, F&& f, CompletionToken&& token)
+{
+ typedef typename result_of<F()>::type awaitable_type;
+ typedef typename awaitable_type::executor_type executor_type;
+ typedef typename awaitable_signature<awaitable_type>::type signature_type;
+
+ async_completion<CompletionToken, signature_type> completion(token);
+
+ executor_type ex2(ex);
+ auto work_guard = make_work_guard(completion.completion_handler, ex2);
+
+ auto* a = (co_spawn_entry_point)(
+ static_cast<awaitable_type*>(nullptr), std::move(work_guard),
+ std::forward<F>(f), std::move(completion.completion_handler));
+
+ a->set_executor(ex2);
+ (post)(co_spawn_handler<executor_type>(a));
+
+ return completion.result.get();
+}
+
+#if defined(_MSC_VER)
+# pragma warning(push)
+# pragma warning(disable:4033)
+#endif // defined(_MSC_VER)
+
+#if defined(_MSC_VER)
+template <typename T> T dummy_return()
+{
+ return std::move(*static_cast<T*>(nullptr));
+}
+
+template <>
+inline void dummy_return()
+{
+}
+#endif // defined(_MSC_VER)
+
+template <typename Awaitable>
+inline Awaitable make_dummy_awaitable()
+{
+ for (;;) co_await std::experimental::suspend_always();
+#if defined(_MSC_VER)
+ co_return dummy_return<typename Awaitable::value_type>();
+#endif // defined(_MSC_VER)
+}
+
+#if defined(_MSC_VER)
+# pragma warning(pop)
+#endif // defined(_MSC_VER)
+
+} // namespace detail
+} // namespace experimental
+
+template <typename Executor, typename R, typename... Args>
+class async_result<experimental::await_token<Executor>, R(Args...)>
+{
+public:
+ typedef experimental::detail::await_handler<
+ Executor, typename decay<Args>::type...> completion_handler_type;
+
+ typedef typename experimental::detail::await_handler<
+ Executor, Args...>::awaitable_type return_type;
+
+ async_result(completion_handler_type& h)
+ : awaitable_(experimental::detail::make_dummy_awaitable<return_type>())
+ {
+ h.attach_awaitee(awaitable_);
+ }
+
+ return_type get()
+ {
+ return std::move(awaitable_);
+ }
+
+private:
+ return_type awaitable_;
+};
+
+#if !defined(BOOST_ASIO_NO_DEPRECATED)
+
+template <typename Executor, typename R, typename... Args>
+struct handler_type<experimental::await_token<Executor>, R(Args...)>
+{
+ typedef experimental::detail::await_handler<
+ Executor, typename decay<Args>::type...> type;
+};
+
+template <typename Executor, typename... Args>
+class async_result<experimental::detail::await_handler<Executor, Args...>>
+{
+public:
+ typedef typename experimental::detail::await_handler<
+ Executor, Args...>::awaitable_type type;
+
+ async_result(experimental::detail::await_handler<Executor, Args...>& h)
+ : awaitable_(experimental::detail::make_dummy_awaitable<type>())
+ {
+ h.attach_awaitee(awaitable_);
+ }
+
+ type get()
+ {
+ return std::move(awaitable_);
+ }
+
+private:
+ type awaitable_;
+};
+
+#endif // !defined(BOOST_ASIO_NO_DEPRECATED)
+
+} // namespace asio
+} // namespace boost
+
+namespace std { namespace experimental {
+
+template <typename Executor, typename... Args>
+struct coroutine_traits<
+ boost::asio::experimental::detail::awaiter<Executor>*, Args...>
+{
+ typedef boost::asio::experimental::detail::awaiter<Executor> promise_type;
+};
+
+template <typename T, typename Executor, typename... Args>
+struct coroutine_traits<
+ boost::asio::experimental::awaitable<T, Executor>, Args...>
+{
+ typedef boost::asio::experimental::detail::awaitee<T, Executor> promise_type;
+};
+
+}} // namespace std::experimental
+
+#include <boost/asio/detail/pop_options.hpp>
+
+#endif // BOOST_ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP