From 067be1aa75d9065b6cc57ba6316fc17544a9fdf1 Mon Sep 17 00:00:00 2001 From: Mingsheng Hong Date: Sat, 5 May 2018 07:10:58 -0700 Subject: [PATCH] Part 2 of Swift<->TF sends/recvs: support receiving tensors in TF from Swift via direct session. The changes are: 1. Added a TF experimental C API for Swift host to enqueue a tensor for sending to TF. Again, the C APIs can be removed once the Fifo-queue based design proves stable later. 2. TFLowerGraph is extended to generate Fifo related nodes for TF to receive tensors. This is similar to the extension for TF to send tensors. 3. TFPartition is extended to support host send (createHostSend()), which does the tensor send via a new protocol method TensorSendableReceivable.sendToDevice(). The main complexity is in sending a scalar, where a new protocol method TensorSendableReceivable.createScalarTensor() is called to first create a tensor out of it, and then send it over to TF. Also removed code for protocol conformance on AccelerableByTensorFlow. Instead have compiler look up that conformance from the SILFunction on sending/receiving tensors. AccelerableByTensorFlow could be removed from the compiler-known protocol list now, but we'll defer that till things can stabilized more (in the past this protocol has been added to and removed from the list at different times). PiperOrigin-RevId: 195539436 --- tensorflow/c/c_api_experimental.cc | 48 ++++++++++++++++++++++++++++++++++++++ tensorflow/c/c_api_experimental.h | 23 ++++++++++++++++-- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/tensorflow/c/c_api_experimental.cc b/tensorflow/c/c_api_experimental.cc index 82dbd3c..95b04f9 100644 --- a/tensorflow/c/c_api_experimental.cc +++ b/tensorflow/c/c_api_experimental.cc @@ -8407,3 +8407,51 @@ TF_Tensor* TF_DequeueNamedTensor(TF_Session* session, int tensor_id, } return ret; } + +void TF_EnqueueNamedTensor(TF_Session* session, int tensor_id, + TF_Tensor* tensor, TF_Status* status) { + assert(session); + { + tensorflow::mutex_lock c(session->graph->mu); + if (VLOG_IS_ON(1)) { + VLOG(1) << "Enqueuing named tensor with id " << tensor_id + << ", with input graph: " + << session->graph->graph.ToGraphDefDebug().DebugString(); + tensorflow::Tensor internal_tensor; + if (tensorflow::TF_TensorToTensor(tensor, &internal_tensor).ok()) { + VLOG(1) << "Enqueu'ing tensor content: " + << internal_tensor.DebugString(); + } + } + } + + TF_Operation* enqueue_op = TF_GraphOperationByName( + session->graph, + tensorflow::strings::StrCat("fifo_queue_enqueue_", tensor_id).c_str()); + if (enqueue_op == nullptr) { + status->status = tensorflow::errors::Internal( + "Unable to find the enqueue node in the TF graph."); + return; + } + + TF_Operation* placeholder_op = TF_GraphOperationByName( + session->graph, + tensorflow::strings::StrCat("arg_tensor_enqueue_", tensor_id).c_str()); + if (placeholder_op == nullptr) { + status->status = tensorflow::errors::Internal( + "Unable to find the placeholder node as input to enqueue in the TF " + "graph."); + return; + } + + VLOG(1) << "Running the enqueue op"; + TF_Output input{placeholder_op, 0}; + TF_SessionRun(session, /*run_options*/ nullptr, + // input related parameters + /*inputs*/ &input, /*input_values*/ &tensor, /*ninputs*/ 1, + // output related parameters + /*outputs*/ nullptr, /*output_values*/ nullptr, /*noutputs*/ 0, + /*targets*/ &enqueue_op, /*ntargets*/ 1, + /*run_metadata*/ nullptr, status); + VLOG(1) << "Enqueuing is done."; +} diff --git a/tensorflow/c/c_api_experimental.h b/tensorflow/c/c_api_experimental.h index e6757c0..20bdace 100644 --- a/tensorflow/c/c_api_experimental.h +++ b/tensorflow/c/c_api_experimental.h @@ -87,8 +87,11 @@ TF_CAPI_EXPORT extern TF_Operation* TF_MakeFileBasedIteratorGetNextWithDatasets( unsigned char is_mnist, TF_Status* status); // On success, dequeues a tensor from a TF-managed FifoQueue given by -// `tensor_id`, associated with `session`. Caller must call TF_DeleteTensor() -// over the returned tensor. If the queue is empty, this call is blocked. +// `tensor_id`, associated with `session`. There must be a graph node named +// "fifo_queue_dequeue_", to be executed by this API call. + +// Caller must call TF_DeleteTensor() over the returned tensor. If the queue is +// empty, this call is blocked. // // Tensors are enqueued via the corresponding TF enqueue op. // TODO(hongm): Add support for `timeout_ms`. @@ -96,6 +99,22 @@ TF_CAPI_EXPORT extern TF_Tensor* TF_DequeueNamedTensor(TF_Session* session, int tensor_id, TF_Status* status); +// On success, enqueues `tensor` into a TF-managed FifoQueue given by +// `tensor_id`, associated with `session`. There must be a graph node named +// "fifo_queue_enqueue_", to be executed by this API call. It reads +// from a placeholder node "arg_tensor_enqueue_". +// +// `tensor` is still owned by the caller. This call will be blocked if the queue +// has reached its capacity, and will be unblocked when the queued tensors again +// drop below the capacity due to dequeuing. +// +// Tensors are dequeued via the corresponding TF dequeue op. +// TODO(hongm): Add support for `timeout_ms`. +TF_CAPI_EXPORT extern void TF_EnqueueNamedTensor(TF_Session* session, + int tensor_id, + TF_Tensor* tensor, + TF_Status* status); + #ifdef __cplusplus } /* end extern "C" */ #endif -- 2.7.4