From b1f5f433959406c7aad634c05e85ccd62fd06e87 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Wed, 7 Feb 2018 16:46:31 -0800 Subject: [PATCH] Support CopyFile with streaming (#12658) * Support CopyFile with streaming This fix tries to address the issue raised in 12641 where it was not possible to have CopyFile with streaming. The original implementation copies the whole content of the file to a string buffer and write to the file. This could be an issue if the file size is large (than memory of the host). This fix streams the CopyFile operation. This fix fixes 12641. Signed-off-by: Yong Tang * Use sendfile for CopyFile implementation in Linux Signed-off-by: Yong Tang * Merge CopyFile for same fs and different fs Signed-off-by: Yong Tang * `sendfile64` -> `sendfile` to fix Android build Signed-off-by: Yong Tang * Add sendfile processing for Darwin This commit adds sendfile processing for OSX Darwin. Signed-off-by: Yong Tang * Not using sendfile in MacOSX Signed-off-by: Yong Tang * Address review feedback Signed-off-by: Yong Tang * Remove the size check and test OUT_OF_RANGE instead. Signed-off-by: Yong Tang * Small fixes Signed-off-by: Yong Tang * Rename CopyFile to FileSystemCopyFile to fix Windows build errors Signed-off-by: Yong Tang --- tensorflow/core/platform/env.cc | 37 +++++++++++ tensorflow/core/platform/env.h | 8 +++ tensorflow/core/platform/file_system.cc | 4 ++ tensorflow/core/platform/file_system.h | 3 + .../core/platform/posix/posix_file_system.cc | 72 ++++++++++++++++++++++ tensorflow/core/platform/posix/posix_file_system.h | 2 + tensorflow/python/lib/io/file_io.i | 16 ++--- 7 files changed, 131 insertions(+), 11 deletions(-) diff --git a/tensorflow/core/platform/env.cc b/tensorflow/core/platform/env.cc index 1bcca12..12509c2 100644 --- a/tensorflow/core/platform/env.cc +++ b/tensorflow/core/platform/env.cc @@ -44,6 +44,9 @@ limitations under the License. namespace tensorflow { +// 128KB copy buffer +constexpr size_t kCopyFileBufferSize = 128 * 1024; + class FileSystemRegistryImpl : public FileSystemRegistry { public: Status Register(const string& scheme, Factory factory) override; @@ -278,6 +281,17 @@ Status Env::RenameFile(const string& src, const string& target) { return src_fs->RenameFile(src, target); } +Status Env::CopyFile(const string& src, const string& target) { + FileSystem* src_fs; + FileSystem* target_fs; + TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs)); + TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs)); + if (src_fs == target_fs) { + return src_fs->CopyFile(src, target); + } + return FileSystemCopyFile(src_fs, src, target_fs, target); +} + string Env::GetExecutablePath() { char exe_path[PATH_MAX] = {0}; #ifdef __APPLE__ @@ -406,6 +420,29 @@ Status WriteStringToFile(Env* env, const string& fname, return s; } +Status FileSystemCopyFile(FileSystem* src_fs, const string& src, + FileSystem* target_fs, const string& target) { + std::unique_ptr src_file; + TF_RETURN_IF_ERROR(src_fs->NewRandomAccessFile(src, &src_file)); + + std::unique_ptr target_file; + TF_RETURN_IF_ERROR(target_fs->NewWritableFile(target, &target_file)); + + uint64 offset = 0; + std::unique_ptr scratch(new char[kCopyFileBufferSize]); + Status s = Status::OK(); + while (s.ok()) { + StringPiece result; + s = src_file->Read(offset, kCopyFileBufferSize, &result, scratch.get()); + if (!(s.ok() || s.code() == error::OUT_OF_RANGE)) { + return s; + } + TF_RETURN_IF_ERROR(target_file->Append(result)); + offset += result.size(); + } + return target_file->Close(); +} + // A ZeroCopyInputStream on a RandomAccessFile. namespace { class FileStream : public ::tensorflow::protobuf::io::ZeroCopyInputStream { diff --git a/tensorflow/core/platform/env.h b/tensorflow/core/platform/env.h index 34aaf3f..4ce4e0b 100644 --- a/tensorflow/core/platform/env.h +++ b/tensorflow/core/platform/env.h @@ -214,6 +214,9 @@ class Env { /// replaced. Status RenameFile(const string& src, const string& target); + /// \brief Copy the src to target. + Status CopyFile(const string& src, const string& target); + /// \brief Returns the absolute path of the current executable. It resolves /// symlinks if there is any. string GetExecutablePath(); @@ -381,6 +384,11 @@ struct ThreadOptions { size_t guard_size = 0; // 0: use system default value }; +/// A utility routine: copy contents of `src` in file system `src_fs` +/// to `target` in file system `target_fs`. +Status FileSystemCopyFile(FileSystem* src_fs, const string& src, + FileSystem* target_fs, const string& target); + /// A utility routine: reads contents of named file into `*data` Status ReadFileToString(Env* env, const string& fname, string* data); diff --git a/tensorflow/core/platform/file_system.cc b/tensorflow/core/platform/file_system.cc index b9866cf..271d73f 100644 --- a/tensorflow/core/platform/file_system.cc +++ b/tensorflow/core/platform/file_system.cc @@ -265,4 +265,8 @@ Status FileSystem::RecursivelyCreateDir(const string& dirname) { return Status::OK(); } +Status FileSystem::CopyFile(const string& src, const string& target) { + return FileSystemCopyFile(this, src, this, target); +} + } // namespace tensorflow diff --git a/tensorflow/core/platform/file_system.h b/tensorflow/core/platform/file_system.h index d32efce..3085b69 100644 --- a/tensorflow/core/platform/file_system.h +++ b/tensorflow/core/platform/file_system.h @@ -189,6 +189,9 @@ class FileSystem { /// \brief Overwrites the target if it exists. virtual Status RenameFile(const string& src, const string& target) = 0; + /// \brief Copy the src to target. + virtual Status CopyFile(const string& src, const string& target); + /// \brief Translate an URI to a filename for the FileSystem implementation. /// /// The implementation in this class cleans up the path, removing diff --git a/tensorflow/core/platform/posix/posix_file_system.cc b/tensorflow/core/platform/posix/posix_file_system.cc index fb7a5a9..9a80215 100644 --- a/tensorflow/core/platform/posix/posix_file_system.cc +++ b/tensorflow/core/platform/posix/posix_file_system.cc @@ -18,6 +18,9 @@ limitations under the License. #include #include #include +#if !defined(__APPLE__) +#include +#endif #include #include #include @@ -34,6 +37,9 @@ limitations under the License. namespace tensorflow { +// 128KB of copy buffer +constexpr size_t kPosixCopyFileBufferSize = 128 * 1024; + // pread() based random-access class PosixRandomAccessFile : public RandomAccessFile { private: @@ -276,4 +282,70 @@ Status PosixFileSystem::RenameFile(const string& src, const string& target) { return result; } +Status PosixFileSystem::CopyFile(const string& src, const string& target) { + string translated_src = TranslateName(src); + struct stat sbuf; + if (stat(translated_src.c_str(), &sbuf) != 0) { + return IOError(src, errno); + } + int src_fd = open(translated_src.c_str(), O_RDONLY); + if (src_fd < 0) { + return IOError(src, errno); + } + string translated_target = TranslateName(target); + // O_WRONLY | O_CREAT: + // Open file for write and if file does not exist, create the file. + // S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH: + // Create the file with permission of 0644 + int target_fd = open(translated_target.c_str(), O_WRONLY | O_CREAT, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (target_fd < 0) { + close(src_fd); + return IOError(target, errno); + } + int rc = 0; + off_t offset = 0; + std::unique_ptr buffer(new char[kPosixCopyFileBufferSize]); + while (offset < sbuf.st_size) { + // Use uint64 for safe compare SSIZE_MAX + uint64 chunk = sbuf.st_size - offset; + if (chunk > SSIZE_MAX) { + chunk = SSIZE_MAX; + } +#if defined(__linux__) && !defined(__ANDROID__) + rc = sendfile(target_fd, src_fd, &offset, static_cast(chunk)); +#else + if (chunk > kPosixCopyFileBufferSize) { + chunk = kPosixCopyFileBufferSize; + } + rc = read(src_fd, buffer.get(), static_cast(chunk)); + if (rc <= 0) { + break; + } + rc = write(target_fd, buffer.get(), static_cast(chunk)); + offset += chunk; +#endif + if (rc <= 0) { + break; + } + } + + Status result = Status::OK(); + if (rc < 0) { + result = IOError(target, errno); + } + + // Keep the error code + rc = close(target_fd); + if (rc < 0 && result == Status::OK()) { + result = IOError(target, errno); + } + rc = close(src_fd); + if (rc < 0 && result == Status::OK()) { + result = IOError(target, errno); + } + + return result; +} + } // namespace tensorflow diff --git a/tensorflow/core/platform/posix/posix_file_system.h b/tensorflow/core/platform/posix/posix_file_system.h index fe050fd..98ffa43 100644 --- a/tensorflow/core/platform/posix/posix_file_system.h +++ b/tensorflow/core/platform/posix/posix_file_system.h @@ -56,6 +56,8 @@ class PosixFileSystem : public FileSystem { Status GetFileSize(const string& fname, uint64* size) override; Status RenameFile(const string& src, const string& target) override; + + Status CopyFile(const string& src, const string& target) override; }; Status IOError(const string& context, int err_number); diff --git a/tensorflow/python/lib/io/file_io.i b/tensorflow/python/lib/io/file_io.i index c0c4e03..891a7b0 100644 --- a/tensorflow/python/lib/io/file_io.i +++ b/tensorflow/python/lib/io/file_io.i @@ -110,21 +110,15 @@ void RecursivelyCreateDir(const string& dirname, TF_Status* out_status) { } } -void CopyFile(const string& oldpath, const string& newpath, bool overwrite, +void CopyFile(const string& src, const string& target, bool overwrite, TF_Status* out_status) { - // If overwrite is false and the newpath file exists then it's an error. - if (!overwrite && tensorflow::Env::Default()->FileExists(newpath).ok()) { + // If overwrite is false and the target file exists then its an error. + if (!overwrite && tensorflow::Env::Default()->FileExists(target).ok()) { TF_SetStatus(out_status, TF_ALREADY_EXISTS, "file already exists"); return; } - string file_content; - tensorflow::Status status = ReadFileToString(tensorflow::Env::Default(), - oldpath, &file_content); - if (!status.ok()) { - Set_TF_Status_from_Status(out_status, status); - return; - } - status = WriteStringToFile(tensorflow::Env::Default(), newpath, file_content); + tensorflow::Status status = + tensorflow::Env::Default()->CopyFile(src, target); if (!status.ok()) { Set_TF_Status_from_Status(out_status, status); } -- 2.7.4