From 2a81e8b8f1526b375d1e78402f91bf8fd82d2b68 Mon Sep 17 00:00:00 2001 From: Shen Li Date: Fri, 10 Sep 2021 07:44:09 -0700 Subject: [PATCH] Let all_reduce_coalesced and all_gather_coalesced return Future objects (#64722) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/64722 `all_reduce_coalesced` and `all_gather_coalesced` are never publicly released in our API docs. So, I would assume the blast radius to be small. The motivation for this change to allow implementing `all_reduce_coalesced` and `all_gather_coalesced` by re-using `allreduce` and `allgather` C++ cores and perform flatten and copy only on the Python side. With that, we can then remove `all_reduce_coalesced` and `all_gather_coalesced` from C++ ProcessGroup APIs. For the async mode, the copy-back logic after the communication will need to be chained as a callback on the returned Future and use the chained child Future as the return value (otherwise, we will need to wrap the child Future into another work handle). This PR tries to test if we can directly return a Future without breaking tests and internal use cases. If yes, it will make the consolidation a lot easier. cc pietern mrshenli pritamdamania87 zhaojuanmao satgera rohan-varma gqchen aazzolini osalpekar jiayisuse agolynski SciPioneer H-Huang mrzzd cbalioglu gcramer23 Test Plan: Imported from OSS Reviewed By: rohan-varma Differential Revision: D30830994 Pulled By: mrshenli fbshipit-source-id: dcde0ed9245e9e8fee357b3588b07d540a4b6318 --- test/distributed/test_c10d_gloo.py | 56 ++++++++++++++++++++++++++++------- torch/distributed/distributed_c10d.py | 4 +-- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/test/distributed/test_c10d_gloo.py b/test/distributed/test_c10d_gloo.py index 789d76e..15c3dc8 100644 --- a/test/distributed/test_c10d_gloo.py +++ b/test/distributed/test_c10d_gloo.py @@ -605,6 +605,10 @@ class ProcessGroupGlooTest(MultiProcessTestCase): def test_allreduce_coalesced_basics(self): self._test_allreduce_coalesced_basics(lambda t: t.clone()) + def _expected_output(self, i): + ws = self.world_size + return 2 * [torch.tensor([(i * ws) + (ws * (ws - 1) / 2)])] + def _test_allreduce_coalesced_stress(self, inputs): store = c10d.FileStore(self.file_name, self.world_size) pg = self._create_process_group_gloo( @@ -618,17 +622,9 @@ class ProcessGroupGlooTest(MultiProcessTestCase): result = future_handle.value() # TODO(#38095): Replace assertEqualIgnoreType. See issue #38095 self.assertEqualIgnoreType( - 2 - * [ - torch.tensor( - [ - (i * self.world_size) - + (self.world_size * (self.world_size - 1) / 2) - ] - ) - ], + self._expected_output(i), result, - msg="Mismatch in interation {}".format(i), + msg="Mismatch in iteration {}".format(i), ) @requires_gloo() @@ -637,6 +633,23 @@ class ProcessGroupGlooTest(MultiProcessTestCase): self._test_allreduce_coalesced_stress(inputs) @requires_gloo() + def test_allreduce_coalesced_async(self): + store = c10d.FileStore(self.file_name, self.world_size) + c10d.init_process_group( + backend="gloo", rank=self.rank, world_size=self.world_size, store=store + ) + + xs = [2 * [torch.tensor([i + self.rank])] for i in range(2)] + futs = [c10d.all_reduce_coalesced(x, async_op=True) for x in xs] + torch.futures.wait_all(futs) + for i, fut in enumerate(futs): + self.assertEqualIgnoreType( + self._expected_output(i), + fut.wait(), + msg="Mismatch in iteration {}".format(i), + ) + + @requires_gloo() def test_sparse_allreduce_checks(self): store = c10d.FileStore(self.file_name, self.world_size) pg = self._create_process_group_gloo( @@ -1186,6 +1199,29 @@ class ProcessGroupGlooTest(MultiProcessTestCase): c10d.all_gather_coalesced(dummy_output_lists, dummy_input, pg) @requires_gloo() + def test_allgather_coalesced_async(self): + store = c10d.FileStore(self.file_name, self.world_size) + c10d.init_process_group( + backend="gloo", rank=self.rank, world_size=self.world_size, store=store + ) + + xxs = [2 * [torch.tensor([i + self.rank])] for i in range(2)] + yys = [[[torch.zeros_like(x) for x in xx] for _ in range(self.world_size)] for xx in xxs] + futs = [c10d.all_gather_coalesced(yy, xx, async_op=True) for xx, yy in zip(xxs, yys)] + + # expected outputs + zzs = [[2 * [torch.tensor([i + r])] for r in range(self.world_size)] for i in range(2)] + + torch.futures.wait_all(futs) + for yy, zz in zip(yys, zzs): + # one iteration + for y_out, z_out in zip(yy, zz): + # one output tensor list + for y, z in zip(y_out, z_out): + # one tensor in output tensor list + self.assertEqualIgnoreType(y, z) + + @requires_gloo() def test_reduce_checks(self): store = c10d.FileStore(self.file_name, self.world_size) pg = pg = self._create_process_group_gloo( diff --git a/torch/distributed/distributed_c10d.py b/torch/distributed/distributed_c10d.py index 302114e..e0c2d89 100644 --- a/torch/distributed/distributed_c10d.py +++ b/torch/distributed/distributed_c10d.py @@ -1343,7 +1343,7 @@ def all_reduce_coalesced(tensors, op=ReduceOp.SUM, group=None, async_op=False): work = group.allreduce_coalesced(tensors, opts) if async_op: - return work + return work.get_future() else: work.wait() @@ -2145,7 +2145,7 @@ def all_gather_coalesced( work = group.allgather_coalesced(output_tensor_lists, input_tensor_list) if async_op: - return work + return work.get_future() else: work.wait() -- 2.7.4