diff options
Diffstat (limited to 'boost/log/sinks/async_frontend.hpp')
-rw-r--r-- | boost/log/sinks/async_frontend.hpp | 462 |
1 files changed, 462 insertions, 0 deletions
diff --git a/boost/log/sinks/async_frontend.hpp b/boost/log/sinks/async_frontend.hpp new file mode 100644 index 0000000000..d85421e8a1 --- /dev/null +++ b/boost/log/sinks/async_frontend.hpp @@ -0,0 +1,462 @@ +/* + * Copyright Andrey Semashev 2007 - 2014. + * Distributed under the Boost Software License, Version 1.0. + * (See accompanying file LICENSE_1_0.txt or copy at + * http://www.boost.org/LICENSE_1_0.txt) + */ +/*! + * \file async_frontend.hpp + * \author Andrey Semashev + * \date 14.07.2009 + * + * The header contains implementation of asynchronous sink frontend. + */ + +#ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ +#define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ + +#include <boost/log/detail/config.hpp> + +#ifdef BOOST_HAS_PRAGMA_ONCE +#pragma once +#endif + +#if defined(BOOST_LOG_NO_THREADS) +#error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment +#endif + +#include <boost/bind.hpp> +#include <boost/static_assert.hpp> +#include <boost/smart_ptr/shared_ptr.hpp> +#include <boost/smart_ptr/make_shared_object.hpp> +#include <boost/thread/locks.hpp> +#include <boost/thread/recursive_mutex.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/log/exceptions.hpp> +#include <boost/log/detail/locking_ptr.hpp> +#include <boost/log/detail/parameter_tools.hpp> +#include <boost/log/core/record_view.hpp> +#include <boost/log/sinks/basic_sink_frontend.hpp> +#include <boost/log/sinks/frontend_requirements.hpp> +#include <boost/log/sinks/unbounded_fifo_queue.hpp> +#include <boost/log/keywords/start_thread.hpp> +#include <boost/log/detail/header.hpp> + +namespace boost { + +BOOST_LOG_OPEN_NAMESPACE + +namespace sinks { + +#ifndef BOOST_LOG_DOXYGEN_PASS + +#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, types)\ + template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ + explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ + base_type(true),\ + queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ + m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\ + m_StopRequested(false),\ + m_FlushRequested(false)\ + {\ + if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ + start_feeding_thread();\ + }\ + template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ + explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ + base_type(true),\ + queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ + m_pBackend(backend),\ + m_StopRequested(false),\ + m_FlushRequested(false)\ + {\ + if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ + start_feeding_thread();\ + } + +#endif // BOOST_LOG_DOXYGEN_PASS + +/*! + * \brief Asynchronous logging sink frontend + * + * The frontend starts a separate thread on construction. All logging records are passed + * to the backend in this dedicated thread only. + */ +template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue > +class asynchronous_sink : + public aux::make_sink_frontend_base< SinkBackendT >::type, + public QueueingStrategyT +{ + typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type; + typedef QueueingStrategyT queue_base_type; + +private: + //! Backend synchronization mutex type + typedef boost::recursive_mutex backend_mutex_type; + //! Frontend synchronization mutex type + typedef typename base_type::mutex_type frontend_mutex_type; + + //! A scope guard that implements thread ID management + class scoped_thread_id + { + private: + frontend_mutex_type& m_Mutex; + condition_variable_any& m_Cond; + thread::id& m_ThreadID; + bool volatile& m_StopRequested; + + public: + //! Initializing constructor + scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, bool volatile& sr) + : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) + { + lock_guard< frontend_mutex_type > lock(m_Mutex); + if (m_ThreadID != thread::id()) + BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); + m_ThreadID = this_thread::get_id(); + } + //! Initializing constructor + scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, bool volatile& sr) + : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) + { + unique_lock< frontend_mutex_type > lock(move(l)); + if (m_ThreadID != thread::id()) + BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); + m_ThreadID = this_thread::get_id(); + } + //! Destructor + ~scoped_thread_id() + { + try + { + lock_guard< frontend_mutex_type > lock(m_Mutex); + m_StopRequested = false; + m_ThreadID = thread::id(); + m_Cond.notify_all(); + } + catch (...) + { + } + } + + private: + scoped_thread_id(scoped_thread_id const&); + scoped_thread_id& operator= (scoped_thread_id const&); + }; + + //! A scope guard that resets a flag on destructor + class scoped_flag + { + private: + frontend_mutex_type& m_Mutex; + condition_variable_any& m_Cond; + volatile bool& m_Flag; + + public: + explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, volatile bool& f) : + m_Mutex(mut), m_Cond(cond), m_Flag(f) + { + } + ~scoped_flag() + { + try + { + lock_guard< frontend_mutex_type > lock(m_Mutex); + m_Flag = false; + m_Cond.notify_all(); + } + catch (...) + { + } + } + + private: + scoped_flag(scoped_flag const&); + scoped_flag& operator= (scoped_flag const&); + }; + +public: + //! Sink implementation type + typedef SinkBackendT sink_backend_type; + //! \cond + BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met"); + //! \endcond + +#ifndef BOOST_LOG_DOXYGEN_PASS + + //! A pointer type that locks the backend until it's destroyed + typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr; + +#else // BOOST_LOG_DOXYGEN_PASS + + //! A pointer type that locks the backend until it's destroyed + typedef implementation_defined locked_backend_ptr; + +#endif // BOOST_LOG_DOXYGEN_PASS + +private: + //! Synchronization mutex + backend_mutex_type m_BackendMutex; + //! Pointer to the backend + const shared_ptr< sink_backend_type > m_pBackend; + + //! Dedicated record feeding thread + thread m_DedicatedFeedingThread; + //! Feeding thread ID + thread::id m_FeedingThreadID; + //! Condition variable to implement blocking operations + condition_variable_any m_BlockCond; + + //! The flag indicates that the feeding loop has to be stopped + volatile bool m_StopRequested; // TODO: make it a real atomic + //! The flag indicates that queue flush has been requested + volatile bool m_FlushRequested; // TODO: make it a real atomic + +public: + /*! + * Default constructor. Constructs the sink backend instance. + * Requires the backend to be default-constructible. + * + * \param start_thread If \c true, the frontend creates a thread to feed + * log records to the backend. Otherwise no thread is + * started and it is assumed that the user will call + * either \c run or \c feed_records himself. + */ + asynchronous_sink(bool start_thread = true) : + base_type(true), + m_pBackend(boost::make_shared< sink_backend_type >()), + m_StopRequested(false), + m_FlushRequested(false) + { + if (start_thread) + start_feeding_thread(); + } + /*! + * Constructor attaches user-constructed backend instance + * + * \param backend Pointer to the backend instance. + * \param start_thread If \c true, the frontend creates a thread to feed + * log records to the backend. Otherwise no thread is + * started and it is assumed that the user will call + * either \c run or \c feed_records himself. + * + * \pre \a backend is not \c NULL. + */ + explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) : + base_type(true), + m_pBackend(backend), + m_StopRequested(false), + m_FlushRequested(false) + { + if (start_thread) + start_feeding_thread(); + } + + // Constructors that pass arbitrary parameters to the backend constructor + BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~) + + /*! + * Destructor. Implicitly stops the dedicated feeding thread, if one is running. + */ + ~asynchronous_sink() + { + boost::this_thread::disable_interruption no_interrupts; + stop(); + } + + /*! + * Locking accessor to the attached backend + */ + locked_backend_ptr locked_backend() + { + return locked_backend_ptr(m_pBackend, m_BackendMutex); + } + + /*! + * Enqueues the log record to the backend + */ + void consume(record_view const& rec) + { + if (m_FlushRequested) + { + unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); + // Wait until flush is done + while (m_FlushRequested) + m_BlockCond.wait(lock); + } + queue_base_type::enqueue(rec); + } + + /*! + * The method attempts to pass logging record to the backend + */ + bool try_consume(record_view const& rec) + { + if (!m_FlushRequested) + { + return queue_base_type::try_enqueue(rec); + } + else + return false; + } + + /*! + * The method starts record feeding loop and effectively blocks until either of this happens: + * + * \li the thread is interrupted due to either standard thread interruption or a call to \c stop + * \li an exception is thrown while processing a log record in the backend, and the exception is + * not terminated by the exception handler, if one is installed + * + * \pre The sink frontend must be constructed without spawning a dedicated thread + */ + void run() + { + // First check that no other thread is running + scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); + + // Now start the feeding loop + while (true) + { + do_feed_records(); + if (!m_StopRequested) + { + // Block until new record is available + record_view rec; + if (queue_base_type::dequeue_ready(rec)) + base_type::feed_record(rec, m_BackendMutex, *m_pBackend); + } + else + break; + } + } + + /*! + * The method softly interrupts record feeding loop. This method must be called when the \c run + * method execution has to be interrupted. Unlike regular thread interruption, calling + * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend + * will attempt to finish its business with the record in progress and return afterwards. + * This method can be called either if the sink was created with a dedicated thread, + * or if the feeding loop was initiated by user. + * + * \note Returning from this method does not guarantee that there are no records left buffered + * in the sink frontend. It is possible that log records keep coming during and after this + * method is called. At some point of execution of this method log records stop being processed, + * and all records that come after this point are put into the queue. These records will be + * processed upon further calls to \c run or \c feed_records. + */ + void stop() + { + unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); + if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) + { + try + { + m_StopRequested = true; + queue_base_type::interrupt_dequeue(); + while (m_StopRequested) + m_BlockCond.wait(lock); + } + catch (...) + { + m_StopRequested = false; + throw; + } + + lock.unlock(); + m_DedicatedFeedingThread.join(); + } + } + + /*! + * The method feeds log records that may have been buffered to the backend and returns + * + * \pre The sink frontend must be constructed without spawning a dedicated thread + */ + void feed_records() + { + // First check that no other thread is running + scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); + + // Now start the feeding loop + do_feed_records(); + } + + /*! + * The method feeds all log records that may have been buffered to the backend and returns. + * Unlike \c feed_records, in case of ordering queueing the method also feeds records + * that were enqueued during the ordering window, attempting to empty the queue completely. + * + * \pre The sink frontend must be constructed without spawning a dedicated thread + */ + void flush() + { + unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); + if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) + { + // There is already a thread feeding records, let it do the job + m_FlushRequested = true; + queue_base_type::interrupt_dequeue(); + while (!m_StopRequested && m_FlushRequested) + m_BlockCond.wait(lock); + + // The condition may have been signalled when the feeding thread was finishing. + // In that case records may not have been flushed, and we do the flush ourselves. + if (m_FeedingThreadID != thread::id()) + return; + } + + m_FlushRequested = true; + + // Flush records ourselves. The guard releases the lock. + scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested); + + do_feed_records(); + } + +private: +#ifndef BOOST_LOG_DOXYGEN_PASS + //! The method spawns record feeding thread + void start_feeding_thread() + { + boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread); + } + + //! The record feeding loop + void do_feed_records() + { + while (!m_StopRequested) + { + record_view rec; + bool dequeued = false; + if (!m_FlushRequested) + dequeued = queue_base_type::try_dequeue_ready(rec); + else + dequeued = queue_base_type::try_dequeue(rec); + + if (dequeued) + base_type::feed_record(rec, m_BackendMutex, *m_pBackend); + else + break; + } + + if (m_FlushRequested) + { + scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested); + base_type::flush_backend(m_BackendMutex, *m_pBackend); + } + } +#endif // BOOST_LOG_DOXYGEN_PASS +}; + +#undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL + +} // namespace sinks + +BOOST_LOG_CLOSE_NAMESPACE // namespace log + +} // namespace boost + +#include <boost/log/detail/footer.hpp> + +#endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ |