summaryrefslogtreecommitdiff
path: root/boost/fiber/bounded_channel.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/bounded_channel.hpp')
-rw-r--r--boost/fiber/bounded_channel.hpp433
1 files changed, 0 insertions, 433 deletions
diff --git a/boost/fiber/bounded_channel.hpp b/boost/fiber/bounded_channel.hpp
deleted file mode 100644
index ac257b4ff5..0000000000
--- a/boost/fiber/bounded_channel.hpp
+++ /dev/null
@@ -1,433 +0,0 @@
-
-// Copyright Oliver Kowalke 2013.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-//
-
-#ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H
-#define BOOST_FIBERS_BOUNDED_CHANNEL_H
-
-#warn "template bounded_channel is deprecated"
-
-#include <algorithm>
-#include <atomic>
-#include <chrono>
-#include <cstddef>
-#include <memory>
-#include <mutex>
-#include <system_error>
-#include <utility>
-
-#include <boost/config.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-#include <boost/fiber/detail/config.hpp>
-#include <boost/fiber/exceptions.hpp>
-#include <boost/fiber/exceptions.hpp>
-#include <boost/fiber/condition_variable.hpp>
-#include <boost/fiber/mutex.hpp>
-#include <boost/fiber/channel_op_status.hpp>
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_PREFIX
-#endif
-
-namespace boost {
-namespace fibers {
-
-template< typename T,
- typename Allocator = std::allocator< T >
->
-class bounded_channel {
-public:
- typedef T value_type;
-
-private:
- struct node {
- typedef intrusive_ptr< node > ptr_t;
- typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
- node
- > allocator_t;
- typedef std::allocator_traits< allocator_t > allocator_traits_t;
-
-#if ! defined(BOOST_FIBERS_NO_ATOMICS)
- std::atomic< std::size_t > use_count{ 0 };
-#else
- std::size_t use_count{ 0 };
-#endif
- allocator_t alloc;
- T va;
- ptr_t nxt{};
-
- node( T const& t, allocator_t const& alloc_) noexcept :
- alloc{ alloc_ },
- va{ t } {
- }
-
- node( T && t, allocator_t & alloc_) noexcept :
- alloc{ alloc_ },
- va{ std::move( t) } {
- }
-
- friend
- void intrusive_ptr_add_ref( node * p) noexcept {
- ++p->use_count;
- }
-
- friend
- void intrusive_ptr_release( node * p) noexcept {
- if ( 0 == --p->use_count) {
- allocator_t alloc( p->alloc);
- allocator_traits_t::destroy( alloc, p);
- allocator_traits_t::deallocate( alloc, p, 1);
- }
- }
- };
-
- using ptr_t = typename node::ptr_t;
- using allocator_t = typename node::allocator_t;
- using allocator_traits_t = typename node::allocator_traits_t;
-
- enum class queue_status {
- open = 0,
- closed
- };
-
- allocator_t alloc_;
- queue_status state_{ queue_status::open };
- std::size_t count_{ 0 };
- ptr_t head_{};
- ptr_t * tail_;
- mutable mutex mtx_{};
- condition_variable not_empty_cond_{};
- condition_variable not_full_cond_{};
- std::size_t hwm_;
- std::size_t lwm_;
-
- bool is_closed_() const noexcept {
- return queue_status::closed == state_;
- }
-
- void close_( std::unique_lock< boost::fibers::mutex > & lk) noexcept {
- state_ = queue_status::closed;
- lk.unlock();
- not_empty_cond_.notify_all();
- not_full_cond_.notify_all();
- }
-
- std::size_t size_() const noexcept {
- return count_;
- }
-
- bool is_empty_() const noexcept {
- return ! head_;
- }
-
- bool is_full_() const noexcept {
- return count_ >= hwm_;
- }
-
- channel_op_status push_( ptr_t new_node,
- std::unique_lock< boost::fibers::mutex > & lk) {
- if ( is_closed_() ) {
- return channel_op_status::closed;
- }
- not_full_cond_.wait( lk,
- [this](){
- return ! is_full_();
- });
- return push_and_notify_( new_node, lk);
- }
-
- channel_op_status try_push_( ptr_t new_node,
- std::unique_lock< boost::fibers::mutex > & lk) noexcept {
- if ( is_closed_() ) {
- return channel_op_status::closed;
- }
- if ( is_full_() ) {
- return channel_op_status::full;
- }
- return push_and_notify_( new_node, lk);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until_( ptr_t new_node,
- std::chrono::time_point< Clock, Duration > const& timeout_time,
- std::unique_lock< boost::fibers::mutex > & lk) {
- if ( is_closed_() ) {
- return channel_op_status::closed;
- }
- if ( ! not_full_cond_.wait_until( lk, timeout_time,
- [this](){
- return ! is_full_();
- })) {
- return channel_op_status::timeout;
- }
- return push_and_notify_( new_node, lk);
- }
-
- channel_op_status push_and_notify_( ptr_t new_node,
- std::unique_lock< boost::fibers::mutex > & lk) noexcept {
- push_tail_( new_node);
- lk.unlock();
- not_empty_cond_.notify_one();
- return channel_op_status::success;
- }
-
- void push_tail_( ptr_t new_node) noexcept {
- * tail_ = new_node;
- tail_ = & new_node->nxt;
- ++count_;
- }
-
- value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) {
- BOOST_ASSERT( ! is_empty_() );
- auto old_head = pop_head_();
- if ( size_() <= lwm_) {
- if ( lwm_ == hwm_) {
- lk.unlock();
- not_full_cond_.notify_one();
- } else {
- lk.unlock();
- // more than one producer could be waiting
- // to push a value
- not_full_cond_.notify_all();
- }
- }
- return std::move( old_head->va);
- }
-
- ptr_t pop_head_() noexcept {
- auto old_head = head_;
- head_ = old_head->nxt;
- if ( ! head_) {
- tail_ = & head_;
- }
- old_head->nxt.reset();
- --count_;
- return old_head;
- }
-
-public:
- bounded_channel( std::size_t hwm, std::size_t lwm,
- Allocator const& alloc = Allocator() ) :
- alloc_{ alloc },
- tail_{ & head_ },
- hwm_{ hwm },
- lwm_{ lwm } {
- if ( hwm_ <= lwm_) {
- throw fiber_error( std::make_error_code( std::errc::invalid_argument),
- "boost fiber: high-watermark is less than or equal to low-watermark for bounded_channel");
- }
- if ( 0 == hwm) {
- throw fiber_error( std::make_error_code( std::errc::invalid_argument),
- "boost fiber: high-watermark is zero");
- }
- }
-
- bounded_channel( std::size_t wm,
- Allocator const& alloc = Allocator() ) :
- alloc_{ alloc },
- tail_{ & head_ },
- hwm_{ wm },
- lwm_{ wm - 1 } {
- if ( 0 == wm) {
- throw fiber_error( std::make_error_code( std::errc::invalid_argument),
- "boost fiber: watermark is zero");
- }
- }
-
- bounded_channel( bounded_channel const&) = delete;
- bounded_channel & operator=( bounded_channel const&) = delete;
-
- std::size_t upper_bound() const noexcept {
- return hwm_;
- }
-
- std::size_t lower_bound() const noexcept {
- return lwm_;
- }
-
- void close() noexcept {
- std::unique_lock< mutex > lk( mtx_);
- close_( lk);
- }
-
- channel_op_status push( value_type const& va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct( alloc_, ptr, va, alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_( { detail::convert( ptr) }, lk);
- }
-
- channel_op_status push( value_type && va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct(
- alloc_, ptr, std::move( va), alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_( { detail::convert( ptr) }, lk);
- }
-
- template< typename Rep, typename Period >
- channel_op_status push_wait_for( value_type const& va,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return push_wait_until( va,
- std::chrono::steady_clock::now() + timeout_duration);
- }
-
- template< typename Rep, typename Period >
- channel_op_status push_wait_for( value_type && va,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return push_wait_until( std::forward< value_type >( va),
- std::chrono::steady_clock::now() + timeout_duration);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until( value_type const& va,
- std::chrono::time_point< Clock, Duration > const& timeout_time) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct( alloc_, ptr, va, alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until( value_type && va,
- std::chrono::time_point< Clock, Duration > const& timeout_time) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct(
- alloc_, ptr, std::move( va), alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk);
- }
-
- channel_op_status try_push( value_type const& va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct( alloc_, ptr, va, alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return try_push_( { detail::convert( ptr) }, lk);
- }
-
- channel_op_status try_push( value_type && va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct(
- alloc_, ptr, std::move( va), alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return try_push_( { detail::convert( ptr) }, lk);
- }
-
- channel_op_status pop( value_type & va) {
- std::unique_lock< mutex > lk( mtx_);
- not_empty_cond_.wait( lk,
- [this](){
- return is_closed_() || ! is_empty_();
- });
- if ( is_closed_() && is_empty_() ) {
- return channel_op_status::closed;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-
- value_type value_pop() {
- std::unique_lock< mutex > lk( mtx_);
- not_empty_cond_.wait( lk,
- [this](){
- return is_closed_() || ! is_empty_();
- });
- if ( is_closed_() && is_empty_() ) {
- throw fiber_error(
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: queue is closed");
- }
- return value_pop_( lk);
- }
-
- channel_op_status try_pop( value_type & va) {
- std::unique_lock< mutex > lk( mtx_);
- if ( is_closed_() && is_empty_() ) {
- // let other fibers run
- lk.unlock();
- this_fiber::yield();
- return channel_op_status::closed;
- }
- if ( is_empty_() ) {
- // let other fibers run
- lk.unlock();
- this_fiber::yield();
- return channel_op_status::empty;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-
- template< typename Rep, typename Period >
- channel_op_status pop_wait_for( value_type & va,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return pop_wait_until( va,
- std::chrono::steady_clock::now() + timeout_duration);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status pop_wait_until( value_type & va,
- std::chrono::time_point< Clock, Duration > const& timeout_time) {
- std::unique_lock< mutex > lk( mtx_);
- if ( ! not_empty_cond_.wait_until( lk,
- timeout_time,
- [this](){
- return is_closed_() || ! is_empty_();
- })) {
- return channel_op_status::timeout;
- }
- if ( is_closed_() && is_empty_() ) {
- return channel_op_status::closed;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-};
-
-}}
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_SUFFIX
-#endif
-
-#endif // BOOST_FIBERS_BOUNDED_CHANNEL_H