diff options
Diffstat (limited to 'boost/log/sinks/unbounded_ordering_queue.hpp')
-rw-r--r-- | boost/log/sinks/unbounded_ordering_queue.hpp | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/boost/log/sinks/unbounded_ordering_queue.hpp b/boost/log/sinks/unbounded_ordering_queue.hpp new file mode 100644 index 0000000000..f67e629a3e --- /dev/null +++ b/boost/log/sinks/unbounded_ordering_queue.hpp @@ -0,0 +1,245 @@ +/* + * Copyright Andrey Semashev 2007 - 2014. + * Distributed under the Boost Software License, Version 1.0. + * (See accompanying file LICENSE_1_0.txt or copy at + * http://www.boost.org/LICENSE_1_0.txt) + */ +/*! + * \file unbounded_ordering_queue.hpp + * \author Andrey Semashev + * \date 24.07.2011 + * + * The header contains implementation of unbounded ordering record queueing strategy for + * the asynchronous sink frontend. + */ + +#ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ +#define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ + +#include <boost/log/detail/config.hpp> + +#ifdef BOOST_HAS_PRAGMA_ONCE +#pragma once +#endif + +#if defined(BOOST_LOG_NO_THREADS) +#error Boost.Log: This header content is only supported in multithreaded environment +#endif + +#include <queue> +#include <vector> +#include <boost/cstdint.hpp> +#include <boost/thread/locks.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/thread_time.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp> +#include <boost/log/detail/timestamp.hpp> +#include <boost/log/detail/enqueued_record.hpp> +#include <boost/log/keywords/order.hpp> +#include <boost/log/keywords/ordering_window.hpp> +#include <boost/log/core/record_view.hpp> +#include <boost/log/detail/header.hpp> + +namespace boost { + +BOOST_LOG_OPEN_NAMESPACE + +namespace sinks { + +/*! + * \brief Unbounded ordering log record queueing strategy + * + * The \c unbounded_ordering_queue class is intended to be used with + * the \c asynchronous_sink frontend as a log record queueing strategy. + * + * This strategy provides the following properties to the record queueing mechanism: + * + * \li The queue has no size limits. + * \li The queue has a fixed latency window. This means that each log record put + * into the queue will normally not be dequeued for a certain period of time. + * \li The queue performs stable record ordering within the latency window. + * The ordering predicate can be specified in the \c OrderT template parameter. + * + * Since this queue has no size limits, it may grow uncontrollably if sink backends + * dequeue log records not fast enough. When this is an issue, it is recommended to + * use one of the bounded strategies. + */ +template< typename OrderT > +class unbounded_ordering_queue +{ +private: + typedef boost::mutex mutex_type; + typedef sinks::aux::enqueued_record enqueued_record; + + typedef std::priority_queue< + enqueued_record, + std::vector< enqueued_record >, + enqueued_record::order< OrderT > + > queue_type; + +private: + //! Ordering window duration, in milliseconds + const uint64_t m_ordering_window; + //! Synchronization mutex + mutex_type m_mutex; + //! Condition for blocking + condition_variable m_cond; + //! Thread-safe queue + queue_type m_queue; + //! Interruption flag + bool m_interruption_requested; + +public: + /*! + * Returns ordering window size specified during initialization + */ + posix_time::time_duration get_ordering_window() const + { + return posix_time::milliseconds(m_ordering_window); + } + + /*! + * Returns default ordering window size. + * The default window size is specific to the operating system thread scheduling mechanism. + */ + static posix_time::time_duration get_default_ordering_window() + { + // The main idea behind this parameter is that the ordering window should be large enough + // to allow the frontend to order records from different threads on an attribute + // that contains system time. Thus this value should be: + // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS. + // For instance, on Windows it defaults to around 15-16 ms. + // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to + // switch threads on any known OS. It can be tuned for other platforms as needed. + return posix_time::milliseconds(30); + } + +protected: + //! Initializing constructor + template< typename ArgsT > + explicit unbounded_ordering_queue(ArgsT const& args) : + m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()), + m_queue(args[keywords::order]), + m_interruption_requested(false) + { + } + + //! Enqueues log record to the queue + void enqueue(record_view const& rec) + { + lock_guard< mutex_type > lock(m_mutex); + enqueue_unlocked(rec); + } + + //! Attempts to enqueue log record to the queue + bool try_enqueue(record_view const& rec) + { + unique_lock< mutex_type > lock(m_mutex, try_to_lock); + if (lock.owns_lock()) + { + enqueue_unlocked(rec); + return true; + } + else + return false; + } + + //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed + bool try_dequeue_ready(record_view& rec) + { + lock_guard< mutex_type > lock(m_mutex); + if (!m_queue.empty()) + { + const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); + enqueued_record const& elem = m_queue.top(); + if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window) + { + // We got a new element + rec = elem.m_record; + m_queue.pop(); + return true; + } + } + + return false; + } + + //! Attempts to dequeue log record from the queue, does not block. + bool try_dequeue(record_view& rec) + { + lock_guard< mutex_type > lock(m_mutex); + if (!m_queue.empty()) + { + enqueued_record const& elem = m_queue.top(); + rec = elem.m_record; + m_queue.pop(); + return true; + } + + return false; + } + + //! Dequeues log record from the queue, blocks if no log records are ready to be processed + bool dequeue_ready(record_view& rec) + { + unique_lock< mutex_type > lock(m_mutex); + while (!m_interruption_requested) + { + if (!m_queue.empty()) + { + const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); + enqueued_record const& elem = m_queue.top(); + const uint64_t difference = (now - elem.m_timestamp).milliseconds(); + if (difference >= m_ordering_window) + { + // We got a new element + rec = elem.m_record; + m_queue.pop(); + return true; + } + else + { + // Wait until the element becomes ready to be processed + m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference)); + } + } + else + { + // Wait for an element to come + m_cond.wait(lock); + } + } + m_interruption_requested = false; + + return false; + } + + //! Wakes a thread possibly blocked in the \c dequeue method + void interrupt_dequeue() + { + lock_guard< mutex_type > lock(m_mutex); + m_interruption_requested = true; + m_cond.notify_one(); + } + +private: + //! Enqueues a log record + void enqueue_unlocked(record_view const& rec) + { + const bool was_empty = m_queue.empty(); + m_queue.push(enqueued_record(rec)); + if (was_empty) + m_cond.notify_one(); + } +}; + +} // namespace sinks + +BOOST_LOG_CLOSE_NAMESPACE // namespace log + +} // namespace boost + +#include <boost/log/detail/footer.hpp> + +#endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ |