Break FileSystem's dependency on ThreadPool.
authorA. Unique TensorFlower <gardener@tensorflow.org>
Fri, 30 Mar 2018 18:32:58 +0000 (11:32 -0700)
committerTensorFlower Gardener <gardener@tensorflow.org>
Fri, 30 Mar 2018 18:35:15 +0000 (11:35 -0700)
PiperOrigin-RevId: 191092932

18 files changed:
tensorflow/contrib/android/asset_manager_filesystem.cc
tensorflow/contrib/android/asset_manager_filesystem.h
tensorflow/contrib/makefile/proto_text_cc_files.txt
tensorflow/core/BUILD
tensorflow/core/platform/file_system.cc
tensorflow/core/platform/file_system.h
tensorflow/core/platform/file_system_helper.cc [new file with mode: 0644]
tensorflow/core/platform/file_system_helper.h [new file with mode: 0644]
tensorflow/core/platform/hadoop/hadoop_file_system.cc
tensorflow/core/platform/hadoop/hadoop_file_system.h
tensorflow/core/platform/null_file_system.h
tensorflow/core/platform/posix/posix_file_system.cc
tensorflow/core/platform/posix/posix_file_system.h
tensorflow/core/platform/s3/s3_file_system.cc
tensorflow/core/platform/s3/s3_file_system.h
tensorflow/core/platform/windows/windows_file_system.cc
tensorflow/core/util/memmapped_file_system.cc
tensorflow/core/util/memmapped_file_system.h

index 380a652..fe2d13e 100644 (file)
@@ -19,6 +19,7 @@ limitations under the License.
 
 #include "tensorflow/core/lib/strings/str_util.h"
 #include "tensorflow/core/platform/env.h"
+#include "tensorflow/core/platform/file_system_helper.h"
 
 namespace tensorflow {
 namespace {
@@ -243,6 +244,11 @@ bool AssetManagerFileSystem::DirectoryExists(const std::string& fname) {
   return AAssetDir_getNextFileName(dir.get()) != NULL;
 }
 
+Status AssetManagerFileSystem::GetMatchingPaths(const string& pattern,
+                                                std::vector<string>* results) {
+  return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
+}
+
 Status AssetManagerFileSystem::NewWritableFile(
     const string& fname, std::unique_ptr<WritableFile>* result) {
   return errors::Unimplemented("Asset storage is read only.");
index 665304b..a87ff42 100644 (file)
@@ -66,6 +66,9 @@ class AssetManagerFileSystem : public FileSystem {
   Status DeleteDir(const string& d) override;
   Status RenameFile(const string& s, const string& t) override;
 
+  Status GetMatchingPaths(const string& pattern,
+                          std::vector<string>* results) override;
+
  private:
   string RemoveAssetPrefix(const string& name);
 
index 77c936d..76428bc 100644 (file)
@@ -12,6 +12,7 @@ tensorflow/core/platform/posix/env.cc
 tensorflow/core/platform/posix/load_library.cc
 tensorflow/core/platform/posix/env_time.cc
 tensorflow/core/platform/file_system.cc
+tensorflow/core/platform/file_system_helper.cc
 tensorflow/core/platform/env.cc
 tensorflow/core/platform/env_time.cc
 tensorflow/core/platform/setround.cc
index 21f7866..7d5ae1c 100644 (file)
@@ -349,6 +349,7 @@ cc_library(
         "platform/env.h",
         "platform/env_time.h",
         "platform/file_system.h",
+        "platform/file_system_helper.h",
         "platform/fingerprint.h",
         "platform/init_main.h",
         "platform/logging.h",
index a2f42f4..b55e94d 100644 (file)
@@ -18,7 +18,6 @@ limitations under the License.
 #include <deque>
 
 #include "tensorflow/core/lib/core/errors.h"
-#include "tensorflow/core/lib/core/threadpool.h"
 #include "tensorflow/core/lib/io/path.h"
 #include "tensorflow/core/lib/strings/str_util.h"
 #include "tensorflow/core/lib/strings/strcat.h"
@@ -28,28 +27,6 @@ limitations under the License.
 
 namespace tensorflow {
 
-namespace {
-
-constexpr int kNumThreads = 8;
-
-// Run a function in parallel using a ThreadPool, but skip the ThreadPool
-// on the iOS platform due to its problems with more than a few threads.
-void ForEach(int first, int last, const std::function<void(int)>& f) {
-#if TARGET_OS_IPHONE
-  for (int i = first; i < last; i++) {
-    f(i);
-  }
-#else
-  int num_threads = std::min(kNumThreads, last - first);
-  thread::ThreadPool threads(Env::Default(), "ForEach", num_threads);
-  for (int i = first; i < last; i++) {
-    threads.Schedule([f, i] { f(i); });
-  }
-#endif
-}
-
-}  // anonymous namespace
-
 FileSystem::~FileSystem() {}
 
 string FileSystem::TranslateName(const string& name) const {
@@ -94,76 +71,6 @@ bool FileSystem::FilesExist(const std::vector<string>& files,
   return result;
 }
 
-Status FileSystem::GetMatchingPaths(const string& pattern,
-                                    std::vector<string>* results) {
-  results->clear();
-  // Find the fixed prefix by looking for the first wildcard.
-  string fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\"));
-  string eval_pattern = pattern;
-  std::vector<string> all_files;
-  string dir = io::Dirname(fixed_prefix).ToString();
-  // If dir is empty then we need to fix up fixed_prefix and eval_pattern to
-  // include . as the top level directory.
-  if (dir.empty()) {
-    dir = ".";
-    fixed_prefix = io::JoinPath(dir, fixed_prefix);
-    eval_pattern = io::JoinPath(dir, pattern);
-  }
-
-  // Setup a BFS to explore everything under dir.
-  std::deque<string> dir_q;
-  dir_q.push_back(dir);
-  Status ret;  // Status to return.
-  // children_dir_status holds is_dir status for children. It can have three
-  // possible values: OK for true; FAILED_PRECONDITION for false; CANCELLED
-  // if we don't calculate IsDirectory (we might do that because there isn't
-  // any point in exploring that child path).
-  std::vector<Status> children_dir_status;
-  while (!dir_q.empty()) {
-    string current_dir = dir_q.front();
-    dir_q.pop_front();
-    std::vector<string> children;
-    Status s = GetChildren(current_dir, &children);
-    ret.Update(s);
-    if (children.empty()) continue;
-    // This IsDirectory call can be expensive for some FS. Parallelizing it.
-    children_dir_status.resize(children.size());
-    ForEach(0, children.size(),
-            [this, &current_dir, &children, &fixed_prefix,
-             &children_dir_status](int i) {
-              const string child_path = io::JoinPath(current_dir, children[i]);
-              // In case the child_path doesn't start with the fixed_prefix then
-              // we don't need to explore this path.
-              if (!str_util::StartsWith(child_path, fixed_prefix)) {
-                children_dir_status[i] = Status(tensorflow::error::CANCELLED,
-                                                "Operation not needed");
-              } else {
-                children_dir_status[i] = IsDirectory(child_path);
-              }
-            });
-    for (int i = 0; i < children.size(); ++i) {
-      const string child_path = io::JoinPath(current_dir, children[i]);
-      // If the IsDirectory call was cancelled we bail.
-      if (children_dir_status[i].code() == tensorflow::error::CANCELLED) {
-        continue;
-      }
-      // If the child is a directory add it to the queue.
-      if (children_dir_status[i].ok()) {
-        dir_q.push_back(child_path);
-      }
-      all_files.push_back(child_path);
-    }
-  }
-
-  // Match all obtained files to the input pattern.
-  for (const auto& f : all_files) {
-    if (Env::Default()->MatchPath(f, eval_pattern)) {
-      results->push_back(f);
-    }
-  }
-  return ret;
-}
-
 Status FileSystem::DeleteRecursively(const string& dirname,
                                      int64* undeleted_files,
                                      int64* undeleted_dirs) {
index 8f99766..077b1d7 100644 (file)
@@ -138,10 +138,8 @@ class FileSystem {
   ///  * OK - no errors
   ///  * UNIMPLEMENTED - Some underlying functions (like GetChildren) are not
   ///                    implemented
-  /// The default implementation uses a combination of GetChildren, MatchPath
-  /// and IsDirectory.
   virtual Status GetMatchingPaths(const string& pattern,
-                                  std::vector<string>* results);
+                                  std::vector<string>* results) = 0;
 
   /// \brief Obtains statistics for the given path.
   virtual Status Stat(const string& fname, FileStatistics* stat) = 0;
diff --git a/tensorflow/core/platform/file_system_helper.cc b/tensorflow/core/platform/file_system_helper.cc
new file mode 100644 (file)
index 0000000..22c5057
--- /dev/null
@@ -0,0 +1,126 @@
+/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include "tensorflow/core/platform/file_system_helper.h"
+
+#include <deque>
+#include <string>
+#include <vector>
+
+#include "tensorflow/core/lib/core/status.h"
+#include "tensorflow/core/lib/core/threadpool.h"
+#include "tensorflow/core/lib/io/path.h"
+#include "tensorflow/core/lib/strings/str_util.h"
+#include "tensorflow/core/platform/env.h"
+#include "tensorflow/core/platform/file_system.h"
+#include "tensorflow/core/platform/platform.h"
+
+namespace tensorflow {
+namespace internal {
+
+namespace {
+
+constexpr int kNumThreads = 8;
+
+// Run a function in parallel using a ThreadPool, but skip the ThreadPool
+// on the iOS platform due to its problems with more than a few threads.
+void ForEach(int first, int last, const std::function<void(int)>& f) {
+#if TARGET_OS_IPHONE
+  for (int i = first; i < last; i++) {
+    f(i);
+  }
+#else
+  int num_threads = std::min(kNumThreads, last - first);
+  thread::ThreadPool threads(Env::Default(), "ForEach", num_threads);
+  for (int i = first; i < last; i++) {
+    threads.Schedule([f, i] { f(i); });
+  }
+#endif
+}
+
+}  // namespace
+
+Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern,
+                        std::vector<string>* results) {
+  results->clear();
+  // Find the fixed prefix by looking for the first wildcard.
+  string fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\"));
+  string eval_pattern = pattern;
+  std::vector<string> all_files;
+  string dir = io::Dirname(fixed_prefix).ToString();
+  // If dir is empty then we need to fix up fixed_prefix and eval_pattern to
+  // include . as the top level directory.
+  if (dir.empty()) {
+    dir = ".";
+    fixed_prefix = io::JoinPath(dir, fixed_prefix);
+    eval_pattern = io::JoinPath(dir, pattern);
+  }
+
+  // Setup a BFS to explore everything under dir.
+  std::deque<string> dir_q;
+  dir_q.push_back(dir);
+  Status ret;  // Status to return.
+  // children_dir_status holds is_dir status for children. It can have three
+  // possible values: OK for true; FAILED_PRECONDITION for false; CANCELLED
+  // if we don't calculate IsDirectory (we might do that because there isn't
+  // any point in exploring that child path).
+  std::vector<Status> children_dir_status;
+  while (!dir_q.empty()) {
+    string current_dir = dir_q.front();
+    dir_q.pop_front();
+    std::vector<string> children;
+    Status s = fs->GetChildren(current_dir, &children);
+    ret.Update(s);
+    if (children.empty()) continue;
+    // This IsDirectory call can be expensive for some FS. Parallelizing it.
+    children_dir_status.resize(children.size());
+    ForEach(0, children.size(),
+            [fs, &current_dir, &children, &fixed_prefix,
+             &children_dir_status](int i) {
+              const string child_path = io::JoinPath(current_dir, children[i]);
+              // In case the child_path doesn't start with the fixed_prefix then
+              // we don't need to explore this path.
+              if (!str_util::StartsWith(child_path, fixed_prefix)) {
+                children_dir_status[i] = Status(tensorflow::error::CANCELLED,
+                                                "Operation not needed");
+              } else {
+                children_dir_status[i] = fs->IsDirectory(child_path);
+              }
+            });
+    for (int i = 0; i < children.size(); ++i) {
+      const string child_path = io::JoinPath(current_dir, children[i]);
+      // If the IsDirectory call was cancelled we bail.
+      if (children_dir_status[i].code() == tensorflow::error::CANCELLED) {
+        continue;
+      }
+      // If the child is a directory add it to the queue.
+      if (children_dir_status[i].ok()) {
+        dir_q.push_back(child_path);
+      }
+      all_files.push_back(child_path);
+    }
+  }
+
+  // Match all obtained files to the input pattern.
+  for (const auto& f : all_files) {
+    if (env->MatchPath(f, eval_pattern)) {
+      results->push_back(f);
+    }
+  }
+  return ret;
+}
+
+}  // namespace internal
+}  // namespace tensorflow
diff --git a/tensorflow/core/platform/file_system_helper.h b/tensorflow/core/platform/file_system_helper.h
new file mode 100644 (file)
index 0000000..8d812b0
--- /dev/null
@@ -0,0 +1,51 @@
+/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef TENSORFLOW_CORE_PLATFORM_FILE_SYSTEM_HELPER_H_
+#define TENSORFLOW_CORE_PLATFORM_FILE_SYSTEM_HELPER_H_
+
+#include <string>
+#include <vector>
+
+#include "tensorflow/core/lib/core/status.h"
+
+namespace tensorflow {
+
+class FileSystem;
+class Env;
+
+namespace internal {
+
+// Given a pattern, stores in 'results' the set of paths (in the given file
+// system) that match that pattern.
+//
+// This helper may be used by implementations of FileSystem::GetMatchingPaths()
+// in order to provide parallel scanning of subdirectories (except on iOS).
+//
+// Arguments:
+//   fs: may not be null and will be used to identify directories and list
+//       their contents.
+//   env: may not be null and will be used to check if a match has been found.
+//   pattern: see FileSystem::GetMatchingPaths() for details.
+//   results: will be cleared and may not be null.
+//
+// Returns an error status if any call to 'fs' failed.
+Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern,
+                        std::vector<string>* results);
+
+}  // namespace internal
+}  // namespace tensorflow
+
+#endif  // TENSORFLOW_CORE_PLATFORM_FILE_SYSTEM_HELPER_H_
index 7486329..9a71fbe 100644 (file)
@@ -22,6 +22,7 @@ limitations under the License.
 #include "tensorflow/core/lib/strings/strcat.h"
 #include "tensorflow/core/platform/env.h"
 #include "tensorflow/core/platform/file_system.h"
+#include "tensorflow/core/platform/file_system_helper.h"
 #include "tensorflow/core/platform/logging.h"
 #include "tensorflow/core/platform/mutex.h"
 #include "tensorflow/core/platform/posix/error.h"
@@ -396,6 +397,11 @@ Status HadoopFileSystem::GetChildren(const string& dir,
   return Status::OK();
 }
 
+Status HadoopFileSystem::GetMatchingPaths(const string& pattern,
+                                          std::vector<string>* results) {
+  return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
+}
+
 Status HadoopFileSystem::DeleteFile(const string& fname) {
   hdfsFS fs = nullptr;
   TF_RETURN_IF_ERROR(Connect(fname, &fs));
index 5f2b222..6af7a69 100644 (file)
@@ -49,6 +49,9 @@ class HadoopFileSystem : public FileSystem {
 
   Status GetChildren(const string& dir, std::vector<string>* result) override;
 
+  Status GetMatchingPaths(const string& pattern,
+                          std::vector<string>* results) override;
+
   Status DeleteFile(const string& fname) override;
 
   Status CreateDir(const string& name) override;
index 008e6d5..420abc1 100644 (file)
@@ -22,6 +22,7 @@ limitations under the License.
 
 #include "tensorflow/core/platform/env.h"
 #include "tensorflow/core/platform/file_system.h"
+#include "tensorflow/core/platform/file_system_helper.h"
 
 namespace tensorflow {
 
@@ -65,6 +66,11 @@ class NullFileSystem : public FileSystem {
     return errors::Unimplemented("GetChildren unimplemented");
   }
 
+  Status GetMatchingPaths(const string& pattern,
+                          std::vector<string>* results) override {
+    return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
+  }
+
   Status DeleteFile(const string& fname) override {
     return errors::Unimplemented("DeleteFile unimplemented");
   }
index 9a80215..47bfa02 100644 (file)
@@ -31,6 +31,7 @@ limitations under the License.
 #include "tensorflow/core/lib/core/status.h"
 #include "tensorflow/core/lib/strings/strcat.h"
 #include "tensorflow/core/platform/env.h"
+#include "tensorflow/core/platform/file_system_helper.h"
 #include "tensorflow/core/platform/logging.h"
 #include "tensorflow/core/platform/posix/error.h"
 #include "tensorflow/core/platform/posix/posix_file_system.h"
@@ -225,6 +226,11 @@ Status PosixFileSystem::GetChildren(const string& dir,
   return Status::OK();
 }
 
+Status PosixFileSystem::GetMatchingPaths(const string& pattern,
+                                         std::vector<string>* results) {
+  return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
+}
+
 Status PosixFileSystem::DeleteFile(const string& fname) {
   Status result;
   if (unlink(TranslateName(fname).c_str()) != 0) {
index 98ffa43..e8898d0 100644 (file)
@@ -47,6 +47,9 @@ class PosixFileSystem : public FileSystem {
 
   Status Stat(const string& fname, FileStatistics* stats) override;
 
+  Status GetMatchingPaths(const string& pattern,
+                          std::vector<string>* results) override;
+
   Status DeleteFile(const string& fname) override;
 
   Status CreateDir(const string& name) override;
index 301fcb9..ee42369 100644 (file)
@@ -15,6 +15,7 @@ limitations under the License.
 #include "tensorflow/core/platform/s3/s3_file_system.h"
 #include "tensorflow/core/lib/io/path.h"
 #include "tensorflow/core/lib/strings/str_util.h"
+#include "tensorflow/core/platform/file_system_helper.h"
 #include "tensorflow/core/platform/mutex.h"
 #include "tensorflow/core/platform/s3/aws_logging.h"
 #include "tensorflow/core/platform/s3/s3_crypto.h"
@@ -497,6 +498,11 @@ Status S3FileSystem::Stat(const string& fname, FileStatistics* stats) {
   return Status::OK();
 }
 
+Status S3FileSystem::GetMatchingPaths(const string& pattern,
+                                      std::vector<string>* results) {
+  return internal::GetMatchingPaths(this, Env::Default(), pattern, results);
+}
+
 Status S3FileSystem::DeleteFile(const string& fname) {
   string bucket, object;
   TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
index 31264be..5d0565b 100644 (file)
@@ -46,6 +46,9 @@ class S3FileSystem : public FileSystem {
 
   Status Stat(const string& fname, FileStatistics* stat) override;
 
+  Status GetMatchingPaths(const string& pattern,
+                          std::vector<string>* results) override;
+
   Status DeleteFile(const string& fname) override;
 
   Status CreateDir(const string& name) override;
index 682e46e..dc2efbe 100644 (file)
@@ -28,6 +28,7 @@ limitations under the License.
 #include "tensorflow/core/lib/core/error_codes.pb.h"
 #include "tensorflow/core/lib/strings/strcat.h"
 #include "tensorflow/core/platform/env.h"
+#include "tensorflow/core/platform/file_system_helper.h"
 #include "tensorflow/core/platform/logging.h"
 #include "tensorflow/core/platform/posix/error.h"
 #include "tensorflow/core/platform/windows/error.h"
@@ -494,7 +495,8 @@ Status WindowsFileSystem::GetMatchingPaths(const string& pattern,
   // but no code appears to rely on this behavior.
   string converted_pattern(pattern);
   std::replace(converted_pattern.begin(), converted_pattern.end(), '\\', '/');
-  TF_RETURN_IF_ERROR(FileSystem::GetMatchingPaths(converted_pattern, results));
+  TF_RETURN_IF_ERROR(internal::GetMatchingPaths(this, Env::Default(),
+                                                converted_pattern, results));
   for (string& result : *results) {
     std::replace(result.begin(), result.end(), '/', '\\');
   }
index a0f43d2..ea0a381 100644 (file)
@@ -157,6 +157,12 @@ Status MemmappedFileSystem::GetChildren(const string& filename,
   return errors::Unimplemented("memmapped format doesn't support GetChildren");
 }
 
+Status MemmappedFileSystem::GetMatchingPaths(const string& pattern,
+                                             std::vector<string>* results) {
+  return errors::Unimplemented(
+      "memmapped format doesn't support GetMatchingPaths");
+}
+
 Status MemmappedFileSystem::DeleteFile(const string& filename) {
   return errors::Unimplemented("memmapped format doesn't support DeleteFile");
 }
index 541587a..76cc491 100644 (file)
@@ -85,6 +85,8 @@ class MemmappedFileSystem : public FileSystem {
   Status NewAppendableFile(const string& fname,
                            std::unique_ptr<WritableFile>* result) override;
   Status GetChildren(const string& dir, std::vector<string>* r) override;
+  Status GetMatchingPaths(const string& pattern,
+                          std::vector<string>* results) override;
   Status DeleteFile(const string& f) override;
   Status CreateDir(const string& d) override;
   Status DeleteDir(const string& d) override;