from tensorflow.python.framework import errors
from tensorflow.python.framework import function
from tensorflow.python.framework import ops
+from tensorflow.python.framework import sparse_tensor
from tensorflow.python.framework import test_util
from tensorflow.python.ops import resource_variable_ops
from tensorflow.python.platform import test
with self.assertRaises(errors.OutOfRangeError):
sess.run(next_element)
+ def testPrefetchSparseTensorsToDevice(self):
+ def make_tensor(i):
+ return sparse_tensor.SparseTensorValue(
+ indices=[[0, 0]], values=(i*[1]), dense_shape=[2, 2])
+ host_dataset = dataset_ops.Dataset.range(10).map(make_tensor)
+
+ device_dataset = host_dataset.apply(
+ prefetching_ops.prefetch_to_device("/cpu:1"))
+
+ # NOTE(mrry): This device block creates the "host" dataset and iterator on
+ # /cpu:0, and ensures that the prefetching is across devices. In typical use
+ # this would not be necessary, because the GPU device would not support any
+ # of the dataset-related ops.
+ with ops.device("/cpu:0"):
+ iterator = device_dataset.make_one_shot_iterator()
+
+ self.assertEqual(host_dataset.output_types, device_dataset.output_types)
+ self.assertEqual(host_dataset.output_types, iterator.output_types)
+ self.assertEqual(host_dataset.output_shapes, device_dataset.output_shapes)
+ self.assertEqual(host_dataset.output_shapes, iterator.output_shapes)
+ self.assertEqual(host_dataset.output_classes, device_dataset.output_classes)
+ self.assertEqual(host_dataset.output_classes, iterator.output_classes)
+
+ next_element = iterator.get_next()
+ self.assertEqual(dtypes.int64, next_element.dtype)
+
+ worker_config = config_pb2.ConfigProto()
+ worker_config.device_count["CPU"] = 2
+ with self.test_session(config=worker_config) as sess:
+ for i in range(10):
+ actual = sess.run(next_element)
+ self.assertAllEqual([i], actual.values)
+ self.assertAllEqual([[0, 0]], actual.indices)
+ self.assertAllEqual([2, 2], actual.dense_shape)
+ with self.assertRaises(errors.OutOfRangeError):
+ sess.run(next_element)
+
def testPrefetchToDeviceGpu(self):
if not test_util.is_gpu_available():
self.skipTest("No GPU available")
This feature is in early stages and there are a lot of improvements forthcoming:
* Metrics are not yet supported during distributed training.
-* Summaries are currently computed in every tower.
+* Summaries are only computed in the first tower in `MirroredStrategy`.
* Evaluation is not yet distributed.
* Eager support is in the works; performance can be more challenging with eager
execution.
adjusting your learning rate or batch size according to the number of GPUs.
We are working on addressing this limitation by splitting each batch across GPUs
instead.
-* Dictionaries inside dataset in the input are not supported when prefetching
-on GPUs is turned on. (If you need to use dictionaries in the dataset, turn off
-prefetching on GPUs by passing param `prefetch_on_device=False` to
-`MirroredStrategy`)
* PartitionedVariables are not supported yet.
## What's next?
@function.Defun(dtypes.string)
def _prefetch_fn(handle):
+ """Prefetches one element from `input_iterator`."""
remote_iterator = iterator_ops.Iterator.from_string_handle(
handle, input_iterator.output_types, input_iterator.output_shapes,
input_iterator.output_classes)
- return remote_iterator.get_next()
+ ret = remote_iterator.get_next()
+ return nest.flatten(sparse.serialize_sparse_tensors(ret))
target_device = gen_dataset_ops.iterator_get_device(
input_iterator._iterator_resource)