InstantiateOptions options_copy(options);
options_copy.target = device_name_;
const string key = Canonicalize(function_name, attrs, options_copy);
- *handle = parent_->GetHandle(key);
- if (*handle != kInvalidHandle) {
+
+ {
mutex_lock l(mu_);
- items_[parent_->GetHandleOnDevice(device_name_, *handle)]->Ref();
- return Status::OK();
+ *handle = parent_->GetHandle(key);
+ if (*handle != kInvalidHandle) {
+ FunctionLibraryRuntime::LocalHandle handle_on_device =
+ parent_->GetHandleOnDevice(device_name_, *handle);
+ if (handle_on_device == kInvalidLocalHandle) {
+ return errors::Internal("LocalHandle not found for handle ", *handle,
+ ".");
+ }
+ auto item_handle = items_.find(handle_on_device);
+ if (item_handle == items_.end()) {
+ return errors::Internal("LocalHandle ", handle_on_device,
+ " for handle ", *handle,
+ " not found in items.");
+ }
+ item_handle->second->Ref();
+ return Status::OK();
+ }
}
Status s;
}
LocalHandle h = parent_->GetHandleOnDevice(device_name_, handle);
+ CHECK_NE(h, kInvalidLocalHandle);
mutex_lock l(mu_);
CHECK_EQ(1, items_.count(h));
Item* item = items_[h];
with self.assertRaises(errors.OutOfRangeError):
sess.run(get_next)
+ def testParallelFilters(self):
+ dataset = dataset_ops.Dataset.range(10).filter(
+ lambda x: math_ops.equal(x % 2, 0))
+ iterators = [dataset.make_one_shot_iterator() for _ in range(10)]
+ next_elements = [iterator.get_next() for iterator in iterators]
+ with self.test_session() as sess:
+ self.assertEqual([0 for _ in range(10)], sess.run(next_elements))
+
class FilterDatasetBenchmark(test.Benchmark):