srcs = ["dataset.cc"],
hdrs = ["dataset.h"],
deps = [
+ "//tensorflow/core:core_cpu",
"//tensorflow/core:framework",
"//tensorflow/core:graph",
"//tensorflow/core:lib",
const Tensor& first_element = batch_elements[0][component_index];
TensorShape batch_component_shape({num_batch_elements});
batch_component_shape.AppendShape(first_element.shape());
- Tensor batch_component(cpu_allocator(), first_element.dtype(),
+ Tensor batch_component(ctx->allocator({}), first_element.dtype(),
batch_component_shape);
// Build the output tuple component by copying one slice
// from each input element in the batch.
limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/data/dataset.h"
+#include "tensorflow/core/common_runtime/device.h"
#include "tensorflow/core/graph/graph_def_builder.h"
#include "tensorflow/core/graph/node_builder.h"
MakeDataset(ctx, input, another_input, output);
}
+Allocator* IteratorContext::allocator(AllocatorAttributes attrs) {
+ return params_.lib->device()->GetAllocator(attrs);
+}
+
const char GraphDatasetBase::kDatasetGraphKey[] = "_DATASET_GRAPH";
const char GraphDatasetBase::kDatasetGraphOutputNodeKey[] =
"_DATASET_GRAPH_OUTPUT_NODE";
// The FunctionLibraryRuntime object to be used to make function calls.
FunctionLibraryRuntime* lib = nullptr;
std::shared_ptr<const FunctionLibraryDefinition> function_library = nullptr;
+
+ // The Allocator to be used to allocate the output of an iterator.
+ Allocator* allocator = nullptr;
};
explicit IteratorContext(Params params) : params_(std::move(params)) {}
void set_lib(FunctionLibraryRuntime* lib) { params_.lib = lib; }
+ Allocator* allocator(AllocatorAttributes attrs);
+
private:
Params params_;
};
// Determine the size of the output tensors:
// * dense_shape will be [`row_shape + 1`].
- Tensor dense_shape(cpu_allocator(), DT_INT64, {row_ndims + 1});
+ Tensor dense_shape(ctx->allocator({}), DT_INT64, {row_ndims + 1});
auto dense_shape_vec = dense_shape.vec<int64>();
for (size_t i = 0; i < row_ndims; ++i) {
if (row_shape.dim_size(i) == -1) {
// * indices will be [`total_elements`, `row_shape + 1`].
// * values will be [`total_elements`].
- Tensor indices(cpu_allocator(), DT_INT64,
+ Tensor indices(ctx->allocator({}), DT_INT64,
{total_elements, row_ndims + 1});
Tensor values(
- cpu_allocator(),
+ ctx->allocator({}),
DatasetIterator<Dataset<T>>::dataset()->input_->output_dtypes()[0],
{total_elements});
auto indices_matrix = indices.matrix<int64>();
TensorShape component_shape(
batch_results_[current_batch_index_].output[i].shape());
component_shape.set_dim(0, num_elements);
- Tensor component(cpu_allocator(), output[i].dtype(),
+ Tensor component(ctx->allocator({}), output[i].dtype(),
component_shape);
TF_RETURN_IF_ERROR(
CopyPartialBatch(&component, output[i], num_elements));
return Status::OK();
}
- void EnsureOutputAllocated(BatchResult* batch_result,
+ void EnsureOutputAllocated(IteratorContext* ctx,
+ BatchResult* batch_result,
const std::vector<Tensor>& return_values) {
mutex_lock l(batch_result->mu);
if (batch_result->output_allocated) {
for (size_t i = 0; i < num_components; ++i) {
TensorShape component_shape({dataset()->batch_size_});
component_shape.AppendShape(return_values[i].shape());
- Tensor component(cpu_allocator(), return_values[i].dtype(),
+ Tensor component(ctx->allocator({}), return_values[i].dtype(),
component_shape);
batch_result->output.emplace_back(std::move(component));
}
dataset()->captured_func_->RunAsync(
ctx, std::move(input_element), &result->return_values,
[this, ctx, result, batch_result, offset](Status ret_status) {
- delete ctx;
result->status.Update(ret_status);
if (ret_status.ok()) {
- EnsureOutputAllocated(batch_result,
+ EnsureOutputAllocated(ctx, batch_result,
result->return_values);
const size_t num_components =
result->return_values.size();
}
}
}
+ delete ctx;
// NOTE(mrry): We clear the return values here to release
// any memory associated with them and to paralellize the
// destruction of the tensors (which can be surprisingly
// 2. Copy each batch element to the appropriate location in
// the output component tensor.
- Tensor batch_component(cpu_allocator(),
+ Tensor batch_component(ctx->allocator({}),
output_dtypes()[component_index],
batch_component_shape);
TF_RETURN_IF_ERROR(SetElementZero(
std::vector<Tensor>* out_tensors,
bool* end_of_sequence) override {
mutex_lock l(mu_);
- Tensor value_tensor(cpu_allocator(), DT_INT64, {});
+ Tensor value_tensor(ctx->allocator({}), DT_INT64, {});
value_tensor.scalar<int64>()() = Random();
out_tensors->emplace_back(std::move(value_tensor));
*end_of_sequence = false;
*end_of_sequence = true;
return Status::OK();
}
- Tensor value_tensor(cpu_allocator(), DT_INT64, {});
+ Tensor value_tensor(ctx->allocator({}), DT_INT64, {});
value_tensor.scalar<int64>()() = next_;
out_tensors->emplace_back(std::move(value_tensor));
*end_of_sequence = false;
if (s.ok()) {
// Produce the line as output.
- Tensor line_tensor(cpu_allocator(), DT_STRING, {});
+ Tensor line_tensor(ctx->allocator({}), DT_STRING, {});
line_tensor.scalar<string>()() = line_contents;
out_tensors->emplace_back(std::move(line_tensor));
*end_of_sequence = false;
TF_RETURN_IF_ERROR(
input_buffer_->ReadNBytes(dataset()->record_bytes_, &record));
// Produce the record as output.
- Tensor record_tensor(cpu_allocator(), DT_STRING, {});
+ Tensor record_tensor(ctx->allocator({}), DT_STRING, {});
record_tensor.scalar<string>()() = record;
out_tensors->emplace_back(std::move(record_tensor));
*end_of_sequence = false;
do {
// We are currently processing a file, so try to read the next record.
if (reader_) {
- Tensor result_tensor(cpu_allocator(), DT_STRING, {});
+ Tensor result_tensor(ctx->allocator({}), DT_STRING, {});
Status s = reader_->ReadRecord(&result_tensor.scalar<string>()());
if (s.ok()) {
out_tensors->emplace_back(std::move(result_tensor));
deps = [
"//tensorflow/core:framework",
"//tensorflow/core:lib",
+ "//tensorflow/core/kernels/data:dataset",
"//tensorflow/core/lib/db:sqlite",
],
)
namespace tensorflow {
+class IteratorContext;
+
namespace sql {
// This interface allows a user to connect to a database, execute a query, and
// iterate over the result set, putting the results into an output tensor.
// If there are no more rows in the result set, then instead `true` will be
// stored in `*end_of_sequence`, and the content of `*out_tensors` will be
// undefined.
- virtual Status GetNext(std::vector<Tensor>* out_tensors,
+ virtual Status GetNext(IteratorContext* ctx, std::vector<Tensor>* out_tensors,
bool* end_of_sequence) = 0;
};
#include "tensorflow/core/kernels/data/sql/sqlite_query_connection.h"
#include "tensorflow/core/framework/register_types.h"
+#include "tensorflow/core/kernels/data/dataset.h"
#include "tensorflow/core/lib/strings/stringprintf.h"
namespace tensorflow {
return Status::OK();
}
-Status SqliteQueryConnection::GetNext(std::vector<Tensor>* out_tensors,
+Status SqliteQueryConnection::GetNext(IteratorContext* ctx,
+ std::vector<Tensor>* out_tensors,
bool* end_of_sequence) {
if (!stmt_) TF_RETURN_IF_ERROR(PrepareQuery());
TF_RETURN_IF_ERROR(stmt_.Step(end_of_sequence));
if (!*end_of_sequence) {
for (int i = 0; i < column_count_; i++) {
DataType dt = output_types_[i];
- Tensor tensor(cpu_allocator(), dt, {});
+ // TODO(mrry): Pass in the `IteratorContext::allocator()`.
+ Tensor tensor(ctx->allocator({}), dt, {});
FillTensorWithResultSetEntry(dt, i, &tensor);
out_tensors->emplace_back(std::move(tensor));
}
Status Open(const string& data_source_name, const string& query,
const DataTypeVector& output_types) override;
Status Close() override;
- Status GetNext(std::vector<Tensor>* out_tensors,
+ Status GetNext(IteratorContext* ctx, std::vector<Tensor>* out_tensors,
bool* end_of_sequence) override;
private:
}
}
- Status GetNextInternal(IteratorContext* /*ctx*/,
+ Status GetNextInternal(IteratorContext* ctx,
std::vector<Tensor>* out_tensors,
bool* end_of_sequence) override {
mutex_lock l(mu_);
return s;
}
}
- return query_connection_->GetNext(out_tensors, end_of_sequence);
+ return query_connection_->GetNext(ctx, out_tensors, end_of_sequence);
}
private:
out_tensors->reserve(dataset()->tensors_.size());
for (int i = 0; i < dataset()->tensors_.size(); ++i) {
const Tensor& t = dataset()->tensors_[i];
- Tensor t_slice(cpu_allocator(), t.dtype(),
+ Tensor t_slice(ctx->allocator({}), t.dtype(),
TensorShape(dataset()->shapes_[i].dim_sizes()));
TF_RETURN_IF_ERROR(batch_util::CopySliceToElement(t, &t_slice, i_));
out_tensors->emplace_back(std::move(t_slice));