Remove old implementation of the adaptive shared batcher, the in flight batches imple...
authorA. Unique TensorFlower <gardener@tensorflow.org>
Tue, 27 Feb 2018 02:05:59 +0000 (18:05 -0800)
committerGunhan Gulsoy <gunan@google.com>
Tue, 27 Feb 2018 22:33:33 +0000 (14:33 -0800)
PiperOrigin-RevId: 187111685

tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler.h
tensorflow/core/kernels/batching_util/adaptive_shared_batch_scheduler_test.cc

index 25c5f9c..661ed23 100644 (file)
@@ -50,43 +50,26 @@ class ASBSQueue;
 // track of a number of queues (one per model or model version) which are
 // continuously enqueuing requests. The scheduler groups the requests into
 // batches which it periodically sends off for processing (see
-// shared_batch_scheduler.h for more details). The AdaptiveSharedBatchScheduler
-// prioritizes batches by age (i.e. the batch's oldest request) irrespective of
-// queue or batch size.
+// shared_batch_scheduler.h for more details). AdaptiveSharedBatchScheduler
+// (ASBS) prioritizes batches by age (i.e. the batch's oldest request)
+// irrespective of queue or batch size.
 //
-// The scheduling decision currently exists in two flavors, controlled by the
-// option use_in_flight_batches_implementation. It is expected that setting this
-// option to true will give universally better results; after a period of
-// testing to confirm, the old implementation will be removed.
-//
-// If use_in_flight_batches_implementation is set to true, the scheduler
-// limits the number of batches which can be processed concurrently.  If a new
-// batch is created, and the number of in flight batches is below the limit,
-// the next (i.e. oldest) batch is immediately scheduled.  Similarly, when a
-// batch finishes processing, the limit is rechecked, and another batch may be
-// scheduled.  To avoid the need to carefully tune the limit for workload,
-// model type, platform, etc, it is dynamically adjusted in order to provide the
-// lowest latency.
-//
-// If use_in_flight_batches_implementation is set to false, the scheduler will
-// process the oldest batch at an adjustable rate, regardless of batch size.
-// The user can provide feedback to help set this rate to achieve some goal
-// (i.e. minimize overall latency, limit cpu usage, etc). The rate (or rather,
-// the corresponding period) is adjusted each time a batch is processed, using
-// an exponentially weighted moving average to smooth noisy feedback:
-// ewma_feedback = ((N - 1) * ewma_feedback + feedback()) / N
-// period *= (1 + K * emwa_feedback)
+// ASBS tries to keep the system busy by maintaining an adjustable number of
+// concurrently processed batches.  If a new batch is created, and the number of
+// in flight batches is below the target, the next (i.e. oldest) batch is
+// immediately scheduled.  Similarly, when a batch finishes processing, the
+// target is rechecked, and another batch may be scheduled.  To avoid the need
+// to carefully tune the target for workload, model type, platform, etc, it is
+// dynamically adjusted in order to provide the lowest average latency.
 //
 // Some potential use cases:
 // Hardware Accelerators (GPUs & TPUs) - If some phase of batch processing
 //   involves serial processing by a device, from a latency perspective it is
 //   desirable to keep the device evenly loaded, avoiding the need to wait for
 //   the device to process prior batches.
-//   feedback = num_pending_on_device() - desired_pending.
 // CPU utilization - If the batch processing is cpu dominated, you can reap
 //   latency gains when underutilized by increasing the processing rate, but
 //   back the rate off when the load increases to avoid overload.
-//   feedback = cpu_rate() - desired_cpu_rate.
 
 template <typename TaskType>
 class AdaptiveSharedBatchScheduler
