summaryrefslogtreecommitdiff
path: root/boost/graph/distributed/adjlist/redistribute.hpp
blob: 5401f1240af697c91bf05b23a6319de9e5e54c54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
// Copyright (C) 2005-2006 The Trustees of Indiana University.

// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

//  Authors: Douglas Gregor
//           Andrew Lumsdaine

//
// Implements redistribution of vertices for a distributed adjacency
// list. This file should not be included by users. It will be
// included by the distributed adjacency list header.
//

#ifndef BOOST_GRAPH_USE_MPI
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
#endif

#include <boost/pending/container_traits.hpp>

namespace boost { namespace detail { namespace parallel {

/* This structure contains a (vertex or edge) descriptor that is being
   moved from one processor to another. It contains the properties for
   that descriptor (if any).
 */
template<typename Descriptor, typename DescriptorProperty>
struct redistributed_descriptor : maybe_store_property<DescriptorProperty>
{
  typedef maybe_store_property<DescriptorProperty> inherited;

  redistributed_descriptor() { }

  redistributed_descriptor(const Descriptor& v, const DescriptorProperty& p)
    : inherited(p), descriptor(v) { }

  Descriptor descriptor;

private:
  friend class boost::serialization::access;

  template<typename Archiver>
  void serialize(Archiver& ar, unsigned int /*version*/)
  {
    ar & boost::serialization::base_object<inherited>(*this) 
       & unsafe_serialize(descriptor);
  }
};

/* Predicate that returns true if the target has migrated. */
template<typename VertexProcessorMap, typename Graph>
struct target_migrated_t
{
  typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
  typedef typename graph_traits<Graph>::edge_descriptor Edge;

  target_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g)
    : vertex_to_processor(vertex_to_processor), g(g) { }

  bool operator()(Edge e) const
  {
    typedef global_descriptor<Vertex> DVertex;
    processor_id_type owner = get(edge_target_processor_id, g, e);
    return get(vertex_to_processor, DVertex(owner, target(e, g))) != owner;
  }

private:
  VertexProcessorMap vertex_to_processor;
  const Graph& g;
};

template<typename VertexProcessorMap, typename Graph>
inline target_migrated_t<VertexProcessorMap, Graph>
target_migrated(VertexProcessorMap vertex_to_processor, const Graph& g)
{ return target_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); }

/* Predicate that returns true if the source of an in-edge has migrated. */
template<typename VertexProcessorMap, typename Graph>
struct source_migrated_t
{
  typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
  typedef typename graph_traits<Graph>::edge_descriptor Edge;

  source_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g)
    : vertex_to_processor(vertex_to_processor), g(g) { }

  bool operator()(stored_in_edge<Edge> e) const
  {
    return get(vertex_to_processor, DVertex(e.source_processor, source(e.e, g)))
      != e.source_processor;
  }

private:
  VertexProcessorMap vertex_to_processor;
  const Graph& g;
};

template<typename VertexProcessorMap, typename Graph>
inline source_migrated_t<VertexProcessorMap, Graph>
source_migrated(VertexProcessorMap vertex_to_processor, const Graph& g)
{ return source_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); }

/* Predicate that returns true if the target has migrated. */
template<typename VertexProcessorMap, typename Graph>
struct source_or_target_migrated_t
{
  typedef typename graph_traits<Graph>::edge_descriptor Edge;

  source_or_target_migrated_t(VertexProcessorMap vertex_to_processor,
                              const Graph& g)
    : vertex_to_processor(vertex_to_processor), g(g) { }

  bool operator()(Edge e) const
  {
    return get(vertex_to_processor, source(e, g)) != source(e, g).owner
      || get(vertex_to_processor, target(e, g)) != target(e, g).owner;
  }

private:
  VertexProcessorMap vertex_to_processor;
  const Graph& g;
};

template<typename VertexProcessorMap, typename Graph>
inline source_or_target_migrated_t<VertexProcessorMap, Graph>
source_or_target_migrated(VertexProcessorMap vertex_to_processor,
const Graph& g)
{
  typedef source_or_target_migrated_t<VertexProcessorMap, Graph> result_type;
  return result_type(vertex_to_processor, g);
}

} } // end of namespace detail::parallel

template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
template<typename VertexProcessorMap>
void
PBGL_DISTRIB_ADJLIST_TYPE
::request_in_neighbors(vertex_descriptor v,
                       VertexProcessorMap vertex_to_processor,
                       bidirectionalS)
{
  BGL_FORALL_INEDGES_T(v, e, *this, graph_type)
    request(vertex_to_processor, source(e, *this));
}

