summaryrefslogtreecommitdiff
path: root/doc/html/boost_asio/example/porthopper/server.cpp
blob: 0703bddff97764166cdfe0c4f73467c0c71caeb1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <cmath>
#include <cstdlib>
#include <exception>
#include <iostream>
#include <set>
#include "protocol.hpp"

using boost::asio::ip::tcp;
using boost::asio::ip::udp;

typedef boost::shared_ptr<tcp::socket> tcp_socket_ptr;
typedef boost::shared_ptr<boost::asio::deadline_timer> timer_ptr;
typedef boost::shared_ptr<control_request> control_request_ptr;

class server
{
public:
  // Construct the server to wait for incoming control connections.
  server(boost::asio::io_service& io_service, unsigned short port)
    : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
      timer_(io_service),
      udp_socket_(io_service, udp::endpoint(udp::v4(), 0)),
      next_frame_number_(1)
  {
    // Start waiting for a new control connection.
    tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_io_service()));
    acceptor_.async_accept(*new_socket,
        boost::bind(&server::handle_accept, this,
          boost::asio::placeholders::error, new_socket));

    // Start the timer used to generate outgoing frames.
    timer_.expires_from_now(boost::posix_time::milliseconds(100));
    timer_.async_wait(boost::bind(&server::handle_timer, this));
  }

  // Handle a new control connection.
  void handle_accept(const boost::system::error_code& ec, tcp_socket_ptr socket)
  {
    if (!ec)
    {
      // Start receiving control requests on the connection.
      control_request_ptr request(new control_request);
      boost::asio::async_read(*socket, request->to_buffers(),
          boost::bind(&server::handle_control_request, this,
            boost::asio::placeholders::error, socket, request));
    }

    // Start waiting for a new control connection.
    tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_io_service()));
    acceptor_.async_accept(*new_socket,
        boost::bind(&server::handle_accept, this,
          boost::asio::placeholders::error, new_socket));
  }

  // Handle a new control request.
  void handle_control_request(const boost::system::error_code& ec,
      tcp_socket_ptr socket, control_request_ptr request)
  {
    if (!ec)
    {
      // Delay handling of the control request to simulate network latency.
      timer_ptr delay_timer(
          new boost::asio::deadline_timer(acceptor_.get_io_service()));
      delay_timer->expires_from_now(boost::posix_time::seconds(2));
      delay_timer->async_wait(
          boost::bind(&server::handle_control_request_timer, this,
            socket, request, delay_timer));
    }
  }

  void handle_control_request_timer(tcp_socket_ptr socket,
      control_request_ptr request, timer_ptr /*delay_timer*/)
  {
    // Determine what address this client is connected from, since
    // subscriptions must be stored on the server as a complete endpoint, not
    // just a port. We use the non-throwing overload of remote_endpoint() since
    // it may fail if the socket is no longer connected.
    boost::system::error_code ec;
    tcp::endpoint remote_endpoint = socket->remote_endpoint(ec);
    if (!ec)
    {
      // Remove old port subscription, if any.
      if (unsigned short old_port = request->old_port())
      {
        udp::endpoint old_endpoint(remote_endpoint.address(), old_port);
        subscribers_.erase(old_endpoint);
        std::cout << "Removing subscription " << old_endpoint << std::endl;
      }

      // Add new port subscription, if any.
      if (unsigned short new_port = request->new_port())
      {
        udp::endpoint new_endpoint(remote_endpoint.address(), new_port);
        subscribers_.insert(new_endpoint);
        std::cout << "Adding subscription " << new_endpoint << std::endl;
      }
    }

    // Wait for next control request on this connection.
    boost::asio::async_read(*socket, request->to_buffers(),
        boost::bind(&server::handle_control_request, this,
          boost::asio::placeholders::error, socket, request));
  }

  // Every time the timer fires we will generate a new frame and send it to all
  // subscribers.
  void handle_timer()
  {
    // Generate payload.
    double x = next_frame_number_ * 0.2;
    double y = std::sin(x);
    int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2));
    std::string payload;
    for (int i = 0; i < frame::payload_size; ++i)
      payload += (i == char_index ? '*' : '.');

    // Create the frame to be sent to all subscribers.
    frame f(next_frame_number_++, payload);

    // Send frame to all subscribers. We can use synchronous calls here since
    // UDP send operations typically do not block.
    std::set<udp::endpoint>::iterator j;
    for (j = subscribers_.begin(); j != subscribers_.end(); ++j)
    {
      boost::system::error_code ec;
      udp_socket_.send_to(f.to_buffers(), *j, 0, ec);
    }

    // Wait for next timeout.
    timer_.expires_from_now(boost::posix_time::milliseconds(100));
    timer_.async_wait(boost::bind(&server::handle_timer, this));
  }

private:
  // The acceptor used to accept incoming control connections.
  tcp::acceptor acceptor_;

  // The timer used for generating data.
  boost::asio::deadline_timer timer_;

  // The socket used to send data to subscribers.
  udp::socket udp_socket_;

  // The next frame number.
  unsigned long next_frame_number_;

  // The set of endpoints that are subscribed.
  std::set<udp::endpoint> subscribers_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: server <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;

    using namespace std; // For atoi.
    server s(io_service, atoi(argv[1]));

    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << std::endl;
  }

  return 0;
}