diff options
Diffstat (limited to 'boost/beast/websocket/impl/read.ipp')
-rw-r--r-- | boost/beast/websocket/impl/read.ipp | 377 |
1 files changed, 215 insertions, 162 deletions
diff --git a/boost/beast/websocket/impl/read.ipp b/boost/beast/websocket/impl/read.ipp index 422cc3766b..1dfbd01e72 100644 --- a/boost/beast/websocket/impl/read.ipp +++ b/boost/beast/websocket/impl/read.ipp @@ -22,6 +22,7 @@ #include <boost/asio/associated_executor.hpp> #include <boost/asio/coroutine.hpp> #include <boost/asio/handler_continuation_hook.hpp> +#include <boost/asio/handler_invoke_hook.hpp> #include <boost/asio/post.hpp> #include <boost/assert.hpp> #include <boost/config.hpp> @@ -35,42 +36,75 @@ namespace boost { namespace beast { namespace websocket { +namespace detail { + +template<> +inline +void +stream_base<true>:: +inflate( + zlib::z_params& zs, + zlib::Flush flush, + error_code& ec) +{ + this->pmd_->zi.write(zs, flush, ec); +} + +template<> +inline +void +stream_base<true>:: +do_context_takeover_read(role_type role) +{ + if((role == role_type::client && + pmd_config_.server_no_context_takeover) || + (role == role_type::server && + pmd_config_.client_no_context_takeover)) + { + pmd_->zi.reset(); + } +} + +} // detail + +//------------------------------------------------------------------------------ + /* Read some message frame data. Also reads and handles control frames. */ -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template< class MutableBufferSequence, class Handler> -class stream<NextLayer>::read_some_op +class stream<NextLayer, deflateSupported>::read_some_op : public boost::asio::coroutine { Handler h_; - stream<NextLayer>& ws_; + stream<NextLayer, deflateSupported>& ws_; MutableBufferSequence bs_; buffers_suffix<MutableBufferSequence> cb_; std::size_t bytes_written_ = 0; - error_code ev_; - token tok_; + error_code result_; close_code code_; bool did_read_ = false; bool cont_ = false; public: + static constexpr int id = 1; // for soft_mutex + read_some_op(read_some_op&&) = default; - read_some_op(read_some_op const&) = default; + read_some_op(read_some_op const&) = delete; template<class DeducedHandler> read_some_op( DeducedHandler&& h, - stream<NextLayer>& ws, + stream<NextLayer, deflateSupported>& ws, MutableBufferSequence const& bs) : h_(std::forward<DeducedHandler>(h)) , ws_(ws) , bs_(bs) , cb_(bs) - , tok_(ws_.tok_.unique()) , code_(close_code::none) { } @@ -81,16 +115,16 @@ public: allocator_type get_allocator() const noexcept { - return boost::asio::get_associated_allocator(h_); + return (boost::asio::get_associated_allocator)(h_); } using executor_type = boost::asio::associated_executor_t< - Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>; + Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>; executor_type get_executor() const noexcept { - return boost::asio::get_associated_executor( + return (boost::asio::get_associated_executor)( h_, ws_.get_executor()); } @@ -112,12 +146,20 @@ public: return op->cont_ || asio_handler_is_continuation( std::addressof(op->h_)); } + + template<class Function> + friend + void asio_handler_invoke(Function&& f, read_some_op* op) + { + using boost::asio::asio_handler_invoke; + asio_handler_invoke(f, std::addressof(op->h_)); + } }; -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class MutableBufferSequence, class Handler> void -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: read_some_op<MutableBufferSequence, Handler>:: operator()( error_code ec, @@ -127,17 +169,13 @@ operator()( using beast::detail::clamp; using boost::asio::buffer; using boost::asio::buffer_size; - close_code code{}; cont_ = cont; BOOST_ASIO_CORO_REENTER(*this) { // Maybe suspend do_maybe_suspend: - if(! ws_.rd_block_) + if(ws_.rd_block_.try_lock(this)) { - // Acquire the read block - ws_.rd_block_ = tok_; - // Make sure the stream is not closed if( ws_.status_ == status::closed || ws_.status_ == status::failed) @@ -150,19 +188,17 @@ operator()( { do_suspend: // Suspend - BOOST_ASSERT(ws_.rd_block_ != tok_); BOOST_ASIO_CORO_YIELD - ws_.paused_r_rd_.save(std::move(*this)); + ws_.paused_r_rd_.emplace(std::move(*this)); // Acquire the read block - BOOST_ASSERT(! ws_.rd_block_); - ws_.rd_block_ = tok_; + ws_.rd_block_.lock(this); // Resume BOOST_ASIO_CORO_YIELD boost::asio::post( ws_.get_executor(), std::move(*this)); - BOOST_ASSERT(ws_.rd_block_ == tok_); + BOOST_ASSERT(ws_.rd_block_.is_locked(this)); // The only way to get read blocked is if // a `close_op` wrote a close frame @@ -177,7 +213,7 @@ operator()( // then finish the read with operation_aborted. loop: - BOOST_ASSERT(ws_.rd_block_ == tok_); + BOOST_ASSERT(ws_.rd_block_.is_locked(this)); // See if we need to read a frame header. This // condition is structured to give the decompressor // a chance to emit the final empty deflate block @@ -187,45 +223,46 @@ operator()( { // Read frame header while(! ws_.parse_fh( - ws_.rd_fh_, ws_.rd_buf_, code)) + ws_.rd_fh_, ws_.rd_buf_, result_)) { - if(code != close_code::none) + if(result_) { // _Fail the WebSocket Connection_ - code_ = code; - ev_ = error::failed; + if(result_ == error::message_too_big) + code_ = close_code::too_big; + else + code_ = close_code::protocol_error; goto close; } - BOOST_ASSERT(ws_.rd_block_ == tok_); + BOOST_ASSERT(ws_.rd_block_.is_locked(this)); BOOST_ASIO_CORO_YIELD ws_.stream_.async_read_some( ws_.rd_buf_.prepare(read_size( ws_.rd_buf_, ws_.rd_buf_.max_size())), std::move(*this)); - BOOST_ASSERT(ws_.rd_block_ == tok_); + BOOST_ASSERT(ws_.rd_block_.is_locked(this)); if(! ws_.check_ok(ec)) goto upcall; ws_.rd_buf_.commit(bytes_transferred); // Allow a close operation // to acquire the read block - BOOST_ASSERT(ws_.rd_block_ == tok_); - ws_.rd_block_.reset(); + ws_.rd_block_.unlock(this); if( ws_.paused_r_close_.maybe_invoke()) { // Suspend - BOOST_ASSERT(ws_.rd_block_); + BOOST_ASSERT(ws_.rd_block_.is_locked()); goto do_suspend; } // Acquire read block - ws_.rd_block_ = tok_; + ws_.rd_block_.lock(this); } // Immediately apply the mask to the portion // of the buffer holding payload data. if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask) detail::mask_inplace(buffers_prefix( clamp(ws_.rd_fh_.len), - ws_.rd_buf_.data()), + ws_.rd_buf_.mutable_data()), ws_.rd_key_); if(detail::is_control(ws_.rd_fh_.op)) { @@ -236,6 +273,17 @@ operator()( // Handle ping frame if(ws_.rd_fh_.op == detail::opcode::ping) { + if(ws_.ctrl_cb_) + { + if(! cont_) + { + BOOST_ASIO_CORO_YIELD + boost::asio::post( + ws_.get_executor(), + std::move(*this)); + BOOST_ASSERT(cont_); + } + } { auto const b = buffers_prefix( clamp(ws_.rd_fh_.len), @@ -249,43 +297,34 @@ operator()( if(ws_.status_ == status::closing) goto loop; if(ws_.ctrl_cb_) - ws_.ctrl_cb_(frame_type::ping, payload); + ws_.ctrl_cb_( + frame_type::ping, payload); ws_.rd_fb_.reset(); ws_.template write_ping< flat_static_buffer_base>(ws_.rd_fb_, detail::opcode::pong, payload); } - //BOOST_ASSERT(! ws_.paused_r_close_); - // Allow a close operation // to acquire the read block - BOOST_ASSERT(ws_.rd_block_ == tok_); - ws_.rd_block_.reset(); + ws_.rd_block_.unlock(this); ws_.paused_r_close_.maybe_invoke(); // Maybe suspend - if(! ws_.wr_block_) - { - // Acquire the write block - ws_.wr_block_ = tok_; - } - else + if(! ws_.wr_block_.try_lock(this)) { // Suspend - BOOST_ASSERT(ws_.wr_block_ != tok_); BOOST_ASIO_CORO_YIELD - ws_.paused_rd_.save(std::move(*this)); + ws_.paused_rd_.emplace(std::move(*this)); // Acquire the write block - BOOST_ASSERT(! ws_.wr_block_); - ws_.wr_block_ = tok_; + ws_.wr_block_.lock(this); // Resume BOOST_ASIO_CORO_YIELD boost::asio::post( ws_.get_executor(), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASSERT(ws_.wr_block_.is_locked(this)); // Make sure the stream is open if(! ws_.check_open(ec)) @@ -293,14 +332,14 @@ operator()( } // Send pong - BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASSERT(ws_.wr_block_.is_locked(this)); BOOST_ASIO_CORO_YIELD boost::asio::async_write(ws_.stream_, ws_.rd_fb_.data(), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASSERT(ws_.wr_block_.is_locked(this)); if(! ws_.check_ok(ec)) goto upcall; - ws_.wr_block_.reset(); + ws_.wr_block_.unlock(this); ws_.paused_close_.maybe_invoke() || ws_.paused_ping_.maybe_invoke() || ws_.paused_wr_.maybe_invoke(); @@ -309,11 +348,22 @@ operator()( // Handle pong frame if(ws_.rd_fh_.op == detail::opcode::pong) { + // Ignore pong when closing + if(! ws_.wr_close_ && ws_.ctrl_cb_) + { + if(! cont_) + { + BOOST_ASIO_CORO_YIELD + boost::asio::post( + ws_.get_executor(), + std::move(*this)); + BOOST_ASSERT(cont_); + } + } auto const cb = buffers_prefix(clamp( ws_.rd_fh_.len), ws_.rd_buf_.data()); auto const len = buffer_size(cb); BOOST_ASSERT(len == ws_.rd_fh_.len); - code = close_code::none; ping_data payload; detail::read_ping(payload, cb); ws_.rd_buf_.consume(len); @@ -325,6 +375,17 @@ operator()( // Handle close frame BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close); { + if(ws_.ctrl_cb_) + { + if(! cont_) + { + BOOST_ASIO_CORO_YIELD + boost::asio::post( + ws_.get_executor(), + std::move(*this)); + BOOST_ASSERT(cont_); + } + } auto const cb = buffers_prefix(clamp( ws_.rd_fh_.len), ws_.rd_buf_.data()); auto const len = buffer_size(cb); @@ -332,12 +393,11 @@ operator()( BOOST_ASSERT(! ws_.rd_close_); ws_.rd_close_ = true; close_reason cr; - detail::read_close(cr, cb, code); - if(code != close_code::none) + detail::read_close(cr, cb, result_); + if(result_) { // _Fail the WebSocket Connection_ - code_ = code; - ev_ = error::failed; + code_ = close_code::protocol_error; goto close; } ws_.cr_ = cr; @@ -351,14 +411,14 @@ operator()( // _Close the WebSocket Connection_ BOOST_ASSERT(ws_.wr_close_); code_ = close_code::none; - ev_ = error::closed; + result_ = error::closed; goto close; } // _Start the WebSocket Closing Handshake_ code_ = cr.code == close_code::none ? close_code::normal : static_cast<close_code>(cr.code); - ev_ = error::closed; + result_ = error::closed; goto close; } } @@ -369,7 +429,7 @@ operator()( } ws_.rd_done_ = false; } - if(! ws_.pmd_ || ! ws_.pmd_->rd_set) + if(! ws_.rd_deflated()) { if(ws_.rd_remain_ > 0) { @@ -389,7 +449,7 @@ operator()( ws_.rd_buf_.commit(bytes_transferred); if(ws_.rd_fh_.mask) detail::mask_inplace(buffers_prefix(clamp( - ws_.rd_remain_), ws_.rd_buf_.data()), + ws_.rd_remain_), ws_.rd_buf_.mutable_data()), ws_.rd_key_); } if(ws_.rd_buf_.size() > 0) @@ -409,7 +469,7 @@ operator()( { // _Fail the WebSocket Connection_ code_ = close_code::bad_payload; - ev_ = error::failed; + result_ = error::bad_frame_payload; goto close; } } @@ -443,7 +503,7 @@ operator()( { // _Fail the WebSocket Connection_ code_ = close_code::bad_payload; - ev_ = error::failed; + result_ = error::bad_frame_payload; goto close; } } @@ -477,7 +537,7 @@ operator()( if(ws_.rd_fh_.mask) detail::mask_inplace( buffers_prefix(clamp(ws_.rd_remain_), - ws_.rd_buf_.data()), ws_.rd_key_); + ws_.rd_buf_.mutable_data()), ws_.rd_key_); did_read_ = true; } zlib::z_params zs; @@ -511,7 +571,7 @@ operator()( 0x00, 0x00, 0xff, 0xff }; zs.next_in = empty_block; zs.avail_in = sizeof(empty_block); - ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); + ws_.inflate(zs, zlib::Flush::sync, ec); if(! ec) { // https://github.com/madler/zlib/issues/280 @@ -520,12 +580,7 @@ operator()( } if(! ws_.check_ok(ec)) goto upcall; - if( - (ws_.role_ == role_type::client && - ws_.pmd_config_.server_no_context_takeover) || - (ws_.role_ == role_type::server && - ws_.pmd_config_.client_no_context_takeover)) - ws_.pmd_->zi.reset(); + ws_.do_context_takeover_read(ws_.role_); ws_.rd_done_ = true; break; } @@ -533,7 +588,7 @@ operator()( { break; } - ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec); + ws_.inflate(zs, zlib::Flush::sync, ec); if(! ws_.check_ok(ec)) goto upcall; if(ws_.rd_msg_max_ && beast::detail::sum_exceeds( @@ -541,7 +596,7 @@ operator()( { // _Fail the WebSocket Connection_ code_ = close_code::too_big; - ev_ = error::failed; + result_ = error::message_too_big; goto close; } cb_.consume(zs.total_out); @@ -559,7 +614,7 @@ operator()( { // _Fail the WebSocket Connection_ code_ = close_code::bad_payload; - ev_ = error::failed; + result_ = error::bad_frame_payload; goto close; } } @@ -567,30 +622,25 @@ operator()( goto upcall; close: - if(! ws_.wr_block_) + if(ws_.wr_block_.try_lock(this)) { - // Acquire the write block - ws_.wr_block_ = tok_; - // Make sure the stream is open BOOST_ASSERT(ws_.status_ == status::open); } else { // Suspend - BOOST_ASSERT(ws_.wr_block_ != tok_); BOOST_ASIO_CORO_YIELD - ws_.paused_rd_.save(std::move(*this)); + ws_.paused_rd_.emplace(std::move(*this)); // Acquire the write block - BOOST_ASSERT(! ws_.wr_block_); - ws_.wr_block_ = tok_; + ws_.wr_block_.lock(this); // Resume BOOST_ASIO_CORO_YIELD boost::asio::post( ws_.get_executor(), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASSERT(ws_.wr_block_.is_locked(this)); // Make sure the stream is open if(! ws_.check_open(ec)) @@ -611,23 +661,23 @@ operator()( ws_.rd_fb_, code_); // Send close frame - BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASSERT(ws_.wr_block_.is_locked(this)); BOOST_ASIO_CORO_YIELD boost::asio::async_write( ws_.stream_, ws_.rd_fb_.data(), std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASSERT(ws_.wr_block_.is_locked(this)); if(! ws_.check_ok(ec)) goto upcall; } // Teardown using beast::websocket::async_teardown; - BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASSERT(ws_.wr_block_.is_locked(this)); BOOST_ASIO_CORO_YIELD async_teardown(ws_.role_, ws_.stream_, std::move(*this)); - BOOST_ASSERT(ws_.wr_block_ == tok_); + BOOST_ASSERT(ws_.wr_block_.is_locked(this)); if(ec == boost::asio::error::eof) { // Rationale: @@ -635,7 +685,7 @@ operator()( ec.assign(0, ec.category()); } if(! ec) - ec = ev_; + ec = result_; if(ec && ec != error::closed) ws_.status_ = status::failed; else @@ -643,16 +693,12 @@ operator()( ws_.close(); upcall: - if(ws_.rd_block_ == tok_) - ws_.rd_block_.reset(); + ws_.rd_block_.try_unlock(this); ws_.paused_r_close_.maybe_invoke(); - if(ws_.wr_block_ == tok_) - { - ws_.wr_block_.reset(); + if(ws_.wr_block_.try_unlock(this)) ws_.paused_close_.maybe_invoke() || ws_.paused_ping_.maybe_invoke() || ws_.paused_wr_.maybe_invoke(); - } if(! cont_) return boost::asio::post( ws_.stream_.get_executor(), @@ -664,15 +710,15 @@ operator()( //------------------------------------------------------------------------------ -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template< class DynamicBuffer, class Handler> -class stream<NextLayer>::read_op +class stream<NextLayer, deflateSupported>::read_op : public boost::asio::coroutine { Handler h_; - stream<NextLayer>& ws_; + stream<NextLayer, deflateSupported>& ws_; DynamicBuffer& b_; std::size_t limit_; std::size_t bytes_written_ = 0; @@ -683,12 +729,12 @@ public: boost::asio::associated_allocator_t<Handler>; read_op(read_op&&) = default; - read_op(read_op const&) = default; + read_op(read_op const&) = delete; template<class DeducedHandler> read_op( DeducedHandler&& h, - stream<NextLayer>& ws, + stream<NextLayer, deflateSupported>& ws, DynamicBuffer& b, std::size_t limit, bool some) @@ -704,16 +750,16 @@ public: allocator_type get_allocator() const noexcept { - return boost::asio::get_associated_allocator(h_); + return (boost::asio::get_associated_allocator)(h_); } using executor_type = boost::asio::associated_executor_t< - Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>; + Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>; executor_type get_executor() const noexcept { - return boost::asio::get_associated_executor( + return (boost::asio::get_associated_executor)( h_, ws_.get_executor()); } @@ -728,12 +774,20 @@ public: return asio_handler_is_continuation( std::addressof(op->h_)); } + + template<class Function> + friend + void asio_handler_invoke(Function&& f, read_op* op) + { + using boost::asio::asio_handler_invoke; + asio_handler_invoke(f, std::addressof(op->h_)); + } }; -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class DynamicBuffer, class Handler> void -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: read_op<DynamicBuffer, Handler>:: operator()( error_code ec, @@ -781,10 +835,10 @@ operator()( //------------------------------------------------------------------------------ -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class DynamicBuffer> std::size_t -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: read(DynamicBuffer& buffer) { static_assert(is_sync_stream<next_layer_type>::value, @@ -799,10 +853,10 @@ read(DynamicBuffer& buffer) return bytes_written; } -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class DynamicBuffer> std::size_t -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: read(DynamicBuffer& buffer, error_code& ec) { static_assert(is_sync_stream<next_layer_type>::value, @@ -821,25 +875,25 @@ read(DynamicBuffer& buffer, error_code& ec) return bytes_written; } -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class DynamicBuffer, class ReadHandler> BOOST_ASIO_INITFN_RESULT_TYPE( ReadHandler, void(error_code, std::size_t)) -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: async_read(DynamicBuffer& buffer, ReadHandler&& handler) { static_assert(is_async_stream<next_layer_type>::value, - "AsyncStream requirements requirements not met"); + "AsyncStream requirements not met"); static_assert( boost::asio::is_dynamic_buffer<DynamicBuffer>::value, "DynamicBuffer requirements not met"); - boost::asio::async_completion< - ReadHandler, void(error_code, std::size_t)> init{handler}; + BOOST_BEAST_HANDLER_INIT( + ReadHandler, void(error_code, std::size_t)); read_op< DynamicBuffer, BOOST_ASIO_HANDLER_TYPE( ReadHandler, void(error_code, std::size_t))>{ - init.completion_handler, + std::move(init.completion_handler), *this, buffer, 0, @@ -849,10 +903,10 @@ async_read(DynamicBuffer& buffer, ReadHandler&& handler) //------------------------------------------------------------------------------ -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class DynamicBuffer> std::size_t -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: read_some( DynamicBuffer& buffer, std::size_t limit) @@ -870,10 +924,10 @@ read_some( return bytes_written; } -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class DynamicBuffer> std::size_t -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: read_some( DynamicBuffer& buffer, std::size_t limit, @@ -906,28 +960,28 @@ read_some( return bytes_written; } -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class DynamicBuffer, class ReadHandler> BOOST_ASIO_INITFN_RESULT_TYPE( ReadHandler, void(error_code, std::size_t)) -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: async_read_some( DynamicBuffer& buffer, std::size_t limit, ReadHandler&& handler) { static_assert(is_async_stream<next_layer_type>::value, - "AsyncStream requirements requirements not met"); + "AsyncStream requirements not met"); static_assert( boost::asio::is_dynamic_buffer<DynamicBuffer>::value, "DynamicBuffer requirements not met"); - boost::asio::async_completion<ReadHandler, - void(error_code, std::size_t)> init{handler}; + BOOST_BEAST_HANDLER_INIT( + ReadHandler, void(error_code, std::size_t)); read_op< DynamicBuffer, BOOST_ASIO_HANDLER_TYPE( ReadHandler, void(error_code, std::size_t))>{ - init.completion_handler, + std::move(init.completion_handler), *this, buffer, limit, @@ -937,10 +991,10 @@ async_read_some( //------------------------------------------------------------------------------ -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class MutableBufferSequence> std::size_t -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: read_some( MutableBufferSequence const& buffers) { @@ -956,10 +1010,10 @@ read_some( return bytes_written; } -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class MutableBufferSequence> std::size_t -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: read_some( MutableBufferSequence const& buffers, error_code& ec) @@ -986,12 +1040,17 @@ loop: if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_)) { // Read frame header - while(! parse_fh(rd_fh_, rd_buf_, code)) + error_code result; + while(! parse_fh(rd_fh_, rd_buf_, result)) { - if(code != close_code::none) + if(result) { // _Fail the WebSocket Connection_ - do_fail(code, error::failed, ec); + if(result == error::message_too_big) + code = close_code::too_big; + else + code = close_code::protocol_error; + do_fail(code, result, ec); return bytes_written; } auto const bytes_transferred = @@ -1007,7 +1066,7 @@ loop: // of the buffer holding payload data. if(rd_fh_.len > 0 && rd_fh_.mask) detail::mask_inplace(buffers_prefix( - clamp(rd_fh_.len), rd_buf_.data()), + clamp(rd_fh_.len), rd_buf_.mutable_data()), rd_key_); if(detail::is_control(rd_fh_.op)) { @@ -1058,11 +1117,12 @@ loop: BOOST_ASSERT(! rd_close_); rd_close_ = true; close_reason cr; - detail::read_close(cr, b, code); - if(code != close_code::none) + detail::read_close(cr, b, result); + if(result) { // _Fail the WebSocket Connection_ - do_fail(code, error::failed, ec); + do_fail(close_code::protocol_error, + result, ec); return bytes_written; } cr_ = cr; @@ -1090,7 +1150,7 @@ loop: { ec.assign(0, ec.category()); } - if(! pmd_ || ! pmd_->rd_set) + if(! this->rd_deflated()) { if(rd_remain_ > 0) { @@ -1108,7 +1168,7 @@ loop: if(rd_fh_.mask) detail::mask_inplace( buffers_prefix(clamp(rd_remain_), - rd_buf_.data()), rd_key_); + rd_buf_.mutable_data()), rd_key_); } if(rd_buf_.size() > 0) { @@ -1127,10 +1187,8 @@ loop: ! rd_utf8_.finish())) { // _Fail the WebSocket Connection_ - do_fail( - close_code::bad_payload, - error::failed, - ec); + do_fail(close_code::bad_payload, + error::bad_frame_payload, ec); return bytes_written; } } @@ -1164,7 +1222,7 @@ loop: { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, - error::failed, ec); + error::bad_frame_payload, ec); return bytes_written; } } @@ -1217,7 +1275,7 @@ loop: if(rd_fh_.mask) detail::mask_inplace( buffers_prefix(clamp(rd_remain_), - rd_buf_.data()), rd_key_); + rd_buf_.mutable_data()), rd_key_); auto const in = buffers_prefix( clamp(rd_remain_), buffers_front( rd_buf_.data())); @@ -1238,7 +1296,7 @@ loop: 0x00, 0x00, 0xff, 0xff }; zs.next_in = empty_block; zs.avail_in = sizeof(empty_block); - pmd_->zi.write(zs, zlib::Flush::sync, ec); + this->inflate(zs, zlib::Flush::sync, ec); if(! ec) { // https://github.com/madler/zlib/issues/280 @@ -1247,12 +1305,7 @@ loop: } if(! check_ok(ec)) return bytes_written; - if( - (role_ == role_type::client && - pmd_config_.server_no_context_takeover) || - (role_ == role_type::server && - pmd_config_.client_no_context_takeover)) - pmd_->zi.reset(); + this->do_context_takeover_read(role_); rd_done_ = true; break; } @@ -1260,14 +1313,14 @@ loop: { break; } - pmd_->zi.write(zs, zlib::Flush::sync, ec); + this->inflate(zs, zlib::Flush::sync, ec); if(! check_ok(ec)) return bytes_written; if(rd_msg_max_ && beast::detail::sum_exceeds( rd_size_, zs.total_out, rd_msg_max_)) { do_fail(close_code::too_big, - error::failed, ec); + error::message_too_big, ec); return bytes_written; } cb.consume(zs.total_out); @@ -1285,7 +1338,7 @@ loop: { // _Fail the WebSocket Connection_ do_fail(close_code::bad_payload, - error::failed, ec); + error::bad_frame_payload, ec); return bytes_written; } } @@ -1293,25 +1346,25 @@ loop: return bytes_written; } -template<class NextLayer> +template<class NextLayer, bool deflateSupported> template<class MutableBufferSequence, class ReadHandler> BOOST_ASIO_INITFN_RESULT_TYPE( ReadHandler, void(error_code, std::size_t)) -stream<NextLayer>:: +stream<NextLayer, deflateSupported>:: async_read_some( MutableBufferSequence const& buffers, ReadHandler&& handler) { static_assert(is_async_stream<next_layer_type>::value, - "AsyncStream requirements requirements not met"); + "AsyncStream requirements not met"); static_assert(boost::asio::is_mutable_buffer_sequence< MutableBufferSequence>::value, "MutableBufferSequence requirements not met"); - boost::asio::async_completion<ReadHandler, - void(error_code, std::size_t)> init{handler}; + BOOST_BEAST_HANDLER_INIT( + ReadHandler, void(error_code, std::size_t)); read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE( ReadHandler, void(error_code, std::size_t))>{ - init.completion_handler,*this, buffers}( + std::move(init.completion_handler), *this, buffers}( {}, 0, false); return init.result.get(); } @@ -1320,4 +1373,4 @@ async_read_some( } // beast } // boost -#endif
\ No newline at end of file +#endif |