diff options
Diffstat (limited to 'boost/beast/websocket/impl/close.ipp')
-rw-r--r-- | boost/beast/websocket/impl/close.ipp | 133 |
1 files changed, 60 insertions, 73 deletions
diff --git a/boost/beast/websocket/impl/close.ipp b/boost/beast/websocket/impl/close.ipp index 0349dd1409..7b0e1ff648 100644 --- a/boost/beast/websocket/impl/close.ipp +++ b/boost/beast/websocket/impl/close.ipp @@ -19,6 +19,7 @@ #include <boost/asio/associated_executor.hpp> #include <boost/asio/coroutine.hpp> #include <boost/asio/handler_continuation_hook.hpp> +#include <boost/asio/handler_invoke_hook.hpp> #include <boost/asio/post.hpp> #include <boost/throw_exception.hpp> #include <memory> @@ -34,25 +35,23 @@ namespace websocket { frame. Finally it invokes the teardown operation to shut down the underlying connection. */ -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class Handler> -class stream<NextLayer>::close_op +class stream<NextLayer, deflateSupported>::close_op : public boost::asio::coroutine { struct state { - stream<NextLayer>& ws; + stream<NextLayer, deflateSupported>& ws; detail::frame_buffer fb; error_code ev; - token tok; bool cont = false; state( - Handler&, - stream<NextLayer>& ws_, + Handler const&, + stream<NextLayer, deflateSupported>& ws_, close_reason const& cr) : ws(ws_) - , tok(ws.tok_.unique()) { // Serialize the close frame ws.template write_close< @@ -63,13 +62,15 @@ class stream<NextLayer>::close_op handler_ptr<state, Handler> d_; public: + static constexpr int id = 4; // for soft_mutex + close_op(close_op&&) = default; - close_op(close_op const&) = default; + close_op(close_op const&) = delete; template<class DeducedHandler> close_op( DeducedHandler&& h, - stream<NextLayer>& ws, + stream<NextLayer, deflateSupported>& ws, close_reason const& cr) : d_(std::forward<DeducedHandler>(h), ws, cr) { @@ -81,16 +82,16 @@ public: allocator_type get_allocator() const noexcept { - return boost::asio::get_associated_allocator(d_.handler()); + return (boost::asio::get_associated_allocator)(d_.handler()); } using executor_type = boost::asio::associated_executor_t< - Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>; + Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>; executor_type get_executor() const noexcept { - return boost::asio::get_associated_executor( + return (boost::asio::get_associated_executor)( d_.handler(), d_->ws.get_executor()); } @@ -107,12 +108,21 @@ public: return op->d_->cont || asio_handler_is_continuation( std::addressof(op->d_.handler())); } + + template<class Function> + friend + void asio_handler_invoke(Function&& f, close_op* op) + { + using boost::asio::asio_handler_invoke; + asio_handler_invoke(f, + std::addressof(op->d_.handler())); + } }; -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class Handler> void -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: close_op<Handler>:: operator()( error_code ec, @@ -121,16 +131,12 @@ operator()( { using beast::detail::clamp; auto& d = *d_; - close_code code{}; d.cont = cont; BOOST_ASIO_CORO_REENTER(*this) { // Maybe suspend - if(! d.ws.wr_block_) + if(d.ws.wr_block_.try_lock(this)) { - // Acquire the write block - d.ws.wr_block_ = d.tok; - // Make sure the stream is open if(! d.ws.check_open(ec)) goto upcall; @@ -138,19 +144,17 @@ operator()( else { // Suspend - BOOST_ASSERT(d.ws.wr_block_ != d.tok); BOOST_ASIO_CORO_YIELD d.ws.paused_close_.emplace(std::move(*this)); // Acquire the write block - BOOST_ASSERT(! d.ws.wr_block_); - d.ws.wr_block_ = d.tok; + d.ws.wr_block_.lock(this); // Resume BOOST_ASIO_CORO_YIELD boost::asio::post( d.ws.get_executor(), std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_ == d.tok); + BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); // Make sure the stream is open if(! d.ws.check_open(ec)) @@ -181,27 +185,20 @@ operator()( } // Maybe suspend - if(! d.ws.rd_block_) - { - // Acquire the read block - d.ws.rd_block_ = d.tok; - } - else + if(! d.ws.rd_block_.try_lock(this)) { // Suspend - BOOST_ASSERT(d.ws.rd_block_ != d.tok); BOOST_ASIO_CORO_YIELD d.ws.paused_r_close_.emplace(std::move(*this)); // Acquire the read block - BOOST_ASSERT(! d.ws.rd_block_); - d.ws.rd_block_ = d.tok; + d.ws.rd_block_.lock(this); // Resume BOOST_ASIO_CORO_YIELD boost::asio::post( d.ws.get_executor(), std::move(*this)); - BOOST_ASSERT(d.ws.rd_block_ == d.tok); + BOOST_ASSERT(d.ws.rd_block_.is_locked(this)); // Make sure the stream is open BOOST_ASSERT(d.ws.status_ != status::open); @@ -219,13 +216,10 @@ operator()( { // Read frame header while(! d.ws.parse_fh( - d.ws.rd_fh_, d.ws.rd_buf_, code)) + d.ws.rd_fh_, d.ws.rd_buf_, d.ev)) { - if(code != close_code::none) - { - d.ev = error::failed; + if(d.ev) goto teardown; - } BOOST_ASIO_CORO_YIELD d.ws.stream_.async_read_some( d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_, @@ -244,16 +238,12 @@ operator()( d.ws.rd_close_ = true; auto const mb = buffers_prefix( clamp(d.ws.rd_fh_.len), - d.ws.rd_buf_.data()); + d.ws.rd_buf_.mutable_data()); if(d.ws.rd_fh_.len > 0 && d.ws.rd_fh_.mask) detail::mask_inplace(mb, d.ws.rd_key_); - detail::read_close(d.ws.cr_, mb, code); - if(code != close_code::none) - { - // Protocol error - d.ev = error::failed; + detail::read_close(d.ws.cr_, mb, d.ev); + if(d.ev) goto teardown; - } d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len)); goto teardown; } @@ -283,12 +273,12 @@ operator()( teardown: // Teardown - BOOST_ASSERT(d.ws.wr_block_ == d.tok); + BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); using beast::websocket::async_teardown; BOOST_ASIO_CORO_YIELD async_teardown(d.ws.role_, d.ws.stream_, std::move(*this)); - BOOST_ASSERT(d.ws.wr_block_ == d.tok); + BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); if(ec == boost::asio::error::eof) { // Rationale: @@ -304,13 +294,10 @@ operator()( d.ws.close(); upcall: - BOOST_ASSERT(d.ws.wr_block_ == d.tok); - d.ws.wr_block_.reset(); - if(d.ws.rd_block_ == d.tok) - { - d.ws.rd_block_.reset(); + BOOST_ASSERT(d.ws.wr_block_.is_locked(this)); + d.ws.wr_block_.unlock(this); + if(d.ws.rd_block_.try_unlock(this)) d.ws.paused_r_rd_.maybe_invoke(); - } d.ws.paused_rd_.maybe_invoke() || d.ws.paused_ping_.maybe_invoke() || d.ws.paused_wr_.maybe_invoke(); @@ -327,9 +314,9 @@ operator()( //------------------------------------------------------------------------------ -template<class NextLayer> +template<class NextLayer, bool deflateSupported> void -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: close(close_reason const& cr) { static_assert(is_sync_stream<next_layer_type>::value, @@ -340,9 +327,9 @@ close(close_reason const& cr) BOOST_THROW_EXCEPTION(system_error{ec}); } -template<class NextLayer> +template<class NextLayer, bool deflateSupported> void -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: close(close_reason const& cr, error_code& ec) { static_assert(is_sync_stream<next_layer_type>::value, @@ -364,18 +351,18 @@ close(close_reason const& cr, error_code& ec) if(! check_ok(ec)) return; status_ = status::closing; + error_code result; // Drain the connection - close_code code{}; if(rd_remain_ > 0) goto read_payload; for(;;) { // Read frame header - while(! parse_fh(rd_fh_, rd_buf_, code)) + while(! parse_fh(rd_fh_, rd_buf_, result)) { - if(code != close_code::none) - return do_fail(close_code::none, - error::failed, ec); + if(result) + return do_fail( + close_code::none, result, ec); auto const bytes_transferred = stream_.read_some( rd_buf_.prepare(read_size(rd_buf_, @@ -393,15 +380,15 @@ close(close_reason const& cr, error_code& ec) rd_close_ = true; auto const mb = buffers_prefix( clamp(rd_fh_.len), - rd_buf_.data()); + rd_buf_.mutable_data()); if(rd_fh_.len > 0 && rd_fh_.mask) detail::mask_inplace(mb, rd_key_); - detail::read_close(cr_, mb, code); - if(code != close_code::none) + detail::read_close(cr_, mb, result); + if(result) { - // Protocol error - return do_fail(close_code::none, - error::failed, ec); + // Protocol violation + return do_fail( + close_code::none, result, ec); } rd_buf_.consume(clamp(rd_fh_.len)); break; @@ -434,20 +421,20 @@ close(close_reason const& cr, error_code& ec) ec.assign(0, ec.category()); } -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class CloseHandler> BOOST_ASIO_INITFN_RESULT_TYPE( CloseHandler, void(error_code)) -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: async_close(close_reason const& cr, CloseHandler&& handler) { static_assert(is_async_stream<next_layer_type>::value, "AsyncStream requirements not met"); - boost::asio::async_completion<CloseHandler, - void(error_code)> init{handler}; + BOOST_BEAST_HANDLER_INIT( + CloseHandler, void(error_code)); close_op<BOOST_ASIO_HANDLER_TYPE( CloseHandler, void(error_code))>{ - init.completion_handler, *this, cr}( + std::move(init.completion_handler), *this, cr}( {}, 0, false); return init.result.get(); } |