diff options
author | Anas Nashif <anas.nashif@intel.com> | 2012-10-30 12:57:26 -0700 |
---|---|---|
committer | Anas Nashif <anas.nashif@intel.com> | 2012-10-30 12:57:26 -0700 |
commit | 1a78a62555be32868418fe52f8e330c9d0f95d5a (patch) | |
tree | d3765a80e7d3b9640ec2e930743630cd6b9fce2b /boost/property_map/parallel/impl/distributed_property_map.ipp | |
download | boost-1a78a62555be32868418fe52f8e330c9d0f95d5a.tar.gz boost-1a78a62555be32868418fe52f8e330c9d0f95d5a.tar.bz2 boost-1a78a62555be32868418fe52f8e330c9d0f95d5a.zip |
Imported Upstream version 1.49.0upstream/1.49.0
Diffstat (limited to 'boost/property_map/parallel/impl/distributed_property_map.ipp')
-rw-r--r-- | boost/property_map/parallel/impl/distributed_property_map.ipp | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/boost/property_map/parallel/impl/distributed_property_map.ipp b/boost/property_map/parallel/impl/distributed_property_map.ipp new file mode 100644 index 0000000000..fa95a5bd43 --- /dev/null +++ b/boost/property_map/parallel/impl/distributed_property_map.ipp @@ -0,0 +1,432 @@ +// Copyright (C) 2004-2006 The Trustees of Indiana University. + +// Use, modification and distribution is subject to the Boost Software +// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// Authors: Douglas Gregor +// Nick Edmonds +// Andrew Lumsdaine +#include <boost/assert.hpp> +#include <boost/property_map/parallel/distributed_property_map.hpp> +#include <boost/graph/parallel/detail/untracked_pair.hpp> +#include <boost/type_traits/is_base_and_derived.hpp> +#include <boost/bind.hpp> +#include <boost/graph/parallel/simple_trigger.hpp> + +#ifndef BOOST_GRAPH_USE_MPI +#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" +#endif + +namespace boost { namespace parallel { + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +PBGL_DISTRIB_PMAP +::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global, + const StorageMap& pm, const Reduce& reduce) + : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver)) +{ + typedef handle_message<Reduce> Handler; + + data->ghost_cells.reset(new ghost_cells_type()); + data->reset = &data_t::template do_reset<Reduce>; + data->process_group.replace_handler(Handler(data, reduce)); + data->process_group.template get_receiver<Handler>() + ->setup_triggers(data->process_group); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +PBGL_DISTRIB_PMAP::~distributed_property_map() { } + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +void +PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce) +{ + typedef handle_message<Reduce> Handler; + data->process_group.replace_handler(Handler(data, reduce)); + Handler* handler = data->process_group.template get_receiver<Handler>(); + BOOST_ASSERT(handler); + handler->setup_triggers(data->process_group); + data->get_default_value = reduce; + data->has_default_resolver = Reduce::non_default_resolver; + int model = data->model; + data->reset = &data_t::template do_reset<Reduce>; + set_consistency_model(model); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void PBGL_DISTRIB_PMAP::prune_ghost_cells() const +{ + if (data->max_ghost_cells == 0) + return; + + while (data->ghost_cells->size() > data->max_ghost_cells) { + // Evict the last ghost cell + + if (data->model & cm_flush) { + // We need to flush values when we evict them. + boost::parallel::detail::untracked_pair<key_type, value_type> const& victim + = data->ghost_cells->back(); + send(data->process_group, get(data->global, victim.first).first, + property_map_put, victim); + } + + // Actually remove the ghost cell + data->ghost_cells->pop_back(); + } +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +typename PBGL_DISTRIB_PMAP::value_type& +PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const +{ + // Index by key + ghost_cells_key_index_type const& key_index + = data->ghost_cells->template get<1>(); + + // Search for the ghost cell by key, and project back to the sequence + iterator ghost_cell + = data->ghost_cells->template project<0>(key_index.find(key)); + if (ghost_cell == data->ghost_cells->end()) { + value_type value; + if (data->has_default_resolver) + // Since we have a default resolver, use it to create a default + // value for this ghost cell. + value = data->get_default_value(key); + else if (request_if_missing) + // Request the actual value of this key from its owner + send_oob_with_reply(data->process_group, get(data->global, key).first, + property_map_get, key, value); + else + value = value_type(); + + // Create a ghost cell containing the new value + ghost_cell + = data->ghost_cells->push_front(std::make_pair(key, value)).first; + + // If we need to, prune the ghost cells + if (data->max_ghost_cells > 0) + prune_ghost_cells(); + } else if (data->max_ghost_cells > 0) + // Put this cell at the beginning of the MRU list + data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell); + + return const_cast<value_type&>(ghost_cell->second); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +void +PBGL_DISTRIB_PMAP +::handle_message<Reduce>::operator()(process_id_type source, int tag) +{ + BOOST_ASSERT(false); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +void +PBGL_DISTRIB_PMAP::handle_message<Reduce>:: +handle_put(int /*source*/, int /*tag*/, + const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context) +{ + using boost::get; + + shared_ptr<data_t> data(data_ptr); + + owner_local_pair p = get(data->global, req.first); + BOOST_ASSERT(p.first == process_id(data->process_group)); + + detail::maybe_put(data->storage, p.second, + reduce(req.first, + get(data->storage, p.second), + req.second)); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +typename PBGL_DISTRIB_PMAP::value_type +PBGL_DISTRIB_PMAP::handle_message<Reduce>:: +handle_get(int source, int /*tag*/, const key_type& key, + trigger_receive_context) +{ + using boost::get; + + shared_ptr<data_t> data(data_ptr); + BOOST_ASSERT(data); + + owner_local_pair p = get(data->global, key); + return get(data->storage, p.second); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +void +PBGL_DISTRIB_PMAP::handle_message<Reduce>:: +handle_multiget(int source, int tag, const std::vector<key_type>& keys, + trigger_receive_context) +{ + shared_ptr<data_t> data(data_ptr); + BOOST_ASSERT(data); + + typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value; + std::vector<key_value> results; + std::size_t n = keys.size(); + results.reserve(n); + + using boost::get; + + for (std::size_t i = 0; i < n; ++i) { + local_key_type local_key = get(data->global, keys[i]).second; + results.push_back(key_value(keys[i], get(data->storage, local_key))); + } + send(data->process_group, source, property_map_multiget_reply, results); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +void +PBGL_DISTRIB_PMAP::handle_message<Reduce>:: +handle_multiget_reply + (int source, int tag, + const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg, + trigger_receive_context) +{ + shared_ptr<data_t> data(data_ptr); + BOOST_ASSERT(data); + + // Index by key + ghost_cells_key_index_type const& key_index + = data->ghost_cells->template get<1>(); + + std::size_t n = msg.size(); + for (std::size_t i = 0; i < n; ++i) { + // Search for the ghost cell by key, and project back to the sequence + iterator position + = data->ghost_cells->template project<0>(key_index.find(msg[i].first)); + + if (position != data->ghost_cells->end()) + const_cast<value_type&>(position->second) = msg[i].second; + } +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +void +PBGL_DISTRIB_PMAP::handle_message<Reduce>:: +handle_multiput + (int source, int tag, + const std::vector<unsafe_pair<local_key_type, value_type> >& values, + trigger_receive_context) +{ + using boost::get; + + shared_ptr<data_t> data(data_ptr); + BOOST_ASSERT(data); + + std::size_t n = values.size(); + for (std::size_t i = 0; i < n; ++i) { + local_key_type local_key = values[i].first; + value_type local_value = get(data->storage, local_key); + detail::maybe_put(data->storage, values[i].first, + reduce(values[i].first, + local_value, + values[i].second)); + } +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Reduce> +void +PBGL_DISTRIB_PMAP::handle_message<Reduce>:: +setup_triggers(process_group_type& pg) +{ + using boost::graph::parallel::simple_trigger; + + simple_trigger(pg, property_map_put, this, &handle_message::handle_put); + simple_trigger(pg, property_map_get, this, &handle_message::handle_get); + simple_trigger(pg, property_map_multiget, this, + &handle_message::handle_multiget); + simple_trigger(pg, property_map_multiget_reply, this, + &handle_message::handle_multiget_reply); + simple_trigger(pg, property_map_multiput, this, + &handle_message::handle_multiput); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void +PBGL_DISTRIB_PMAP +::on_synchronize::operator()() +{ + int stage=0; // we only get called at the start now + shared_ptr<data_t> data(data_ptr); + BOOST_ASSERT(data); + + // Determine in which stage backward consistency messages should be sent. + int backward_stage = -1; + if (data->model & cm_backward) { + if (data->model & cm_flush) backward_stage = 1; + else backward_stage = 0; + } + + // Flush results in first stage + if (stage == 0 && data->model & cm_flush) + data->flush(); + + // Backward consistency + if (stage == backward_stage && !(data->model & (cm_clear | cm_reset))) + data->refresh_ghost_cells(); + + // Optionally clear results + if (data->model & cm_clear) + data->clear(); + + // Optionally reset results + if (data->model & cm_reset) { + if (data->reset) ((*data).*data->reset)(); + } +} + + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void +PBGL_DISTRIB_PMAP::set_consistency_model(int model) +{ + data->model = model; + + int stages = 1; + bool need_on_synchronize = (model != cm_forward); + + // Backward consistency is a two-stage process. + if (model & cm_backward) { + if (model & cm_flush) stages = 3; + else stages = 2; + + // For backward consistency to work, we absolutely cannot throw + // away any ghost cells. + data->max_ghost_cells = 0; + } + + // attach the on_synchronize handler. + if (need_on_synchronize) + data->process_group.replace_on_synchronize_handler(on_synchronize(data)); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void +PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells) +{ + if ((data->model & cm_backward) && max_ghost_cells > 0) + boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: " + "cannot limit ghost-cell usage with a backward " + "consistency model")); + + if (max_ghost_cells == 1) + // It is not safe to have only 1 ghost cell; the cell() method + // will fail. + max_ghost_cells = 2; + + data->max_ghost_cells = max_ghost_cells; + prune_ghost_cells(); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void PBGL_DISTRIB_PMAP::clear() +{ + data->clear(); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void PBGL_DISTRIB_PMAP::data_t::clear() +{ + ghost_cells->clear(); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void PBGL_DISTRIB_PMAP::reset() +{ + if (data->reset) ((*data).*data->reset)(); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void PBGL_DISTRIB_PMAP::flush() +{ + data->flush(); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells() +{ + using boost::get; + + std::vector<std::vector<key_type> > keys; + keys.resize(num_processes(process_group)); + + // Collect the set of keys for which we will request values + for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) + keys[get(global, i->first).first].push_back(i->first); + + // Send multiget requests to each of the other processors + typedef typename ProcessGroup::process_size_type process_size_type; + process_size_type n = num_processes(process_group); + process_id_type id = process_id(process_group); + for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) { + if (!keys[p].empty()) + send(process_group, p, property_map_multiget, keys[p]); + } +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void PBGL_DISTRIB_PMAP::data_t::flush() +{ + using boost::get; + + int n = num_processes(process_group); + std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values; + values.resize(n); + + // Collect all of the flushed values + for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) { + std::pair<int, local_key_type> g = get(global, i->first); + values[g.first].push_back(std::make_pair(g.second, i->second)); + } + + // Transmit flushed values + for (int p = 0; p < n; ++p) { + if (!values[p].empty()) + send(process_group, p, property_map_multiput, values[p]); + } +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +void PBGL_DISTRIB_PMAP::do_synchronize() +{ + if (data->model & cm_backward) { + synchronize(data->process_group); + return; + } + + // Request refreshes of the values of our ghost cells + data->refresh_ghost_cells(); + + // Allows all of the multigets to get to their destinations + synchronize(data->process_group); + + // Allows all of the multiget responses to get to their destinations + synchronize(data->process_group); +} + +template<typename ProcessGroup, typename GlobalMap, typename StorageMap> +template<typename Resolver> +void PBGL_DISTRIB_PMAP::data_t::do_reset() +{ + Resolver* resolver = get_default_value.template target<Resolver>(); + BOOST_ASSERT(resolver); + + for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) + const_cast<value_type&>(i->second) = (*resolver)(i->first); +} + +} } // end namespace boost::parallel |