From 6b0ca8eae5d663ad3db560b428abcef465f09dbb Mon Sep 17 00:00:00 2001 From: Shen Li Date: Wed, 10 Apr 2019 20:30:46 -0700 Subject: [PATCH] Fix flaky store timeout test (#19114) Summary: ~Sometimes, `init_process_group()`, `store.get()`, and `destory_process_group()` can take more than a few seconds. Hence, removing thread join timeout.~ The error was due to `Address already in use` when starting TPC backend. The solution is to catch the error and report it to the `retry_on_address_already_in_use_error` decorator. Pull Request resolved: https://github.com/pytorch/pytorch/pull/19114 Reviewed By: ezyang Differential Revision: D14872680 Pulled By: mrshenli fbshipit-source-id: fc504d02853ca73f76288c0ade564ab20bc01f7e --- test/test_c10d.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/test/test_c10d.py b/test/test_c10d.py index 6b88fa9..f06dd88 100644 --- a/test/test_c10d.py +++ b/test/test_c10d.py @@ -504,16 +504,21 @@ class MultiProcessTestCase(TestCase): class TimeoutTest(TestCase): def _test_store_timeout(self, backend, init_method, c2p): - c10d.distributed_c10d.init_process_group( - backend=backend, init_method=init_method, world_size=1, rank=0, - timeout=timedelta(seconds=1)) - default_store = c10d.distributed_c10d._get_default_store() - tik = time.time() - with self.assertRaisesRegex(RuntimeError, "Timeout"): - default_store.get("nonexistent key") - tok = time.time() - c10d.destroy_process_group() - c2p.append(tok - tik) + try: + c10d.distributed_c10d.init_process_group( + backend=backend, init_method=init_method, world_size=1, rank=0, + timeout=timedelta(seconds=1)) + default_store = c10d.distributed_c10d._get_default_store() + tik = time.time() + with self.assertRaisesRegex(RuntimeError, "Timeout"): + default_store.get("nonexistent key") + tok = time.time() + c10d.destroy_process_group() + c2p.append(float(tok - tik)) + except RuntimeError as e: + # catch "Address already in use" error and report it to the main + # thread + c2p.append(e) def _init_methods(self): f = tempfile.NamedTemporaryFile(delete=False) @@ -532,8 +537,14 @@ class TimeoutTest(TestCase): t.join(5) self.assertEqual(1, len(c2p)) - # waiting time should be 1s, use 3s to rule out false alarm - self.assertGreater(3, c2p[0]) + if isinstance(c2p[0], float): + # waiting time should be 1s, use 3s to rule out false alarm + self.assertGreater(3, c2p[0]) + elif isinstance(c2p[0], RuntimeError): + # let @retry_on_address_already_in_use_error handle the error + raise c2p[0] + else: + raise RuntimeError("Unexpected type {}".format(type(c2p[0]))) @retry_on_address_already_in_use_error def test_default_store_timeout_nccl(self): -- 2.7.4