from tensorflow.python.ops import sparse_ops
from tensorflow.python.ops import state_ops
from tensorflow.python.ops import variable_scope
-from tensorflow.python.ops.losses import losses
from tensorflow.python.platform import tf_logging
from tensorflow.python.training import device_setter as device_setter_lib
from tensorflow.python.training import training_util
-def replicate_model_fn(model_fn,
- optimizer_fn,
- loss_reduction=losses.Reduction.SUM,
- devices=None):
+def replicate_model_fn(model_fn, optimizer_fn, devices=None):
"""Replicate `Estimator.model_fn` over GPUs within a single host.
The given `model_fn` specifies a single forward pass of a model. To replicate
such a model over GPUs, each GPU gets its own instance of the forward pass
(a.k.a. a tower). The input features and labels get sharded into the chunks
- that correspond to the number of GPUs. Each tower computes a loss based
+ that correspond to the number of GPUs. Each tower computes its own loss based
on its input. For each such loss, gradients are computed. After that, the
- available losses are aggregated to form aggregated loss. Available
- gradients are summed. Then, they update weights using the specified
+ available losses are summed to form aggregated loss. The available
+ gradients are summed too. Then, they update weights using the specified
optimizer.
If `devices` are `None`, then all available GPUs are going to be used for
On reduction algorithms:
Certain algorithms were chosen for aggregating results of computations on
multiple towers:
- - Losses from all towers are reduced according to `loss_reduction`.
+ - Losses from all towers are reduced using sum.
- Gradients are reduced using sum for each trainable variable.
- `eval_metrics_ops` are reduced per metric using `reduce_mean`.
- `EstimatorSpec.predictions` and `EstimatorSpec.export_outputs` are
optimizer_fn: a function that returns an optimizer instance. The function
may accept one `params` argument. This is the `params` argument as
defined by `Estimator`. See the `Estimator` documentation for details.
- loss_reduction: controls whether losses are summed or averaged.
devices: Optional list of devices to replicate the model across. This
argument can be used to replice only on the subset of available GPUs.
If `None`, then all available GPUs are going to be used for replication.
return _replicate_model_fn_with_mode(
model_fn,
optimizer_fn,
- loss_reduction,
devices,
- # TODO(isaprykin): Query the system configuration to choose modes other
- # than `SHARED_LOCAL_PARAMETER_SERVER`, even though it is often
- # appropriate.
+ # TODO(isaprykin): Query system configuration to choose modes other than
+ # `SHARED_LOCAL_PARAMETER_SERVER`, even though it is often appropriate.
mode=_VariableDistributionMode.SHARED_LOCAL_PARAMETER_SERVER)
def _replicate_model_fn_with_mode(
model_fn,
optimizer_fn,
- loss_reduction=losses.Reduction.SUM,
devices=None,
mode=_VariableDistributionMode.SHARED_LOCAL_PARAMETER_SERVER):
"""A version of `replicate_model_fn` that allows to specify a `mode`."""
- if loss_reduction == losses.Reduction.NONE:
- raise ValueError('Tower losses need to be reduced in some way, yet {} '
- 'reduction is specified.'.format(loss_reduction))
if not devices:
devices = _get_local_devices('GPU') or _get_local_devices('CPU')
features=feature_shards,
labels=label_shards,
params=params,
- loss_reduction=loss_reduction,
config=config,
devices=devices,
local_ps_devices=ps_devices)
config,
devices,
local_ps_devices,
- loss_reduction=losses.Reduction.SUM,
name_scope_pattern=_DEFAULT_NAME_SCOPE_PATTERN):
"""Replicate the loss computation across devices."""
tower_specs = []
if labels:
labels_shard = labels[i]
- tower_spec = model_fn(
- mode=mode,
- features=features[i],
- labels=labels_shard,
- **optional_params)
- if loss_reduction != losses.Reduction.SUM:
- tower_spec = _scale_tower_loss(
- tower_spec, number_of_towers=len(devices))
- tower_specs.append(tower_spec)
+ tower_specs.append(
+ model_fn(
+ mode=mode,
+ features=features[i],
+ labels=labels_shard,
+ **optional_params))
return tower_specs
return local_device_chooser
-def _scale_tower_loss(tower_spec, number_of_towers):
- """Scale down the loss for arriving at the average loss by summing."""
- if tower_spec.loss is None:
- return tower_spec
-
- estimator_spec = tower_spec._asdict()
- estimator_spec['loss'] = math_ops.div(
- estimator_spec['loss'], 1.0 * number_of_towers, name='averaged_loss')
- return model_fn_lib.EstimatorSpec(**estimator_spec)
-
-
def _minimize_towers(tower_specs, optimizer):
"""Aggregate and apply gradients for computed losses."""
grad_lists = {}
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
-from tensorflow.python.ops import losses
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import metrics as metrics_lib
from tensorflow.python.ops import variable_scope
total_loss = (1.0 * 10 - 1.0) + (2.0 * 10 - 2.0)
self.assertEqual(total_loss, session.run(estimator_spec.loss))
- # derivative of loss = (1*c - 1) + (2*c - 2) is 3.
+ # loss' of c is 3.
# new value of c = 10 - learning rate * 3 = 7.0.
session.run(estimator_spec.train_op)
with variable_scope.variable_scope('', reuse=True):
c = variable_scope.get_variable('c', dtype=dtypes.float64)
self.assertEqual(7.0, session.run(c))
- def test_train_with_mean_reduction(self):
- features = np.array([[1.0], [2.0]])
- labels = np.array([[1.0], [2.0]])
-
- with self.test_session() as session:
- replicated_model_fn = replicate_model_fn.replicate_model_fn(
- self.model_fn,
- self.optimizer_fn,
- losses.Reduction.MEAN,
- devices=['/gpu:0', '/gpu:1'])
- estimator_spec = replicated_model_fn(
- features, labels, model_fn_lib.ModeKeys.TRAIN, self.params)
- session.run(variables.global_variables_initializer())
-
- # loss = feature * c - label
- total_loss = ((1.0 * 10 - 1.0) + (2.0 * 10 - 2.0)) / 2.0
- self.assertEqual(total_loss, session.run(estimator_spec.loss))
-
- # derivative of loss = (1*c - 1)/2 + (2*c - 2)/2 is 1.5.
- # It's the same computation as without mean reduction, but the
- # loss from every tower is scaled by 1/<number of towers>.
- # new value of c = 10 - learning rate * 1.5 = 8.5
- session.run(estimator_spec.train_op)
- with variable_scope.variable_scope('', reuse=True):
- c = variable_scope.get_variable('c', dtype=dtypes.float64)
- self.assertEqual(8.5, session.run(c))
-
def test_train_spec_with_optimizer_without_params(self):
def optimizer_fn_without_params():
self.assertEqual(0, auc)
self.assertNear(total_loss, session.run(estimator_spec.loss), 0.01)
- def test_eval_with_mean_reduction(self):
- features = np.array([[0.01], [0.002]])
- labels = np.array([[0.01], [0.02]])
-
- with self.test_session() as session:
- replicated_model_fn = replicate_model_fn.replicate_model_fn(
- self.model_fn,
- self.optimizer_fn,
- losses.Reduction.MEAN,
- devices=['/gpu:0', '/gpu:1'])
- estimator_spec = replicated_model_fn(
- features, labels, model_fn_lib.ModeKeys.EVAL, self.params)
- session.run(variables.local_variables_initializer())
- session.run(variables.global_variables_initializer())
-
- accuracy, a = estimator_spec.eval_metric_ops['accuracy']
- auc, b = estimator_spec.eval_metric_ops['auc']
-
- session.run([a, b])
- accuracy = session.run(accuracy)
- auc = session.run(auc)
-
- # loss[i] = features[i] * 10 - labels[i].
- # Accuracy is 0.0 (no match) in the first tower.
- # Accuracy is 1.0 (match) in the second tower, since the feature
- # times weight "c" happened to be equal to the label.
- total_loss = ((0.01 * 10 - 0.01) + (0.002 * 10 - 0.02)) / 2.0
-
- self.assertNear((0.0 + 1.0) / 2.0, accuracy, 0.01)
- self.assertEqual(0, auc)
- self.assertNear(total_loss, session.run(estimator_spec.loss), 0.01)
-
def test_predict(self):
features = np.array([[0.01], [0.002]])
labels = np.array([[0.01], [0.02]])
'probabilities': np.array([[0.1], [0.02]])
}, session.run(estimator_spec.predictions))
- def test_unsupported_loss_reduction(self):
- with self.assertRaisesRegexp(ValueError, ''):
- _ = replicate_model_fn.replicate_model_fn(
- self.model_fn, self.optimizer_fn, losses.Reduction.NONE)
-
class GetLossTowersTest(test_util.TensorFlowTestCase):
c = variable_scope.get_variable('c', dtype=dtypes.float64)
self.assertEqual(0.25, session.run(c))
- def test_gradients_are_computed_with_mean_reduction(self):
- with self.test_session() as session:
- tower_specs = replicate_model_fn._get_loss_towers(
- self.model_fn,
- mode=None,
- features=[[0.6], [1.6]],
- labels=[[0.6], [0.6]],
- params=None,
- loss_reduction=losses.Reduction.MEAN,
- config=None,
- devices=['/gpu:0', '/gpu:1'],
- local_ps_devices=['/gpu:0'],
- name_scope_pattern='test_tower_{}')
- session.run(variables.global_variables_initializer())
-
- self.assertEqual(len(tower_specs), 2)
-
- self.assertEqual('/device:GPU:0', tower_specs[0].loss.device)
- self.assertEqual('averaged_loss:0', tower_specs[0].loss.name)
- self.assertEqual(0.5, session.run(tower_specs[0].loss))
-
- self.assertEqual('/device:GPU:1', tower_specs[1].loss.device)
- self.assertEqual('test_tower_1/averaged_loss:0', tower_specs[1].loss.name)
- # The input batch for the second tower had a loss that is 1.0
- # bigger: 0.6 vs 1.6.
- self.assertEqual(1.0, session.run(tower_specs[1].loss))
-
- self.assertEqual(1, len(variables.global_variables()))
- self.assertEqual(1, len(variables.trainable_variables()))
-
- with variable_scope.variable_scope('', reuse=True):
- c = variable_scope.get_variable('c', dtype=dtypes.float64)
- self.assertEqual(0.25, session.run(c))
-
def test_variables_are_round_robined_correctly(self):
"""Test that creates multiple variables and tests round-robin placement."""