summaryrefslogtreecommitdiff
path: root/doc/html/boost_asio/example/cpp03/porthopper/client.cpp
blob: 074b8808be517a6f18b66b4ea44c66899baa7c7a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
//
// client.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/asio.hpp>
#include <boost/lambda/lambda.hpp>
#include <boost/lambda/bind.hpp>
#include <boost/lambda/if.hpp>
#include <boost/shared_ptr.hpp>
#include <algorithm>
#include <cstdlib>
#include <exception>
#include <iostream>
#include <string>
#include "protocol.hpp"

using namespace boost;
using boost::asio::ip::tcp;
using boost::asio::ip::udp;

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 3)
    {
      std::cerr << "Usage: client <host> <port>\n";
      return 1;
    }
    using namespace std; // For atoi.
    std::string host_name = argv[1];
    std::string port = argv[2];

    boost::asio::io_context io_context;

    // Determine the location of the server.
    tcp::resolver resolver(io_context);
    tcp::endpoint remote_endpoint = *resolver.resolve(host_name, port).begin();

    // Establish the control connection to the server.
    tcp::socket control_socket(io_context);
    control_socket.connect(remote_endpoint);

    // Create a datagram socket to receive data from the server.
    boost::shared_ptr<udp::socket> data_socket(
        new udp::socket(io_context, udp::endpoint(udp::v4(), 0)));

    // Determine what port we will receive data on.
    udp::endpoint data_endpoint = data_socket->local_endpoint();

    // Ask the server to start sending us data.
    control_request start = control_request::start(data_endpoint.port());
    boost::asio::write(control_socket, start.to_buffers());

    unsigned long last_frame_number = 0;
    for (;;)
    {
      // Receive 50 messages on the current data socket.
      for (int i = 0; i < 50; ++i)
      {
        // Receive a frame from the server.
        frame f;
        data_socket->receive(f.to_buffers(), 0);
        if (f.number() > last_frame_number)
        {
          last_frame_number = f.number();
          std::cout << "\n" << f.payload();
        }
      }

      // Time to switch to a new socket. To ensure seamless handover we will
      // continue to receive packets using the old socket until data arrives on
      // the new one.
      std::cout << " Starting renegotiation";

      // Create the new data socket.
      boost::shared_ptr<udp::socket> new_data_socket(
          new udp::socket(io_context, udp::endpoint(udp::v4(), 0)));

      // Determine the new port we will use to receive data.
      udp::endpoint new_data_endpoint = new_data_socket->local_endpoint();

      // Ask the server to switch over to the new port.
      control_request change = control_request::change(
          data_endpoint.port(), new_data_endpoint.port());
      boost::system::error_code control_result;
      boost::asio::async_write(control_socket, change.to_buffers(),
          (
            lambda::var(control_result) = lambda::_1
          ));

      // Try to receive a frame from the server on the new data socket. If we
      // successfully receive a frame on this new data socket we can consider
      // the renegotation complete. In that case we will close the old data
      // socket, which will cause any outstanding receive operation on it to be
      // cancelled.
      frame f1;
      boost::system::error_code new_data_socket_result;
      new_data_socket->async_receive(f1.to_buffers(),
          (
            // Note: lambda::_1 is the first argument to the callback handler,
            // which in this case is the error code for the operation.
            lambda::var(new_data_socket_result) = lambda::_1,
            lambda::if_(!lambda::_1)
            [
              // We have successfully received a frame on the new data socket,
              // so we can close the old data socket. This will cancel any
              // outstanding receive operation on the old data socket.
              lambda::var(data_socket) = boost::shared_ptr<udp::socket>()
            ]
          ));

      // This loop will continue until we have successfully completed the
      // renegotiation (i.e. received a frame on the new data socket), or some
      // unrecoverable error occurs.
      bool done = false;
      while (!done)
      {
        // Even though we're performing a renegotation, we want to continue
        // receiving data as smoothly as possible. Therefore we will continue to
        // try to receive a frame from the server on the old data socket. If we
        // receive a frame on this socket we will interrupt the io_context,
        // print the frame, and resume waiting for the other operations to
        // complete.
        frame f2;
        done = true; // Let's be optimistic.
        if (data_socket) // Might have been closed by new_data_socket's handler.
        {
          data_socket->async_receive(f2.to_buffers(), 0,
              (
                lambda::if_(!lambda::_1)
                [
                  // We have successfully received a frame on the old data
                  // socket. Stop the io_context so that we can print it.
                  lambda::bind(&boost::asio::io_context::stop, &io_context),
                  lambda::var(done) = false
                ]
              ));
        }

        // Run the operations in parallel. This will block until all operations
        // have finished, or until the io_context is interrupted. (No threads!)
        io_context.restart();
        io_context.run();

        // If the io_context.run() was interrupted then we have received a frame
        // on the old data socket. We need to keep waiting for the renegotation
        // operations to complete.
        if (!done)
        {
          if (f2.number() > last_frame_number)
          {
            last_frame_number = f2.number();
            std::cout << "\n" << f2.payload();
          }
        }
      }

      // Since the loop has finished, we have either successfully completed
      // the renegotation, or an error has occurred. First we'll check for
      // errors.
      if (control_result)
        throw boost::system::system_error(control_result);
      if (new_data_socket_result)
        throw boost::system::system_error(new_data_socket_result);

      // If we get here it means we have successfully started receiving data on
      // the new data socket. This new data socket will be used from now on
      // (until the next time we renegotiate).
      std::cout << " Renegotiation complete";
      data_socket = new_data_socket;
      data_endpoint = new_data_endpoint;
      if (f1.number() > last_frame_number)
      {
        last_frame_number = f1.number();
        std::cout << "\n" << f1.payload();
      }
    }
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << std::endl;
  }

  return 0;
}