Let all_reduce_coalesced and all_gather_coalesced return Future objects (#64722)
authorShen Li <cs.shenli@gmail.com>
Fri, 10 Sep 2021 14:44:09 +0000 (07:44 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 10 Sep 2021 14:45:25 +0000 (07:45 -0700)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/64722

`all_reduce_coalesced` and `all_gather_coalesced` are never publicly
released in our API docs. So, I would assume the blast radius to be small.

The motivation for this change to allow implementing
`all_reduce_coalesced` and `all_gather_coalesced` by re-using `allreduce`
and `allgather` C++ cores and perform flatten and copy only on the Python
side. With that, we can then remove `all_reduce_coalesced` and
`all_gather_coalesced` from C++ ProcessGroup APIs. For the async mode,
the copy-back logic after the communication will need to be chained
as a callback on the returned Future and use the chained child Future
as the return value (otherwise, we will need to wrap the child Future
into another work handle). This PR tries to test if we can directly
return a Future without breaking tests and internal use cases. If yes,
it will make the consolidation a lot easier.

cc pietern mrshenli pritamdamania87 zhaojuanmao satgera rohan-varma gqchen aazzolini osalpekar jiayisuse agolynski SciPioneer H-Huang mrzzd cbalioglu gcramer23

Test Plan: Imported from OSS

Reviewed By: rohan-varma

Differential Revision: D30830994

Pulled By: mrshenli

fbshipit-source-id: dcde0ed9245e9e8fee357b3588b07d540a4b6318

test/distributed/test_c10d_gloo.py
torch/distributed/distributed_c10d.py

index 789d76e..15c3dc8 100644 (file)
@@ -605,6 +605,10 @@ class ProcessGroupGlooTest(MultiProcessTestCase):
     def test_allreduce_coalesced_basics(self):
         self._test_allreduce_coalesced_basics(lambda t: t.clone())
 
+    def _expected_output(self, i):
+        ws = self.world_size
+        return 2 * [torch.tensor([(i * ws) + (ws * (ws - 1) / 2)])]
+
     def _test_allreduce_coalesced_stress(self, inputs):
         store = c10d.FileStore(self.file_name, self.world_size)
         pg = self._create_process_group_gloo(
@@ -618,17 +622,9 @@ class ProcessGroupGlooTest(MultiProcessTestCase):
             result = future_handle.value()
             # TODO(#38095): Replace assertEqualIgnoreType. See issue #38095
             self.assertEqualIgnoreType(
-                2
-                * [
-                    torch.tensor(
-                        [
-                            (i * self.world_size)
-                            + (self.world_size * (self.world_size - 1) / 2)
-                        ]
-                    )
-                ],
+                self._expected_output(i),
                 result,
-                msg="Mismatch in interation {}".format(i),
+                msg="Mismatch in iteration {}".format(i),
             )
 
     @requires_gloo()
@@ -637,6 +633,23 @@ class ProcessGroupGlooTest(MultiProcessTestCase):
         self._test_allreduce_coalesced_stress(inputs)
 
     @requires_gloo()
+    def test_allreduce_coalesced_async(self):
+        store = c10d.FileStore(self.file_name, self.world_size)
+        c10d.init_process_group(
+            backend="gloo", rank=self.rank, world_size=self.world_size, store=store
+        )
+
+        xs = [2 * [torch.tensor([i + self.rank])] for i in range(2)]
+        futs = [c10d.all_reduce_coalesced(x, async_op=True) for x in xs]
+        torch.futures.wait_all(futs)
+        for i, fut in enumerate(futs):
+            self.assertEqualIgnoreType(
+                self._expected_output(i),
+                fut.wait(),
+                msg="Mismatch in iteration {}".format(i),
+            )
+
+    @requires_gloo()
     def test_sparse_allreduce_checks(self):
         store = c10d.FileStore(self.file_name, self.world_size)
         pg = self._create_process_group_gloo(
@@ -1186,6 +1199,29 @@ class ProcessGroupGlooTest(MultiProcessTestCase):
             c10d.all_gather_coalesced(dummy_output_lists, dummy_input, pg)
 
     @requires_gloo()
+    def test_allgather_coalesced_async(self):
+        store = c10d.FileStore(self.file_name, self.world_size)
+        c10d.init_process_group(
+            backend="gloo", rank=self.rank, world_size=self.world_size, store=store
+        )
+
+        xxs = [2 * [torch.tensor([i + self.rank])] for i in range(2)]
+        yys = [[[torch.zeros_like(x) for x in xx] for _ in range(self.world_size)] for xx in xxs]
+        futs = [c10d.all_gather_coalesced(yy, xx, async_op=True) for xx, yy in zip(xxs, yys)]
+
+        # expected outputs
+        zzs = [[2 * [torch.tensor([i + r])] for r in range(self.world_size)] for i in range(2)]
+
+        torch.futures.wait_all(futs)
+        for yy, zz in zip(yys, zzs):
+            # one iteration
+            for y_out, z_out in zip(yy, zz):
+                # one output tensor list
+                for y, z in zip(y_out, z_out):
+                    # one tensor in output tensor list
+                    self.assertEqualIgnoreType(y, z)
+
+    @requires_gloo()
     def test_reduce_checks(self):
         store = c10d.FileStore(self.file_name, self.world_size)
         pg = pg = self._create_process_group_gloo(
index 302114e..e0c2d89 100644 (file)
@@ -1343,7 +1343,7 @@ def all_reduce_coalesced(tensors, op=ReduceOp.SUM, group=None, async_op=False):
         work = group.allreduce_coalesced(tensors, opts)
 
     if async_op:
-        return work
+        return work.get_future()
     else:
         work.wait()
 
@@ -2145,7 +2145,7 @@ def all_gather_coalesced(
         work = group.allgather_coalesced(output_tensor_lists, input_tensor_list)
 
     if async_op:
-        return work
+        return work.get_future()
     else:
         work.wait()