import copy import math import multiprocessing import os import sys import tempfile import time import unittest from datetime import timedelta from functools import wraps from collections import namedtuple import torch import common_utils as common from torch import nn import torch.nn.functional as F import torch.distributed as c10d from torch.nn.parallel import DistributedDataParallel from common_utils import TestCase, load_tests, run_tests from common_utils import retry_on_address_already_in_use_error # load_tests from common_utils is used to automatically filter tests for # sharding on sandcastle. This line silences flake warnings load_tests = load_tests if not c10d.is_available(): print('c10d not available, skipping tests') sys.exit(0) TIMEOUT_DEFAULT = 30 TIMEOUT_OVERRIDE = {} TestSkip = namedtuple('TestSkip', 'exit_code, message') TEST_SKIPS = { "multi-gpu": TestSkip(75, "Need at least 2 CUDA devices"), "nccl": TestSkip(76, "c10d not compiled with NCCL support"), "known_issues": TestSkip(77, "Test skipped due to known issues") } def skip_if_not_multigpu(func): """Multi-GPU tests requires at least 2 GPUS. Skip if this is not met.""" @wraps(func) def wrapper(*args, **kwargs): if torch.cuda.is_available() and torch.cuda.device_count() >= 2: return func(*args, **kwargs) sys.exit(TEST_SKIPS['multi-gpu'].exit_code) return wrapper def skip_if_not_nccl(func): """Skips a test if NCCL is not available (for c10d).""" @wraps(func) def wrapper(*args, **kwargs): if hasattr(c10d, "ProcessGroupNCCL"): return func(*args, **kwargs) sys.exit(TEST_SKIPS['nccl'].exit_code) return wrapper def skip_for_known_issues(func): """Skips a test due to known issues (for c10d).""" @wraps(func) def wrapper(*args, **kwargs): sys.exit(TEST_SKIPS['known_issues'].exit_code) return wrapper def get_timeout(test_id): return TIMEOUT_OVERRIDE.get(test_id.split('.')[-1], TIMEOUT_DEFAULT) def gpus_for_rank(world_size): """Multigpu tests are designed to simulate the multi nodes with multi GPUs on each node. Nccl backend requires equal #GPUs in each process. On a single node, all visible GPUs are evenly divided to subsets, each process only uses a subset. """ visible_devices = list(range(torch.cuda.device_count())) gpus_per_process = torch.cuda.device_count() // world_size gpus_for_rank = [] for rank in range(world_size): gpus_for_rank.append(visible_devices[rank * gpus_per_process: (rank + 1) * gpus_per_process]) return gpus_for_rank def simple_reduce_tests(rank, world_size): return [ ( c10d.ReduceOp.SUM, torch.Tensor([rank + 1.0]), torch.Tensor([float(world_size * (world_size + 1) / 2)]), ), ( c10d.ReduceOp.PRODUCT, torch.Tensor([rank + 1.0]), torch.Tensor([float(math.factorial(world_size))]), ), ( c10d.ReduceOp.MIN, torch.Tensor([rank + 1.0]), torch.Tensor([1.0]), ), ( c10d.ReduceOp.MAX, torch.Tensor([rank + 1.0]), torch.Tensor([world_size]), ), ] def simple_multi_input_reduce_tests(rank, world_size): return [ ( c10d.ReduceOp.SUM, [torch.Tensor([2 * rank + 0.0]), torch.Tensor([2 * rank + 1.0])], torch.Tensor([float(world_size * (2 * world_size - 1))]), ), ( c10d.ReduceOp.PRODUCT, [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])], torch.Tensor([float(math.factorial(2 * world_size))]), ), ( c10d.ReduceOp.MIN, [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])], torch.Tensor([1.0]), ), ( c10d.ReduceOp.MAX, [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])], torch.Tensor([2 * world_size]), ), ] class StoreTestBase(object): def _create_store(self, i): raise RuntimeError("not implemented") def _test_set_get(self, fs): fs.add("key", 1) fs.add("key", 2) fs.add("key", 3) fs.set("key0", "value0") fs.add("key3", 1) fs.set("key1", "value1") fs.add("key3", 2) fs.set("key2", "value2") fs.add("key3", 3) fs.add("key3", 4) fs.add("key3", 5) fs.add("key3", 6) self.assertEqual(b"6", fs.get("key")) self.assertEqual(b"value0", fs.get("key0")) self.assertEqual(b"value1", fs.get("key1")) self.assertEqual(b"value2", fs.get("key2")) self.assertEqual(b"21", fs.get("key3")) def test_set_get(self): self._test_set_get(self._create_store()) class FileStoreTest(TestCase, StoreTestBase): def setUp(self): self.file = tempfile.NamedTemporaryFile(delete=False) def tearDown(self): pass def _create_store(self): store = c10d.FileStore(self.file.name, 1) store.set_timeout(timedelta(seconds=300)) return store class PrefixFileStoreTest(TestCase, StoreTestBase): def setUp(self): self.file = tempfile.NamedTemporaryFile(delete=False) self.filestore = c10d.FileStore(self.file.name, 1) self.prefix = "test_prefix" self.filestore.set_timeout(timedelta(seconds=300)) def tearDown(self): pass def _create_store(self): return c10d.PrefixStore(self.prefix, self.filestore) def create_tcp_store(addr): """ Creates a TCP store. Retries if the chosen port is already in use. """ ports = [] for _ in range(10): try: port = common.find_free_port() ports.append(port) return c10d.TCPStore(addr, port, 1, True) except RuntimeError as error: if str(error) == "Address already in use": continue raise raise RuntimeError("Unable to find free port (tried %s)" % ", ".join(ports)) class TCPStoreTest(TestCase, StoreTestBase): def _create_store(self): store = create_tcp_store('localhost') store.set_timeout(timedelta(seconds=300)) return store def test_address_already_in_use(self): with self.assertRaisesRegex(RuntimeError, "^Address already in use$"): addr = 'localhost' port = common.find_free_port() # Use noqa to silence flake8. # Need to store in an unused variable here to ensure the first # object is not destroyed before the second object is created. store1 = c10d.TCPStore(addr, port, 1, True) # noqa: F841 store2 = c10d.TCPStore(addr, port, 1, True) # noqa: F841 class PrefixTCPStoreTest(TestCase, StoreTestBase): def setUp(self): self.tcpstore = create_tcp_store('localhost') self.prefix = "test_prefix" self.tcpstore.set_timeout(timedelta(seconds=300)) def _create_store(self): return c10d.PrefixStore(self.prefix, self.tcpstore) class RendezvousTest(TestCase): def test_unknown_handler(self): with self.assertRaisesRegex(RuntimeError, "^No rendezvous handler"): c10d.rendezvous('invalid://') class RendezvousEnvTest(TestCase): @retry_on_address_already_in_use_error def test_common_errors(self): # TODO remove this hack if not hasattr(c10d, "ProcessGroupNCCL"): raise unittest.SkipTest("C10D is not built with NCCL process group," " skipping test") vars = { "WORLD_SIZE": "1", "RANK": "0", "MASTER_ADDR": "127.0.0.1", "MASTER_PORT": common.find_free_port(), } class Env(object): def __init__(self, vars): self.vars = vars def __enter__(self): for key, value in self.vars.items(): os.environ[key] = str(value) def __exit__(self, type, value, traceback): for key in self.vars.keys(): del os.environ[key] def without(d, key): d = d.copy() d.pop(key) return d def withouts(d, keys): d = d.copy() for key in keys: d.pop(key) return d with Env(without(vars, 'WORLD_SIZE')): with self.assertRaisesRegex(ValueError, 'WORLD_SIZE expected'): gen = c10d.rendezvous('env://') next(gen) c10d.init_process_group(backend='nccl', world_size=1) self.assertEqual(c10d.get_rank(), 0) self.assertEqual(c10d.get_world_size(), 1) c10d.destroy_process_group() with Env(without(vars, 'RANK')): with self.assertRaisesRegex(ValueError, 'RANK expected'): gen = c10d.rendezvous('env://') next(gen) c10d.init_process_group(backend='nccl', rank=0) self.assertEqual(c10d.get_rank(), 0) self.assertEqual(c10d.get_world_size(), 1) c10d.destroy_process_group() with Env(withouts(vars, ['RANK', 'WORLD_SIZE'])): c10d.init_process_group(backend='nccl', rank=0, world_size=1) self.assertEqual(c10d.get_rank(), 0) self.assertEqual(c10d.get_world_size(), 1) c10d.destroy_process_group() with Env(vars): c10d.init_process_group(backend='nccl') self.assertEqual(c10d.get_rank(), 0) self.assertEqual(c10d.get_world_size(), 1) c10d.destroy_process_group() with Env(without(vars, 'MASTER_ADDR')): with self.assertRaisesRegex(ValueError, 'MASTER_ADDR expected'): gen = c10d.rendezvous('env://') next(gen) with Env(without(vars, 'MASTER_PORT')): with self.assertRaisesRegex(ValueError, 'MASTER_PORT expected'): gen = c10d.rendezvous('env://') next(gen) with Env(without(vars, 'WORLD_SIZE')): gen = c10d.rendezvous('env://?world_size={}'.format(1)) _, _, size = next(gen) self.assertEqual(size, 1) with Env(without(vars, 'RANK')): gen = c10d.rendezvous('env://?rank={}'.format(0)) _, rank, _ = next(gen) self.assertEqual(rank, 0) with Env(withouts(vars, ['RANK', 'WORLD_SIZE'])): gen = c10d.rendezvous('env://?rank={}&world_size={}'.format(0, 1)) _, rank, size = next(gen) self.assertEqual(rank, 0) self.assertEqual(size, 1) @retry_on_address_already_in_use_error def test_nominal(self): os.environ['WORLD_SIZE'] = '1' os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = str(common.find_free_port()) # Single rank os.environ['RANK'] = '0' gen0 = c10d.rendezvous('env://') store0, rank0, size0 = next(gen0) self.assertEqual(0, rank0) self.assertEqual(1, size0) store0.set("key0", "value0") # check with get self.assertEqual(b"value0", store0.get("key0")) class RendezvousFileTest(TestCase): def test_common_errors(self): with self.assertRaisesRegex(ValueError, 'path missing'): gen = c10d.rendezvous('file://?rank=0&world_size=1') next(gen) with self.assertRaisesRegex(ValueError, 'rank parameter missing'): gen = c10d.rendezvous('file:///tmp/foo?world_size=1') next(gen) with self.assertRaisesRegex(ValueError, 'size parameter missing'): gen = c10d.rendezvous('file:///tmp/foo?rank=0') next(gen) def test_nominal(self): with tempfile.NamedTemporaryFile(delete=False) as file: url = 'file://%s?world_size=%d' % (file.name, 2) gen0 = c10d.rendezvous(url + "&rank=0") store0, rank0, size0 = next(gen0) self.assertEqual(0, rank0) self.assertEqual(2, size0) gen1 = c10d.rendezvous(url + "&rank=1") store1, rank1, size1 = next(gen1) self.assertEqual(1, rank1) self.assertEqual(2, size1) # Set value on both stores store0.set("key0", "value0") store1.set("key1", "value1") # Cross check with get self.assertEqual(b"value0", store1.get("key0")) self.assertEqual(b"value1", store0.get("key1")) class RendezvousTCPTest(TestCase): def test_common_errors(self): with self.assertRaisesRegex(ValueError, 'port number missing'): gen = c10d.rendezvous('tcp://127.0.0.1?rank=0&world_size=1') next(gen) with self.assertRaisesRegex(ValueError, 'rank parameter missing'): gen = c10d.rendezvous('tcp://127.0.0.1:23456?world_size=1') next(gen) with self.assertRaisesRegex(ValueError, 'size parameter missing'): gen = c10d.rendezvous('tcp://127.0.0.1:23456?rank=0') next(gen) @retry_on_address_already_in_use_error def test_nominal(self): addr = 'localhost' port = common.find_free_port() url = 'tcp://%s:%d?world_size=%d' % (addr, port, 1) gen0 = c10d.rendezvous(url + "&rank=0") store0, rank0, size0 = next(gen0) self.assertEqual(0, rank0) self.assertEqual(1, size0) # Set value on the single store store0.set("key0", "value0") # check with get self.assertEqual(b"value0", store0.get("key0")) class MultiProcessTestCase(TestCase): MAIN_PROCESS_RANK = -1 @property def world_size(self): return 4 @staticmethod def join_or_run(fn): @wraps(fn) def wrapper(self): if self.rank == self.MAIN_PROCESS_RANK: self._join_processes(fn) else: fn(self) return wrapper # The main process spawns N subprocesses that run the test. # This function patches overwrites every test function to either # assume the role of the main process and join its subprocesses, # or run the underlying test function. @classmethod def setUpClass(cls): for attr in dir(cls): if attr.startswith('test'): fn = getattr(cls, attr) setattr(cls, attr, cls.join_or_run(fn)) def setUp(self): self.rank = self.MAIN_PROCESS_RANK self.file = tempfile.NamedTemporaryFile(delete=False) self.processes = [self._spawn_process(rank) for rank in range(int(self.world_size))] def tearDown(self): for p in self.processes: p.terminate() def _spawn_process(self, rank): name = 'process ' + str(rank) process = multiprocessing.Process(target=self._run, name=name, args=(rank,)) process.start() return process def _run(self, rank): self.rank = rank # self.id() == e.g. '__main__.TestDistributed.test_get_rank' # We're retreiving a corresponding test and executing it. getattr(self, self.id().split(".")[2])() sys.exit(0) def _join_processes(self, fn): timeout = get_timeout(self.id()) start_time = time.time() for p in self.processes: p.join(timeout) elapsed_time = time.time() - start_time self._check_return_codes(elapsed_time) def _check_return_codes(self, elapsed_time): """ Checks that the return codes of all spawned processes match, and skips tests if they returned a return code indicating a skipping condition. """ first_process = self.processes[0] for i, p in enumerate(self.processes): if p.exitcode is None: raise RuntimeError('Process {} terminated or timed out after {} seconds'.format(i, elapsed_time)) self.assertEqual(p.exitcode, first_process.exitcode) for skip in TEST_SKIPS.values(): if first_process.exitcode == skip.exit_code: raise unittest.SkipTest(skip.message) self.assertEqual(first_process.exitcode, 0) @property def is_master(self): return self.rank == 0 class ProcessGroupGlooTest(MultiProcessTestCase): def opts(self, threads=2): opts = c10d.ProcessGroupGloo.Options() opts.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")] opts.timeout = 1.0 opts.threads = threads return opts def test_broadcast_checks(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) t1 = torch.zeros([1], dtype=torch.float32) t2 = torch.zeros([1], dtype=torch.float64) t3 = torch.zeros([2], dtype=torch.float32) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.BroadcastOptions() opts.rootRank = -1 opts.rootTensor = 0 pg.broadcast([t1], opts) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.BroadcastOptions() opts.rootRank = self.world_size opts.rootTensor = 0 pg.broadcast([t1], opts) with self.assertRaisesRegex(ValueError, "invalid root tensor"): opts = c10d.BroadcastOptions() opts.rootRank = self.rank opts.rootTensor = -1 pg.broadcast([t1], opts) with self.assertRaisesRegex(ValueError, "invalid root tensor"): opts = c10d.BroadcastOptions() opts.rootRank = self.rank opts.rootTensor = 1 pg.broadcast([t1], opts) with self.assertRaisesRegex(ValueError, "invalid root tensor"): opts = c10d.BroadcastOptions() opts.rootRank = self.rank opts.rootTensor = 0 pg.broadcast([], opts) with self.assertRaisesRegex(ValueError, "invalid tensor type"): opts = c10d.BroadcastOptions() opts.rootRank = self.rank opts.rootTensor = 0 pg.broadcast([t1, t2], opts) with self.assertRaisesRegex(ValueError, "invalid tensor size"): opts = c10d.BroadcastOptions() opts.rootRank = self.rank opts.rootTensor = 0 pg.broadcast([t1, t3], opts) def _test_broadcast_basics(self, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) def broadcast(xs, rootRank, rootTensor): opts = c10d.BroadcastOptions() opts.rootRank = rootRank opts.rootTensor = rootTensor work = pg.broadcast(xs, opts) work.wait() # Every rank is root once for i in range(self.world_size): # Run with 1 input tensor x = fn(torch.Tensor([self.rank])) broadcast([x], i, 0) self.assertEqual(torch.Tensor([i]), x) # Run with 2 input tensors num = 2 for j in range(num): xs = [ fn(torch.Tensor([self.rank * num + 0.0])), fn(torch.Tensor([self.rank * num + 1.0])), ] broadcast(xs, i, j) self.assertEqual(torch.Tensor([i * num + j]), xs[0]) self.assertEqual(torch.Tensor([i * num + j]), xs[1]) # Test overloaded convenience function x = torch.Tensor([self.rank + 1.0]) work = pg.broadcast(x, root=0) work.wait() self.assertEqual(torch.Tensor([1.0]), x) def test_broadcast_basics(self): self._test_broadcast_basics(lambda t: t.clone()) @skip_if_not_multigpu def test_broadcast_basics_cuda(self): self._test_broadcast_basics(lambda t: t.clone().cuda()) def _test_broadcast_stress(self, inputs): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8)) work_handles = [ pg.broadcast(inputs[i], root=(i % self.world_size)) for i in range(len(inputs)) ] for i, work_handle in enumerate(work_handles): work_handle.wait() self.assertEqual( torch.Tensor([ (i * self.world_size) + (i % self.world_size) ]), inputs[i], "Mismatch in iteration %d" % i, ) def test_broadcast_stress(self): inputs = [torch.Tensor([i * self.world_size + self.rank]) for i in range(1000)] self._test_broadcast_stress(inputs) @skip_if_not_multigpu def test_broadcast_stress_cuda(self): inputs = [torch.Tensor([i * self.world_size + self.rank]).cuda() for i in range(1000)] self._test_broadcast_stress(inputs) def test_allreduce_checks(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) t1 = torch.zeros([1], dtype=torch.float32) t2 = torch.zeros([1], dtype=torch.float64) t3 = torch.zeros([2], dtype=torch.float32) with self.assertRaisesRegex(ValueError, "requires non-empty tensor list"): opts = c10d.AllreduceOptions() pg.allreduce([], opts) with self.assertRaisesRegex(ValueError, "invalid tensor type"): opts = c10d.AllreduceOptions() pg.allreduce([t1, t2], opts) with self.assertRaisesRegex(ValueError, "invalid tensor size"): opts = c10d.AllreduceOptions() pg.allreduce([t1, t3], opts) def _test_allreduce_basics(self, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) # Single input tests tests = simple_reduce_tests(self.rank, self.world_size) for (op, input, output) in tests: opts = c10d.AllreduceOptions() opts.reduceOp = op tensor = fn(input) work = pg.allreduce([tensor], opts) work.wait() self.assertEqual(output, tensor) # Multi input tests tests = simple_multi_input_reduce_tests(self.rank, self.world_size) for (op, inputs, output) in tests: opts = c10d.AllreduceOptions() opts.reduceOp = op tensors = [fn(input) for input in inputs] work = pg.allreduce(tensors, opts) work.wait() for tensor in tensors: self.assertEqual(output, tensor) # Test overloaded convenience function (defaults to using sum) x = fn(torch.Tensor([self.rank + 1.0])) work = pg.allreduce(x) work.wait() self.assertEqual(torch.Tensor([float(self.world_size * (self.world_size + 1) / 2)]), x) def test_allreduce_basics(self): self._test_allreduce_basics(lambda t: t.clone()) @skip_if_not_multigpu def test_allreduce_basics_cuda(self): self._test_allreduce_basics(lambda t: t.clone().cuda()) def _test_allreduce_stress(self, inputs): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8)) work_handles = [pg.allreduce(inputs[i]) for i in range(len(inputs))] for i, work_handle in enumerate(work_handles): work_handle.wait() self.assertEqual( torch.Tensor([ (i * self.world_size) + (self.world_size * (self.world_size - 1) / 2) ]), inputs[i], "Mismatch in iteration %d" % i, ) def test_allreduce_stress(self): inputs = [torch.Tensor([i + self.rank]) for i in range(1000)] self._test_allreduce_stress(inputs) @skip_if_not_multigpu def test_allreduce_stress_cuda(self): inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)] self._test_allreduce_stress(inputs) def test_scatter_checks(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) t1 = torch.zeros([1], dtype=torch.float32) t2 = torch.zeros([1], dtype=torch.float64) t3 = torch.zeros([2], dtype=torch.float32) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.ScatterOptions() opts.rootRank = -1 pg.scatter([t1], [], opts) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.ScatterOptions() opts.rootRank = self.world_size pg.scatter([t1], [], opts) with self.assertRaisesRegex(ValueError, "requires a single-element output tensor list"): opts = c10d.ScatterOptions() opts.rootRank = 0 pg.scatter([], [], opts) with self.assertRaisesRegex(ValueError, "requires a single-element output tensor list"): opts = c10d.ScatterOptions() opts.rootRank = 0 pg.scatter([t1, t1], [], opts) with self.assertRaisesRegex(ValueError, "requires a single-element input list"): opts = c10d.ScatterOptions() opts.rootRank = self.rank pg.scatter([t1], [], opts) with self.assertRaisesRegex(ValueError, "requires a single-element input list"): opts = c10d.ScatterOptions() opts.rootRank = self.rank pg.scatter([t1], [[t1] * self.world_size, [t1] * self.world_size], opts) with self.assertRaisesRegex(ValueError, "requires a single-element input list"): opts = c10d.ScatterOptions() opts.rootRank = self.rank pg.scatter([t1], [[t1] * (self.world_size - 1)], opts) with self.assertRaisesRegex(ValueError, "requires a single-element input list"): opts = c10d.ScatterOptions() opts.rootRank = self.rank pg.scatter([t1], [[t1] * (self.world_size + 1)], opts) with self.assertRaisesRegex(ValueError, "requires a single-element input list"): opts = c10d.ScatterOptions() opts.rootRank = self.rank pg.scatter([t1], [[t1] * (self.world_size + 1)], opts) with self.assertRaisesRegex(ValueError, "invalid tensor type"): opts = c10d.ScatterOptions() opts.rootRank = self.rank pg.scatter([t1], [[t2] * self.world_size], opts) with self.assertRaisesRegex(ValueError, "invalid tensor size"): opts = c10d.ScatterOptions() opts.rootRank = self.rank pg.scatter([t1], [[t3] * self.world_size], opts) with self.assertRaisesRegex(ValueError, "requires empty input on non-root"): opts = c10d.ScatterOptions() opts.rootRank = (self.rank + 1) % self.world_size pg.scatter([t1], [[t1] * self.world_size], opts) def _test_scatter_basics(self, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) # Preallocate tensors for input/output input = [fn(torch.Tensor([self.rank])) for _ in range(self.world_size)] outputs = [fn(torch.Tensor([-1])) for _ in range(self.world_size)] # Take turns being the scatter root and accumulate work items work = [] for i in range(self.world_size): opts = c10d.ScatterOptions() opts.rootRank = i if i == self.rank: work.append(pg.scatter([outputs[i]], [input], opts)) else: work.append(pg.scatter([outputs[i]], [], opts)) # Wait for work to complete for i in range(self.world_size): work[i].wait() self.assertEqual(torch.Tensor([i]), outputs[i]) def test_scatter_basics(self): self._test_scatter_basics(lambda t: t.clone()) @skip_if_not_multigpu def test_scatter_basics_cuda(self): self._test_scatter_basics(lambda t: t.clone().cuda()) def _test_scatter_stress(self, inputs, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8)) outputs = [ [fn(torch.Tensor([-1])) for _ in range(self.world_size)] for _ in range(len(inputs)) ] work_handles = [] for i in range(len(inputs)): for root in range(self.world_size): opts = c10d.ScatterOptions() opts.rootRank = root if root == self.rank: work = pg.scatter([outputs[i][root]], [[fn(e) for e in inputs[i]]], opts) else: work = pg.scatter([outputs[i][root]], [], opts) work_handles.append(work) for i, work_handle in enumerate(work_handles): work_handle.wait() iter = i // self.world_size root = i % self.world_size self.assertEqual( torch.Tensor([iter + root]), outputs[iter][root], "Mismatch in iteration %d for rank %d" % (iter, root) ) def test_scatter_stress(self): inputs = [ [torch.Tensor([i + self.rank]) for _ in range(self.world_size)] for i in range(1000) ] self._test_scatter_stress(inputs, lambda t: t.clone()) @unittest.skip("Test is flaky, see https://github.com/pytorch/pytorch/issues/15963") @skip_if_not_multigpu def test_scatter_stress_cuda(self): inputs = [ [torch.Tensor([i + self.rank]) for _ in range(self.world_size)] for i in range(1000) ] self._test_scatter_stress(inputs, lambda t: t.clone().cuda()) def test_gather_checks(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) t1 = torch.zeros([1], dtype=torch.float32) t2 = torch.zeros([1], dtype=torch.float64) t3 = torch.zeros([2], dtype=torch.float32) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.GatherOptions() opts.rootRank = -1 pg.gather([], [t1], opts) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.GatherOptions() opts.rootRank = self.world_size pg.gather([], [t1], opts) with self.assertRaisesRegex(ValueError, "requires a single-element input tensor list"): opts = c10d.GatherOptions() opts.rootRank = 0 pg.gather([], [], opts) with self.assertRaisesRegex(ValueError, "requires a single-element input tensor list"): opts = c10d.GatherOptions() opts.rootRank = 0 pg.gather([], [t1, t1], opts) with self.assertRaisesRegex(ValueError, "requires a single-element output list"): opts = c10d.GatherOptions() opts.rootRank = self.rank pg.gather([], [t1], opts) with self.assertRaisesRegex(ValueError, "requires a single-element output list"): opts = c10d.GatherOptions() opts.rootRank = self.rank pg.gather([[t1] * self.world_size, [t1] * self.world_size], [t1], opts) with self.assertRaisesRegex(ValueError, "requires a single-element output list"): opts = c10d.GatherOptions() opts.rootRank = self.rank pg.gather([[t1] * (self.world_size - 1)], [t1], opts) with self.assertRaisesRegex(ValueError, "requires a single-element output list"): opts = c10d.GatherOptions() opts.rootRank = self.rank pg.gather([[t1] * (self.world_size + 1)], [t1], opts) with self.assertRaisesRegex(ValueError, "invalid tensor type"): opts = c10d.GatherOptions() opts.rootRank = self.rank pg.gather([[t2] * self.world_size], [t1], opts) with self.assertRaisesRegex(ValueError, "invalid tensor size"): opts = c10d.GatherOptions() opts.rootRank = self.rank pg.gather([[t3] * self.world_size], [t1], opts) with self.assertRaisesRegex(ValueError, "requires empty output on non-root"): opts = c10d.GatherOptions() opts.rootRank = (self.rank + 1) % self.world_size pg.gather([[t1] * self.world_size], [t1], opts) def _test_gather_basics(self, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) # Preallocate tensors for input/output input = [fn(torch.Tensor([self.rank]))] outputs = [fn(torch.Tensor([-1])) for _ in range(self.world_size)] # Take turns being the gather root and accumulate work items work = [] for i in range(self.world_size): opts = c10d.GatherOptions() opts.rootRank = i if i == self.rank: work.append(pg.gather([outputs], input, opts)) else: work.append(pg.gather([], input, opts)) # Wait for work to complete expected = [torch.Tensor([rank]) for rank in range(self.world_size)] for i in range(self.world_size): work[i].wait() if i == self.rank: self.assertEqual(expected, outputs) def test_gather_basics(self): self._test_gather_basics(lambda t: t.clone()) @skip_if_not_multigpu def test_gather_basics_cuda(self): self._test_gather_basics(lambda t: t.clone().cuda()) def _test_gather_stress(self, inputs, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8)) work_handles = [] outputs = [ [ [fn(torch.Tensor([-1])) for _ in range(self.world_size)] ] for _ in range(len(inputs)) ] expected_outputs = [ [ [torch.Tensor([i + j]) for j in range(self.world_size)] ] for i in range(len(inputs)) ] for i in range(len(inputs)): for root in range(self.world_size): opts = c10d.GatherOptions() opts.rootRank = root if root == self.rank: work = pg.gather(outputs[i], [fn(inputs[i])], opts) else: work = pg.gather([], [fn(inputs[i])], opts) work_handles.append(work) for i, work_handle in enumerate(work_handles): work_handle.wait() iter = i // self.world_size root = i % self.world_size if root == self.rank: self.assertEqual( expected_outputs[iter], outputs[iter], "Mismatch in iteration %d for root %d" % (iter, root) ) def test_gather_stress(self): inputs = [torch.Tensor([i + self.rank]) for i in range(1000)] self._test_gather_stress(inputs, lambda t: t.clone()) @skip_if_not_multigpu def test_gather_stress_cuda(self): inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)] self._test_gather_stress(inputs, lambda t: t.clone().cuda()) def test_allgather_checks(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) t1 = torch.zeros([1], dtype=torch.float32) t2 = torch.zeros([1], dtype=torch.float64) t3 = torch.zeros([2], dtype=torch.float32) with self.assertRaisesRegex(ValueError, "requires non-empty input tensor list"): pg.allgather([], []) with self.assertRaisesRegex(ValueError, "requires input/output tensor lists to have the same length"): pg.allgather([], [t1]) with self.assertRaisesRegex(ValueError, "requires input/output tensor lists to have the same length"): pg.allgather([[t1] * self.world_size, [t1] * self.world_size], [t1]) with self.assertRaisesRegex(ValueError, "invalid output tensor list"): pg.allgather([[t1] * (self.world_size - 1)], [t1]) with self.assertRaisesRegex(ValueError, "invalid output tensor list"): pg.allgather([[t1] * (self.world_size + 1)], [t1]) with self.assertRaisesRegex(ValueError, "invalid tensor type"): pg.allgather([[t1, t1] * (self.world_size), [t1, t1] * (self.world_size)], [t1, t2]) with self.assertRaisesRegex(ValueError, "invalid tensor size"): pg.allgather([[t1, t1] * (self.world_size), [t1, t1] * (self.world_size)], [t1, t3]) with self.assertRaisesRegex(ValueError, "invalid tensor type"): pg.allgather([([t1, t2] * (self.world_size))[:self.world_size]], [t1]) with self.assertRaisesRegex(ValueError, "invalid tensor size"): pg.allgather([([t1, t3] * (self.world_size))[:self.world_size]], [t1]) def _test_allgather_basics(self, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) # Run with N input tensor per rank for n in [1, 2, 3]: input = [ fn(torch.Tensor([n * self.rank + i])) for i in range(n) ] output = [ [ fn(torch.Tensor([-1])) for _ in range(n * self.world_size) ] for _ in range(n) ] expected_output = [ [ torch.Tensor([i]) for i in range(n * self.world_size) ] for _ in range(n) ] work = pg.allgather(output, input) work.wait() self.assertEqual(expected_output, output) def test_allgather_basics(self): self._test_allgather_basics(lambda t: t.clone()) @skip_if_not_multigpu def test_allgather_basics_cuda(self): self._test_allgather_basics(lambda t: t.clone().cuda()) def _test_allgather_stress(self, inputs, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8)) work_handles = [] outputs = [ [ [fn(torch.Tensor([-1])) for _ in range(self.world_size)] ] for _ in range(len(inputs)) ] expected_outputs = [ [ [torch.Tensor([i + j]) for j in range(self.world_size)] ] for i in range(len(inputs)) ] for i in range(len(inputs)): work = pg.allgather(outputs[i], [fn(inputs[i])]) work_handles.append(work) for i, work_handle in enumerate(work_handles): work_handle.wait() self.assertEqual( expected_outputs[i], outputs[i], "Mismatch in iteration %d" % i ) def test_allgather_stress(self): inputs = [torch.Tensor([i + self.rank]) for i in range(1000)] self._test_allgather_stress(inputs, lambda t: t.clone()) @skip_if_not_multigpu def test_allgather_stress_cuda(self): inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)] self._test_allgather_stress(inputs, lambda t: t.clone().cuda()) def test_reduce_checks(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) t1 = torch.zeros([1], dtype=torch.float32) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.ReduceOptions() opts.rootRank = -1 opts.rootTensor = 0 pg.reduce([t1], opts) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.ReduceOptions() opts.rootRank = self.world_size opts.rootTensor = 0 pg.reduce([t1], opts) with self.assertRaisesRegex(ValueError, "invalid root tensor"): opts = c10d.ReduceOptions() opts.rootRank = self.rank opts.rootTensor = 1 pg.reduce([t1], opts) with self.assertRaisesRegex(ValueError, "requires a single-element tensor list"): opts = c10d.ReduceOptions() opts.rootRank = self.rank opts.rootTensor = 0 pg.reduce([t1, t1], opts) def _test_reduce_basics(self, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) for (op, input, output) in simple_reduce_tests(self.rank, self.world_size): for root in range(self.world_size): opts = c10d.ReduceOptions() opts.reduceOp = op opts.rootRank = root tmp = fn(input) work = pg.reduce([tmp], opts) work.wait() if root == self.rank: self.assertEqual(output, tmp) def test_reduce_basics(self): self._test_reduce_basics(lambda t: t.clone()) @skip_if_not_multigpu def test_reduce_basics_cuda(self): self._test_reduce_basics(lambda t: t.clone().cuda()) def _test_reduce_stress(self, inputs): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8)) work_handles = [] outputs = [] for i in range(len(inputs)): for root in range(self.world_size): opts = c10d.ReduceOptions() opts.rootRank = root tmp = inputs[i].clone() outputs.append(tmp) work = pg.reduce([tmp], opts) work_handles.append(work) for i, work_handle in enumerate(work_handles): work_handle.wait() iter = i // self.world_size root = i % self.world_size if root == self.rank: self.assertEqual( torch.Tensor([ (iter * self.world_size) + (self.world_size * (self.world_size - 1) / 2) ]), outputs[i], "Mismatch in iteration %d with root rank %d" % (iter, root), ) def test_reduce_stress(self): inputs = [torch.Tensor([i + self.rank]) for i in range(1000)] self._test_reduce_stress(inputs) @skip_if_not_multigpu def test_reduce_stress_cuda(self): inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)] self._test_reduce_stress(inputs) def test_send_recv_all_to_all(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) # Preallocate tensors for input/output inputs = [torch.Tensor([self.rank]) for _ in range(self.world_size)] outputs = [torch.Tensor([-1]) for _ in range(self.world_size)] # Issue sends send_work = [] for i in range(self.world_size): if i == self.rank: continue send_work.append(pg.send([inputs[i]], i, 0)) # Issue recvs recv_work = [] for i in range(self.world_size): if i == self.rank: continue recv_work.append(pg.recv([outputs[i]], i, 0)) # Wait for sends to complete for work in send_work: work.wait() self.assertTrue(work.is_completed()) # Wait for recvs to complete for work in recv_work: work.wait() self.assertTrue(work.is_completed()) # Test that every output other than our own contains the respective rank for i in range(self.world_size): if i == self.rank: continue self.assertEqual(torch.Tensor([i]), outputs[i]) def test_timeout_kwarg(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo( store, self.rank, self.world_size, timeout=timedelta(seconds=0.5)) # Wait on barrier pg.barrier().wait() # Sleep on one of the processes to trigger barrier timeout if self.rank == 0: time.sleep(1.0) # The barrier will now time out with self.assertRaisesRegex(RuntimeError, " (Timed out|closed) "): pg.barrier().wait() def test_barrier_implies_wait(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size) # Kick off allreduce operations size = (100, 100) num = 16 tensors = [torch.full(size, float(i)) for i in range(num)] for tensor in tensors: # Note: leak the returned work handle pg.allreduce(tensor) # Barrier should ensure all previous work has completed pg.barrier().wait() for i, tensor in enumerate(tensors): self.assertEqual(torch.full(size, float(i * self.world_size)), tensor) class ProcessGroupNCCLTest(TestCase): MAIN_PROCESS_RANK = 0 def setUp(self): if not hasattr(c10d, "ProcessGroupNCCL"): raise unittest.SkipTest("C10D is not built with NCCL process group," " skipping test") self.rank = self.MAIN_PROCESS_RANK self.world_size = 1 self.file = tempfile.NamedTemporaryFile(delete=False) self.num_gpus = torch.cuda.device_count() if self.num_gpus < 2: raise unittest.SkipTest("NCCL test requires 2+ GPUs") def tearDown(self): pass def test_broadcast_ops(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) def broadcast(xs, rootRank, rootTensor): opts = c10d.BroadcastOptions() opts.rootRank = rootRank opts.rootTensor = rootTensor work = pg.broadcast(xs, opts) work.wait() # for every root tensor for rt in range(self.num_gpus): tensors = [] for i in range(self.num_gpus): tensors.append(torch.Tensor([i]).cuda(i)) broadcast(tensors, self.rank, rt) for i in range(self.num_gpus): self.assertEqual(tensors[i], tensors[rt]) def test_allreduce_ops(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) def allreduce(tensors, op): opts = c10d.AllreduceOptions() opts.reduceOp = op work = pg.allreduce(tensors, opts) work.wait() # Sum tensors = [] for i in range(self.num_gpus): tensors.append(torch.Tensor([i + 1]).cuda(i)) allreduce(tensors, c10d.ReduceOp.SUM) for i in range(self.num_gpus): self.assertEqual( torch.Tensor([float(self.num_gpus * (self.num_gpus + 1) / 2)]), tensors[i]) # Product tensors = [] for i in range(self.num_gpus): tensors.append(torch.Tensor([i + 1]).cuda(i)) allreduce(tensors, c10d.ReduceOp.PRODUCT) for i in range(self.num_gpus): self.assertEqual( torch.Tensor([float(math.factorial(self.num_gpus))]), tensors[i]) # Min tensors = [] for i in range(self.num_gpus): tensors.append(torch.Tensor([i + 1]).cuda(i)) allreduce(tensors, c10d.ReduceOp.MIN) for i in range(self.num_gpus): self.assertEqual(torch.Tensor([1.0]), tensors[i]) # Max tensors = [] for i in range(self.num_gpus): tensors.append(torch.Tensor([i + 1]).cuda(i)) allreduce(tensors, c10d.ReduceOp.MAX) for i in range(self.num_gpus): self.assertEqual(torch.Tensor([self.num_gpus]), tensors[i]) def test_reduce_ops(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) def reduce(xs, rootRank, rootTensor): opts = c10d.ReduceOptions() opts.rootRank = rootRank opts.rootTensor = rootTensor work = pg.reduce(xs, opts) work.wait() # for every root tensor for rt in range(self.num_gpus): tensors = [] for i in range(self.num_gpus): tensors.append(torch.Tensor([i + 1]).cuda(i)) reduce(tensors, self.rank, rt) self.assertEqual( torch.Tensor([float(self.num_gpus * (self.num_gpus + 1) / 2)]), tensors[rt]) def test_allgather_ops(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) def allgather(output_ts, input_ts): work = pg.allgather(output_ts, input_ts) work.wait() tensors = [] output_ts = [[] for _ in range(self.num_gpus)] for idx, ls in enumerate(output_ts): for _ in range(self.world_size * self.num_gpus): ls.append(torch.Tensor([0]).cuda(idx)) for i in range(self.num_gpus): tensors.append(torch.Tensor([i]).cuda(i)) allgather(output_ts, tensors) # Verification for device_ts in output_ts: for s_idx, t in enumerate(device_ts): self.assertEqual(torch.Tensor([s_idx]), t) def test_barrier(self): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) def allreduce(tensors): opts = c10d.AllreduceOptions() work = pg.allreduce(tensors, opts) return work # Making the collective to operate on # 1, 2, 3, 4, .... self.num_gpus GPUs tensors_list = [[] for _ in range(2, self.num_gpus + 1)] for i in range(2, self.num_gpus + 1): for j in range(i): tensors_list[i - 2].append(torch.Tensor([j + 1]).cuda(j)) works = [] for tensors in tensors_list: work = allreduce(tensors) works.append(work) # Barrier will ensure that all previous work is completed pg.barrier().wait() for i in range(2, self.num_gpus + 1): for j in range(i): self.assertEqual( torch.Tensor([float(i * (i + 1) / 2)]), tensors_list[i - 2][j]) class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.fc1 = nn.Linear(2, 10, bias=False) self.fc2 = nn.Linear(10, 50, bias=False) self.fc3 = nn.Linear(50, 4, bias=False) self.relu = nn.ReLU() def forward(self, x): x = self.relu(self.fc1(x)) x = self.relu(self.fc2(x)) x = self.fc3(x) return F.softmax(x, dim=1) class DistributedDataParallelTest(MultiProcessTestCase): def tearDown(self): # DistributedDataParallel test doesn't seem to call FileStore destructor # TODO: investigate this test and the test is known to have issues # Use this hack to remove files for that test try: os.remove(self.file.name) except OSError: pass @property def world_size(self): return 2 def _test_ddp_with_process_group(self, process_group, gpus): model = Net() ddp_model = DistributedDataParallel( copy.deepcopy(model).cuda(gpus[0]), device_ids=gpus, process_group=process_group, bucket_cap_mb=0.001) model.cuda(gpus[0]) local_batch_size = len(gpus) global_batch_size = self.world_size * local_batch_size input = torch.randn(global_batch_size, 2).cuda(gpus[0]) target = torch.randn(global_batch_size, 4).cuda(gpus[0]) def step_model(model, input, target): model.train() output = model(input) loss = F.mse_loss(output, target) loss.backward() def update_parameters(model): for param in model.parameters(): param.data -= param.grad param.grad = None # check two model parameters over 2 iterations for iteration in range(2): # single cpu/gpu training step_model(model, input, target) # DDP training, DDP scatters subsets of input_cpu to nodes/GPUs step_model(ddp_model, input[self.rank * local_batch_size: (self.rank + 1) * local_batch_size], target[self.rank * local_batch_size: (self.rank + 1) * local_batch_size]) # Update weights and run a second iteration to shake out errors update_parameters(model) update_parameters(ddp_model) self.assertEqual(len(list(model.parameters())), len(list(ddp_model.parameters()))) for i, j in zip(model.parameters(), ddp_model.parameters()): self.assertEqual(i, j) # Shuffle the input so that DDP input is different torch.manual_seed(1337 + iteration) input = input[torch.randperm(global_batch_size)] @skip_if_not_multigpu def test_gloo_backend(self): store = c10d.FileStore(self.file.name, self.world_size) options = c10d.ProcessGroupGloo.Options() options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")] process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options) gpus = gpus_for_rank(self.world_size)[self.rank] self._test_ddp_with_process_group(process_group, gpus) self._test_ddp_with_process_group(process_group, list(map(lambda i: torch.device('cuda:' + str(i)), gpus))) @skip_if_not_multigpu @skip_if_not_nccl def test_nccl_backend(self): store = c10d.FileStore(self.file.name, self.world_size) process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) gpus = gpus_for_rank(self.world_size)[self.rank] self._test_ddp_with_process_group(process_group, gpus) self._test_ddp_with_process_group(process_group, list(map(lambda i: torch.device('cuda:' + str(i)), gpus))) @skip_if_not_multigpu @skip_if_not_nccl @skip_for_known_issues def test_dist_broadcast_coalesced_nccl(self): store = c10d.FileStore(self.file.name, self.world_size) process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) device = torch.device('cuda') for fine_grained in [False, True]: target = torch.arange(60, dtype=torch.float16, device=device).chunk(5) target += torch.arange(60, dtype=torch.float32, device=device).chunk(5) target += torch.arange(60, dtype=torch.float16, device=device).chunk(5) target += torch.arange(60, dtype=torch.float64, device=device).chunk(5) target += torch.arange(60, dtype=torch.float16, device=device).chunk(5) target += torch.arange(60, dtype=torch.float32, device=device).chunk(5) if self.is_master: # All processes should have these tensors in the end. tensors = target else: # Non-master processes start with empty tensors and should be # filled with the tensors from the master. tensors = torch.zeros(60, dtype=torch.float16, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float64, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5) c10d._dist_broadcast_coalesced( process_group, tensors, buffer_size=256, fine_grained=fine_grained) self.assertEqual(tensors, target) @skip_if_not_multigpu def test_dist_broadcast_coalesced_gloo(self): store = c10d.FileStore(self.file.name, self.world_size) options = c10d.ProcessGroupGloo.Options() options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")] process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options) device = torch.device('cuda') for fine_grained in [False, True]: target = torch.arange(60, dtype=torch.float16, device=device).chunk(5) target += torch.arange(60, dtype=torch.float32, device=device).chunk(5) target += torch.arange(60, dtype=torch.float16, device=device).chunk(5) target += torch.arange(60, dtype=torch.float64, device=device).chunk(5) target += torch.arange(60, dtype=torch.float16, device=device).chunk(5) target += torch.arange(60, dtype=torch.float32, device=device).chunk(5) if self.is_master: # All processes should have these tensors in the end. tensors = target else: # Non-master processes start with empty tensors and should be # filled with the tensors from the master. tensors = torch.zeros(60, dtype=torch.float16, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float64, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5) tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5) c10d._dist_broadcast_coalesced( process_group, tensors, buffer_size=128, fine_grained=fine_grained) self.assertEqual(tensors, target) @skip_if_not_multigpu def test_sync_params_no_buffers(self): store = c10d.FileStore(self.file.name, self.world_size) options = c10d.ProcessGroupGloo.Options() options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")] process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options) # Use all available devices on every process here (data is small, so should be fine). devices = gpus_for_rank(self.world_size)[self.rank] target = torch.arange(10, dtype=torch.float64, device='cuda:{}'.format(devices[0])).chunk(5) parameter_data = [target] parameter_data += [torch.zeros(10, device=torch.device('cuda', d)).chunk(5) for d in devices[1:]] buffer_data = [[]] * len(parameter_data) c10d._sync_params( process_group, parameter_data=parameter_data, buffer_data=buffer_data, devices=devices, broadcast_bucket_size=10, broadcast_buffers=False) for device_data in parameter_data: for i, parameter in enumerate(device_data): self.assertEqual(parameter, target[i]) @skip_if_not_multigpu def test_sync_params_with_buffers(self): store = c10d.FileStore(self.file.name, self.world_size) options = c10d.ProcessGroupGloo.Options() options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")] process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options) devices = gpus_for_rank(self.world_size)[self.rank] target = torch.arange(10, dtype=torch.float64, device='cuda:{}'.format(devices[0])).chunk(5) parameter_data = [target] parameter_data += [torch.zeros(10, device=torch.device('cuda', d)).chunk(5) for d in devices[1:]] # sync_params should do a dist_broadcast for buffers, so we only populate the master buffers and # then check that other processes' tensors end up matching. if self.is_master: buffer_data = [target] buffer_data += [torch.zeros(10, device=torch.device('cuda', d)).chunk(5) for d in devices[1:]] else: buffer_data = [torch.zeros(10, device=torch.device('cuda', d)).chunk(5) for d in devices] c10d._sync_params( process_group, parameter_data=parameter_data, buffer_data=buffer_data, devices=devices, broadcast_bucket_size=10, broadcast_buffers=True) for device_data in parameter_data: for i, parameter in enumerate(device_data): self.assertEqual(parameter, target[i]) for device_data in buffer_data: for i, buffer in enumerate(device_data): self.assertEqual(buffer, target[i]) @skip_if_not_multigpu @skip_if_not_nccl def test_fp16(self): store = c10d.FileStore(self.file.name, self.world_size) process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) gpus = gpus_for_rank(self.world_size)[self.rank] model = nn.Linear(1, 1, bias=False).cuda(gpus[0]).half() nn.init.constant_(model.weight, 1) ddp_model = DistributedDataParallel( model, device_ids=[gpus[0]], process_group=process_group, bucket_cap_mb=0.001, ) # Input 2**15, so that the gradients will overflow with a # world_size of 2, unless we normalize the gradient by the # world_size before the reduction input = torch.Tensor([[2**15]]).cuda(gpus[0]).half() # Step model ddp_model.train() output = ddp_model(input) loss = output.sum() loss.backward() self.assertFalse( any(torch.isinf(p.grad).any() for p in ddp_model.parameters()) ) @skip_if_not_nccl @skip_if_not_multigpu def test_queue_reduction(self): # Set up process group. store = c10d.FileStore(self.file.name, self.world_size) process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) # Get this process' split of devices. devices = gpus_for_rank(self.world_size)[self.rank] grads_batch = [(torch.ones(10, device=torch.device('cuda', d)) * (self.rank + 1)).chunk(5) for d in devices] work, local_grad_sum = c10d._queue_reduction(process_group, grads_batch, devices) # The first return value should be the allreduce work item. self.assertTrue(isinstance(work, c10d.Work)) # The second return value will be the finished allreduced gradients. self.assertTrue(isinstance(local_grad_sum, torch.Tensor)) # Wait for the allreduce to finish. work.wait() # The expected result of the allreduce should be the average self.assertEqual(local_grad_sum, torch.ones(10) * (self.world_size + 1) * len(devices) / 2.0) @skip_if_not_nccl @skip_if_not_multigpu def test_sync_reduction(self): # Set up process group. store = c10d.FileStore(self.file.name, self.world_size) process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size) # Get this process' split of devices. devices = gpus_for_rank(self.world_size)[self.rank] grads_batch = [(torch.ones(10, device=torch.device('cuda', d)) * (self.rank + 1)).chunk(5) for d in devices] work, local_grad_sum = c10d._queue_reduction(process_group, grads_batch, devices) c10d._sync_reduction(work, grads_batch[0], local_grad_sum) # The expected result of the allreduce should be the average self.assertEqual(grads_batch[0], (torch.ones(10) * (self.world_size + 1) * len(devices) / 2.0).chunk(5)) if __name__ == '__main__': assert not torch.cuda._initialized, "test_distributed must not have initialized CUDA context on main process" run_tests()