summaryrefslogtreecommitdiff
path: root/boost/statechart/fifo_worker.hpp
blob: 75d3ad6ce1c26e268cabe57c221e437804c80cd8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#ifndef BOOST_STATECHART_FIFO_WORKER_HPP_INCLUDED
#define BOOST_STATECHART_FIFO_WORKER_HPP_INCLUDED
//////////////////////////////////////////////////////////////////////////////
// Copyright 2002-2008 Andreas Huber Doenni
// Distributed under the Boost Software License, Version 1.0. (See accompany-
// ing file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//////////////////////////////////////////////////////////////////////////////



#include <boost/assert.hpp>
#include <boost/noncopyable.hpp>
#include <boost/function/function0.hpp>
#include <boost/bind.hpp>
// BOOST_HAS_THREADS, BOOST_MSVC
#include <boost/config.hpp>

#include <boost/detail/allocator_utilities.hpp>

#ifdef BOOST_HAS_THREADS
#  ifdef BOOST_MSVC
#    pragma warning( push )
     // "conditional expression is constant" in basic_timed_mutex.hpp
#    pragma warning( disable: 4127 )
     // "conversion from 'int' to 'unsigned short'" in microsec_time_clock.hpp
#    pragma warning( disable: 4244 )
     // "... needs to have dll-interface to be used by clients of class ..."
#    pragma warning( disable: 4251 )
     // "... assignment operator could not be generated"
#    pragma warning( disable: 4512 )
     // "Function call with parameters that may be unsafe" in
     // condition_variable.hpp
#    pragma warning( disable: 4996 )
#  endif

#  include <boost/thread/mutex.hpp>
#  include <boost/thread/condition.hpp>

#  ifdef BOOST_MSVC
#    pragma warning( pop )
#  endif
#endif

#include <list>
#include <memory>   // std::allocator


namespace boost
{
namespace statechart
{



template< class Allocator = std::allocator< none > >
class fifo_worker : noncopyable
{
  public:
    //////////////////////////////////////////////////////////////////////////
    #ifdef BOOST_HAS_THREADS
    fifo_worker( bool waitOnEmptyQueue = false ) :
      waitOnEmptyQueue_( waitOnEmptyQueue ),
    #else
    fifo_worker() :
    #endif
      terminated_( false )
    {
    }

    typedef function0< void > work_item;

    // We take a non-const reference so that we can move (i.e. swap) the item
    // into the queue, what avoids copying the (possibly heap-allocated)
    // implementation object inside work_item.
    void queue_work_item( work_item & item )
    {
      if ( item.empty() )
      {
        return;
      }

      #ifdef BOOST_HAS_THREADS
      mutex::scoped_lock lock( mutex_ );
      #endif

      workQueue_.push_back( work_item() );
      workQueue_.back().swap( item );

      #ifdef BOOST_HAS_THREADS
      queueNotEmpty_.notify_one();
      #endif
    }

    // Convenience overload so that temporary objects can be passed directly
    // instead of having to create a work_item object first. Under most
    // circumstances, this will lead to one unnecessary copy of the
    // function implementation object.
    void queue_work_item( const work_item & item )
    {
      work_item copy = item;
      queue_work_item( copy );
    }

    void terminate()
    {
      work_item item = boost::bind( &fifo_worker::terminate_impl, this );
      queue_work_item( item );
    }

    // Is not mutex-protected! Must only be called from the thread that also
    // calls operator().
    bool terminated() const
    {
      return terminated_;
    }

    unsigned long operator()( unsigned long maxItemCount = 0 )
    {
      unsigned long itemCount = 0;

      while ( !terminated() &&
        ( ( maxItemCount == 0 ) || ( itemCount < maxItemCount ) ) )
      {
        work_item item = dequeue_item();

        if ( item.empty() )
        {
          // item can only be empty when the queue is empty, which only
          // happens in ST builds or when users pass false to the fifo_worker
          // constructor
          return itemCount;
        }

        item();
        ++itemCount;
      }

      return itemCount;
    }

  private:
    //////////////////////////////////////////////////////////////////////////
    work_item dequeue_item()
    {
      #ifdef BOOST_HAS_THREADS
      mutex::scoped_lock lock( mutex_ );

      if ( !waitOnEmptyQueue_ && workQueue_.empty() )
      {
        return work_item();
      }

      while ( workQueue_.empty() )
      {
        queueNotEmpty_.wait( lock );
      }
      #else
      // If the queue happens to run empty in a single-threaded system,
      // waiting for new work items (which means to loop indefinitely!) is
      // pointless as there is no way that new work items could find their way
      // into the queue. The only sensible thing is to exit the loop and
      // return to the caller in this case.
      // Users can then queue new work items before calling operator() again.
      if ( workQueue_.empty() )
      {
        return work_item();
      }
      #endif

      // Optimization: Swap rather than assign to avoid the copy of the
      // implementation object inside function
      work_item result;
      result.swap( workQueue_.front() );
      workQueue_.pop_front();
      return result;
    }

    void terminate_impl()
    {
      terminated_ = true;
    }


    typedef std::list<
      work_item,
      typename boost::detail::allocator::rebind_to<
        Allocator, work_item >::type
    > work_queue_type;

    work_queue_type workQueue_;

    #ifdef BOOST_HAS_THREADS
    mutex mutex_;
    condition queueNotEmpty_;
    const bool waitOnEmptyQueue_;
    #endif

    bool terminated_;
};



} // namespace statechart
} // namespace boost



#endif