def test_allreduce_coalesced_basics(self):
self._test_allreduce_coalesced_basics(lambda t: t.clone())
+ def _expected_output(self, i):
+ ws = self.world_size
+ return 2 * [torch.tensor([(i * ws) + (ws * (ws - 1) / 2)])]
+
def _test_allreduce_coalesced_stress(self, inputs):
store = c10d.FileStore(self.file_name, self.world_size)
pg = self._create_process_group_gloo(
result = future_handle.value()
# TODO(#38095): Replace assertEqualIgnoreType. See issue #38095
self.assertEqualIgnoreType(
- 2
- * [
- torch.tensor(
- [
- (i * self.world_size)
- + (self.world_size * (self.world_size - 1) / 2)
- ]
- )
- ],
+ self._expected_output(i),
result,
- msg="Mismatch in interation {}".format(i),
+ msg="Mismatch in iteration {}".format(i),
)
@requires_gloo()
self._test_allreduce_coalesced_stress(inputs)
@requires_gloo()
+ def test_allreduce_coalesced_async(self):
+ store = c10d.FileStore(self.file_name, self.world_size)
+ c10d.init_process_group(
+ backend="gloo", rank=self.rank, world_size=self.world_size, store=store
+ )
+
+ xs = [2 * [torch.tensor([i + self.rank])] for i in range(2)]
+ futs = [c10d.all_reduce_coalesced(x, async_op=True) for x in xs]
+ torch.futures.wait_all(futs)
+ for i, fut in enumerate(futs):
+ self.assertEqualIgnoreType(
+ self._expected_output(i),
+ fut.wait(),
+ msg="Mismatch in iteration {}".format(i),
+ )
+
+ @requires_gloo()
def test_sparse_allreduce_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = self._create_process_group_gloo(
c10d.all_gather_coalesced(dummy_output_lists, dummy_input, pg)
@requires_gloo()
+ def test_allgather_coalesced_async(self):
+ store = c10d.FileStore(self.file_name, self.world_size)
+ c10d.init_process_group(
+ backend="gloo", rank=self.rank, world_size=self.world_size, store=store
+ )
+
+ xxs = [2 * [torch.tensor([i + self.rank])] for i in range(2)]
+ yys = [[[torch.zeros_like(x) for x in xx] for _ in range(self.world_size)] for xx in xxs]
+ futs = [c10d.all_gather_coalesced(yy, xx, async_op=True) for xx, yy in zip(xxs, yys)]
+
+ # expected outputs
+ zzs = [[2 * [torch.tensor([i + r])] for r in range(self.world_size)] for i in range(2)]
+
+ torch.futures.wait_all(futs)
+ for yy, zz in zip(yys, zzs):
+ # one iteration
+ for y_out, z_out in zip(yy, zz):
+ # one output tensor list
+ for y, z in zip(y_out, z_out):
+ # one tensor in output tensor list
+ self.assertEqualIgnoreType(y, z)
+
+ @requires_gloo()
def test_reduce_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = pg = self._create_process_group_gloo(