1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
import unittest
import torch
import torch.cuda.nccl as nccl
import torch.cuda
from common_utils import TestCase, run_tests, IS_WINDOWS, load_tests
from common_cuda import TEST_CUDA, TEST_MULTIGPU
# load_tests from common_utils is used to automatically filter tests for
# sharding on sandcastle. This line silences flake warnings
load_tests = load_tests
nGPUs = torch.cuda.device_count()
if not TEST_CUDA:
print('CUDA not available, skipping tests')
TestCase = object # noqa: F811
class TestNCCL(TestCase):
@unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
def test_unique_id(self):
uid = nccl.unique_id()
self.assertIsInstance(uid, bytes)
self.assertGreater(len(uid), 1)
@unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_broadcast(self):
expected = torch.FloatTensor(128).uniform_()
tensors = [expected.cuda()]
for device in range(1, torch.cuda.device_count()):
with torch.cuda.device(device):
tensors.append(torch.cuda.FloatTensor(128))
nccl.broadcast(tensors)
for i in range(torch.cuda.device_count()):
self.assertEqual(tensors[i], expected)
@unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_reduce(self):
tensors = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)]
expected = torch.FloatTensor(128).zero_()
for t in tensors:
expected.add_(t)
tensors = [tensors[i].cuda(i) for i in range(nGPUs)]
nccl.reduce(tensors)
self.assertEqual(tensors[0], expected)
@unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_all_reduce(self):
tensors = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)]
expected = torch.FloatTensor(128).zero_()
for t in tensors:
expected.add_(t)
tensors = [tensors[i].cuda(i) for i in range(nGPUs)]
nccl.all_reduce(tensors)
for tensor in tensors:
self.assertEqual(tensor, expected)
@unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_all_gather(self):
inputs = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)]
expected = torch.cat(inputs, 0)
inputs = [inputs[i].cuda(i) for i in range(nGPUs)]
outputs = [torch.cuda.FloatTensor(128 * nGPUs, device=i)
for i in range(nGPUs)]
nccl.all_gather(inputs, outputs)
for tensor in outputs:
self.assertEqual(tensor, expected)
@unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_reduce_scatter(self):
in_size = 32 * nGPUs
out_size = 32
inputs = [torch.FloatTensor(in_size).uniform_() for i in range(nGPUs)]
expected = torch.FloatTensor(in_size).zero_()
for t in inputs:
expected.add_(t)
expected = expected.view(nGPUs, 32)
inputs = [inputs[i].cuda(i) for i in range(nGPUs)]
outputs = [torch.cuda.FloatTensor(out_size, device=i)
for i in range(nGPUs)]
nccl.reduce_scatter(inputs, outputs)
for i in range(nGPUs):
self.assertEqual(outputs[i], expected[i])
if __name__ == '__main__':
run_tests()
|