diff options
author | Shen Li <cs.shenli@gmail.com> | 2019-04-10 20:30:46 -0700 |
---|---|---|
committer | Facebook Github Bot <facebook-github-bot@users.noreply.github.com> | 2019-04-10 20:35:36 -0700 |
commit | 6b0ca8eae5d663ad3db560b428abcef465f09dbb (patch) | |
tree | 314a2269b3d7c6adf74ce2565dd6f4f8a257f46d /test | |
parent | 821b5f138a987807032a2fd908fe10a5be5439d9 (diff) | |
download | pytorch-6b0ca8eae5d663ad3db560b428abcef465f09dbb.tar.gz pytorch-6b0ca8eae5d663ad3db560b428abcef465f09dbb.tar.bz2 pytorch-6b0ca8eae5d663ad3db560b428abcef465f09dbb.zip |
Fix flaky store timeout test (#19114)
Summary:
~Sometimes, `init_process_group()`, `store.get()`, and `destory_process_group()` can take more than a few seconds. Hence, removing thread join timeout.~
The error was due to `Address already in use` when starting TPC backend. The solution is to catch the error and report it to the `retry_on_address_already_in_use_error` decorator.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/19114
Reviewed By: ezyang
Differential Revision: D14872680
Pulled By: mrshenli
fbshipit-source-id: fc504d02853ca73f76288c0ade564ab20bc01f7e
Diffstat (limited to 'test')
-rw-r--r-- | test/test_c10d.py | 35 |
1 files changed, 23 insertions, 12 deletions
diff --git a/test/test_c10d.py b/test/test_c10d.py index 6b88fa925b..f06dd88d45 100644 --- a/test/test_c10d.py +++ b/test/test_c10d.py @@ -504,16 +504,21 @@ class MultiProcessTestCase(TestCase): class TimeoutTest(TestCase): def _test_store_timeout(self, backend, init_method, c2p): - c10d.distributed_c10d.init_process_group( - backend=backend, init_method=init_method, world_size=1, rank=0, - timeout=timedelta(seconds=1)) - default_store = c10d.distributed_c10d._get_default_store() - tik = time.time() - with self.assertRaisesRegex(RuntimeError, "Timeout"): - default_store.get("nonexistent key") - tok = time.time() - c10d.destroy_process_group() - c2p.append(tok - tik) + try: + c10d.distributed_c10d.init_process_group( + backend=backend, init_method=init_method, world_size=1, rank=0, + timeout=timedelta(seconds=1)) + default_store = c10d.distributed_c10d._get_default_store() + tik = time.time() + with self.assertRaisesRegex(RuntimeError, "Timeout"): + default_store.get("nonexistent key") + tok = time.time() + c10d.destroy_process_group() + c2p.append(float(tok - tik)) + except RuntimeError as e: + # catch "Address already in use" error and report it to the main + # thread + c2p.append(e) def _init_methods(self): f = tempfile.NamedTemporaryFile(delete=False) @@ -532,8 +537,14 @@ class TimeoutTest(TestCase): t.join(5) self.assertEqual(1, len(c2p)) - # waiting time should be 1s, use 3s to rule out false alarm - self.assertGreater(3, c2p[0]) + if isinstance(c2p[0], float): + # waiting time should be 1s, use 3s to rule out false alarm + self.assertGreater(3, c2p[0]) + elif isinstance(c2p[0], RuntimeError): + # let @retry_on_address_already_in_use_error handle the error + raise c2p[0] + else: + raise RuntimeError("Unexpected type {}".format(type(c2p[0]))) @retry_on_address_already_in_use_error def test_default_store_timeout_nccl(self): |