diff options
Diffstat (limited to 'boost/mpi/communicator.hpp')
-rw-r--r-- | boost/mpi/communicator.hpp | 483 |
1 files changed, 70 insertions, 413 deletions
diff --git a/boost/mpi/communicator.hpp b/boost/mpi/communicator.hpp index 6e55b167f9..ff889d00a3 100644 --- a/boost/mpi/communicator.hpp +++ b/boost/mpi/communicator.hpp @@ -34,9 +34,6 @@ // For (de-)serializing skeletons and content #include <boost/mpi/skeleton_and_content_fwd.hpp> -// For (de-)serializing arrays -#include <boost/serialization/array.hpp> - #include <boost/mpi/detail/point_to_point.hpp> #include <boost/mpi/status.hpp> #include <boost/mpi/request.hpp> @@ -802,23 +799,7 @@ class BOOST_MPI_DECL communicator * Split the communicator into multiple, disjoint communicators * each of which is based on a particular color. This is a * collective operation that returns a new communicator that is a - * subgroup of @p this. This routine is functionally equivalent to - * @c MPI_Comm_split. - * - * @param color The color of this process. All processes with the - * same @p color value will be placed into the same group. - * - * @returns A new communicator containing all of the processes in - * @p this that have the same @p color. - */ - communicator split(int color) const; - - /** - * Split the communicator into multiple, disjoint communicators - * each of which is based on a particular color. This is a - * collective operation that returns a new communicator that is a - * subgroup of @p this. This routine is functionally equivalent to - * @c MPI_Comm_split. + * subgroup of @p this. * * @param color The color of this process. All processes with the * same @p color value will be placed into the same group. @@ -833,6 +814,7 @@ class BOOST_MPI_DECL communicator * @p this that have the same @p color. */ communicator split(int color, int key) const; + communicator split(int color) const; /** * Determine if the communicator is in fact an intercommunicator @@ -1157,11 +1139,15 @@ inline bool operator!=(const communicator& comm1, const communicator& comm2) return !(comm1 == comm2); } +}} // boost::mpi /************************************************************************ * Implementation details * ************************************************************************/ +#include <boost/mpi/detail/request_handlers.hpp> + +namespace boost { namespace mpi { /** * INTERNAL ONLY (using the same 'end' name might be considerd unfortunate */ @@ -1308,6 +1294,7 @@ template<typename T> void communicator::send_impl(int dest, int tag, const T& value, mpl::true_) const { + // received by recv or trivial handler. BOOST_MPI_CHECK_RESULT(MPI_Send, (const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest, tag, MPI_Comm(*this))); @@ -1355,26 +1342,37 @@ communicator::array_send_impl(int dest, int tag, const T* values, int n, mpl::false_) const { packed_oarchive oa(*this); - oa << n << boost::serialization::make_array(values, n); + T const* v = values; + while (v < values+n) { + oa << *v++; + } send(dest, tag, oa); } template<typename T, typename A> void communicator::send_vector(int dest, int tag, - const std::vector<T,A>& value, mpl::true_ true_type) const + const std::vector<T,A>& values, mpl::true_ primitive) const { - // send the vector size - typename std::vector<T,A>::size_type size = value.size(); - send(dest, tag, size); - // send the data - this->array_send_impl(dest, tag, value.data(), size, true_type); +#if defined(BOOST_MPI_USE_IMPROBE) + array_send_impl(dest, tag, values.data(), values.size(), primitive); +#else + { + // non blocking recv by legacy_dynamic_primitive_array_handler + // blocking recv by recv_vector(source,tag,value,primitive) + // send the vector size + typename std::vector<T,A>::size_type size = values.size(); + send(dest, tag, size); + // send the data + this->array_send_impl(dest, tag, values.data(), size, primitive); + } +#endif } template<typename T, typename A> void communicator::send_vector(int dest, int tag, - const std::vector<T,A>& value, mpl::false_ false_type) const + const std::vector<T,A>& value, mpl::false_ primitive) const { - this->send_impl(dest, tag, value, false_type); + this->send_impl(dest, tag, value, primitive); } template<typename T, typename A> @@ -1396,7 +1394,6 @@ template<typename T> status communicator::recv_impl(int source, int tag, T& value, mpl::true_) const { status stat; - BOOST_MPI_CHECK_RESULT(MPI_Recv, (const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), @@ -1444,38 +1441,42 @@ status communicator::array_recv_impl(int source, int tag, T* values, int n, mpl::false_) const { - // Receive the message packed_iarchive ia(*this); status stat = recv(source, tag, ia); - - // Determine how much data we are going to receive - int count; - ia >> count; - - // Deserialize the data in the message - boost::serialization::array_wrapper<T> arr(values, count > n? n : count); - ia >> arr; - - if (count > n) { - boost::throw_exception( - std::range_error("communicator::recv: message receive overflow")); + T* v = values; + while (v != values+n) { + ia >> *v++; } - - stat.m_count = count; + stat.m_count = n; return stat; } template<typename T, typename A> status communicator::recv_vector(int source, int tag, - std::vector<T,A>& value, mpl::true_ true_type) const + std::vector<T,A>& values, mpl::true_ primitive) const { - // receive the vector size - typename std::vector<T,A>::size_type size = 0; - recv(source, tag, size); - // size the vector - value.resize(size); - // receive the data - return this->array_recv_impl(source, tag, value.data(), size, true_type); +#if defined(BOOST_MPI_USE_IMPROBE) + { + MPI_Message msg; + status stat; + BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (source,tag,*this,&msg,&stat.m_status)); + int count; + BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status,get_mpi_datatype<T>(),&count)); + values.resize(count); + BOOST_MPI_CHECK_RESULT(MPI_Mrecv, (values.data(), count, get_mpi_datatype<T>(), &msg, &stat.m_status)); + return stat; + } +#else + { + // receive the vector size + typename std::vector<T,A>::size_type size = 0; + recv(source, tag, size); + // size the vector + values.resize(size); + // receive the data + return this->array_recv_impl(source, tag, values.data(), size, primitive); + } +#endif } template<typename T, typename A> @@ -1542,12 +1543,7 @@ template<typename T> request communicator::isend_impl(int dest, int tag, const T& value, mpl::true_) const { - request req; - BOOST_MPI_CHECK_RESULT(MPI_Isend, - (const_cast<T*>(&value), 1, - get_mpi_datatype<T>(value), - dest, tag, MPI_Comm(*this), &req.m_requests[0])); - return req; + return request::make_trivial_send(*this, dest, tag, value); } // We're sending a type that does not have an associated MPI @@ -1560,7 +1556,7 @@ communicator::isend_impl(int dest, int tag, const T& value, mpl::false_) const shared_ptr<packed_oarchive> archive(new packed_oarchive(*this)); *archive << value; request result = isend(dest, tag, *archive); - result.m_data = archive; + result.preserve(archive); return result; } @@ -1581,16 +1577,9 @@ request communicator::isend(int dest, int tag, const std::vector<T,A>& values) c template<typename T, class A> request communicator::isend_vector(int dest, int tag, const std::vector<T,A>& values, - mpl::true_) const + mpl::true_ primitive) const { - std::size_t size = values.size(); - request req = this->isend_impl(dest, tag, size, mpl::true_()); - BOOST_MPI_CHECK_RESULT(MPI_Isend, - (const_cast<T*>(values.data()), size, - get_mpi_datatype<T>(), - dest, tag, MPI_Comm(*this), &req.m_requests[1])); - return req; - + return request::make_dynamic_primitive_array_send(*this, dest, tag, values); } template<typename T, class A> @@ -1606,12 +1595,7 @@ request communicator::array_isend_impl(int dest, int tag, const T* values, int n, mpl::true_) const { - request req; - BOOST_MPI_CHECK_RESULT(MPI_Isend, - (const_cast<T*>(values), n, - get_mpi_datatype<T>(*values), - dest, tag, MPI_Comm(*this), &req.m_requests[0])); - return req; + return request::make_trivial_send(*this, dest, tag, values, n); } template<typename T> @@ -1620,9 +1604,12 @@ communicator::array_isend_impl(int dest, int tag, const T* values, int n, mpl::false_) const { shared_ptr<packed_oarchive> archive(new packed_oarchive(*this)); - *archive << n << boost::serialization::make_array(values, n); + T const* v = values; + while (v < values+n) { + *archive << *v++; + } request result = isend(dest, tag, *archive); - result.m_data = archive; + result.preserve(archive); return result; } @@ -1634,323 +1621,20 @@ request communicator::isend(int dest, int tag, const T* values, int n) const return array_isend_impl(dest, tag, values, n, is_mpi_datatype<T>()); } -namespace detail { - /** - * Internal data structure that stores everything required to manage - * the receipt of serialized data via a request object. - */ - template<typename T> - struct serialized_irecv_data - { - serialized_irecv_data(const communicator& comm, int source, int tag, - T& value) - : comm(comm), source(source), tag(tag), ia(comm), value(value) - { - } - - void deserialize(status& stat) - { - ia >> value; - stat.m_count = 1; - } - - communicator comm; - int source; - int tag; - std::size_t count; - packed_iarchive ia; - T& value; - }; - - template<> - struct serialized_irecv_data<packed_iarchive> - { - serialized_irecv_data(const communicator& comm, int source, int tag, - packed_iarchive& ia) - : comm(comm), source(source), tag(tag), ia(ia) { } - - void deserialize(status&) { /* Do nothing. */ } - - communicator comm; - int source; - int tag; - std::size_t count; - packed_iarchive& ia; - }; - - /** - * Internal data structure that stores everything required to manage - * the receipt of an array of serialized data via a request object. - */ - template<typename T> - struct serialized_array_irecv_data - { - serialized_array_irecv_data(const communicator& comm, int source, int tag, - T* values, int n) - : comm(comm), source(source), tag(tag), ia(comm), values(values), n(n) - { - } - - void deserialize(status& stat); - - communicator comm; - int source; - int tag; - std::size_t count; - packed_iarchive ia; - T* values; - int n; - }; - - template<typename T> - void serialized_array_irecv_data<T>::deserialize(status& stat) - { - // Determine how much data we are going to receive - int count; - ia >> count; - - // Deserialize the data in the message - boost::serialization::array_wrapper<T> arr(values, count > n? n : count); - ia >> arr; - - if (count > n) { - boost::throw_exception( - std::range_error("communicator::recv: message receive overflow")); - } - - stat.m_count = count; - } - - /** - * Internal data structure that stores everything required to manage - * the receipt of an array of primitive data but unknown size. - * Such an array can have been send with blocking operation and so must - * be compatible with the (size_t,raw_data[]) format. - */ - template<typename T, class A> - struct dynamic_array_irecv_data - { - BOOST_STATIC_ASSERT_MSG(is_mpi_datatype<T>::value, "Can only be specialized for MPI datatypes."); - - dynamic_array_irecv_data(const communicator& comm, int source, int tag, - std::vector<T,A>& values) - : comm(comm), source(source), tag(tag), count(-1), values(values) - { - } - - communicator comm; - int source; - int tag; - std::size_t count; - std::vector<T,A>& values; - }; - -} - -template<typename T> -optional<status> -request::handle_serialized_irecv(request* self, request_action action) -{ - typedef detail::serialized_irecv_data<T> data_t; - shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data); - - if (action == ra_wait) { - status stat; - if (self->m_requests[1] == MPI_REQUEST_NULL) { - // Wait for the count message to complete - BOOST_MPI_CHECK_RESULT(MPI_Wait, - (self->m_requests, &stat.m_status)); - // Resize our buffer and get ready to receive its data - data->ia.resize(data->count); - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (data->ia.address(), data->ia.size(), MPI_PACKED, - stat.source(), stat.tag(), - MPI_Comm(data->comm), self->m_requests + 1)); - } - - // Wait until we have received the entire message - BOOST_MPI_CHECK_RESULT(MPI_Wait, - (self->m_requests + 1, &stat.m_status)); - - data->deserialize(stat); - return stat; - } else if (action == ra_test) { - status stat; - int flag = 0; - - if (self->m_requests[1] == MPI_REQUEST_NULL) { - // Check if the count message has completed - BOOST_MPI_CHECK_RESULT(MPI_Test, - (self->m_requests, &flag, &stat.m_status)); - if (flag) { - // Resize our buffer and get ready to receive its data - data->ia.resize(data->count); - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (data->ia.address(), data->ia.size(),MPI_PACKED, - stat.source(), stat.tag(), - MPI_Comm(data->comm), self->m_requests + 1)); - } else - return optional<status>(); // We have not finished yet - } - - // Check if we have received the message data - BOOST_MPI_CHECK_RESULT(MPI_Test, - (self->m_requests + 1, &flag, &stat.m_status)); - if (flag) { - data->deserialize(stat); - return stat; - } else - return optional<status>(); - } else { - return optional<status>(); - } -} - -template<typename T> -optional<status> -request::handle_serialized_array_irecv(request* self, request_action action) -{ - typedef detail::serialized_array_irecv_data<T> data_t; - shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data); - - if (action == ra_wait) { - status stat; - if (self->m_requests[1] == MPI_REQUEST_NULL) { - // Wait for the count message to complete - BOOST_MPI_CHECK_RESULT(MPI_Wait, - (self->m_requests, &stat.m_status)); - // Resize our buffer and get ready to receive its data - data->ia.resize(data->count); - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (data->ia.address(), data->ia.size(), MPI_PACKED, - stat.source(), stat.tag(), - MPI_Comm(data->comm), self->m_requests + 1)); - } - - // Wait until we have received the entire message - BOOST_MPI_CHECK_RESULT(MPI_Wait, - (self->m_requests + 1, &stat.m_status)); - - data->deserialize(stat); - return stat; - } else if (action == ra_test) { - status stat; - int flag = 0; - - if (self->m_requests[1] == MPI_REQUEST_NULL) { - // Check if the count message has completed - BOOST_MPI_CHECK_RESULT(MPI_Test, - (self->m_requests, &flag, &stat.m_status)); - if (flag) { - // Resize our buffer and get ready to receive its data - data->ia.resize(data->count); - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (data->ia.address(), data->ia.size(),MPI_PACKED, - stat.source(), stat.tag(), - MPI_Comm(data->comm), self->m_requests + 1)); - } else - return optional<status>(); // We have not finished yet - } - - // Check if we have received the message data - BOOST_MPI_CHECK_RESULT(MPI_Test, - (self->m_requests + 1, &flag, &stat.m_status)); - if (flag) { - data->deserialize(stat); - return stat; - } else - return optional<status>(); - } else { - return optional<status>(); - } -} - -template<typename T, class A> -optional<status> -request::handle_dynamic_primitive_array_irecv(request* self, request_action action) -{ - typedef detail::dynamic_array_irecv_data<T,A> data_t; - shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data); - - if (action == ra_wait) { - status stat; - if (self->m_requests[1] == MPI_REQUEST_NULL) { - // Wait for the count message to complete - BOOST_MPI_CHECK_RESULT(MPI_Wait, - (self->m_requests, &stat.m_status)); - // Resize our buffer and get ready to receive its data - data->values.resize(data->count); - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (&(data->values[0]), data->values.size(), get_mpi_datatype<T>(), - stat.source(), stat.tag(), - MPI_Comm(data->comm), self->m_requests + 1)); - } - - // Wait until we have received the entire message - BOOST_MPI_CHECK_RESULT(MPI_Wait, - (self->m_requests + 1, &stat.m_status)); - return stat; - } else if (action == ra_test) { - status stat; - int flag = 0; - - if (self->m_requests[1] == MPI_REQUEST_NULL) { - // Check if the count message has completed - BOOST_MPI_CHECK_RESULT(MPI_Test, - (self->m_requests, &flag, &stat.m_status)); - if (flag) { - // Resize our buffer and get ready to receive its data - data->values.resize(data->count); - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (&(data->values[0]), data->values.size(),MPI_PACKED, - stat.source(), stat.tag(), - MPI_Comm(data->comm), self->m_requests + 1)); - } else - return optional<status>(); // We have not finished yet - } - - // Check if we have received the message data - BOOST_MPI_CHECK_RESULT(MPI_Test, - (self->m_requests + 1, &flag, &stat.m_status)); - if (flag) { - return stat; - } else - return optional<status>(); - } else { - return optional<status>(); - } -} - // We're receiving a type that has an associated MPI datatype, so we // map directly to that datatype. template<typename T> request communicator::irecv_impl(int source, int tag, T& value, mpl::true_) const { - request req; - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (const_cast<T*>(&value), 1, - get_mpi_datatype<T>(value), - source, tag, MPI_Comm(*this), &req.m_requests[0])); - return req; + return request::make_trivial_recv(*this, source, tag, value); } template<typename T> request communicator::irecv_impl(int source, int tag, T& value, mpl::false_) const { - typedef detail::serialized_irecv_data<T> data_t; - shared_ptr<data_t> data(new data_t(*this, source, tag, value)); - request req; - req.m_data = data; - req.m_handler = request::handle_serialized_irecv<T>; - - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (&data->count, 1, - get_mpi_datatype<std::size_t>(data->count), - source, tag, MPI_Comm(*this), &req.m_requests[0])); - - return req; + return request::make_serialized(*this, source, tag, value); } template<typename T> @@ -1965,12 +1649,7 @@ request communicator::array_irecv_impl(int source, int tag, T* values, int n, mpl::true_) const { - request req; - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (const_cast<T*>(values), n, - get_mpi_datatype<T>(*values), - source, tag, MPI_Comm(*this), &req.m_requests[0])); - return req; + return request::make_trivial_recv(*this, source, tag, values, n); } template<typename T> @@ -1978,37 +1657,15 @@ request communicator::array_irecv_impl(int source, int tag, T* values, int n, mpl::false_) const { - typedef detail::serialized_array_irecv_data<T> data_t; - shared_ptr<data_t> data(new data_t(*this, source, tag, values, n)); - request req; - req.m_data = data; - req.m_handler = request::handle_serialized_array_irecv<T>; - - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (&data->count, 1, - get_mpi_datatype<std::size_t>(data->count), - source, tag, MPI_Comm(*this), &req.m_requests[0])); - - return req; + return request::make_serialized_array(*this, source, tag, values, n); } template<typename T, class A> request communicator::irecv_vector(int source, int tag, std::vector<T,A>& values, - mpl::true_) const + mpl::true_ primitive) const { - typedef detail::dynamic_array_irecv_data<T,A> data_t; - shared_ptr<data_t> data(new data_t(*this, source, tag, values)); - request req; - req.m_data = data; - req.m_handler = request::handle_dynamic_primitive_array_irecv<T,A>; - - BOOST_MPI_CHECK_RESULT(MPI_Irecv, - (&data->count, 1, - get_mpi_datatype<std::size_t>(data->count), - source, tag, MPI_Comm(*this), &req.m_requests[0])); - - return req; + return request::make_dynamic_primitive_array_recv(*this, source, tag, values); } template<typename T, class A> |