summaryrefslogtreecommitdiff
path: root/test/test_c10d.py
diff options
context:
space:
mode:
authorJane Wang <janewang@fb.com>2018-12-11 21:03:13 -0800
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>2018-12-11 21:21:10 -0800
commitf8455ed754196cd554cbaaaaa440dd178383154d (patch)
treea5467795d4578ab5b4acb2d32ca2db8b17b8d4ff /test/test_c10d.py
parent3fa53da61a82a19193619f48c3bfcf91d5eb20c0 (diff)
downloadpytorch-f8455ed754196cd554cbaaaaa440dd178383154d.tar.gz
pytorch-f8455ed754196cd554cbaaaaa440dd178383154d.tar.bz2
pytorch-f8455ed754196cd554cbaaaaa440dd178383154d.zip
add gloo support for gather on GPU (#14916)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/14916 as titled Reviewed By: pietern Differential Revision: D13267832 fbshipit-source-id: 3b89d08af93f74941f17ff892c33fc2a4a023c19
Diffstat (limited to 'test/test_c10d.py')
-rw-r--r--test/test_c10d.py57
1 files changed, 54 insertions, 3 deletions
diff --git a/test/test_c10d.py b/test/test_c10d.py
index bf28d2d843..329a7be8db 100644
--- a/test/test_c10d.py
+++ b/test/test_c10d.py
@@ -929,13 +929,13 @@ class ProcessGroupGlooTest(MultiProcessTestCase):
opts.rootRank = (self.rank + 1) % self.world_size
pg.gather([[t1] * self.world_size], [t1], opts)
- def test_gather_basics(self):
+ def _test_gather_basics(self, fn):
store = c10d.FileStore(self.file.name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
# Preallocate tensors for input/output
- input = [torch.Tensor([self.rank])]
- outputs = [torch.Tensor([-1]) for _ in range(self.world_size)]
+ input = [fn(torch.Tensor([self.rank]))]
+ outputs = [fn(torch.Tensor([-1])) for _ in range(self.world_size)]
# Take turns being the gather root and accumulate work items
work = []
@@ -954,6 +954,57 @@ class ProcessGroupGlooTest(MultiProcessTestCase):
if i == self.rank:
self.assertEqual(expected, outputs)
+ def test_gather_basics(self):
+ self._test_gather_basics(lambda t: t.clone())
+
+ @skip_if_not_multigpu
+ def test_gather_basics_cuda(self):
+ self._test_gather_basics(lambda t: t.clone().cuda())
+
+ def _test_gather_stress(self, inputs, fn):
+ store = c10d.FileStore(self.file.name, self.world_size)
+ pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8))
+ work_handles = []
+ outputs = [
+ [
+ [fn(torch.Tensor([-1])) for _ in range(self.world_size)]
+ ] for _ in range(len(inputs))
+ ]
+ expected_outputs = [
+ [
+ [torch.Tensor([i + j]) for j in range(self.world_size)]
+ ] for i in range(len(inputs))
+ ]
+ for i in range(len(inputs)):
+ for root in range(self.world_size):
+ opts = c10d.GatherOptions()
+ opts.rootRank = root
+ if root == self.rank:
+ work = pg.gather(outputs[i], [fn(inputs[i])], opts)
+ else:
+ work = pg.gather([], [fn(inputs[i])], opts)
+ work_handles.append(work)
+
+ for i, work_handle in enumerate(work_handles):
+ work_handle.wait()
+ iter = i // self.world_size
+ root = i % self.world_size
+ if root == self.rank:
+ self.assertEqual(
+ expected_outputs[iter],
+ outputs[iter],
+ "Mismatch in iteration %d for root %d" % (iter, root)
+ )
+
+ def test_gather_stress(self):
+ inputs = [torch.Tensor([i + self.rank]) for i in range(1000)]
+ self._test_gather_stress(inputs, lambda t: t.clone())
+
+ @skip_if_not_multigpu
+ def test_gather_stress_cuda(self):
+ inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)]
+ self._test_gather_stress(inputs, lambda t: t.clone().cuda())
+
def test_allgather_checks(self):
store = c10d.FileStore(self.file.name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())