Jaliyae/chunk buffer fix (#17409)
authorJaliya Ekanayake <jaliyaek@microsoft.com>
Sat, 23 Feb 2019 16:46:24 +0000 (08:46 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Sat, 23 Feb 2019 16:48:53 +0000 (08:48 -0800)
Summary:
The chunk buffer had a possibility to hang when no data is read and the buffer size is lower than chunk size. We detected this while running with larger dataset and hence the fix. I added a test to mimic the situation and validated that the fix is working. Thank you Xueyun for finding this issue.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/17409

Differential Revision: D14198546

Pulled By: soumith

fbshipit-source-id: b8ca43b0400deaae2ebb6601fdc65b47f32b0554

test/cpp/api/dataloader.cpp
torch/csrc/api/include/torch/data/datasets/chunk.h

index 2c1717c..9601911 100644 (file)
@@ -1843,3 +1843,39 @@ TEST(DataLoaderTest, CanAccessChunkSamplerWithChunkDataSet) {
   // 3 chunks, and when exhausted the value is already incremented.
   ASSERT_EQ(chunk_sampler.index(), 3);
 }
+
+TEST(DataLoaderTest, ChunkDatasetDoesNotHang) {
+  const size_t prefetch_count = 2;
+  const size_t batch_size = 5;
+  // this will make the preloaders to wait till the `get_batch()` calls.
+  const size_t cache_size = 10;
+
+  DummyChunkDataReader data_reader;
+  samplers::SequentialSampler sampler(0);
+  datasets::SharedBatchDataset<datasets::ChunkDataset<
+      DummyChunkDataReader,
+      samplers::SequentialSampler,
+      samplers::SequentialSampler>>
+      dataset = datasets::make_shared_dataset<datasets::ChunkDataset<
+          DummyChunkDataReader,
+          samplers::SequentialSampler,
+          samplers::SequentialSampler>>(
+          data_reader,
+          sampler,
+          sampler,
+          datasets::ChunkDatasetOptions(
+              prefetch_count, batch_size, cache_size));
+
+  samplers::SequentialSampler& chunk_sampler = dataset->chunk_sampler();
+
+  auto data_loader = torch::data::make_data_loader(
+      dataset.map(transforms::BatchLambda<std::vector<int>, int>(
+          [](std::vector<int> batch) {
+            return std::accumulate(batch.begin(), batch.end(), 0);
+          })),
+      DataLoaderOptions(batch_size).workers(0));
+  // simply creates the iterator but no iteration. chunk preloaders are waiting
+  // to fill the batch buffer but it is not draining. Still we need to exit
+  // cleanly.
+  auto iterator = data_loader->begin();
+}
index 1507dc3..8326816 100644 (file)
@@ -293,6 +293,10 @@ class ChunkDataset final
         running_preloaders_(0) {}
 
   virtual ~ChunkDataset() {
+    // stop batch buffer first.
+    if (batch_buffer_) {
+      batch_buffer_->stop();
+    }
     free_workers();
   }
 
@@ -317,6 +321,10 @@ class ChunkDataset final
   /// This will clear any internal state and starts the internal prefetching
   /// mechanism for the chunk dataset.
   void reset() override {
+    // We need this to support partial data reads via dataloader iterator.     
+    if (batch_buffer_) {
+      batch_buffer_->stop();
+    }
     // free workers from previous reset if there is any.
     free_workers();
     preload_threads_.clear();