diff options
Diffstat (limited to 'boost/fiber/bounded_channel.hpp')
-rw-r--r-- | boost/fiber/bounded_channel.hpp | 431 |
1 files changed, 431 insertions, 0 deletions
diff --git a/boost/fiber/bounded_channel.hpp b/boost/fiber/bounded_channel.hpp new file mode 100644 index 0000000000..716080bd2d --- /dev/null +++ b/boost/fiber/bounded_channel.hpp @@ -0,0 +1,431 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H +#define BOOST_FIBERS_BOUNDED_CHANNEL_H + +#include <algorithm> +#include <atomic> +#include <chrono> +#include <cstddef> +#include <memory> +#include <mutex> +#include <system_error> +#include <utility> + +#include <boost/config.hpp> +#include <boost/intrusive_ptr.hpp> + +#include <boost/fiber/detail/config.hpp> +#include <boost/fiber/exceptions.hpp> +#include <boost/fiber/exceptions.hpp> +#include <boost/fiber/condition_variable.hpp> +#include <boost/fiber/mutex.hpp> +#include <boost/fiber/channel_op_status.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { + +template< typename T, + typename Allocator = std::allocator< T > +> +class bounded_channel { +public: + typedef T value_type; + +private: + struct node { + typedef intrusive_ptr< node > ptr_t; + typedef typename std::allocator_traits< Allocator >::template rebind_alloc< + node + > allocator_t; + typedef std::allocator_traits< allocator_t > allocator_traits_t; + +#if ! defined(BOOST_FIBERS_NO_ATOMICS) + std::atomic< std::size_t > use_count{ 0 }; +#else + std::size_t use_count{ 0 }; +#endif + allocator_t alloc; + T va; + ptr_t nxt{}; + + node( T const& t, allocator_t const& alloc_) noexcept : + alloc{ alloc_ }, + va{ t } { + } + + node( T && t, allocator_t & alloc_) noexcept : + alloc{ alloc_ }, + va{ std::move( t) } { + } + + friend + void intrusive_ptr_add_ref( node * p) noexcept { + ++p->use_count; + } + + friend + void intrusive_ptr_release( node * p) noexcept { + if ( 0 == --p->use_count) { + allocator_t alloc( p->alloc); + allocator_traits_t::destroy( alloc, p); + allocator_traits_t::deallocate( alloc, p, 1); + } + } + }; + + using ptr_t = typename node::ptr_t; + using allocator_t = typename node::allocator_t; + using allocator_traits_t = typename node::allocator_traits_t; + + enum class queue_status { + open = 0, + closed + }; + + allocator_t alloc_; + queue_status state_{ queue_status::open }; + std::size_t count_{ 0 }; + ptr_t head_{}; + ptr_t * tail_; + mutable mutex mtx_{}; + condition_variable not_empty_cond_{}; + condition_variable not_full_cond_{}; + std::size_t hwm_; + std::size_t lwm_; + + bool is_closed_() const noexcept { + return queue_status::closed == state_; + } + + void close_( std::unique_lock< boost::fibers::mutex > & lk) noexcept { + state_ = queue_status::closed; + lk.unlock(); + not_empty_cond_.notify_all(); + not_full_cond_.notify_all(); + } + + std::size_t size_() const noexcept { + return count_; + } + + bool is_empty_() const noexcept { + return ! head_; + } + + bool is_full_() const noexcept { + return count_ >= hwm_; + } + + channel_op_status push_( ptr_t new_node, + std::unique_lock< boost::fibers::mutex > & lk) { + if ( is_closed_() ) { + return channel_op_status::closed; + } + not_full_cond_.wait( lk, + [this](){ + return ! is_full_(); + }); + return push_and_notify_( new_node, lk); + } + + channel_op_status try_push_( ptr_t new_node, + std::unique_lock< boost::fibers::mutex > & lk) noexcept { + if ( is_closed_() ) { + return channel_op_status::closed; + } + if ( is_full_() ) { + return channel_op_status::full; + } + return push_and_notify_( new_node, lk); + } + + template< typename Clock, typename Duration > + channel_op_status push_wait_until_( ptr_t new_node, + std::chrono::time_point< Clock, Duration > const& timeout_time, + std::unique_lock< boost::fibers::mutex > & lk) { + if ( is_closed_() ) { + return channel_op_status::closed; + } + if ( ! not_full_cond_.wait_until( lk, timeout_time, + [this](){ + return ! is_full_(); + })) { + return channel_op_status::timeout; + } + return push_and_notify_( new_node, lk); + } + + channel_op_status push_and_notify_( ptr_t new_node, + std::unique_lock< boost::fibers::mutex > & lk) noexcept { + push_tail_( new_node); + lk.unlock(); + not_empty_cond_.notify_one(); + return channel_op_status::success; + } + + void push_tail_( ptr_t new_node) noexcept { + * tail_ = new_node; + tail_ = & new_node->nxt; + ++count_; + } + + value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) { + BOOST_ASSERT( ! is_empty_() ); + auto old_head = pop_head_(); + if ( size_() <= lwm_) { + if ( lwm_ == hwm_) { + lk.unlock(); + not_full_cond_.notify_one(); + } else { + lk.unlock(); + // more than one producer could be waiting + // to push a value + not_full_cond_.notify_all(); + } + } + return std::move( old_head->va); + } + + ptr_t pop_head_() noexcept { + auto old_head = head_; + head_ = old_head->nxt; + if ( ! head_) { + tail_ = & head_; + } + old_head->nxt.reset(); + --count_; + return old_head; + } + +public: + bounded_channel( std::size_t hwm, std::size_t lwm, + Allocator const& alloc = Allocator() ) : + alloc_{ alloc }, + tail_{ & head_ }, + hwm_{ hwm }, + lwm_{ lwm } { + if ( hwm_ <= lwm_) { + throw fiber_error( std::make_error_code( std::errc::invalid_argument), + "boost fiber: high-watermark is less than or equal to low-watermark for bounded_channel"); + } + if ( 0 == hwm) { + throw fiber_error( std::make_error_code( std::errc::invalid_argument), + "boost fiber: high-watermark is zero"); + } + } + + bounded_channel( std::size_t wm, + Allocator const& alloc = Allocator() ) : + alloc_{ alloc }, + tail_{ & head_ }, + hwm_{ wm }, + lwm_{ wm - 1 } { + if ( 0 == wm) { + throw fiber_error( std::make_error_code( std::errc::invalid_argument), + "boost fiber: watermark is zero"); + } + } + + bounded_channel( bounded_channel const&) = delete; + bounded_channel & operator=( bounded_channel const&) = delete; + + std::size_t upper_bound() const noexcept { + return hwm_; + } + + std::size_t lower_bound() const noexcept { + return lwm_; + } + + void close() noexcept { + std::unique_lock< mutex > lk( mtx_); + close_( lk); + } + + channel_op_status push( value_type const& va) { + typename allocator_traits_t::pointer ptr{ + allocator_traits_t::allocate( alloc_, 1) }; + try { + allocator_traits_t::construct( alloc_, ptr, va, alloc_); + } catch (...) { + allocator_traits_t::deallocate( alloc_, ptr, 1); + throw; + } + std::unique_lock< mutex > lk( mtx_); + return push_( { detail::convert( ptr) }, lk); + } + + channel_op_status push( value_type && va) { + typename allocator_traits_t::pointer ptr{ + allocator_traits_t::allocate( alloc_, 1) }; + try { + allocator_traits_t::construct( + alloc_, ptr, std::move( va), alloc_); + } catch (...) { + allocator_traits_t::deallocate( alloc_, ptr, 1); + throw; + } + std::unique_lock< mutex > lk( mtx_); + return push_( { detail::convert( ptr) }, lk); + } + + template< typename Rep, typename Period > + channel_op_status push_wait_for( value_type const& va, + std::chrono::duration< Rep, Period > const& timeout_duration) { + return push_wait_until( va, + std::chrono::steady_clock::now() + timeout_duration); + } + + template< typename Rep, typename Period > + channel_op_status push_wait_for( value_type && va, + std::chrono::duration< Rep, Period > const& timeout_duration) { + return push_wait_until( std::forward< value_type >( va), + std::chrono::steady_clock::now() + timeout_duration); + } + + template< typename Clock, typename Duration > + channel_op_status push_wait_until( value_type const& va, + std::chrono::time_point< Clock, Duration > const& timeout_time) { + typename allocator_traits_t::pointer ptr{ + allocator_traits_t::allocate( alloc_, 1) }; + try { + allocator_traits_t::construct( alloc_, ptr, va, alloc_); + } catch (...) { + allocator_traits_t::deallocate( alloc_, ptr, 1); + throw; + } + std::unique_lock< mutex > lk( mtx_); + return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk); + } + + template< typename Clock, typename Duration > + channel_op_status push_wait_until( value_type && va, + std::chrono::time_point< Clock, Duration > const& timeout_time) { + typename allocator_traits_t::pointer ptr{ + allocator_traits_t::allocate( alloc_, 1) }; + try { + allocator_traits_t::construct( + alloc_, ptr, std::move( va), alloc_); + } catch (...) { + allocator_traits_t::deallocate( alloc_, ptr, 1); + throw; + } + std::unique_lock< mutex > lk( mtx_); + return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk); + } + + channel_op_status try_push( value_type const& va) { + typename allocator_traits_t::pointer ptr{ + allocator_traits_t::allocate( alloc_, 1) }; + try { + allocator_traits_t::construct( alloc_, ptr, va, alloc_); + } catch (...) { + allocator_traits_t::deallocate( alloc_, ptr, 1); + throw; + } + std::unique_lock< mutex > lk( mtx_); + return try_push_( { detail::convert( ptr) }, lk); + } + + channel_op_status try_push( value_type && va) { + typename allocator_traits_t::pointer ptr{ + allocator_traits_t::allocate( alloc_, 1) }; + try { + allocator_traits_t::construct( + alloc_, ptr, std::move( va), alloc_); + } catch (...) { + allocator_traits_t::deallocate( alloc_, ptr, 1); + throw; + } + std::unique_lock< mutex > lk( mtx_); + return try_push_( { detail::convert( ptr) }, lk); + } + + channel_op_status pop( value_type & va) { + std::unique_lock< mutex > lk( mtx_); + not_empty_cond_.wait( lk, + [this](){ + return is_closed_() || ! is_empty_(); + }); + if ( is_closed_() && is_empty_() ) { + return channel_op_status::closed; + } + va = value_pop_( lk); + return channel_op_status::success; + } + + value_type value_pop() { + std::unique_lock< mutex > lk( mtx_); + not_empty_cond_.wait( lk, + [this](){ + return is_closed_() || ! is_empty_(); + }); + if ( is_closed_() && is_empty_() ) { + throw fiber_error( + std::make_error_code( std::errc::operation_not_permitted), + "boost fiber: queue is closed"); + } + return value_pop_( lk); + } + + channel_op_status try_pop( value_type & va) { + std::unique_lock< mutex > lk( mtx_); + if ( is_closed_() && is_empty_() ) { + // let other fibers run + lk.unlock(); + this_fiber::yield(); + return channel_op_status::closed; + } + if ( is_empty_() ) { + // let other fibers run + lk.unlock(); + this_fiber::yield(); + return channel_op_status::empty; + } + va = value_pop_( lk); + return channel_op_status::success; + } + + template< typename Rep, typename Period > + channel_op_status pop_wait_for( value_type & va, + std::chrono::duration< Rep, Period > const& timeout_duration) { + return pop_wait_until( va, + std::chrono::steady_clock::now() + timeout_duration); + } + + template< typename Clock, typename Duration > + channel_op_status pop_wait_until( value_type & va, + std::chrono::time_point< Clock, Duration > const& timeout_time) { + std::unique_lock< mutex > lk( mtx_); + if ( ! not_empty_cond_.wait_until( lk, + timeout_time, + [this](){ + return is_closed_() || ! is_empty_(); + })) { + return channel_op_status::timeout; + } + if ( is_closed_() && is_empty_() ) { + return channel_op_status::closed; + } + va = value_pop_( lk); + return channel_op_status::success; + } +}; + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_BOUNDED_CHANNEL_H |