From: Mingsheng Hong Date: Sat, 28 Apr 2018 15:55:08 +0000 (-0700) Subject: This is Part 1 of Swift<->TF sends/recvs: support sending tensors from TF to X-Git-Tag: upstream/v1.9.0_rc1~190^2^2~10 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=4c5699582aa368edfbe058d770407a558729f305;p=platform%2Fupstream%2Ftensorflow.git This is Part 1 of Swift<->TF sends/recvs: support sending tensors from TF to Swift via direct session. The changes are: 1. Added an experimental TF C API TF_DequeueNamedTensor() to consume the queued tensors from a dequeue op. One use case is for the Swift host program to consume tensors sent by TF, where the queue is a Fifo queue managed by TF. Enqueuing tensors are done by running an enqueue op in a graph. The queued tensors are not persisted, and will be lost if the process/machine dies. The queue has a bounded capacity, to prevent producer from being unboundedly ahead of consumer. while caller of TF_DequeueNamedTensor() could have run the Fifo dequeue op directly, the extra level of indirection provided by this API allows us to more easily switch the queuing impl to another mechanism. If and once we stabilize on the Fifo queue based impl, we can remove this API. 2. Added a new S4TF runtime API _TFCReceiveTensorHandle() that receives a tensor via TF_DequeueNamedTensor(). 3. To support tensor receives in host program, taught PartitionCloner in TFPartition to insert SIL code to call _TFCReceiveTensorHandle(). 4. To support tensor sends in accelerator program, taught TFGraphLowering in generate QueueEnqueueV2 nodes in the TF graphs, with appropriate control dependence to make sure these nodes get executed. a) The enqueue produces no output tensor, and is executed only for its side effect. To ensure it is executed properly, control dependence is wired up. The general design is: before a TF_Function (can be a top level function or the body function of a while op) produces an output tensor OT, make OT control dependent on the enqueue op, so that enqueue gets run before the function returns. b) If tensor send occurs in a while loop body, the body logic currently gets lowered in 3 places: the while op cond function, the while op body function, and the ops at the same level as the while op itself (for running the last loop iteration). In this case, the correct TFGraph lowering is to run the enqueue in the last 2 out of the 3 places above. After this CL, the dual versions of the above (dequeuing via an op, and enqueuing via C API) will be added. PiperOrigin-RevId: 194658511 --- diff --git a/tensorflow/c/c_api_experimental.cc b/tensorflow/c/c_api_experimental.cc index d3916bc..82dbd3c 100644 --- a/tensorflow/c/c_api_experimental.cc +++ b/tensorflow/c/c_api_experimental.cc @@ -8368,3 +8368,42 @@ TF_Operation* TF_MakeFileBasedIteratorGetNextWithDatasets( return getnext_node; #endif } + +TF_Tensor* TF_DequeueNamedTensor(TF_Session* session, int tensor_id, + TF_Status* status) { + assert(session); + { + tensorflow::mutex_lock c(session->graph->mu); + VLOG(1) << "Dequeuing named tensor with id " << tensor_id + << ", with input graph: " + << session->graph->graph.ToGraphDefDebug().DebugString(); + } + + TF_Operation* dequeue_op = TF_GraphOperationByName( + session->graph, + tensorflow::strings::StrCat("fifo_queue_dequeue_", tensor_id).c_str()); + if (dequeue_op == nullptr) { + status->status = tensorflow::errors::Internal( + "Unable to find the dequeue node in the TF graph."); + return nullptr; + } + + VLOG(1) << "Running the dequeue op"; + TF_Output output{dequeue_op, 0}; + TF_Tensor* ret; + TF_SessionRun(session, /*run_options*/ nullptr, + // input related parameters + /*inputs*/ nullptr, /*input_values*/ nullptr, /*ninputs*/ 0, + // output related parameters + /*outputs*/ &output, /*output_values*/ &ret, + /*noutputs*/ 1, + /*targets*/ nullptr, /*ntargets*/ 0, + /*run_metadata*/ nullptr, status); + if (VLOG_IS_ON(1) && status->status.ok()) { + tensorflow::Tensor tensor; + if (tensorflow::TF_TensorToTensor(ret, &tensor).ok()) { + VLOG(1) << "Dequeued tensor content: " << tensor.DebugString(); + } + } + return ret; +} diff --git a/tensorflow/c/c_api_experimental.h b/tensorflow/c/c_api_experimental.h index 88cb173..e6757c0 100644 --- a/tensorflow/c/c_api_experimental.h +++ b/tensorflow/c/c_api_experimental.h @@ -86,6 +86,16 @@ TF_CAPI_EXPORT extern TF_Operation* TF_MakeFileBasedIteratorGetNextWithDatasets( TF_Graph* graph, const char* file_path, int batch_size, unsigned char is_mnist, TF_Status* status); +// On success, dequeues a tensor from a TF-managed FifoQueue given by +// `tensor_id`, associated with `session`. Caller must call TF_DeleteTensor() +// over the returned tensor. If the queue is empty, this call is blocked. +// +// Tensors are enqueued via the corresponding TF enqueue op. +// TODO(hongm): Add support for `timeout_ms`. +TF_CAPI_EXPORT extern TF_Tensor* TF_DequeueNamedTensor(TF_Session* session, + int tensor_id, + TF_Status* status); + #ifdef __cplusplus } /* end extern "C" */ #endif