summaryrefslogtreecommitdiff
path: root/boost/beast/websocket/impl/read.ipp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/beast/websocket/impl/read.ipp')
-rw-r--r--boost/beast/websocket/impl/read.ipp377
1 files changed, 215 insertions, 162 deletions
diff --git a/boost/beast/websocket/impl/read.ipp b/boost/beast/websocket/impl/read.ipp
index 422cc3766b..1dfbd01e72 100644
--- a/boost/beast/websocket/impl/read.ipp
+++ b/boost/beast/websocket/impl/read.ipp
@@ -22,6 +22,7 @@
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
+#include <boost/asio/handler_invoke_hook.hpp>
#include <boost/asio/post.hpp>
#include <boost/assert.hpp>
#include <boost/config.hpp>
@@ -35,42 +36,75 @@ namespace boost {
namespace beast {
namespace websocket {
+namespace detail {
+
+template<>
+inline
+void
+stream_base<true>::
+inflate(
+ zlib::z_params& zs,
+ zlib::Flush flush,
+ error_code& ec)
+{
+ this->pmd_->zi.write(zs, flush, ec);
+}
+
+template<>
+inline
+void
+stream_base<true>::
+do_context_takeover_read(role_type role)
+{
+ if((role == role_type::client &&
+ pmd_config_.server_no_context_takeover) ||
+ (role == role_type::server &&
+ pmd_config_.client_no_context_takeover))
+ {
+ pmd_->zi.reset();
+ }
+}
+
+} // detail
+
+//------------------------------------------------------------------------------
+
/* Read some message frame data.
Also reads and handles control frames.
*/
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<
class MutableBufferSequence,
class Handler>
-class stream<NextLayer>::read_some_op
+class stream<NextLayer, deflateSupported>::read_some_op
: public boost::asio::coroutine
{
Handler h_;
- stream<NextLayer>& ws_;
+ stream<NextLayer, deflateSupported>& ws_;
MutableBufferSequence bs_;
buffers_suffix<MutableBufferSequence> cb_;
std::size_t bytes_written_ = 0;
- error_code ev_;
- token tok_;
+ error_code result_;
close_code code_;
bool did_read_ = false;
bool cont_ = false;
public:
+ static constexpr int id = 1; // for soft_mutex
+
read_some_op(read_some_op&&) = default;
- read_some_op(read_some_op const&) = default;
+ read_some_op(read_some_op const&) = delete;
template<class DeducedHandler>
read_some_op(
DeducedHandler&& h,
- stream<NextLayer>& ws,
+ stream<NextLayer, deflateSupported>& ws,
MutableBufferSequence const& bs)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, bs_(bs)
, cb_(bs)
- , tok_(ws_.tok_.unique())
, code_(close_code::none)
{
}
@@ -81,16 +115,16 @@ public:
allocator_type
get_allocator() const noexcept
{
- return boost::asio::get_associated_allocator(h_);
+ return (boost::asio::get_associated_allocator)(h_);
}
using executor_type = boost::asio::associated_executor_t<
- Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
+ Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
- return boost::asio::get_associated_executor(
+ return (boost::asio::get_associated_executor)(
h_, ws_.get_executor());
}
@@ -112,12 +146,20 @@ public:
return op->cont_ || asio_handler_is_continuation(
std::addressof(op->h_));
}
+
+ template<class Function>
+ friend
+ void asio_handler_invoke(Function&& f, read_some_op* op)
+ {
+ using boost::asio::asio_handler_invoke;
+ asio_handler_invoke(f, std::addressof(op->h_));
+ }
};
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class MutableBufferSequence, class Handler>
void
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
read_some_op<MutableBufferSequence, Handler>::
operator()(
error_code ec,
@@ -127,17 +169,13 @@ operator()(
using beast::detail::clamp;
using boost::asio::buffer;
using boost::asio::buffer_size;
- close_code code{};
cont_ = cont;
BOOST_ASIO_CORO_REENTER(*this)
{
// Maybe suspend
do_maybe_suspend:
- if(! ws_.rd_block_)
+ if(ws_.rd_block_.try_lock(this))
{
- // Acquire the read block
- ws_.rd_block_ = tok_;
-
// Make sure the stream is not closed
if( ws_.status_ == status::closed ||
ws_.status_ == status::failed)
@@ -150,19 +188,17 @@ operator()(
{
do_suspend:
// Suspend
- BOOST_ASSERT(ws_.rd_block_ != tok_);
BOOST_ASIO_CORO_YIELD
- ws_.paused_r_rd_.save(std::move(*this));
+ ws_.paused_r_rd_.emplace(std::move(*this));
// Acquire the read block
- BOOST_ASSERT(! ws_.rd_block_);
- ws_.rd_block_ = tok_;
+ ws_.rd_block_.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(), std::move(*this));
- BOOST_ASSERT(ws_.rd_block_ == tok_);
+ BOOST_ASSERT(ws_.rd_block_.is_locked(this));
// The only way to get read blocked is if
// a `close_op` wrote a close frame
@@ -177,7 +213,7 @@ operator()(
// then finish the read with operation_aborted.
loop:
- BOOST_ASSERT(ws_.rd_block_ == tok_);
+ BOOST_ASSERT(ws_.rd_block_.is_locked(this));
// See if we need to read a frame header. This
// condition is structured to give the decompressor
// a chance to emit the final empty deflate block
@@ -187,45 +223,46 @@ operator()(
{
// Read frame header
while(! ws_.parse_fh(
- ws_.rd_fh_, ws_.rd_buf_, code))
+ ws_.rd_fh_, ws_.rd_buf_, result_))
{
- if(code != close_code::none)
+ if(result_)
{
// _Fail the WebSocket Connection_
- code_ = code;
- ev_ = error::failed;
+ if(result_ == error::message_too_big)
+ code_ = close_code::too_big;
+ else
+ code_ = close_code::protocol_error;
goto close;
}
- BOOST_ASSERT(ws_.rd_block_ == tok_);
+ BOOST_ASSERT(ws_.rd_block_.is_locked(this));
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
- BOOST_ASSERT(ws_.rd_block_ == tok_);
+ BOOST_ASSERT(ws_.rd_block_.is_locked(this));
if(! ws_.check_ok(ec))
goto upcall;
ws_.rd_buf_.commit(bytes_transferred);
// Allow a close operation
// to acquire the read block
- BOOST_ASSERT(ws_.rd_block_ == tok_);
- ws_.rd_block_.reset();
+ ws_.rd_block_.unlock(this);
if( ws_.paused_r_close_.maybe_invoke())
{
// Suspend
- BOOST_ASSERT(ws_.rd_block_);
+ BOOST_ASSERT(ws_.rd_block_.is_locked());
goto do_suspend;
}
// Acquire read block
- ws_.rd_block_ = tok_;
+ ws_.rd_block_.lock(this);
}
// Immediately apply the mask to the portion
// of the buffer holding payload data.
if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
detail::mask_inplace(buffers_prefix(
clamp(ws_.rd_fh_.len),
- ws_.rd_buf_.data()),
+ ws_.rd_buf_.mutable_data()),
ws_.rd_key_);
if(detail::is_control(ws_.rd_fh_.op))
{
@@ -236,6 +273,17 @@ operator()(
// Handle ping frame
if(ws_.rd_fh_.op == detail::opcode::ping)
{
+ if(ws_.ctrl_cb_)
+ {
+ if(! cont_)
+ {
+ BOOST_ASIO_CORO_YIELD
+ boost::asio::post(
+ ws_.get_executor(),
+ std::move(*this));
+ BOOST_ASSERT(cont_);
+ }
+ }
{
auto const b = buffers_prefix(
clamp(ws_.rd_fh_.len),
@@ -249,43 +297,34 @@ operator()(
if(ws_.status_ == status::closing)
goto loop;
if(ws_.ctrl_cb_)
- ws_.ctrl_cb_(frame_type::ping, payload);
+ ws_.ctrl_cb_(
+ frame_type::ping, payload);
ws_.rd_fb_.reset();
ws_.template write_ping<
flat_static_buffer_base>(ws_.rd_fb_,
detail::opcode::pong, payload);
}
- //BOOST_ASSERT(! ws_.paused_r_close_);
-
// Allow a close operation
// to acquire the read block
- BOOST_ASSERT(ws_.rd_block_ == tok_);
- ws_.rd_block_.reset();
+ ws_.rd_block_.unlock(this);
ws_.paused_r_close_.maybe_invoke();
// Maybe suspend
- if(! ws_.wr_block_)
- {
- // Acquire the write block
- ws_.wr_block_ = tok_;
- }
- else
+ if(! ws_.wr_block_.try_lock(this))
{
// Suspend
- BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
- ws_.paused_rd_.save(std::move(*this));
+ ws_.paused_rd_.emplace(std::move(*this));
// Acquire the write block
- BOOST_ASSERT(! ws_.wr_block_);
- ws_.wr_block_ = tok_;
+ ws_.wr_block_.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(), std::move(*this));
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
// Make sure the stream is open
if(! ws_.check_open(ec))
@@ -293,14 +332,14 @@ operator()(
}
// Send pong
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
ws_.rd_fb_.data(), std::move(*this));
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
if(! ws_.check_ok(ec))
goto upcall;
- ws_.wr_block_.reset();
+ ws_.wr_block_.unlock(this);
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
@@ -309,11 +348,22 @@ operator()(
// Handle pong frame
if(ws_.rd_fh_.op == detail::opcode::pong)
{
+ // Ignore pong when closing
+ if(! ws_.wr_close_ && ws_.ctrl_cb_)
+ {
+ if(! cont_)
+ {
+ BOOST_ASIO_CORO_YIELD
+ boost::asio::post(
+ ws_.get_executor(),
+ std::move(*this));
+ BOOST_ASSERT(cont_);
+ }
+ }
auto const cb = buffers_prefix(clamp(
ws_.rd_fh_.len), ws_.rd_buf_.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_fh_.len);
- code = close_code::none;
ping_data payload;
detail::read_ping(payload, cb);
ws_.rd_buf_.consume(len);
@@ -325,6 +375,17 @@ operator()(
// Handle close frame
BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
{
+ if(ws_.ctrl_cb_)
+ {
+ if(! cont_)
+ {
+ BOOST_ASIO_CORO_YIELD
+ boost::asio::post(
+ ws_.get_executor(),
+ std::move(*this));
+ BOOST_ASSERT(cont_);
+ }
+ }
auto const cb = buffers_prefix(clamp(
ws_.rd_fh_.len), ws_.rd_buf_.data());
auto const len = buffer_size(cb);
@@ -332,12 +393,11 @@ operator()(
BOOST_ASSERT(! ws_.rd_close_);
ws_.rd_close_ = true;
close_reason cr;
- detail::read_close(cr, cb, code);
- if(code != close_code::none)
+ detail::read_close(cr, cb, result_);
+ if(result_)
{
// _Fail the WebSocket Connection_
- code_ = code;
- ev_ = error::failed;
+ code_ = close_code::protocol_error;
goto close;
}
ws_.cr_ = cr;
@@ -351,14 +411,14 @@ operator()(
// _Close the WebSocket Connection_
BOOST_ASSERT(ws_.wr_close_);
code_ = close_code::none;
- ev_ = error::closed;
+ result_ = error::closed;
goto close;
}
// _Start the WebSocket Closing Handshake_
code_ = cr.code == close_code::none ?
close_code::normal :
static_cast<close_code>(cr.code);
- ev_ = error::closed;
+ result_ = error::closed;
goto close;
}
}
@@ -369,7 +429,7 @@ operator()(
}
ws_.rd_done_ = false;
}
- if(! ws_.pmd_ || ! ws_.pmd_->rd_set)
+ if(! ws_.rd_deflated())
{
if(ws_.rd_remain_ > 0)
{
@@ -389,7 +449,7 @@ operator()(
ws_.rd_buf_.commit(bytes_transferred);
if(ws_.rd_fh_.mask)
detail::mask_inplace(buffers_prefix(clamp(
- ws_.rd_remain_), ws_.rd_buf_.data()),
+ ws_.rd_remain_), ws_.rd_buf_.mutable_data()),
ws_.rd_key_);
}
if(ws_.rd_buf_.size() > 0)
@@ -409,7 +469,7 @@ operator()(
{
// _Fail the WebSocket Connection_
code_ = close_code::bad_payload;
- ev_ = error::failed;
+ result_ = error::bad_frame_payload;
goto close;
}
}
@@ -443,7 +503,7 @@ operator()(
{
// _Fail the WebSocket Connection_
code_ = close_code::bad_payload;
- ev_ = error::failed;
+ result_ = error::bad_frame_payload;
goto close;
}
}
@@ -477,7 +537,7 @@ operator()(
if(ws_.rd_fh_.mask)
detail::mask_inplace(
buffers_prefix(clamp(ws_.rd_remain_),
- ws_.rd_buf_.data()), ws_.rd_key_);
+ ws_.rd_buf_.mutable_data()), ws_.rd_key_);
did_read_ = true;
}
zlib::z_params zs;
@@ -511,7 +571,7 @@ operator()(
0x00, 0x00, 0xff, 0xff };
zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block);
- ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
+ ws_.inflate(zs, zlib::Flush::sync, ec);
if(! ec)
{
// https://github.com/madler/zlib/issues/280
@@ -520,12 +580,7 @@ operator()(
}
if(! ws_.check_ok(ec))
goto upcall;
- if(
- (ws_.role_ == role_type::client &&
- ws_.pmd_config_.server_no_context_takeover) ||
- (ws_.role_ == role_type::server &&
- ws_.pmd_config_.client_no_context_takeover))
- ws_.pmd_->zi.reset();
+ ws_.do_context_takeover_read(ws_.role_);
ws_.rd_done_ = true;
break;
}
@@ -533,7 +588,7 @@ operator()(
{
break;
}
- ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
+ ws_.inflate(zs, zlib::Flush::sync, ec);
if(! ws_.check_ok(ec))
goto upcall;
if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
@@ -541,7 +596,7 @@ operator()(
{
// _Fail the WebSocket Connection_
code_ = close_code::too_big;
- ev_ = error::failed;
+ result_ = error::message_too_big;
goto close;
}
cb_.consume(zs.total_out);
@@ -559,7 +614,7 @@ operator()(
{
// _Fail the WebSocket Connection_
code_ = close_code::bad_payload;
- ev_ = error::failed;
+ result_ = error::bad_frame_payload;
goto close;
}
}
@@ -567,30 +622,25 @@ operator()(
goto upcall;
close:
- if(! ws_.wr_block_)
+ if(ws_.wr_block_.try_lock(this))
{
- // Acquire the write block
- ws_.wr_block_ = tok_;
-
// Make sure the stream is open
BOOST_ASSERT(ws_.status_ == status::open);
}
else
{
// Suspend
- BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
- ws_.paused_rd_.save(std::move(*this));
+ ws_.paused_rd_.emplace(std::move(*this));
// Acquire the write block
- BOOST_ASSERT(! ws_.wr_block_);
- ws_.wr_block_ = tok_;
+ ws_.wr_block_.lock(this);
// Resume
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(), std::move(*this));
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
// Make sure the stream is open
if(! ws_.check_open(ec))
@@ -611,23 +661,23 @@ operator()(
ws_.rd_fb_, code_);
// Send close frame
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(
ws_.stream_, ws_.rd_fb_.data(),
std::move(*this));
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
if(! ws_.check_ok(ec))
goto upcall;
}
// Teardown
using beast::websocket::async_teardown;
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
BOOST_ASIO_CORO_YIELD
async_teardown(ws_.role_,
ws_.stream_, std::move(*this));
- BOOST_ASSERT(ws_.wr_block_ == tok_);
+ BOOST_ASSERT(ws_.wr_block_.is_locked(this));
if(ec == boost::asio::error::eof)
{
// Rationale:
@@ -635,7 +685,7 @@ operator()(
ec.assign(0, ec.category());
}
if(! ec)
- ec = ev_;
+ ec = result_;
if(ec && ec != error::closed)
ws_.status_ = status::failed;
else
@@ -643,16 +693,12 @@ operator()(
ws_.close();
upcall:
- if(ws_.rd_block_ == tok_)
- ws_.rd_block_.reset();
+ ws_.rd_block_.try_unlock(this);
ws_.paused_r_close_.maybe_invoke();
- if(ws_.wr_block_ == tok_)
- {
- ws_.wr_block_.reset();
+ if(ws_.wr_block_.try_unlock(this))
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
- }
if(! cont_)
return boost::asio::post(
ws_.stream_.get_executor(),
@@ -664,15 +710,15 @@ operator()(
//------------------------------------------------------------------------------
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<
class DynamicBuffer,
class Handler>
-class stream<NextLayer>::read_op
+class stream<NextLayer, deflateSupported>::read_op
: public boost::asio::coroutine
{
Handler h_;
- stream<NextLayer>& ws_;
+ stream<NextLayer, deflateSupported>& ws_;
DynamicBuffer& b_;
std::size_t limit_;
std::size_t bytes_written_ = 0;
@@ -683,12 +729,12 @@ public:
boost::asio::associated_allocator_t<Handler>;
read_op(read_op&&) = default;
- read_op(read_op const&) = default;
+ read_op(read_op const&) = delete;
template<class DeducedHandler>
read_op(
DeducedHandler&& h,
- stream<NextLayer>& ws,
+ stream<NextLayer, deflateSupported>& ws,
DynamicBuffer& b,
std::size_t limit,
bool some)
@@ -704,16 +750,16 @@ public:
allocator_type
get_allocator() const noexcept
{
- return boost::asio::get_associated_allocator(h_);
+ return (boost::asio::get_associated_allocator)(h_);
}
using executor_type = boost::asio::associated_executor_t<
- Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
+ Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
- return boost::asio::get_associated_executor(
+ return (boost::asio::get_associated_executor)(
h_, ws_.get_executor());
}
@@ -728,12 +774,20 @@ public:
return asio_handler_is_continuation(
std::addressof(op->h_));
}
+
+ template<class Function>
+ friend
+ void asio_handler_invoke(Function&& f, read_op* op)
+ {
+ using boost::asio::asio_handler_invoke;
+ asio_handler_invoke(f, std::addressof(op->h_));
+ }
};
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer, class Handler>
void
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
read_op<DynamicBuffer, Handler>::
operator()(
error_code ec,
@@ -781,10 +835,10 @@ operator()(
//------------------------------------------------------------------------------
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
read(DynamicBuffer& buffer)
{
static_assert(is_sync_stream<next_layer_type>::value,
@@ -799,10 +853,10 @@ read(DynamicBuffer& buffer)
return bytes_written;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
read(DynamicBuffer& buffer, error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
@@ -821,25 +875,25 @@ read(DynamicBuffer& buffer, error_code& ec)
return bytes_written;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer, class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(error_code, std::size_t))
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
async_read(DynamicBuffer& buffer, ReadHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
- "AsyncStream requirements requirements not met");
+ "AsyncStream requirements not met");
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
- boost::asio::async_completion<
- ReadHandler, void(error_code, std::size_t)> init{handler};
+ BOOST_BEAST_HANDLER_INIT(
+ ReadHandler, void(error_code, std::size_t));
read_op<
DynamicBuffer,
BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{
- init.completion_handler,
+ std::move(init.completion_handler),
*this,
buffer,
0,
@@ -849,10 +903,10 @@ async_read(DynamicBuffer& buffer, ReadHandler&& handler)
//------------------------------------------------------------------------------
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
read_some(
DynamicBuffer& buffer,
std::size_t limit)
@@ -870,10 +924,10 @@ read_some(
return bytes_written;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
read_some(
DynamicBuffer& buffer,
std::size_t limit,
@@ -906,28 +960,28 @@ read_some(
return bytes_written;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class DynamicBuffer, class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(error_code, std::size_t))
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
async_read_some(
DynamicBuffer& buffer,
std::size_t limit,
ReadHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
- "AsyncStream requirements requirements not met");
+ "AsyncStream requirements not met");
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
- boost::asio::async_completion<ReadHandler,
- void(error_code, std::size_t)> init{handler};
+ BOOST_BEAST_HANDLER_INIT(
+ ReadHandler, void(error_code, std::size_t));
read_op<
DynamicBuffer,
BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{
- init.completion_handler,
+ std::move(init.completion_handler),
*this,
buffer,
limit,
@@ -937,10 +991,10 @@ async_read_some(
//------------------------------------------------------------------------------
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class MutableBufferSequence>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
read_some(
MutableBufferSequence const& buffers)
{
@@ -956,10 +1010,10 @@ read_some(
return bytes_written;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class MutableBufferSequence>
std::size_t
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
read_some(
MutableBufferSequence const& buffers,
error_code& ec)
@@ -986,12 +1040,17 @@ loop:
if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
{
// Read frame header
- while(! parse_fh(rd_fh_, rd_buf_, code))
+ error_code result;
+ while(! parse_fh(rd_fh_, rd_buf_, result))
{
- if(code != close_code::none)
+ if(result)
{
// _Fail the WebSocket Connection_
- do_fail(code, error::failed, ec);
+ if(result == error::message_too_big)
+ code = close_code::too_big;
+ else
+ code = close_code::protocol_error;
+ do_fail(code, result, ec);
return bytes_written;
}
auto const bytes_transferred =
@@ -1007,7 +1066,7 @@ loop:
// of the buffer holding payload data.
if(rd_fh_.len > 0 && rd_fh_.mask)
detail::mask_inplace(buffers_prefix(
- clamp(rd_fh_.len), rd_buf_.data()),
+ clamp(rd_fh_.len), rd_buf_.mutable_data()),
rd_key_);
if(detail::is_control(rd_fh_.op))
{
@@ -1058,11 +1117,12 @@ loop:
BOOST_ASSERT(! rd_close_);
rd_close_ = true;
close_reason cr;
- detail::read_close(cr, b, code);
- if(code != close_code::none)
+ detail::read_close(cr, b, result);
+ if(result)
{
// _Fail the WebSocket Connection_
- do_fail(code, error::failed, ec);
+ do_fail(close_code::protocol_error,
+ result, ec);
return bytes_written;
}
cr_ = cr;
@@ -1090,7 +1150,7 @@ loop:
{
ec.assign(0, ec.category());
}
- if(! pmd_ || ! pmd_->rd_set)
+ if(! this->rd_deflated())
{
if(rd_remain_ > 0)
{
@@ -1108,7 +1168,7 @@ loop:
if(rd_fh_.mask)
detail::mask_inplace(
buffers_prefix(clamp(rd_remain_),
- rd_buf_.data()), rd_key_);
+ rd_buf_.mutable_data()), rd_key_);
}
if(rd_buf_.size() > 0)
{
@@ -1127,10 +1187,8 @@ loop:
! rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
- do_fail(
- close_code::bad_payload,
- error::failed,
- ec);
+ do_fail(close_code::bad_payload,
+ error::bad_frame_payload, ec);
return bytes_written;
}
}
@@ -1164,7 +1222,7 @@ loop:
{
// _Fail the WebSocket Connection_
do_fail(close_code::bad_payload,
- error::failed, ec);
+ error::bad_frame_payload, ec);
return bytes_written;
}
}
@@ -1217,7 +1275,7 @@ loop:
if(rd_fh_.mask)
detail::mask_inplace(
buffers_prefix(clamp(rd_remain_),
- rd_buf_.data()), rd_key_);
+ rd_buf_.mutable_data()), rd_key_);
auto const in = buffers_prefix(
clamp(rd_remain_), buffers_front(
rd_buf_.data()));
@@ -1238,7 +1296,7 @@ loop:
0x00, 0x00, 0xff, 0xff };
zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block);
- pmd_->zi.write(zs, zlib::Flush::sync, ec);
+ this->inflate(zs, zlib::Flush::sync, ec);
if(! ec)
{
// https://github.com/madler/zlib/issues/280
@@ -1247,12 +1305,7 @@ loop:
}
if(! check_ok(ec))
return bytes_written;
- if(
- (role_ == role_type::client &&
- pmd_config_.server_no_context_takeover) ||
- (role_ == role_type::server &&
- pmd_config_.client_no_context_takeover))
- pmd_->zi.reset();
+ this->do_context_takeover_read(role_);
rd_done_ = true;
break;
}
@@ -1260,14 +1313,14 @@ loop:
{
break;
}
- pmd_->zi.write(zs, zlib::Flush::sync, ec);
+ this->inflate(zs, zlib::Flush::sync, ec);
if(! check_ok(ec))
return bytes_written;
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_size_, zs.total_out, rd_msg_max_))
{
do_fail(close_code::too_big,
- error::failed, ec);
+ error::message_too_big, ec);
return bytes_written;
}
cb.consume(zs.total_out);
@@ -1285,7 +1338,7 @@ loop:
{
// _Fail the WebSocket Connection_
do_fail(close_code::bad_payload,
- error::failed, ec);
+ error::bad_frame_payload, ec);
return bytes_written;
}
}
@@ -1293,25 +1346,25 @@ loop:
return bytes_written;
}
-template<class NextLayer>
+template<class NextLayer, bool deflateSupported>
template<class MutableBufferSequence, class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(error_code, std::size_t))
-stream<NextLayer>::
+stream<NextLayer, deflateSupported>::
async_read_some(
MutableBufferSequence const& buffers,
ReadHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
- "AsyncStream requirements requirements not met");
+ "AsyncStream requirements not met");
static_assert(boost::asio::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
- boost::asio::async_completion<ReadHandler,
- void(error_code, std::size_t)> init{handler};
+ BOOST_BEAST_HANDLER_INIT(
+ ReadHandler, void(error_code, std::size_t));
read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{
- init.completion_handler,*this, buffers}(
+ std::move(init.completion_handler), *this, buffers}(
{}, 0, false);
return init.result.get();
}
@@ -1320,4 +1373,4 @@ async_read_some(
} // beast
} // boost
-#endif \ No newline at end of file
+#endif