module.def(
"_compute_bucket_assignment_by_size",
- &::c10d::compute_bucket_assignment_by_size,
+ [](const std::vector<at::Tensor>& tensors,
+ const std::vector<size_t>& bucket_size_limits,
+ const std::vector<bool>& expect_sparse_gradient,
+ const std::vector<int64_t>& tensor_indices,
+ const c10::optional<std::shared_ptr<::c10d::Logger>>& logger) {
+ if (logger.has_value()) {
+ std::weak_ptr<::c10d::Logger> logger_weakref = logger.value();
+ return ::c10d::compute_bucket_assignment_by_size(tensors, bucket_size_limits, expect_sparse_gradient, tensor_indices, {logger_weakref});
+ } else {
+ return ::c10d::compute_bucket_assignment_by_size(tensors, bucket_size_limits, expect_sparse_gradient, tensor_indices, {});
+ }
+ },
py::arg("tensors"),
py::arg("bucket_size"),
py::arg("expect_sparse_gradient") = std::vector<bool>(),
py::arg("tensor_indices") = std::vector<int64_t>(),
+ py::arg("logger") = c10::optional<std::shared_ptr<::c10d::Logger>>{},
py::call_guard<py::gil_scoped_release>());
module.def(
"_verify_model_across_ranks",
- &::c10d::verify_replica0_across_processes,
+ [](const c10::intrusive_ptr<::c10d::ProcessGroup>& process_group,
+ const std::vector<std::vector<at::Tensor>>& model_replicas,
+ const c10::optional<std::shared_ptr<::c10d::Logger>>& logger) {
+ if (logger.has_value()) {
+ std::weak_ptr<::c10d::Logger> logger_weakref = logger.value();
+ verify_replica0_across_processes(process_group, model_replicas, {logger_weakref});
+ } else {
+ verify_replica0_across_processes(process_group, model_replicas, {});
+ }
+ },
py::arg("process_group"),
py::arg("replicas"),
+ py::arg("logger") = c10::optional<std::shared_ptr<::c10d::Logger>>{},
py::call_guard<py::gil_scoped_release>());
module.def(
rebuilt_params_,
bucket_size_limits,
expect_sparse_gradients_[0],
- rebuilt_param_indices_);
+ rebuilt_param_indices_,
+ logger_);
if (ddp_set_last_bucket_as_small) {
// Reverse again because buckets were rebuilt in the opposite of gradient
const std::vector<at::Tensor>& tensors,
const std::vector<size_t>& bucket_size_limits,
const std::vector<bool>& expect_sparse_gradient,
- const std::vector<int64_t>& tensor_indices) {
+ const std::vector<int64_t>& tensor_indices,
+ const c10::optional<std::weak_ptr<c10d::Logger>>& logger) {
// Either expect_sparse_gradient is not specified or it has as many elements
// as the vector with tensors.
TORCH_INTERNAL_ASSERT(
for (const auto i : c10::irange(tensors.size())) {
const auto& tensor = tensors[i];
- // TODO: This is not a reducer method so it does not have access to logger,
- // pass in logger directly here.
- TORCH_CHECK(!tensor.is_sparse(), "No support for sparse tensors.");
+ auto msg = std::string("No support for sparse tensors.");
+ if (logger.has_value()) {
+ REDUCER_CHECK(!tensor.is_sparse(), logger.value(), msg);
+ } else {
+ TORCH_CHECK(!tensor.is_sparse(), msg);
+ }
// when tensor_indices is empty, the index of tensors[i] assigned to
// bucket is i, otherwise the tensor index is tensor_indices[i].
// Verifies corresponding params in replica 0 have the same sizes/strides
// across processes.
void verify_replica0_across_processes(
- c10::intrusive_ptr<c10d::ProcessGroup> process_group,
- std::vector<std::vector<at::Tensor>> model_replicas) {
+ const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
+ const std::vector<std::vector<at::Tensor>>& model_replicas,
+ const c10::optional<std::weak_ptr<c10d::Logger>>& logger) {
size_t i = 0;
for (const auto& t : model_replicas[0]) {
i += 2 * t.dim();
// I'd like to include which process we are in the message,
// but ProcessGroup::getRank is not public!
for (const auto& sz : t.sizes()) {
- // TODO: pass in logger and use REDUCER_CHECK.
- TORCH_CHECK(
- sz == control_accessor[i++],
- "replicas[0][",
- p,
- "] in this process"
- " with sizes ",
- t.sizes(),
- " appears not to match sizes of the same param in process 0.");
+ auto msg = c10::str("replicas[0][", p, "] in this process",
+ " with sizes ",
+ t.sizes(),
+ " appears not to match sizes of the same param in process 0.");
+ if (logger.has_value()) {
+ REDUCER_CHECK(sz == control_accessor[i++], logger.value(), msg)
+ } else {
+ TORCH_CHECK(sz == control_accessor[i++], msg)
+ }
+
}
for (const auto& str : t.strides()) {
- // TODO: pass in logger and use REDUCER_CHECK.
- TORCH_CHECK(
- str == control_accessor[i++],
- "replicas[0][",
- p,
- "] in this process"
- " with strides ",
- t.strides(),
- " appears not to match strides of the same param in process 0.");
+ auto msg = c10::str("replicas[0][", p, "] in this process",
+ " with sizes ",
+ t.sizes(),
+ " appears not to match strides of the same param in process 0.");
+ if (logger.has_value()) {
+ REDUCER_CHECK(str == control_accessor[i++], logger.value(), msg)
+ } else {
+ TORCH_CHECK(str == control_accessor[i++], msg)
+ }
}
}
}
):
dist.scatter_object_list([], scatter_list, src=src_rank)
- @require_backend({"gloo", "nccl"})
- @require_backends_available({"gloo", "nccl"})
- @skip_if_lt_x_gpu(2)
- @skip_if_rocm
- def test_ddp_model_diff_across_ranks(self):
+ def _generate_sparse_tensors_for_bucket_assignment_test(self):
+ tensors = [
+ torch.empty([50], dtype=torch.float),
+ torch.empty([25], dtype=torch.double),
+ torch.empty([50], dtype=torch.float),
+ torch.empty([25], dtype=torch.double),
+ torch.empty([50], dtype=torch.float),
+ torch.empty([25], dtype=torch.double),
+ ]
+
+ tensors_sparse = [t.to_sparse() for t in tensors]
+ return tensors_sparse
+
+ def _test_compute_bucket_assignment_by_size(self, use_logger):
group_gloo = dist.new_group(
timeout=timedelta(seconds=60), backend=dist.Backend.GLOO
)
backend=dist.get_backend(), timeout=timedelta(seconds=5)
)
torch.cuda.set_device(self.rank)
- # Creates network with different sized embedding table on different
- # ranks. This should throw an error during DDP init.
- net = EmbeddingNet(self.rank)
+
+ # Create a valid model. The constructor initializes the logger that we use later.
+ # We never actually use the rest of the model - we only need its logger.
+ net = EmbeddingNet(0)
+ net = torch.nn.parallel.DistributedDataParallel(
+ net.to(self.rank),
+ device_ids=[self.rank],
+ process_group=group_to_use,
+ )
+
+ # if we don't pass a logger then we can only check that an exception was thrown.
+ expected_err = "No support for sparse tensors."
+ with self.assertRaisesRegex(RuntimeError, expected_err):
+ tensors_sparse = self._generate_sparse_tensors_for_bucket_assignment_test()
+ if use_logger:
+ result = dist._compute_bucket_assignment_by_size(
+ tensors_sparse,
+ [400],
+ logger=net.logger)
+ else:
+ result = dist._compute_bucket_assignment_by_size(tensors_sparse, [400])
+ if use_logger:
+ verify_ddp_error_logged(net, expected_err)
+
+ # Perform gloo-based barrier to ensure one rank doesn't exit test
+ # early which causes failure with Barrier.sync.
+ dist.barrier(group_gloo)
+
+ @require_backend({"gloo", "nccl"})
+ @require_backends_available({"gloo", "nccl"})
+ @skip_if_lt_x_gpu(2)
+ @skip_if_rocm
+ def test_compute_bucket_assignment_by_size_sparse_error_without_logger(self):
+ self._test_compute_bucket_assignment_by_size(use_logger=False)
+
+ @require_backend({"gloo", "nccl"})
+ @require_backends_available({"gloo", "nccl"})
+ @skip_if_lt_x_gpu(2)
+ @skip_if_rocm
+ def test_compute_bucket_assignment_by_size_sparse_error_with_logger(self):
+ self._test_compute_bucket_assignment_by_size(use_logger=True)
+
+ def _determine_expected_error_verify_model_across_rank(self, group_to_use):
# When running with NCCL backend, we don't expect an error on rank 0,
# rather, it will be taken down by NCCL_ASYNC_ERROR_HANDLING. When
# running with Gloo or with debug mode wrapper, we expect the error
is_detail_dbg_mode = (
dist._get_debug_mode() == dist._DistributedDebugLevel.DETAIL
)
- rank_0_ctx = (
- self.assertRaisesRegex(
- RuntimeError, "Caught collective operation timeout"
- )
- if dist.get_backend(group_to_use) == dist.Backend.NCCL
- and not is_detail_dbg_mode
- # Gloo can raise various exception messages, so just assert
- # Runtime error here.
- else self.assertRaises(RuntimeError)
+ if self.rank == 0:
+ if dist.get_backend(group_to_use) == dist.Backend.NCCL and not is_detail_dbg_mode:
+ expected_err = "Caught collective operation timeout"
+ ctx = self.assertRaisesRegex(RuntimeError, expected_err)
+ else:
+ expected_err = None
+ ctx = self.assertRaises(RuntimeError)
+ else:
+ expected_err = "appears not to match"
+ ctx = self.assertRaisesRegex(RuntimeError, expected_err)
+ return ctx, expected_err
+
+ def _test_verify_model_across_rank(self, use_logger):
+ group_gloo = dist.new_group(
+ timeout=timedelta(seconds=60), backend=dist.Backend.GLOO
)
- ctx = (
- rank_0_ctx
- if self.rank == 0
- else self.assertRaisesRegex(RuntimeError, "appears not to match")
+ # Set NCCL_BLOCKING_WAIT and use a new NCCL group to improve test
+ # determinism.
+ os.environ["NCCL_BLOCKING_WAIT"] = "1"
+ group_to_use = dist.new_group(
+ backend=dist.get_backend(), timeout=timedelta(seconds=5)
)
+ torch.cuda.set_device(self.rank)
+ ctx, expected_err = self._determine_expected_error_verify_model_across_rank(group_to_use)
+
+ # Create a valid model. The constructor initializes the logger that we use later.
+ net = EmbeddingNet(0)
+ net = torch.nn.parallel.DistributedDataParallel(
+ net.to(self.rank),
+ device_ids=[self.rank],
+ process_group=group_to_use,
+ )
+
+ # Modify the model so that the number of parameters are different for each rank.
+ # This will cause a RuntimeError to be thrown below in dist._verify_model_across_ranks,
+ # so we can check if the correct error is thrown and is logged.
+ # We can't do this in the constructor above otherwise the logger will
+ # not be properly initialized.
+ net.module.lin = nn.Linear(100 if self.rank == 0 else 10, 1)
+
+ # if we pass a logger we can verify that it was logged
+ with ctx:
+ if use_logger:
+ dist._verify_model_across_ranks(net.process_group, [list(net.parameters())], net.logger)
+ else:
+ dist._verify_model_across_ranks(net.process_group, [list(net.parameters())])
+ # Should only be run by rank 0, and blocking_wait catches and
+ # reports exception.
+ dist.barrier(group_to_use)
+
+ # We don't check when self.rank != 0 because the logger doesn't log
+ # the error "Caught collective operation" as that is not thrown in the reducer.
+ if use_logger and self.rank != 0:
+ verify_ddp_error_logged(net, expected_err)
+
+ # Perform gloo-based barrier to ensure one rank doesn't exit test
+ # early which causes failure with Barrier.sync.
+ dist.barrier(group_gloo)
+
+ @require_backend({"gloo", "nccl"})
+ @require_backends_available({"gloo", "nccl"})
+ @skip_if_lt_x_gpu(2)
+ @skip_if_rocm
+ def test_verify_model_across_rank_with_logger(self):
+ self._test_verify_model_across_rank(use_logger=True)
+
+ @require_backend({"gloo", "nccl"})
+ @require_backends_available({"gloo", "nccl"})
+ @skip_if_lt_x_gpu(2)
+ @skip_if_rocm
+ def test_verify_model_across_rank_without_logger(self):
+ self._test_verify_model_across_rank(use_logger=False)
+
+ @require_backend({"gloo", "nccl"})
+ @require_backends_available({"gloo", "nccl"})
+ @skip_if_lt_x_gpu(2)
+ @skip_if_rocm
+ def test_ddp_model_diff_across_ranks(self):
+ group_gloo = dist.new_group(
+ timeout=timedelta(seconds=60), backend=dist.Backend.GLOO
+ )
+ # Set NCCL_BLOCKING_WAIT and use a new NCCL group to improve test
+ # determinism.
+ os.environ["NCCL_BLOCKING_WAIT"] = "1"
+ group_to_use = dist.new_group(
+ backend=dist.get_backend(), timeout=timedelta(seconds=5)
+ )
+ torch.cuda.set_device(self.rank)
+ ctx, expected_err = self._determine_expected_error_verify_model_across_rank(group_to_use)
+ # Creates network with different sized embedding table on different
+ # ranks. This should throw an error during DDP init.
+ net = EmbeddingNet(self.rank)
with ctx:
net = torch.nn.parallel.DistributedDataParallel(
net.to(self.rank),
# Should only be run by rank 0, and blocking_wait catches and
# reports exception.
dist.barrier(group_to_use)
+ # can't use verify_ddp_error_logged here because net was never properly constructed
# Perform gloo-based barrier to ensure one rank doesn't exit test
# early which causes failure with Barrier.sync.