@@ -101,13 +84,17 @@ class AdaptiveSharedBatchScheduler
   struct Options {
     // The name to use for the pool of batch threads.
     string thread_pool_name = {"batch_threads"};
-    // Number of batch processing threads; equivalently the maximum number of
-    // concurrently running batches.
+    // Number of batch processing threads - the maximum value of
+    // in_flight_batches_limit_.  It is recommended that this value be set by
+    // running the system under load, observing the learned value for
+    // in_flight_batches_limit_, and setting this maximum to ~ 2x the value.
+    // Under low load, in_flight_batches_limit_ has no substantial effect on
+    // latency and therefore undergoes a random walk.  Unreasonably large values
+    // for num_batch_threads allows for large in_flight_batches_limit_, which
+    // will harm latency for some time once load increases again.
     int64 num_batch_threads = port::NumSchedulableCPUs();
     // The environment to use (typically only overridden by test code).
     Env* env = Env::Default();
-    // Which implementation to use (described in class comments above).
-    bool use_in_flight_batches_implementation = false;
     // Initial limit for number of batches being concurrently processed.
     // Non-integer values correspond to probabilistic limits - i.e. a value of
     // 3.2 results in an actual cap of 3 80% of the time, and 4 20% of the time.
@@ -116,28 +103,6 @@ class AdaptiveSharedBatchScheduler
     // numbers will give less noisy latency measurements, but will be less
     // responsive to changes in workload.
     int64 batches_to_average_over = 1000;
-
-    // TODO(kte): remove the rate based implementation and corresponding options
-    // below once testing confirms the superiority of the in flight batches
-    // implementation.
-    // Initial batch scheduling period in microseconds. Will be altered for
-    // non-zero rate_feedback.
-    double initial_scheduling_period_micros = 500;
-    // Minimum batch scheduling period in microseconds. Recommend setting this
-    // value greater than 0, otherwise it may take a while to recover from a
-    // sustained time of negative scheduling_period_feedback (which may occur
-    // under low load).
-    double min_scheduling_period_micros = 100;
-    // Maximum batch scheduling period in microseconds.
-    double max_scheduling_period_micros = 10000;
-    // Feedback function used to modify the scheduling period each time a batch
-    // is scheduled.  Should return values roughly O(1), with positive values
-    // resulting in an increased period.
-    std::function<double()> scheduling_period_feedback{[] { return 0.; }};
-    // To handle potentially noisy scheduling_period_feedback, the period is
-    // adjusted using an exponentially weighted moving average over the previous
-    // feedback_smoothing_batches batches.  Must be greater than 0.
-    int64 feedback_smoothing_batches = 10;
   };
 
   // Ownership is shared between the caller of Create() and any queues created
@@ -171,17 +136,11 @@ class AdaptiveSharedBatchScheduler
 
   explicit AdaptiveSharedBatchScheduler(const Options& options);
 
-  // Batch scheduling function which runs every scheduling_period_ microseconds.
-  // Only used when options_.use_in_flight_batches_implementation == false.
-  void ProcessOneBatch();
-
   // Tracks processing latency and adjusts in_flight_batches_limit to minimize.
-  // Only used when options_.use_in_flight_batches_implementation == true.
   void CallbackWrapper(const internal::ASBSBatch<TaskType>* batch,
                        BatchProcessor callback);
 
   // Schedules batch if in_flight_batches_limit_ is not met.
-  // Only used when options_.use_in_flight_batches_implementation == true.
   void MaybeScheduleNextBatch() EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
   // Notifies scheduler of non-empty batch which is eligible for processing.
@@ -212,41 +171,22 @@ class AdaptiveSharedBatchScheduler
 
   mutex mu_;
 
-  // Responsible for running ProcessOneBatch. PeriodicFunction was used in order
-  // to check for deletion so that the thread can be shut down.
-  // Only used when options_.use_in_flight_batches_implementation == false.
-  std::unique_ptr<PeriodicFunction> scheduling_thread_;
-
   // Responsible for running the batch processing callbacks.
   std::unique_ptr<thread::ThreadPool> batch_thread_pool_;
 
-  // Time interval in microseconds between successive ProcessOneBatch calls.
-  // Only used when options_.use_in_flight_batches_implementation == false.
-  double scheduling_period_;
-
-  // Exponentially weighted moving average of
-  // options_.scheduling_period_feedback() evaluated in each ProcessOneBatch
-  // call.
-  // Only used when options_.use_in_flight_batches_implementation == false.
-  double ewma_feedback_ = 0;
-
   // Limit on number of batches which can be concurrently processed.
   // Non-integer values correspond to probabilistic limits - i.e. a value of 3.2
   // results in an actual cap of 3 80% of the time, and 4 20% of the time.
-  // Only used when options_.use_in_flight_batches_implementation == true.
   double in_flight_batches_limit_ GUARDED_BY(mu_);
 
   // Number of batches currently being processed.
-  // Only used when options_.use_in_flight_batches_implementation == true.
   int64 in_flight_batches_ GUARDED_BY(mu_) = 0;
 
   // RNG engine and distribution.
-  // Only used when options_.use_in_flight_batches_implementation == true.
   std::default_random_engine rand_engine_;
   std::uniform_real_distribution<double> rand_double_;
 
   // Fields controlling the dynamic adjustment of in_flight_batches_limit_.
-  // Only used when options_.use_in_flight_batches_implementation == true.
   // Number of batches since the last in_flight_batches_limit_ adjustment.
   int64 batch_count_ GUARDED_BY(mu_) = 0;
   // Sum of processing latency for batches counted by batch_count_.
@@ -348,32 +288,6 @@ Status AdaptiveSharedBatchScheduler<TaskType>::Create(
     return errors::InvalidArgument("num_batch_threads must be positive; was ",
                                    options.num_batch_threads);
   }
