==============================================================================*/
#include <deque>
+#include "tensorflow/core/common_runtime/process_function_library_runtime.h"
#include "tensorflow/core/framework/function.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/resource_op_kernel.h"
class FunctionBufferResourceHandleOp : public OpKernel {
public:
explicit FunctionBufferResourceHandleOp(OpKernelConstruction* ctx)
- : OpKernel(ctx) {
+ : OpKernel(ctx), flib_def_(nullptr), pflr_(nullptr) {
OP_REQUIRES_OK(ctx, ctx->GetAttr("f", &func_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("buffer_size", &buffer_size_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("container", &container_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("thread_pool_size", &thread_pool_size_));
}
+ ~FunctionBufferResourceHandleOp() override {
+ if (cinfo_.resource_is_private_to_kernel()) {
+ if (!cinfo_.resource_manager()
+ ->Delete<FunctionBufferingResource>(cinfo_.container(),
+ cinfo_.name())
+ .ok()) {
+ // Do nothing; the resource can have been deleted by session resets.
+ }
+ }
+ }
+
void Compute(OpKernelContext* ctx) override {
const Tensor* string_arg;
OP_REQUIRES_OK(ctx, ctx->input("string_arg", &string_arg));
const string& source_device = ctx->device()->name();
- ContainerInfo cinfo;
- OP_REQUIRES_OK(ctx, cinfo.Init(ctx->resource_manager(), def()));
- // Create the resource.
- FunctionBufferingResource* buffer;
- OP_REQUIRES_OK(
- ctx, ctx->resource_manager()->LookupOrCreate<FunctionBufferingResource>(
- cinfo.container(), cinfo.name(), &buffer,
- [lib, &source_device, &target_device, func_args,
- this](FunctionBufferingResource** ptr) {
- *ptr = new FunctionBufferingResource(
- lib, func_, buffer_size_, source_device, target_device,
- func_args, thread_pool_size_);
- return Status::OK();
- }));
- OP_REQUIRES_OK(ctx, buffer->Instantiate());
+ mutex_lock l(mu_);
+ if (!initialized_) {
+ OP_REQUIRES_OK(ctx, cinfo_.Init(ctx->resource_manager(), def()));
+ FunctionLibraryRuntime* clone_lib;
+ OP_REQUIRES_OK(ctx, lib->Clone(&flib_def_, &pflr_, &clone_lib));
+ // Create the resource.
+ FunctionBufferingResource* buffer;
+ OP_REQUIRES_OK(
+ ctx,
+ ctx->resource_manager()->LookupOrCreate<FunctionBufferingResource>(
+ cinfo_.container(), cinfo_.name(), &buffer,
+ [clone_lib, &source_device, &target_device, func_args,
+ this](FunctionBufferingResource** ptr) {
+ *ptr = new FunctionBufferingResource(
+ clone_lib, func_, buffer_size_, source_device,
+ target_device, func_args, thread_pool_size_);
+ return Status::OK();
+ }));
+ OP_REQUIRES_OK(ctx, buffer->Instantiate());
+ initialized_ = true;
+ }
OP_REQUIRES_OK(ctx, MakeResourceHandleToOutput(
- ctx, 0, cinfo.container(), cinfo.name(),
+ ctx, 0, cinfo_.container(), cinfo_.name(),
MakeTypeIndex<FunctionBufferingResource>()));
}
private:
+ mutex mu_;
+ ContainerInfo cinfo_ GUARDED_BY(mu_);
+ bool initialized_ GUARDED_BY(mu_) = false;
+ std::unique_ptr<FunctionLibraryDefinition> flib_def_;
+ std::unique_ptr<ProcessFunctionLibraryRuntime> pflr_;
NameAttrList func_;
int64 buffer_size_;
string container_;