Pull changes from prefetching_ops to support dicts in prefetching_ops_v2 in distribut...
authorPriya Gupta <priyag@google.com>
Fri, 6 Apr 2018 19:36:20 +0000 (12:36 -0700)
committerTensorFlower Gardener <gardener@tensorflow.org>
Fri, 6 Apr 2018 19:38:42 +0000 (12:38 -0700)
Also update readme to reflect the support of dictionaries.

PiperOrigin-RevId: 191924990

tensorflow/contrib/data/python/kernel_tests/prefetching_ops_test.py
tensorflow/contrib/distribute/README.md
tensorflow/contrib/distribute/python/estimator_integration_test.py
tensorflow/contrib/distribute/python/prefetching_ops_v2.py

index 4b50260..b08132c 100644 (file)
@@ -28,6 +28,7 @@ from tensorflow.python.framework import dtypes
 from tensorflow.python.framework import errors
 from tensorflow.python.framework import function
 from tensorflow.python.framework import ops
+from tensorflow.python.framework import sparse_tensor
 from tensorflow.python.framework import test_util
 from tensorflow.python.ops import resource_variable_ops
 from tensorflow.python.platform import test
@@ -265,6 +266,43 @@ class PrefetchToDeviceTest(test.TestCase):
       with self.assertRaises(errors.OutOfRangeError):
         sess.run(next_element)
 
+  def testPrefetchSparseTensorsToDevice(self):
+    def make_tensor(i):
+      return sparse_tensor.SparseTensorValue(
+          indices=[[0, 0]], values=(i*[1]), dense_shape=[2, 2])
+    host_dataset = dataset_ops.Dataset.range(10).map(make_tensor)
+
+    device_dataset = host_dataset.apply(
+        prefetching_ops.prefetch_to_device("/cpu:1"))
+
+    # NOTE(mrry): This device block creates the "host" dataset and iterator on
+    # /cpu:0, and ensures that the prefetching is across devices. In typical use
+    # this would not be necessary, because the GPU device would not support any
+    # of the dataset-related ops.
+    with ops.device("/cpu:0"):
+      iterator = device_dataset.make_one_shot_iterator()
+
+    self.assertEqual(host_dataset.output_types, device_dataset.output_types)
+    self.assertEqual(host_dataset.output_types, iterator.output_types)
+    self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes)
+    self.assertEqual(host_dataset.output_shapes, iterator.output_shapes)
+    self.assertEqual(host_dataset.output_classes, device_dataset.output_classes)
+    self.assertEqual(host_dataset.output_classes, iterator.output_classes)
+
+    next_element = iterator.get_next()
+    self.assertEqual(dtypes.int64, next_element.dtype)
+
+    worker_config = config_pb2.ConfigProto()
+    worker_config.device_count["CPU"] = 2
+    with self.test_session(config=worker_config) as sess:
+      for i in range(10):
+        actual = sess.run(next_element)
+        self.assertAllEqual([i], actual.values)
+        self.assertAllEqual([[0, 0]], actual.indices)
+        self.assertAllEqual([2, 2], actual.dense_shape)
+      with self.assertRaises(errors.OutOfRangeError):
+        sess.run(next_element)
+
   def testPrefetchToDeviceGpu(self):
     if not test_util.is_gpu_available():
       self.skipTest("No GPU available")
index 28483f4..14de1e8 100644 (file)
@@ -117,7 +117,7 @@ in the input function gives a solid boost in performance. When using
 This feature is in early stages and there are a lot of improvements forthcoming:
 
 * Metrics are not yet supported during distributed training.
-* Summaries are currently computed in every tower.
+* Summaries are only computed in the first tower in `MirroredStrategy`.
 * Evaluation is not yet distributed.
 * Eager support is in the works; performance can be more challenging with eager
 execution.
@@ -129,10 +129,6 @@ effective batch size will be `num_gpus * batch_size`. Therefore, consider
 adjusting your learning rate or batch size according to the number of GPUs.
 We are working on addressing this limitation by splitting each batch across GPUs
 instead.
-* Dictionaries inside dataset in the input are not supported when prefetching
-on GPUs is turned on. (If you need to use dictionaries in the dataset, turn off
-prefetching on GPUs by passing param `prefetch_on_device=False` to
-`MirroredStrategy`)
 * PartitionedVariables are not supported yet.
 
 ## What's next?
index 2b49b8f..c5a520a 100644 (file)
@@ -61,7 +61,7 @@ class DNNLinearCombinedClassifierIntegrationTest(test.TestCase,
           mode=['graph'],
           distribution=[
               combinations.one_device_strategy,
-              combinations.mirrored_strategy_without_prefetch
+              combinations.mirrored_strategy_with_gpu_and_cpu
           ]))
   def test_complete_flow_with_mode(self, distribution):
     label_dimension = 2
index e1ddf3c..dfcbb85 100644 (file)
@@ -45,10 +45,12 @@ class _PrefetchToDeviceIterator(object):
 
     @function.Defun(dtypes.string)
     def _prefetch_fn(handle):
+      """Prefetches one element from `input_iterator`."""
       remote_iterator = iterator_ops.Iterator.from_string_handle(
           handle, input_iterator.output_types, input_iterator.output_shapes,
           input_iterator.output_classes)
-      return remote_iterator.get_next()
+      ret = remote_iterator.get_next()
+      return nest.flatten(sparse.serialize_sparse_tensors(ret))
 
     target_device = gen_dataset_ops.iterator_get_device(
         input_iterator._iterator_resource)