summaryrefslogtreecommitdiff
path: root/boost/mpi/communicator.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/mpi/communicator.hpp')
-rw-r--r--boost/mpi/communicator.hpp483
1 files changed, 70 insertions, 413 deletions
diff --git a/boost/mpi/communicator.hpp b/boost/mpi/communicator.hpp
index 6e55b167f9..ff889d00a3 100644
--- a/boost/mpi/communicator.hpp
+++ b/boost/mpi/communicator.hpp
@@ -34,9 +34,6 @@
// For (de-)serializing skeletons and content
#include <boost/mpi/skeleton_and_content_fwd.hpp>
-// For (de-)serializing arrays
-#include <boost/serialization/array.hpp>
-
#include <boost/mpi/detail/point_to_point.hpp>
#include <boost/mpi/status.hpp>
#include <boost/mpi/request.hpp>
@@ -802,23 +799,7 @@ class BOOST_MPI_DECL communicator
* Split the communicator into multiple, disjoint communicators
* each of which is based on a particular color. This is a
* collective operation that returns a new communicator that is a
- * subgroup of @p this. This routine is functionally equivalent to
- * @c MPI_Comm_split.
- *
- * @param color The color of this process. All processes with the
- * same @p color value will be placed into the same group.
- *
- * @returns A new communicator containing all of the processes in
- * @p this that have the same @p color.
- */
- communicator split(int color) const;
-
- /**
- * Split the communicator into multiple, disjoint communicators
- * each of which is based on a particular color. This is a
- * collective operation that returns a new communicator that is a
- * subgroup of @p this. This routine is functionally equivalent to
- * @c MPI_Comm_split.
+ * subgroup of @p this.
*
* @param color The color of this process. All processes with the
* same @p color value will be placed into the same group.
@@ -833,6 +814,7 @@ class BOOST_MPI_DECL communicator
* @p this that have the same @p color.
*/
communicator split(int color, int key) const;
+ communicator split(int color) const;
/**
* Determine if the communicator is in fact an intercommunicator
@@ -1157,11 +1139,15 @@ inline bool operator!=(const communicator& comm1, const communicator& comm2)
return !(comm1 == comm2);
}
+}} // boost::mpi
/************************************************************************
* Implementation details *
************************************************************************/
+#include <boost/mpi/detail/request_handlers.hpp>
+
+namespace boost { namespace mpi {
/**
* INTERNAL ONLY (using the same 'end' name might be considerd unfortunate
*/
@@ -1308,6 +1294,7 @@ template<typename T>
void
communicator::send_impl(int dest, int tag, const T& value, mpl::true_) const
{
+ // received by recv or trivial handler.
BOOST_MPI_CHECK_RESULT(MPI_Send,
(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value),
dest, tag, MPI_Comm(*this)));
@@ -1355,26 +1342,37 @@ communicator::array_send_impl(int dest, int tag, const T* values, int n,
mpl::false_) const
{
packed_oarchive oa(*this);
- oa << n << boost::serialization::make_array(values, n);
+ T const* v = values;
+ while (v < values+n) {
+ oa << *v++;
+ }
send(dest, tag, oa);
}
template<typename T, typename A>
void communicator::send_vector(int dest, int tag,
- const std::vector<T,A>& value, mpl::true_ true_type) const
+ const std::vector<T,A>& values, mpl::true_ primitive) const
{
- // send the vector size
- typename std::vector<T,A>::size_type size = value.size();
- send(dest, tag, size);
- // send the data
- this->array_send_impl(dest, tag, value.data(), size, true_type);
+#if defined(BOOST_MPI_USE_IMPROBE)
+ array_send_impl(dest, tag, values.data(), values.size(), primitive);
+#else
+ {
+ // non blocking recv by legacy_dynamic_primitive_array_handler
+ // blocking recv by recv_vector(source,tag,value,primitive)
+ // send the vector size
+ typename std::vector<T,A>::size_type size = values.size();
+ send(dest, tag, size);
+ // send the data
+ this->array_send_impl(dest, tag, values.data(), size, primitive);
+ }
+#endif
}
template<typename T, typename A>
void communicator::send_vector(int dest, int tag,
- const std::vector<T,A>& value, mpl::false_ false_type) const
+ const std::vector<T,A>& value, mpl::false_ primitive) const
{
- this->send_impl(dest, tag, value, false_type);
+ this->send_impl(dest, tag, value, primitive);
}
template<typename T, typename A>
@@ -1396,7 +1394,6 @@ template<typename T>
status communicator::recv_impl(int source, int tag, T& value, mpl::true_) const
{
status stat;
-
BOOST_MPI_CHECK_RESULT(MPI_Recv,
(const_cast<T*>(&value), 1,
get_mpi_datatype<T>(value),
@@ -1444,38 +1441,42 @@ status
communicator::array_recv_impl(int source, int tag, T* values, int n,
mpl::false_) const
{
- // Receive the message
packed_iarchive ia(*this);
status stat = recv(source, tag, ia);
-
- // Determine how much data we are going to receive
- int count;
- ia >> count;
-
- // Deserialize the data in the message
- boost::serialization::array_wrapper<T> arr(values, count > n? n : count);
- ia >> arr;
-
- if (count > n) {
- boost::throw_exception(
- std::range_error("communicator::recv: message receive overflow"));
+ T* v = values;
+ while (v != values+n) {
+ ia >> *v++;
}
-
- stat.m_count = count;
+ stat.m_count = n;
return stat;
}
template<typename T, typename A>
status communicator::recv_vector(int source, int tag,
- std::vector<T,A>& value, mpl::true_ true_type) const
+ std::vector<T,A>& values, mpl::true_ primitive) const
{
- // receive the vector size
- typename std::vector<T,A>::size_type size = 0;
- recv(source, tag, size);
- // size the vector
- value.resize(size);
- // receive the data
- return this->array_recv_impl(source, tag, value.data(), size, true_type);
+#if defined(BOOST_MPI_USE_IMPROBE)
+ {
+ MPI_Message msg;
+ status stat;
+ BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (source,tag,*this,&msg,&stat.m_status));
+ int count;
+ BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status,get_mpi_datatype<T>(),&count));
+ values.resize(count);
+ BOOST_MPI_CHECK_RESULT(MPI_Mrecv, (values.data(), count, get_mpi_datatype<T>(), &msg, &stat.m_status));
+ return stat;
+ }
+#else
+ {
+ // receive the vector size
+ typename std::vector<T,A>::size_type size = 0;
+ recv(source, tag, size);
+ // size the vector
+ values.resize(size);
+ // receive the data
+ return this->array_recv_impl(source, tag, values.data(), size, primitive);
+ }
+#endif
}
template<typename T, typename A>
@@ -1542,12 +1543,7 @@ template<typename T>
request
communicator::isend_impl(int dest, int tag, const T& value, mpl::true_) const
{
- request req;
- BOOST_MPI_CHECK_RESULT(MPI_Isend,
- (const_cast<T*>(&value), 1,
- get_mpi_datatype<T>(value),
- dest, tag, MPI_Comm(*this), &req.m_requests[0]));
- return req;
+ return request::make_trivial_send(*this, dest, tag, value);
}
// We're sending a type that does not have an associated MPI
@@ -1560,7 +1556,7 @@ communicator::isend_impl(int dest, int tag, const T& value, mpl::false_) const
shared_ptr<packed_oarchive> archive(new packed_oarchive(*this));
*archive << value;
request result = isend(dest, tag, *archive);
- result.m_data = archive;
+ result.preserve(archive);
return result;
}
@@ -1581,16 +1577,9 @@ request communicator::isend(int dest, int tag, const std::vector<T,A>& values) c
template<typename T, class A>
request
communicator::isend_vector(int dest, int tag, const std::vector<T,A>& values,
- mpl::true_) const
+ mpl::true_ primitive) const
{
- std::size_t size = values.size();
- request req = this->isend_impl(dest, tag, size, mpl::true_());
- BOOST_MPI_CHECK_RESULT(MPI_Isend,
- (const_cast<T*>(values.data()), size,
- get_mpi_datatype<T>(),
- dest, tag, MPI_Comm(*this), &req.m_requests[1]));
- return req;
-
+ return request::make_dynamic_primitive_array_send(*this, dest, tag, values);
}
template<typename T, class A>
@@ -1606,12 +1595,7 @@ request
communicator::array_isend_impl(int dest, int tag, const T* values, int n,
mpl::true_) const
{
- request req;
- BOOST_MPI_CHECK_RESULT(MPI_Isend,
- (const_cast<T*>(values), n,
- get_mpi_datatype<T>(*values),
- dest, tag, MPI_Comm(*this), &req.m_requests[0]));
- return req;
+ return request::make_trivial_send(*this, dest, tag, values, n);
}
template<typename T>
@@ -1620,9 +1604,12 @@ communicator::array_isend_impl(int dest, int tag, const T* values, int n,
mpl::false_) const
{
shared_ptr<packed_oarchive> archive(new packed_oarchive(*this));
- *archive << n << boost::serialization::make_array(values, n);
+ T const* v = values;
+ while (v < values+n) {
+ *archive << *v++;
+ }
request result = isend(dest, tag, *archive);
- result.m_data = archive;
+ result.preserve(archive);
return result;
}
@@ -1634,323 +1621,20 @@ request communicator::isend(int dest, int tag, const T* values, int n) const
return array_isend_impl(dest, tag, values, n, is_mpi_datatype<T>());
}
-namespace detail {
- /**
- * Internal data structure that stores everything required to manage
- * the receipt of serialized data via a request object.
- */
- template<typename T>
- struct serialized_irecv_data
- {
- serialized_irecv_data(const communicator& comm, int source, int tag,
- T& value)
- : comm(comm), source(source), tag(tag), ia(comm), value(value)
- {
- }
-
- void deserialize(status& stat)
- {
- ia >> value;
- stat.m_count = 1;
- }
-
- communicator comm;
- int source;
- int tag;
- std::size_t count;
- packed_iarchive ia;
- T& value;
- };
-
- template<>
- struct serialized_irecv_data<packed_iarchive>
- {
- serialized_irecv_data(const communicator& comm, int source, int tag,
- packed_iarchive& ia)
- : comm(comm), source(source), tag(tag), ia(ia) { }
-
- void deserialize(status&) { /* Do nothing. */ }
-
- communicator comm;
- int source;
- int tag;
- std::size_t count;
- packed_iarchive& ia;
- };
-
- /**
- * Internal data structure that stores everything required to manage
- * the receipt of an array of serialized data via a request object.
- */
- template<typename T>
- struct serialized_array_irecv_data
- {
- serialized_array_irecv_data(const communicator& comm, int source, int tag,
- T* values, int n)
- : comm(comm), source(source), tag(tag), ia(comm), values(values), n(n)
- {
- }
-
- void deserialize(status& stat);
-
- communicator comm;
- int source;
- int tag;
- std::size_t count;
- packed_iarchive ia;
- T* values;
- int n;
- };
-
- template<typename T>
- void serialized_array_irecv_data<T>::deserialize(status& stat)
- {
- // Determine how much data we are going to receive
- int count;
- ia >> count;
-
- // Deserialize the data in the message
- boost::serialization::array_wrapper<T> arr(values, count > n? n : count);
- ia >> arr;
-
- if (count > n) {
- boost::throw_exception(
- std::range_error("communicator::recv: message receive overflow"));
- }
-
- stat.m_count = count;
- }
-
- /**
- * Internal data structure that stores everything required to manage
- * the receipt of an array of primitive data but unknown size.
- * Such an array can have been send with blocking operation and so must
- * be compatible with the (size_t,raw_data[]) format.
- */
- template<typename T, class A>
- struct dynamic_array_irecv_data
- {
- BOOST_STATIC_ASSERT_MSG(is_mpi_datatype<T>::value, "Can only be specialized for MPI datatypes.");
-
- dynamic_array_irecv_data(const communicator& comm, int source, int tag,
- std::vector<T,A>& values)
- : comm(comm), source(source), tag(tag), count(-1), values(values)
- {
- }
-
- communicator comm;
- int source;
- int tag;
- std::size_t count;
- std::vector<T,A>& values;
- };
-
-}
-
-template<typename T>
-optional<status>
-request::handle_serialized_irecv(request* self, request_action action)
-{
- typedef detail::serialized_irecv_data<T> data_t;
- shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data);
-
- if (action == ra_wait) {
- status stat;
- if (self->m_requests[1] == MPI_REQUEST_NULL) {
- // Wait for the count message to complete
- BOOST_MPI_CHECK_RESULT(MPI_Wait,
- (self->m_requests, &stat.m_status));
- // Resize our buffer and get ready to receive its data
- data->ia.resize(data->count);
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (data->ia.address(), data->ia.size(), MPI_PACKED,
- stat.source(), stat.tag(),
- MPI_Comm(data->comm), self->m_requests + 1));
- }
-
- // Wait until we have received the entire message
- BOOST_MPI_CHECK_RESULT(MPI_Wait,
- (self->m_requests + 1, &stat.m_status));
-
- data->deserialize(stat);
- return stat;
- } else if (action == ra_test) {
- status stat;
- int flag = 0;
-
- if (self->m_requests[1] == MPI_REQUEST_NULL) {
- // Check if the count message has completed
- BOOST_MPI_CHECK_RESULT(MPI_Test,
- (self->m_requests, &flag, &stat.m_status));
- if (flag) {
- // Resize our buffer and get ready to receive its data
- data->ia.resize(data->count);
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (data->ia.address(), data->ia.size(),MPI_PACKED,
- stat.source(), stat.tag(),
- MPI_Comm(data->comm), self->m_requests + 1));
- } else
- return optional<status>(); // We have not finished yet
- }
-
- // Check if we have received the message data
- BOOST_MPI_CHECK_RESULT(MPI_Test,
- (self->m_requests + 1, &flag, &stat.m_status));
- if (flag) {
- data->deserialize(stat);
- return stat;
- } else
- return optional<status>();
- } else {
- return optional<status>();
- }
-}
-
-template<typename T>
-optional<status>
-request::handle_serialized_array_irecv(request* self, request_action action)
-{
- typedef detail::serialized_array_irecv_data<T> data_t;
- shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data);
-
- if (action == ra_wait) {
- status stat;
- if (self->m_requests[1] == MPI_REQUEST_NULL) {
- // Wait for the count message to complete
- BOOST_MPI_CHECK_RESULT(MPI_Wait,
- (self->m_requests, &stat.m_status));
- // Resize our buffer and get ready to receive its data
- data->ia.resize(data->count);
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (data->ia.address(), data->ia.size(), MPI_PACKED,
- stat.source(), stat.tag(),
- MPI_Comm(data->comm), self->m_requests + 1));
- }
-
- // Wait until we have received the entire message
- BOOST_MPI_CHECK_RESULT(MPI_Wait,
- (self->m_requests + 1, &stat.m_status));
-
- data->deserialize(stat);
- return stat;
- } else if (action == ra_test) {
- status stat;
- int flag = 0;
-
- if (self->m_requests[1] == MPI_REQUEST_NULL) {
- // Check if the count message has completed
- BOOST_MPI_CHECK_RESULT(MPI_Test,
- (self->m_requests, &flag, &stat.m_status));
- if (flag) {
- // Resize our buffer and get ready to receive its data
- data->ia.resize(data->count);
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (data->ia.address(), data->ia.size(),MPI_PACKED,
- stat.source(), stat.tag(),
- MPI_Comm(data->comm), self->m_requests + 1));
- } else
- return optional<status>(); // We have not finished yet
- }
-
- // Check if we have received the message data
- BOOST_MPI_CHECK_RESULT(MPI_Test,
- (self->m_requests + 1, &flag, &stat.m_status));
- if (flag) {
- data->deserialize(stat);
- return stat;
- } else
- return optional<status>();
- } else {
- return optional<status>();
- }
-}
-
-template<typename T, class A>
-optional<status>
-request::handle_dynamic_primitive_array_irecv(request* self, request_action action)
-{
- typedef detail::dynamic_array_irecv_data<T,A> data_t;
- shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data);
-
- if (action == ra_wait) {
- status stat;
- if (self->m_requests[1] == MPI_REQUEST_NULL) {
- // Wait for the count message to complete
- BOOST_MPI_CHECK_RESULT(MPI_Wait,
- (self->m_requests, &stat.m_status));
- // Resize our buffer and get ready to receive its data
- data->values.resize(data->count);
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (&(data->values[0]), data->values.size(), get_mpi_datatype<T>(),
- stat.source(), stat.tag(),
- MPI_Comm(data->comm), self->m_requests + 1));
- }
-
- // Wait until we have received the entire message
- BOOST_MPI_CHECK_RESULT(MPI_Wait,
- (self->m_requests + 1, &stat.m_status));
- return stat;
- } else if (action == ra_test) {
- status stat;
- int flag = 0;
-
- if (self->m_requests[1] == MPI_REQUEST_NULL) {
- // Check if the count message has completed
- BOOST_MPI_CHECK_RESULT(MPI_Test,
- (self->m_requests, &flag, &stat.m_status));
- if (flag) {
- // Resize our buffer and get ready to receive its data
- data->values.resize(data->count);
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (&(data->values[0]), data->values.size(),MPI_PACKED,
- stat.source(), stat.tag(),
- MPI_Comm(data->comm), self->m_requests + 1));
- } else
- return optional<status>(); // We have not finished yet
- }
-
- // Check if we have received the message data
- BOOST_MPI_CHECK_RESULT(MPI_Test,
- (self->m_requests + 1, &flag, &stat.m_status));
- if (flag) {
- return stat;
- } else
- return optional<status>();
- } else {
- return optional<status>();
- }
-}
-
// We're receiving a type that has an associated MPI datatype, so we
// map directly to that datatype.
template<typename T>
request
communicator::irecv_impl(int source, int tag, T& value, mpl::true_) const
{
- request req;
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (const_cast<T*>(&value), 1,
- get_mpi_datatype<T>(value),
- source, tag, MPI_Comm(*this), &req.m_requests[0]));
- return req;
+ return request::make_trivial_recv(*this, source, tag, value);
}
template<typename T>
request
communicator::irecv_impl(int source, int tag, T& value, mpl::false_) const
{
- typedef detail::serialized_irecv_data<T> data_t;
- shared_ptr<data_t> data(new data_t(*this, source, tag, value));
- request req;
- req.m_data = data;
- req.m_handler = request::handle_serialized_irecv<T>;
-
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (&data->count, 1,
- get_mpi_datatype<std::size_t>(data->count),
- source, tag, MPI_Comm(*this), &req.m_requests[0]));
-
- return req;
+ return request::make_serialized(*this, source, tag, value);
}
template<typename T>
@@ -1965,12 +1649,7 @@ request
communicator::array_irecv_impl(int source, int tag, T* values, int n,
mpl::true_) const
{
- request req;
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (const_cast<T*>(values), n,
- get_mpi_datatype<T>(*values),
- source, tag, MPI_Comm(*this), &req.m_requests[0]));
- return req;
+ return request::make_trivial_recv(*this, source, tag, values, n);
}
template<typename T>
@@ -1978,37 +1657,15 @@ request
communicator::array_irecv_impl(int source, int tag, T* values, int n,
mpl::false_) const
{
- typedef detail::serialized_array_irecv_data<T> data_t;
- shared_ptr<data_t> data(new data_t(*this, source, tag, values, n));
- request req;
- req.m_data = data;
- req.m_handler = request::handle_serialized_array_irecv<T>;
-
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (&data->count, 1,
- get_mpi_datatype<std::size_t>(data->count),
- source, tag, MPI_Comm(*this), &req.m_requests[0]));
-
- return req;
+ return request::make_serialized_array(*this, source, tag, values, n);
}
template<typename T, class A>
request
communicator::irecv_vector(int source, int tag, std::vector<T,A>& values,
- mpl::true_) const
+ mpl::true_ primitive) const
{
- typedef detail::dynamic_array_irecv_data<T,A> data_t;
- shared_ptr<data_t> data(new data_t(*this, source, tag, values));
- request req;
- req.m_data = data;
- req.m_handler = request::handle_dynamic_primitive_array_irecv<T,A>;
-
- BOOST_MPI_CHECK_RESULT(MPI_Irecv,
- (&data->count, 1,
- get_mpi_datatype<std::size_t>(data->count),
- source, tag, MPI_Comm(*this), &req.m_requests[0]));
-
- return req;
+ return request::make_dynamic_primitive_array_recv(*this, source, tag, values);
}
template<typename T, class A>