From 7da2448d6276370edde67ef22582613d819de2c3 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 3 Dec 2018 09:37:16 -0800 Subject: [PATCH] Fix multi-argument allreduce in ProcessGroupGloo (#14688) Summary: If multiple arguments are specified to c10d allreduce, they are interpreted as if they are expanding the ranks in the process group. Therefore, not only is every argument to allreduce an input that must be considered, it is also an output. The problem that this commit fixes is that they were not correctly considered as outputs. The upstream problem is tracked in facebookincubator/gloo#152. Once this is fixed there we can remove the copies that this commit adds. This fixes #14676. Pull Request resolved: https://github.com/pytorch/pytorch/pull/14688 Differential Revision: D13294405 Pulled By: pietern fbshipit-source-id: 078a2a0a0ff12d051392461438f1496201ec3cb9 --- test/test_c10d.py | 47 +++++++++++++++++++++++++++++++++---- torch/lib/c10d/ProcessGroupGloo.cpp | 14 ++++++++++- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/test/test_c10d.py b/test/test_c10d.py index 0a44010..19479ce 100644 --- a/test/test_c10d.py +++ b/test/test_c10d.py @@ -116,6 +116,31 @@ def simple_reduce_tests(rank, world_size): ] +def simple_multi_input_reduce_tests(rank, world_size): + return [ + ( + c10d.ReduceOp.SUM, + [torch.Tensor([2 * rank + 0.0]), torch.Tensor([2 * rank + 1.0])], + torch.Tensor([float(world_size * (2 * world_size - 1))]), + ), + ( + c10d.ReduceOp.PRODUCT, + [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])], + torch.Tensor([float(math.factorial(2 * world_size))]), + ), + ( + c10d.ReduceOp.MIN, + [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])], + torch.Tensor([1.0]), + ), + ( + c10d.ReduceOp.MAX, + [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])], + torch.Tensor([2 * world_size]), + ), + ] + + class StoreTestBase(object): def _create_store(self, i): raise RuntimeError("not implemented") @@ -639,13 +664,27 @@ class ProcessGroupGlooTest(MultiProcessTestCase): def _test_allreduce_basics(self, fn): store = c10d.FileStore(self.file.name, self.world_size) pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts()) - for (op, input, output) in simple_reduce_tests(self.rank, self.world_size): + + # Single input tests + tests = simple_reduce_tests(self.rank, self.world_size) + for (op, input, output) in tests: + opts = c10d.AllreduceOptions() + opts.reduceOp = op + tensor = fn(input) + work = pg.allreduce([tensor], opts) + work.wait() + self.assertEqual(output, tensor) + + # Multi input tests + tests = simple_multi_input_reduce_tests(self.rank, self.world_size) + for (op, inputs, output) in tests: opts = c10d.AllreduceOptions() opts.reduceOp = op - tmp = fn(input) - work = pg.allreduce([tmp], opts) + tensors = [fn(input) for input in inputs] + work = pg.allreduce(tensors, opts) work.wait() - self.assertEqual(output, tmp) + for tensor in tensors: + self.assertEqual(output, tensor) # Test overloaded convenience function (defaults to using sum) x = fn(torch.Tensor([self.rank + 1.0])) diff --git a/torch/lib/c10d/ProcessGroupGloo.cpp b/torch/lib/c10d/ProcessGroupGloo.cpp index 4b07edb..1ce9244 100644 --- a/torch/lib/c10d/ProcessGroupGloo.cpp +++ b/torch/lib/c10d/ProcessGroupGloo.cpp @@ -461,6 +461,14 @@ class AsyncAllreduceWork : public ProcessGroupGloo::AsyncWork { void run() override { allreduce(inputs); + + // Only the first output in the tensor list contains the results. + // See https://github.com/facebookincubator/gloo/issues/152. + // The contents is the same for every entry in the tensor list, so + // we can use the first entry as the source of the copy below. + for (size_t i = 1; i < inputs.size(); i++) { + inputs[i].copy_(inputs[0]); + } } template @@ -510,10 +518,14 @@ class AsyncAllreduceCUDAWork : public AsyncAllreduceWork { allreduce(tmp); // Kick off copy back to the CUDA tensors. + // Only the first output in the tensor list contains the results. + // See https://github.com/facebookincubator/gloo/issues/152. + // The contents is the same for every entry in the tensor list, so + // we can use the first entry as the source of the copy below. at::cuda::OptionalCUDAStreamGuard stream_guard; for (size_t i = 0; i < inputs.size(); i++) { stream_guard.reset_stream(streams[i]); - inputs[i].copy_(tmp[i], /* non_blocking */ true); + inputs[i].copy_(tmp[0], /* non_blocking */ true); events[i].record(streams[i]); } } -- 2.7.4