lib_ might get destroyed when there are 2 different graphs using the same FunctionBuf...
authorRohan Jain <rohanj@google.com>
Tue, 6 Mar 2018 19:26:18 +0000 (11:26 -0800)
committerTensorFlower Gardener <gardener@tensorflow.org>
Tue, 6 Mar 2018 19:35:40 +0000 (11:35 -0800)
Also, fixing the LookupOrCreate call in the handle op to run only once for initialization.

PiperOrigin-RevId: 188052319

tensorflow/contrib/data/kernels/BUILD
tensorflow/contrib/data/kernels/prefetching_kernels.cc
tensorflow/contrib/data/python/ops/prefetching_ops.py
tensorflow/core/BUILD

index 9bd6a42..c87da7d 100644 (file)
@@ -10,6 +10,7 @@ cc_library(
     name = "prefetching_kernels",
     srcs = ["prefetching_kernels.cc"],
     deps = [
+        "//tensorflow/core:core_cpu_headers_lib",
         "//tensorflow/core:framework_headers_lib",
         "//third_party/eigen3",
         "@protobuf_archive//:protobuf_headers",
index d3df14b..c0155e8 100644 (file)
@@ -14,6 +14,7 @@ limitations under the License.
 ==============================================================================*/
 #include <deque>
 
+#include "tensorflow/core/common_runtime/process_function_library_runtime.h"
 #include "tensorflow/core/framework/function.h"
 #include "tensorflow/core/framework/op_kernel.h"
 #include "tensorflow/core/framework/resource_op_kernel.h"
@@ -241,7 +242,7 @@ class FunctionBufferingResource : public ResourceBase {
 class FunctionBufferResourceHandleOp : public OpKernel {
  public:
   explicit FunctionBufferResourceHandleOp(OpKernelConstruction* ctx)
-      : OpKernel(ctx) {
+      : OpKernel(ctx), flib_def_(nullptr), pflr_(nullptr) {
     OP_REQUIRES_OK(ctx, ctx->GetAttr("f", &func_));
     OP_REQUIRES_OK(ctx, ctx->GetAttr("buffer_size", &buffer_size_));
     OP_REQUIRES_OK(ctx, ctx->GetAttr("container", &container_));
@@ -249,6 +250,17 @@ class FunctionBufferResourceHandleOp : public OpKernel {
     OP_REQUIRES_OK(ctx, ctx->GetAttr("thread_pool_size", &thread_pool_size_));
   }
 
+  ~FunctionBufferResourceHandleOp() override {
+    if (cinfo_.resource_is_private_to_kernel()) {
+      if (!cinfo_.resource_manager()
+               ->Delete<FunctionBufferingResource>(cinfo_.container(),
+                                                   cinfo_.name())
+               .ok()) {
+        // Do nothing; the resource can have been deleted by session resets.
+      }
+    }
+  }
+
   void Compute(OpKernelContext* ctx) override {
     const Tensor* string_arg;
     OP_REQUIRES_OK(ctx, ctx->input("string_arg", &string_arg));
@@ -267,28 +279,39 @@ class FunctionBufferResourceHandleOp : public OpKernel {
 
     const string& source_device = ctx->device()->name();
 
-    ContainerInfo cinfo;
-    OP_REQUIRES_OK(ctx, cinfo.Init(ctx->resource_manager(), def()));
-    // Create the resource.
-    FunctionBufferingResource* buffer;
-    OP_REQUIRES_OK(
-        ctx, ctx->resource_manager()->LookupOrCreate<FunctionBufferingResource>(
-                 cinfo.container(), cinfo.name(), &buffer,
-                 [lib, &source_device, &target_device, func_args,
-                  this](FunctionBufferingResource** ptr) {
-                   *ptr = new FunctionBufferingResource(
-                       lib, func_, buffer_size_, source_device, target_device,
-                       func_args, thread_pool_size_);
-                   return Status::OK();
-                 }));
-    OP_REQUIRES_OK(ctx, buffer->Instantiate());
+    mutex_lock l(mu_);
+    if (!initialized_) {
+      OP_REQUIRES_OK(ctx, cinfo_.Init(ctx->resource_manager(), def()));
+      FunctionLibraryRuntime* clone_lib;
+      OP_REQUIRES_OK(ctx, lib->Clone(&flib_def_, &pflr_, &clone_lib));
+      // Create the resource.
+      FunctionBufferingResource* buffer;
+      OP_REQUIRES_OK(
+          ctx,
+          ctx->resource_manager()->LookupOrCreate<FunctionBufferingResource>(
+              cinfo_.container(), cinfo_.name(), &buffer,
+              [clone_lib, &source_device, &target_device, func_args,
+               this](FunctionBufferingResource** ptr) {
+                *ptr = new FunctionBufferingResource(
+                    clone_lib, func_, buffer_size_, source_device,
+                    target_device, func_args, thread_pool_size_);
+                return Status::OK();
+              }));
+      OP_REQUIRES_OK(ctx, buffer->Instantiate());
+      initialized_ = true;
+    }
 
     OP_REQUIRES_OK(ctx, MakeResourceHandleToOutput(
-                            ctx, 0, cinfo.container(), cinfo.name(),
+                            ctx, 0, cinfo_.container(), cinfo_.name(),
                             MakeTypeIndex<FunctionBufferingResource>()));
   }
 
  private:
+  mutex mu_;
+  ContainerInfo cinfo_ GUARDED_BY(mu_);
+  bool initialized_ GUARDED_BY(mu_) = false;
+  std::unique_ptr<FunctionLibraryDefinition> flib_def_;
+  std::unique_ptr<ProcessFunctionLibraryRuntime> pflr_;
   NameAttrList func_;
   int64 buffer_size_;
   string container_;
index 96a9e9e..7059b35 100644 (file)
@@ -25,12 +25,14 @@ from tensorflow.contrib.data.python.ops import gen_dataset_ops
 # method and provides a get_next() that calls the prefetch op.
 def function_buffering_resource(string_arg,
                                 target_device,
-                                shared_name,
                                 f,
                                 buffer_size,
                                 thread_pool_size=1,
                                 container="",
+                                shared_name=None,
                                 name=None):
+  if shared_name is None:
+    shared_name = ""
   return gen_dataset_ops.function_buffering_resource(
       string_arg=string_arg,
       target_device=target_device,
index b7f84a4..619899a 100644 (file)
@@ -1874,6 +1874,13 @@ cc_header_only_library(
     ],
 )
 
+cc_header_only_library(
+    name = "core_cpu_headers_lib",
+    deps = [
+        ":core_cpu_lib",
+    ],
+)
+
 tf_cuda_library(
     name = "framework_internal_impl",
     srcs = FRAMEWORK_INTERNAL_PRIVATE_HEADERS + [