template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
template<typename VertexProcessorMap>
void
PBGL_DISTRIB_ADJLIST_TYPE
::remove_migrated_in_edges(vertex_descriptor v,
                           VertexProcessorMap vertex_to_processor,
                           bidirectionalS)
{
  graph_detail::erase_if(get(vertex_in_edges, base())[v.local],
                         source_migrated(vertex_to_processor, base()));
}

template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
template<typename VertexProcessorMap>
void
PBGL_DISTRIB_ADJLIST_TYPE
::redistribute(VertexProcessorMap vertex_to_processor)
{
  using boost::parallel::inplace_all_to_all;

  // When we have stable descriptors, we only move those descriptors
  // that actually need to be moved. Otherwise, we essentially have to
  // regenerate the entire graph.
  const bool has_stable_descriptors =
    is_same<typename config_type::vertex_list_selector, listS>::value
    || is_same<typename config_type::vertex_list_selector, setS>::value
    || is_same<typename config_type::vertex_list_selector, multisetS>::value;

  typedef detail::parallel::redistributed_descriptor<vertex_descriptor, 
                                                     vertex_property_type>
    redistributed_vertex;
  typedef detail::parallel::redistributed_descriptor<edge_descriptor, 
                                                     edge_property_type>
    redistributed_edge;
  typedef std::pair<vertices_size_type, edges_size_type> num_relocated_pair;

  vertex_iterator vi, vi_end;
  edge_iterator ei, ei_end;

  process_group_type pg = process_group();

  // Initial synchronization makes sure that we have all of our ducks
  // in a row. We don't want any outstanding add/remove messages
  // coming in mid-redistribution!
  synchronize(process_group_);

  // We cannot cope with eviction of ghost cells
  vertex_to_processor.set_max_ghost_cells(0);

  process_id_type p = num_processes(pg);

  // Send vertices and edges to the processor where they will
  // actually reside.  This requires O(|V| + |E|) communication
  std::vector<std::vector<redistributed_vertex> > redistributed_vertices(p);
  std::vector<std::vector<redistributed_edge> > redistributed_edges(p);

  // Build the sets of relocated vertices for each process and then do
  // an all-to-all transfer.
  for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; ++vi) {
    if (!has_stable_descriptors
        || get(vertex_to_processor, *vi) != vi->owner) {
      redistributed_vertices[get(vertex_to_processor, *vi)]
        .push_back(redistributed_vertex(*vi, get(vertex_all_t(), base(),
                                                 vi->local)));
    }

    // When our descriptors are stable, we need to determine which
    // adjacent descriptors are stable to determine which edges will
    // be removed.
    if (has_stable_descriptors) {
      BGL_FORALL_OUTEDGES_T(*vi, e, *this, graph_type)
        request(vertex_to_processor, target(e, *this));
      request_in_neighbors(*vi, vertex_to_processor, directed_selector());
    }
  }

  inplace_all_to_all(pg, redistributed_vertices);

  // If we have stable descriptors, we need to know where our neighbor
  // vertices are moving.
  if (has_stable_descriptors)
    synchronize(vertex_to_processor);

  // Build the sets of relocated edges for each process and then do
  // an all-to-all transfer.
  for (boost::tie(ei, ei_end) = edges(*this); ei != ei_end; ++ei) {
    vertex_descriptor src = source(*ei, *this);
    vertex_descriptor tgt = target(*ei, *this);
    if (!has_stable_descriptors
        || get(vertex_to_processor, src) != src.owner
        || get(vertex_to_processor, tgt) != tgt.owner)
      redistributed_edges[get(vertex_to_processor, source(*ei, *this))]
        .push_back(redistributed_edge(*ei, split_edge_property(get(edge_all_t(), base(),
                                                                   ei->local))));
  }
  inplace_all_to_all(pg, redistributed_edges);

  // A mapping from old vertex descriptors to new vertex
  // descriptors. This is an STL map partly because I'm too lazy to
  // build a real property map (which is hard in the general case) but
  // also because it won't try to look in the graph itself, because
  // the keys are all vertex descriptors that have been invalidated.
  std::map<vertex_descriptor, vertex_descriptor> old_to_new_vertex_map;

  if (has_stable_descriptors) {
    // Clear out all vertices and edges that will have moved. There
    // are several stages to this.

    // First, eliminate all outgoing edges from the (local) vertices
    // that have been moved or whose targets have been moved.
    BGL_FORALL_VERTICES_T(v, *this, graph_type) {
      if (get(vertex_to_processor, v) != v.owner) {
        clear_out_edges(v.local, base());
        clear_in_edges_local(v, directed_selector());
      } else {
        remove_out_edge_if(v.local,
                           target_migrated(vertex_to_processor, base()),
                           base());
        remove_migrated_in_edges(v, vertex_to_processor, directed_selector());
      }
    }

    // Next, eliminate locally-stored edges that have migrated (for
    // undirected graphs).
    graph_detail::erase_if(local_edges_,
                           source_or_target_migrated(vertex_to_processor, *this));

    // Eliminate vertices that have migrated
    for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; /* in loop */) {
      if (get(vertex_to_processor, *vi) != vi->owner)
        remove_vertex((*vi++).local, base());
      else {
        // Add the identity relation for vertices that have not migrated
        old_to_new_vertex_map[*vi] = *vi;
        ++vi;
      }
    }
  } else {
    // Clear out the local graph: the entire graph is in transit
    clear();
  }

  // Add the new vertices to the graph. When we do so, update the old
  // -> new vertex mapping both locally and for the owner of the "old"
  // vertex.
  {
    typedef std::pair<vertex_descriptor, vertex_descriptor> mapping_pair;
    std::vector<std::vector<mapping_pair> > mappings(p);

    for (process_id_type src = 0; src < p; ++src) {
      for (typename std::vector<redistributed_vertex>::iterator vi =
             redistributed_vertices[src].begin();
           vi != redistributed_vertices[src].end(); ++vi) {
        vertex_descriptor new_vertex =
            add_vertex(vi->get_property(), *this);
        old_to_new_vertex_map[vi->descriptor] = new_vertex;
        mappings[vi->descriptor.owner].push_back(mapping_pair(vi->descriptor,
                                                              new_vertex));
      }

      redistributed_vertices[src].clear();
    }

    inplace_all_to_all(pg, mappings);

    // Add the mappings we were sent into the old->new map.
    for (process_id_type src = 0; src < p; ++src)
      old_to_new_vertex_map.insert(mappings[src].begin(), mappings[src].end());
  }

  // Get old->new vertex mappings for all of the vertices we need to
  // know about.

  // TBD: An optimization here might involve sending the
  // request-response pairs without an explicit request step (for
  // bidirectional and undirected graphs). However, it may not matter
  // all that much given the cost of redistribution.
  {
    std::vector<std::vector<vertex_descriptor> > vertex_map_requests(p);
    std::vector<std::vector<vertex_descriptor> > vertex_map_responses(p);

    // We need to know about all of the vertices incident on edges
    // that have been relocated to this processor. Tell each processor
    // what each other processor needs to know.
    for (process_id_type src = 0; src < p; ++src)
      for (typename std::vector<redistributed_edge>::iterator ei =
             redistributed_edges[src].begin();
           ei != redistributed_edges[src].end(); ++ei) {
        vertex_descriptor need_vertex = target(ei->descriptor, *this);
        if (old_to_new_vertex_map.find(need_vertex)
            == old_to_new_vertex_map.end())
          {
            old_to_new_vertex_map[need_vertex] = need_vertex;
            vertex_map_requests[need_vertex.owner].push_back(need_vertex);
          }
      }
    inplace_all_to_all(pg,
                       vertex_map_requests,
                       vertex_map_responses);

    // Process the requests made for vertices we own. Then perform yet
    // another all-to-all swap. This one matches the requests we've
    // made to the responses we were given.
    for (process_id_type src = 0; src < p; ++src)
      for (typename std::vector<vertex_descriptor>::iterator vi =
             vertex_map_responses[src].begin();
           vi != vertex_map_responses[src].end(); ++vi)
        *vi = old_to_new_vertex_map[*vi];
    inplace_all_to_all(pg, vertex_map_responses);

    // Matching the requests to the responses, update the old->new
    // vertex map for all of the vertices we will need to know.
    for (process_id_type src = 0; src < p; ++src) {
      typedef typename std::vector<vertex_descriptor>::size_type size_type;
      for (size_type i = 0; i < vertex_map_requests[src].size(); ++i) {
        old_to_new_vertex_map[vertex_map_requests[src][i]] =
          vertex_map_responses[src][i];
      }
    }
  }

  // Add edges to the graph by mapping the source and target.
  for (process_id_type src = 0; src < p; ++src) {
    for (typename std::vector<redistributed_edge>::iterator ei =
           redistributed_edges[src].begin();
         ei != redistributed_edges[src].end(); ++ei) {
      add_edge(old_to_new_vertex_map[source(ei->descriptor, *this)],
               old_to_new_vertex_map[target(ei->descriptor, *this)],
               ei->get_property(),
               *this);
    }

    redistributed_edges[src].clear();
  }

  // Be sure that edge-addition messages are received now, completing
  // the graph.
  synchronize(process_group_);

  this->distribution().clear();

  detail::parallel::maybe_initialize_vertex_indices(vertices(base()), 
                                                    get(vertex_index, base()));
}

} // end namespace boost