-  if (options.min_scheduling_period_micros < 0) {
-    return errors::InvalidArgument(
-        "min_scheduling_period_micros must be >= 0; was ",
-        options.min_scheduling_period_micros);
-  }
-  if (options.min_scheduling_period_micros >
-      options.initial_scheduling_period_micros) {
-    return errors::InvalidArgument(
-        "initial_scheduling_period_micros (",
-        options.initial_scheduling_period_micros,
-        ") must be >= min_scheduling_period_micros (",
-        options.min_scheduling_period_micros, ")");
-  }
-  if (options.initial_scheduling_period_micros >
-      options.max_scheduling_period_micros) {
-    return errors::InvalidArgument(
-        "initial_scheduling_period_micros (",
-        options.initial_scheduling_period_micros,
-        ") must be <= max_scheduling_period_micros (",
-        options.max_scheduling_period_micros, ")");
-  }
-  if (options.feedback_smoothing_batches < 1) {
-    return errors::InvalidArgument(
-        "feedback_smoothing_batches must be positive; was ",
-        options.feedback_smoothing_batches);
-  }
   if (options.initial_in_flight_batches_limit > options.num_batch_threads) {
     return errors::InvalidArgument(
         "initial_in_flight_batches_limit (",
@@ -401,20 +315,12 @@ template <typename TaskType>
 AdaptiveSharedBatchScheduler<TaskType>::AdaptiveSharedBatchScheduler(
     const Options& options)
     : options_(options),
-      scheduling_period_(options.initial_scheduling_period_micros),
       in_flight_batches_limit_(options.initial_in_flight_batches_limit),
       rand_double_(0.0, 1.0) {
   std::random_device device;
   rand_engine_.seed(device());
-  PeriodicFunction::Options opts;
-  opts.thread_name_prefix = "scheduling_thread";
-  opts.env = GetEnv();
   batch_thread_pool_.reset(new thread::ThreadPool(
       GetEnv(), options.thread_pool_name, options.num_batch_threads));
-  if (!options.use_in_flight_batches_implementation) {
-    scheduling_thread_.reset(
-        new PeriodicFunction([this] { ProcessOneBatch(); }, 0, opts));
-  }
 }
 
 template <typename TaskType>
@@ -443,9 +349,7 @@ void AdaptiveSharedBatchScheduler<TaskType>::AddBatch(
     const internal::ASBSBatch<TaskType>* batch) {
   mutex_lock l(mu_);
   batches_.push(batch);
-  if (options_.use_in_flight_batches_implementation) {
-    MaybeScheduleNextBatch();
-  }
+  MaybeScheduleNextBatch();
 }
 
 template <typename TaskType>
@@ -524,44 +428,6 @@ void AdaptiveSharedBatchScheduler<TaskType>::CallbackWrapper(
 }
 
 template <typename TaskType>
-void AdaptiveSharedBatchScheduler<TaskType>::ProcessOneBatch() {
-  static const double kFeedbackMultiplier = .001;
-  const internal::ASBSBatch<TaskType>* batch = nullptr;
-  BatchProcessor callback;
-  const int64 start_time_micros = GetEnv()->NowMicros();
-  {
-    mutex_lock l(mu_);
-    if (!batches_.empty()) {
-      batch = batches_.top();
-      batches_.pop();
-      callback = queues_and_callbacks_[batch->queue()];
-    }
-  }
-  if (batch != nullptr) {
-    double feedback = options_.scheduling_period_feedback();
-    const int64 N = options_.feedback_smoothing_batches;
-    ewma_feedback_ = ((N - 1) * ewma_feedback_ + feedback) / N;
-    scheduling_period_ *= (1 + kFeedbackMultiplier * ewma_feedback_);
-    if (scheduling_period_ < options_.min_scheduling_period_micros) {
-      scheduling_period_ = options_.min_scheduling_period_micros;
-    } else if (scheduling_period_ > options_.max_scheduling_period_micros) {
-      scheduling_period_ = options_.max_scheduling_period_micros;
-    }
-    // Queue may destroy itself after ReleaseBatch is called.
-    batch->queue()->ReleaseBatch(batch);
-    batch_thread_pool_->Schedule([callback, batch] {
-      callback(std::unique_ptr<Batch<TaskType>>(
-          const_cast<internal::ASBSBatch<TaskType>*>(batch)));
-    });
-  }
-  const int64 sleep_time =
-      scheduling_period_ - (GetEnv()->NowMicros() - start_time_micros);
-  if (sleep_time > 0) {
-    GetEnv()->SleepForMicroseconds(sleep_time);
-  }
-}
-
-template <typename TaskType>
 bool AdaptiveSharedBatchScheduler<TaskType>::BatchCompare::operator()(
     const internal::ASBSBatch<TaskType>* a,
     const internal::ASBSBatch<TaskType>* b) {
index 8ae8ca0..1092342 100644 (file)
@@ -64,59 +64,6 @@ std::unique_ptr<Thread> CreateFakeClockAdvancerThread(
       }));
 }
 
