summaryrefslogtreecommitdiff
path: root/test/test_c10d.py
diff options
context:
space:
mode:
authorShen Li <cs.shenli@gmail.com>2019-04-10 20:30:46 -0700
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>2019-04-10 20:35:36 -0700
commit6b0ca8eae5d663ad3db560b428abcef465f09dbb (patch)
tree314a2269b3d7c6adf74ce2565dd6f4f8a257f46d /test/test_c10d.py
parent821b5f138a987807032a2fd908fe10a5be5439d9 (diff)
downloadpytorch-6b0ca8eae5d663ad3db560b428abcef465f09dbb.tar.gz
pytorch-6b0ca8eae5d663ad3db560b428abcef465f09dbb.tar.bz2
pytorch-6b0ca8eae5d663ad3db560b428abcef465f09dbb.zip
Fix flaky store timeout test (#19114)
Summary: ~Sometimes, `init_process_group()`, `store.get()`, and `destory_process_group()` can take more than a few seconds. Hence, removing thread join timeout.~ The error was due to `Address already in use` when starting TPC backend. The solution is to catch the error and report it to the `retry_on_address_already_in_use_error` decorator. Pull Request resolved: https://github.com/pytorch/pytorch/pull/19114 Reviewed By: ezyang Differential Revision: D14872680 Pulled By: mrshenli fbshipit-source-id: fc504d02853ca73f76288c0ade564ab20bc01f7e
Diffstat (limited to 'test/test_c10d.py')
-rw-r--r--test/test_c10d.py35
1 files changed, 23 insertions, 12 deletions
diff --git a/test/test_c10d.py b/test/test_c10d.py
index 6b88fa925b..f06dd88d45 100644
--- a/test/test_c10d.py
+++ b/test/test_c10d.py
@@ -504,16 +504,21 @@ class MultiProcessTestCase(TestCase):
class TimeoutTest(TestCase):
def _test_store_timeout(self, backend, init_method, c2p):
- c10d.distributed_c10d.init_process_group(
- backend=backend, init_method=init_method, world_size=1, rank=0,
- timeout=timedelta(seconds=1))
- default_store = c10d.distributed_c10d._get_default_store()
- tik = time.time()
- with self.assertRaisesRegex(RuntimeError, "Timeout"):
- default_store.get("nonexistent key")
- tok = time.time()
- c10d.destroy_process_group()
- c2p.append(tok - tik)
+ try:
+ c10d.distributed_c10d.init_process_group(
+ backend=backend, init_method=init_method, world_size=1, rank=0,
+ timeout=timedelta(seconds=1))
+ default_store = c10d.distributed_c10d._get_default_store()
+ tik = time.time()
+ with self.assertRaisesRegex(RuntimeError, "Timeout"):
+ default_store.get("nonexistent key")
+ tok = time.time()
+ c10d.destroy_process_group()
+ c2p.append(float(tok - tik))
+ except RuntimeError as e:
+ # catch "Address already in use" error and report it to the main
+ # thread
+ c2p.append(e)
def _init_methods(self):
f = tempfile.NamedTemporaryFile(delete=False)
@@ -532,8 +537,14 @@ class TimeoutTest(TestCase):
t.join(5)
self.assertEqual(1, len(c2p))
- # waiting time should be 1s, use 3s to rule out false alarm
- self.assertGreater(3, c2p[0])
+ if isinstance(c2p[0], float):
+ # waiting time should be 1s, use 3s to rule out false alarm
+ self.assertGreater(3, c2p[0])
+ elif isinstance(c2p[0], RuntimeError):
+ # let @retry_on_address_already_in_use_error handle the error
+ raise c2p[0]
+ else:
+ raise RuntimeError("Unexpected type {}".format(type(c2p[0])))
@retry_on_address_already_in_use_error
def test_default_store_timeout_nccl(self):