diff options
author | Jane Wang <janewang@fb.com> | 2018-12-11 21:03:13 -0800 |
---|---|---|
committer | Facebook Github Bot <facebook-github-bot@users.noreply.github.com> | 2018-12-11 21:21:10 -0800 |
commit | f8455ed754196cd554cbaaaaa440dd178383154d (patch) | |
tree | a5467795d4578ab5b4acb2d32ca2db8b17b8d4ff /test/test_c10d.py | |
parent | 3fa53da61a82a19193619f48c3bfcf91d5eb20c0 (diff) | |
download | pytorch-f8455ed754196cd554cbaaaaa440dd178383154d.tar.gz pytorch-f8455ed754196cd554cbaaaaa440dd178383154d.tar.bz2 pytorch-f8455ed754196cd554cbaaaaa440dd178383154d.zip |
add gloo support for gather on GPU (#14916)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/14916
as titled
Reviewed By: pietern
Differential Revision: D13267832
fbshipit-source-id: 3b89d08af93f74941f17ff892c33fc2a4a023c19
Diffstat (limited to 'test/test_c10d.py')
-rw-r--r-- | test/test_c10d.py | 57 |
1 files changed, 54 insertions, 3 deletions
diff --git a/test/test_c10d.py b/test/test_c10d.py index bf28d2d843..329a7be8db 100644 --- a/test/test_c10d.py +++ b/test/test_c10d.py @@ -929,13 +929,13 @@ class ProcessGroupGlooTest(MultiProcessTestCase): opts.rootRank = (self.rank + 1) % self.world_size pg.gather([[t1] * self.world_size], [t1], opts) - def test_gather_basics(self): + def _test_gather_basics(self, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) # Preallocate tensors for input/output - input = [torch.Tensor([self.rank])] - outputs = [torch.Tensor([-1]) for _ in range(self.world_size)] + input = [fn(torch.Tensor([self.rank]))] + outputs = [fn(torch.Tensor([-1])) for _ in range(self.world_size)] # Take turns being the gather root and accumulate work items work = [] @@ -954,6 +954,57 @@ class ProcessGroupGlooTest(MultiProcessTestCase): if i == self.rank: self.assertEqual(expected, outputs) + def test_gather_basics(self): + self._test_gather_basics(lambda t: t.clone()) + + @skip_if_not_multigpu + def test_gather_basics_cuda(self): + self._test_gather_basics(lambda t: t.clone().cuda()) + + def _test_gather_stress(self, inputs, fn): + store = c10d.FileStore(self.file.name, self.world_size) + pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8)) + work_handles = [] + outputs = [ + [ + [fn(torch.Tensor([-1])) for _ in range(self.world_size)] + ] for _ in range(len(inputs)) + ] + expected_outputs = [ + [ + [torch.Tensor([i + j]) for j in range(self.world_size)] + ] for i in range(len(inputs)) + ] + for i in range(len(inputs)): + for root in range(self.world_size): + opts = c10d.GatherOptions() + opts.rootRank = root + if root == self.rank: + work = pg.gather(outputs[i], [fn(inputs[i])], opts) + else: + work = pg.gather([], [fn(inputs[i])], opts) + work_handles.append(work) + + for i, work_handle in enumerate(work_handles): + work_handle.wait() + iter = i // self.world_size + root = i % self.world_size + if root == self.rank: + self.assertEqual( + expected_outputs[iter], + outputs[iter], + "Mismatch in iteration %d for root %d" % (iter, root) + ) + + def test_gather_stress(self): + inputs = [torch.Tensor([i + self.rank]) for i in range(1000)] + self._test_gather_stress(inputs, lambda t: t.clone()) + + @skip_if_not_multigpu + def test_gather_stress_cuda(self): + inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)] + self._test_gather_stress(inputs, lambda t: t.clone().cuda()) + def test_allgather_checks(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) |