-TEST(AdaptiveSharedBatchSchedulerTest, Basic) {
-  for (const bool delete_scheduler_early : {false, true}) {
-    for (const bool delete_queue_1_early : {false, true}) {
-      int queue_0_tasks = 0;
-      auto queue_0_callback =
-          [&queue_0_tasks](std::unique_ptr<Batch<FakeTask>> batch) {
-            ASSERT_TRUE(batch->IsClosed());
-            EXPECT_GT(batch->num_tasks(), 0);
-            for (int i = 0; i < batch->num_tasks(); i++) {
-              queue_0_tasks += batch->task(i).size();
-            }
-          };
-      int queue_1_tasks = 0;
-      auto queue_1_callback =
-          [&queue_1_tasks](std::unique_ptr<Batch<FakeTask>> batch) {
-            ASSERT_TRUE(batch->IsClosed());
-            EXPECT_GT(batch->num_tasks(), 0);
-            for (int i = 0; i < batch->num_tasks(); i++) {
-              queue_1_tasks += batch->task(i).size();
-            }
-          };
-      {
-        std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
-        TF_ASSERT_OK(
-            AdaptiveSharedBatchScheduler<FakeTask>::Create({}, &scheduler));
-
-        // Create two queues.
-        std::unique_ptr<BatchScheduler<FakeTask>> queue_0;
-        TF_ASSERT_OK(scheduler->AddQueue({}, queue_0_callback, &queue_0));
-        std::unique_ptr<BatchScheduler<FakeTask>> queue_1;
-        TF_ASSERT_OK(scheduler->AddQueue({}, queue_1_callback, &queue_1));
-
-        if (delete_scheduler_early) {
-          // Delete our copy of the scheduler. The queues should keep it alive
-          // under the covers.
-          scheduler = nullptr;
-        }
-        // Submit tasks to the two queues, and (optionally) remove the queues.
-        TF_ASSERT_OK(ScheduleTask(1, queue_0.get()));
-        TF_ASSERT_OK(ScheduleTask(2, queue_1.get()));
-        TF_ASSERT_OK(ScheduleTask(3, queue_0.get()));
-        TF_ASSERT_OK(ScheduleTask(4, queue_1.get()));
-        if (delete_queue_1_early) {
-          queue_1 = nullptr;
-        }
-        TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
-      }
-      EXPECT_EQ(queue_0_tasks, 9);
-      EXPECT_EQ(queue_1_tasks, 6);
-    }
-  }
-}
-
 TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) {
   using Scheduler = AdaptiveSharedBatchScheduler<FakeTask>;
   std::shared_ptr<Scheduler> scheduler;
@@ -124,24 +71,6 @@ TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) {
   options.num_batch_threads = 0;
   EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
   options = Scheduler::Options();
-  options.min_scheduling_period_micros = 50;
-  options.max_scheduling_period_micros = 100;
-  options.initial_scheduling_period_micros = 1;
-  EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
-  options = Scheduler::Options();
-  options.min_scheduling_period_micros = 50;
-  options.max_scheduling_period_micros = 100;
-  options.initial_scheduling_period_micros = 1000;
-  EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
-  options = Scheduler::Options();
-  options.min_scheduling_period_micros = 100;
-  options.max_scheduling_period_micros = 50;
-  options.initial_scheduling_period_micros = 75;
-  EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
-  options = Scheduler::Options();
-  options.feedback_smoothing_batches = 0;
-  EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
-  options = Scheduler::Options();
   options.initial_in_flight_batches_limit = 0.5;
   EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
   options = Scheduler::Options();
@@ -153,301 +82,8 @@ TEST(AdaptiveSharedBatchSchedulerTest, BadOptions) {
   EXPECT_FALSE(Scheduler::Create(options, &scheduler).ok());
 }
 
-TEST(AdaptiveSharedBatchSchedulerTest, ObeysQueueOptions) {
-  test_util::FakeClockEnv env(Env::Default());
-  Notification start_teardown, stop_teardown;
-  std::unique_ptr<Thread> teardown_thread =
-      CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
-  {
-    AdaptiveSharedBatchScheduler<FakeTask>::Options options;
-    options.initial_scheduling_period_micros = 1000;
-    options.env = &env;
-    std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
-    TF_ASSERT_OK(
-        AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
-    std::unique_ptr<BatchScheduler<FakeTask>> queue_0;
-    std::unique_ptr<BatchScheduler<FakeTask>> queue_1;
-    int queue_0_tasks = 0;
-    int queue_1_tasks = 0;
-    auto queue_0_callback = [&queue_0_tasks,
-                             &env](std::unique_ptr<Batch<FakeTask>> batch) {
-      ASSERT_TRUE(batch->IsClosed());
-      EXPECT_GT(batch->num_tasks(), 0);
-      for (int i = 0; i < batch->num_tasks(); i++) {
-        queue_0_tasks += batch->task(i).size();
-      }
-      env.SleepForMicroseconds(1);
-    };
-    auto queue_1_callback = [&queue_1_tasks,
-                             &env](std::unique_ptr<Batch<FakeTask>> batch) {
-      ASSERT_TRUE(batch->IsClosed());
-      EXPECT_GT(batch->num_tasks(), 0);
-      for (int i = 0; i < batch->num_tasks(); i++) {
-        queue_1_tasks += batch->task(i).size();
-      }
-      env.SleepForMicroseconds(1);
-    };
-    AdaptiveSharedBatchScheduler<FakeTask>::QueueOptions queue_options;
-    queue_options.max_batch_size = 10;
-    queue_options.max_enqueued_batches = 0;
-    // Queue must have max_enqueued_batchs > 1.
-    EXPECT_FALSE(
-        scheduler->AddQueue(queue_options, queue_0_callback, &queue_0).ok());
-    queue_options.max_enqueued_batches = 2;
-    TF_ASSERT_OK(
-        scheduler->AddQueue(queue_options, queue_0_callback, &queue_0));
-    EXPECT_EQ(10, queue_0->max_task_size());
-    queue_options.max_batch_size = 0;
-    // Queue must have max_batch_size > 0.
-    EXPECT_FALSE(
-        scheduler->AddQueue(queue_options, queue_1_callback, &queue_1).ok());
-    queue_options.max_batch_size = 2;
-    queue_options.max_enqueued_batches = 1;
-    TF_ASSERT_OK(
-        scheduler->AddQueue(queue_options, queue_1_callback, &queue_1));
-
-    // Wait for scheduling_thread to sleep.
-    env.BlockUntilThreadsAsleep(1);
-    // Task larger than max_batch_size shouldn't schedule.
-    EXPECT_FALSE(ScheduleTask(15, queue_0.get()).ok());
-    TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
-    TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
-    env.AdvanceByMicroseconds(1);
-
-    // Task larger than max_batch_size shouldn't schedule.
-    EXPECT_FALSE(ScheduleTask(3, queue_1.get()).ok());
-    TF_ASSERT_OK(ScheduleTask(1, queue_1.get()));
-    TF_ASSERT_OK(ScheduleTask(1, queue_1.get()));
-    env.AdvanceByMicroseconds(1);
-    // Exceeds max_enqueued_batches, shouldn't schedule.
-    EXPECT_FALSE(ScheduleTask(1, queue_1.get()).ok());
-
-    TF_ASSERT_OK(ScheduleTask(5, queue_0.get()));
-    // Exceeds max_enqueued_batches, shouldn't schedule.
-    EXPECT_FALSE(ScheduleTask(6, queue_0.get()).ok());
-    TF_ASSERT_OK(ScheduleTask(4, queue_0.get()));
-
-    // Batches should be processed in order from oldest to newest.
-    env.AdvanceByMicroseconds(1000);
-    env.BlockUntilThreadsAsleep(2);
-    EXPECT_EQ(queue_0_tasks, 10);
-    EXPECT_EQ(queue_1_tasks, 0);
-
-    env.AdvanceByMicroseconds(1000);
-    env.BlockUntilThreadsAsleep(2);
-    EXPECT_EQ(queue_0_tasks, 10);
-    EXPECT_EQ(queue_1_tasks, 2);
-
-    env.AdvanceByMicroseconds(1000);
-    env.BlockUntilThreadsAsleep(2);
-    EXPECT_EQ(queue_0_tasks, 19);
-    EXPECT_EQ(queue_1_tasks, 2);
-    start_teardown.Notify();
-  }
-  stop_teardown.Notify();
-}
-
-TEST(AdaptiveSharedBatchSchedulerTest, RateFeedback) {
-  test_util::FakeClockEnv env(Env::Default());
-  Notification start_teardown, stop_teardown;
-  std::unique_ptr<Thread> teardown_thread =
-      CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
-  {
-    double feedback = 0;
-    AdaptiveSharedBatchScheduler<FakeTask>::Options options;
-    options.initial_scheduling_period_micros = 1000;
-    options.min_scheduling_period_micros = 200;
-    options.max_scheduling_period_micros = 2000;
-    options.env = &env;
-    options.scheduling_period_feedback = [&feedback] { return feedback; };
-    options.feedback_smoothing_batches = 1;
-    std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
-    TF_ASSERT_OK(
-        AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
-    std::unique_ptr<BatchScheduler<FakeTask>> queue;
-    int scheduled_items = 0;
-    auto queue_callback = [&scheduled_items,
-                           &env](std::unique_ptr<Batch<FakeTask>> batch) {
-      ASSERT_TRUE(batch->IsClosed());
-      EXPECT_GT(batch->num_tasks(), 0);
-      scheduled_items = 0;
-      for (int i = 0; i < batch->num_tasks(); i++) {
-        scheduled_items += batch->task(i).size();
-      }
-      env.SleepForMicroseconds(1);
-    };
-
-    TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
-
-    // Wait for scheduling_thread to sleep.
-    env.BlockUntilThreadsAsleep(1);
-    // Enqueue 6 batches.
-    for (int i = 0; i < 6; i++) {
-      TF_ASSERT_OK(ScheduleTask(900 + i, queue.get()));
-      env.AdvanceByMicroseconds(1);
-    }
-    feedback = -500;
-    env.AdvanceByMicroseconds(994);
-    env.BlockUntilThreadsAsleep(2);  // scheduling period = 500 usec.
-    EXPECT_EQ(scheduled_items, 900);
-    env.AdvanceByMicroseconds(500);
-    env.BlockUntilThreadsAsleep(2);  // scheduling period = 250 usec.
-    EXPECT_EQ(scheduled_items, 901);
-    feedback = 0;
-    env.AdvanceByMicroseconds(250);
-    env.BlockUntilThreadsAsleep(2);  // scheduling period = 250 usec.
-    EXPECT_EQ(scheduled_items, 902);
-    feedback = 10000;  // large feedback should hit max_scheduling_period.
-    env.AdvanceByMicroseconds(250);
-    env.BlockUntilThreadsAsleep(2);  // scheduling period = 2000 usec.
-    EXPECT_EQ(scheduled_items, 903);
-    feedback = -10000;  // large feedback should hit min_scheduling_period.
-    env.AdvanceByMicroseconds(1999);
-    // No callback scheduled, only scheduling thread sleeping.
-    env.BlockUntilThreadsAsleep(1);
-    EXPECT_EQ(scheduled_items, 903);
-    env.AdvanceByMicroseconds(1);
-    env.BlockUntilThreadsAsleep(2);  // scheduling period = 200 usec.
-    EXPECT_EQ(scheduled_items, 904);
-    env.AdvanceByMicroseconds(200);
-    env.BlockUntilThreadsAsleep(2);
-    EXPECT_EQ(scheduled_items, 905);
-    start_teardown.Notify();
-  }
-  stop_teardown.Notify();
-}
-
-TEST(AdaptiveSharedBatchSchedulerTest, FeedbackSmoothing) {
-  test_util::FakeClockEnv env(Env::Default());
-  Notification start_teardown, stop_teardown;
-  std::unique_ptr<Thread> teardown_thread =
-      CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
-  {
-    double feedback = 0;
-    AdaptiveSharedBatchScheduler<FakeTask>::Options options;
-    options.initial_scheduling_period_micros = 1000;
-    options.env = &env;
-    options.scheduling_period_feedback = [&feedback] { return feedback; };
-    options.feedback_smoothing_batches = 3;
-    std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
-    TF_ASSERT_OK(
-        AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
-    std::unique_ptr<BatchScheduler<FakeTask>> queue;
-    int scheduled_items = 0;
-    auto queue_callback = [&scheduled_items,
-                           &env](std::unique_ptr<Batch<FakeTask>> batch) {
-      ASSERT_TRUE(batch->IsClosed());
-      EXPECT_GT(batch->num_tasks(), 0);
-      scheduled_items = 0;
-      for (int i = 0; i < batch->num_tasks(); i++) {
-        scheduled_items += batch->task(i).size();
-      }
-      env.SleepForMicroseconds(1);
-    };
-
-    TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
-
-    // Wait for scheduling_thread to sleep.
-    env.BlockUntilThreadsAsleep(1);
-    // Enqueue 4 batches.
-    for (int i = 0; i < 4; i++) {
-      TF_ASSERT_OK(ScheduleTask(900 + i, queue.get()));
-      env.AdvanceByMicroseconds(1);
-    }
-    feedback = -300;
-    env.AdvanceByMicroseconds(996);
-    env.BlockUntilThreadsAsleep(2);
-    // ewma_feedback = 100, scheduling_period = 900.
-    EXPECT_EQ(scheduled_items, 900);
-    env.AdvanceByMicroseconds(899);
-    // No callback scheduled, only scheduling thread sleeping.
-    env.BlockUntilThreadsAsleep(1);
-    EXPECT_EQ(scheduled_items, 900);
-    env.AdvanceByMicroseconds(1);
-    env.BlockUntilThreadsAsleep(2);
-    // ewma_feedback = 167, scheduling_period = 750.
-    EXPECT_EQ(scheduled_items, 901);
-    env.AdvanceByMicroseconds(749);
-    // No callback scheduled, only scheduling thread sleeping.
-    env.BlockUntilThreadsAsleep(1);
-    EXPECT_EQ(scheduled_items, 901);
-    feedback = 1000 / 3.;
-    env.AdvanceByMicroseconds(1);
-    env.BlockUntilThreadsAsleep(2);
-    // emwa_feedback = 0, scheduling_period = 750.
-    EXPECT_EQ(scheduled_items, 902);
-    env.AdvanceByMicroseconds(749);
-    // No callback scheduled, only scheduling thread sleeping.
-    env.BlockUntilThreadsAsleep(1);
-    EXPECT_EQ(scheduled_items, 902);
-    env.AdvanceByMicroseconds(1);
-    env.BlockUntilThreadsAsleep(2);
-    EXPECT_EQ(scheduled_items, 903);
-    start_teardown.Notify();
-  }
-  stop_teardown.Notify();
-}
-
-TEST(AdaptiveSharedBatchSchedulerTest, QueueCapacityInfo) {
-  test_util::FakeClockEnv env(Env::Default());
-  Notification start_teardown, stop_teardown;
-  std::unique_ptr<Thread> teardown_thread =
-      CreateFakeClockAdvancerThread(&env, &start_teardown, &stop_teardown);
-  {
-    AdaptiveSharedBatchScheduler<FakeTask>::Options options;
-    options.initial_scheduling_period_micros = 1000;
-    options.env = &env;
-    std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
-    TF_ASSERT_OK(
-        AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
-    std::unique_ptr<BatchScheduler<FakeTask>> queue;
-    int scheduled_items = 0;
-    auto queue_callback = [&scheduled_items,
-                           &env](std::unique_ptr<Batch<FakeTask>> batch) {
-      ASSERT_TRUE(batch->IsClosed());
-      EXPECT_GT(batch->num_tasks(), 0);
-      scheduled_items = 0;
-      for (int i = 0; i < batch->num_tasks(); i++) {
-        scheduled_items += batch->task(i).size();
-      }
-      env.SleepForMicroseconds(1);
-    };
-    AdaptiveSharedBatchScheduler<FakeTask>::QueueOptions queue_options;
-    queue_options.max_batch_size = 10;
-    queue_options.max_enqueued_batches = 10;
-    TF_ASSERT_OK(scheduler->AddQueue(queue_options, queue_callback, &queue));
-
-    // Wait for scheduling_thread to sleep.
-    env.BlockUntilThreadsAsleep(1);
-    // Enqueue 3 tasks.
-    EXPECT_EQ(queue->NumEnqueuedTasks(), 0);
-    EXPECT_EQ(queue->SchedulingCapacity(), 100);
-    TF_ASSERT_OK(ScheduleTask(5, queue.get()));
-    EXPECT_EQ(queue->NumEnqueuedTasks(), 1);
-    EXPECT_EQ(queue->SchedulingCapacity(), 95);
-    env.AdvanceByMicroseconds(1);
-    TF_ASSERT_OK(ScheduleTask(6, queue.get()));
-    EXPECT_EQ(queue->NumEnqueuedTasks(), 2);
-    EXPECT_EQ(queue->SchedulingCapacity(), 84);
-    env.AdvanceByMicroseconds(1);
-    TF_ASSERT_OK(ScheduleTask(1, queue.get()));
-    EXPECT_EQ(queue->NumEnqueuedTasks(), 3);
-    EXPECT_EQ(queue->SchedulingCapacity(), 83);
-
-    env.AdvanceByMicroseconds(998);
-    env.BlockUntilThreadsAsleep(2);
-    EXPECT_EQ(scheduled_items, 5);
-    env.AdvanceByMicroseconds(1000);
-    env.BlockUntilThreadsAsleep(2);
-    EXPECT_EQ(scheduled_items, 7);
-    start_teardown.Notify();
-  }
-  stop_teardown.Notify();
-}
-
-TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesImplementation) {
+TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimit) {
   AdaptiveSharedBatchScheduler<FakeTask>::Options options;
-  options.use_in_flight_batches_implementation = true;
   options.initial_in_flight_batches_limit = 2;
   options.batches_to_average_over = 1000;
   mutex mu;
@@ -476,7 +112,7 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesImplementation) {
   std::unique_ptr<BatchScheduler<FakeTask>> queue;
   TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
 
-  // Enqueue 3 batches.
+  // Enqueue 3 tasks, should result in 3 batches.
   for (int i = 0; i < 3; i++) {
     TF_ASSERT_OK(ScheduleTask(100, queue.get()));
   }
@@ -490,7 +126,6 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimitTuning) {
   {
     AdaptiveSharedBatchScheduler<FakeTask>::Options options;
     options.env = &env;
-    options.use_in_flight_batches_implementation = true;
     options.initial_in_flight_batches_limit = 2;
     options.batches_to_average_over = 1;
     auto queue_callback = [&env](std::unique_ptr<Batch<FakeTask>> batch) {
@@ -544,6 +179,125 @@ TEST(AdaptiveSharedBatchSchedulerTest, InFlightBatchesLimitTuning) {
   }
   stop_teardown.Notify();
 }
+
+TEST(AdaptiveSharedBatchSchedulerTest, DeleteQueue) {
+  AdaptiveSharedBatchScheduler<FakeTask>::Options options;
+  options.initial_in_flight_batches_limit = 1;
+  options.batches_to_average_over = 1000;
+  mutex mu;
+  int processed_batches = 0;
+  Notification finish_processing;
+  auto queue_callback = [&mu, &processed_batches, &finish_processing](
+                            std::unique_ptr<Batch<FakeTask>> batch) {
+    ASSERT_TRUE(batch->IsClosed());
+    EXPECT_GT(batch->num_tasks(), 0);
+    finish_processing.WaitForNotification();
+    mu.lock();
+    processed_batches++;
+    mu.unlock();
+  };
+
+  std::unique_ptr<Thread> queue_deleter;
+  std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+  TF_ASSERT_OK(
+      AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+  std::unique_ptr<BatchScheduler<FakeTask>> queue;
+  TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+
+  // Enqueue 2 tasks, should result in 2 batches.
+  for (int i = 0; i < 2; i++) {
+    TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+  }
+  // Delete queue, should be kept alive until empty.
+  queue_deleter.reset(Env::Default()->StartThread(
+      {}, "QueueDeleterThread", [&queue, &mu, &processed_batches] {
+        queue.reset();
+        mutex_lock l(mu);
+        EXPECT_EQ(processed_batches, 2);
+      }));
+  // Give queue_deleter thread time to delete queue.
+  Env::Default()->SleepForMicroseconds(1000);
+  finish_processing.Notify();
+}
+
+TEST(AdaptiveSharedBatchSchedulerTest, DeleteScheduler) {
+  AdaptiveSharedBatchScheduler<FakeTask>::Options options;
+  options.initial_in_flight_batches_limit = 1;
+  options.batches_to_average_over = 1000;
+  mutex mu;
+  int processed_batches = 0;
+  Notification finish_processing;
+  auto queue_callback = [&mu, &processed_batches, &finish_processing](
+                            std::unique_ptr<Batch<FakeTask>> batch) {
+    ASSERT_TRUE(batch->IsClosed());
+    EXPECT_GT(batch->num_tasks(), 0);
+    finish_processing.WaitForNotification();
+    mu.lock();
+    processed_batches++;
+    mu.unlock();
+  };
+
+  std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+  TF_ASSERT_OK(
+      AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+  std::unique_ptr<BatchScheduler<FakeTask>> queue;
+  TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+
+  // Enqueue 2 tasks, should result in 2 batches.
+  for (int i = 0; i < 2; i++) {
+    TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+  }
+  // Delete scheduler, should be kept alive until queues are empty.
+  scheduler.reset();
+  finish_processing.Notify();
+  while (true) {
+    mutex_lock l(mu);
+    if (processed_batches == 2) break;
+  }
+}
+
+TEST(AdaptiveSharedBatchSchedulerTest, QueueCapacityInfo) {
+  AdaptiveSharedBatchScheduler<FakeTask>::Options options;
+  options.initial_in_flight_batches_limit = 1;
+  options.batches_to_average_over = 1000;
+  mutex mu;
+  int processed_batches = 0;
+  Notification finish_processing;
+  auto queue_callback = [&mu, &processed_batches, &finish_processing](
+                            std::unique_ptr<Batch<FakeTask>> batch) {
+    ASSERT_TRUE(batch->IsClosed());
+    EXPECT_GT(batch->num_tasks(), 0);
+    mu.lock();
+    int batch_num = ++processed_batches;
+    mu.unlock();
+    if (batch_num == 1) {
+      finish_processing.WaitForNotification();
+    }
+  };
+  std::shared_ptr<AdaptiveSharedBatchScheduler<FakeTask>> scheduler;
+  TF_ASSERT_OK(
+      AdaptiveSharedBatchScheduler<FakeTask>::Create(options, &scheduler));
+  std::unique_ptr<BatchScheduler<FakeTask>> queue;
+  TF_ASSERT_OK(scheduler->AddQueue({}, queue_callback, &queue));
+
+  // Enqueue 2 tasks, should result in 2 batches.
+  for (int i = 0; i < 2; i++) {
+    TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+  }
+  // First batch was immediately processed, no longer counts as enqueued.
+  EXPECT_EQ(queue->NumEnqueuedTasks(), 1);
+  EXPECT_EQ(queue->SchedulingCapacity(), 9 * 1000 + 900);
+  // Enqueue 2 more tasks, should fall in same batch.
+  TF_ASSERT_OK(ScheduleTask(100, queue.get()));
+  TF_ASSERT_OK(ScheduleTask(200, queue.get()));
+  EXPECT_EQ(queue->NumEnqueuedTasks(), 3);
+  EXPECT_EQ(queue->SchedulingCapacity(), 9 * 1000 + 600);
+  // Enqueue 1 more task, should create new batch.
+  TF_ASSERT_OK(ScheduleTask(700, queue.get()));
+  EXPECT_EQ(queue->NumEnqueuedTasks(), 4);
+  EXPECT_EQ(queue->SchedulingCapacity(), 8 * 1000 + 300);
+  finish_processing.Notify();
+}
 }  // namespace anonymous
 }  // namespace serving
 }  // namespace tensorflow