rebaseline_server: periodically log tasks_queue size
authorepoger <epoger@google.com>
Tue, 12 Aug 2014 15:30:09 +0000 (08:30 -0700)
committerCommit bot <commit-bot@chromium.org>
Tue, 12 Aug 2014 15:30:10 +0000 (08:30 -0700)
Without this, it's hard to tell whether the server is stuck (or why it's taking so long).

NOTRY=true
R=stephana@google.com, rmistry@google.com

Author: epoger@google.com

Review URL: https://codereview.chromium.org/454953002

gm/rebaseline_server/compare_rendered_pictures.py
gm/rebaseline_server/imagediffdb.py

index 5b5a129..85dff27 100755 (executable)
@@ -215,6 +215,9 @@ class RenderedPicturesComparisons(results.BaseComparisons):
             results.KEY__RESULT_TYPE__NOCOMPARISON,
         ])
 
+    logging.info('Starting to add imagepairs to queue.')
+    self._image_diff_db.log_queue_size_if_changed(limit_verbosity=False)
+
     union_dict_paths = sorted(set(setA_dicts.keys() + setB_dicts.keys()))
     num_union_dict_paths = len(union_dict_paths)
     dict_num = 0
@@ -275,6 +278,9 @@ class RenderedPicturesComparisons(results.BaseComparisons):
             if result_type != results.KEY__RESULT_TYPE__SUCCEEDED:
               failing_image_pairs.add_image_pair(one_imagepair)
 
+    logging.info('Finished adding imagepairs to queue.')
+    self._image_diff_db.log_queue_size_if_changed(limit_verbosity=False)
+
     if self._prefetch_only:
       return None
     else:
index 1f93940..167134a 100644 (file)
@@ -55,6 +55,9 @@ KEY__DIFFERENCES__PERCEPTUAL_DIFF = 'perceptualDifference'
 _DIFFRECORD_FAILED = 'failed'
 _DIFFRECORD_PENDING = 'pending'
 
+# How often to report tasks_queue size
+QUEUE_LOGGING_GRANULARITY = 1000
+
 # Temporary variable to keep track of how many times we download
 # the same file in multiple threads.
 # TODO(epoger): Delete this, once we see that the number stays close to 0.
@@ -240,6 +243,10 @@ class ImageDiffDB(object):
     self._storage_root = storage_root
     self._gs = gs
 
+    # Mechanism for reporting queue size periodically.
+    self._last_queue_size_reported = None
+    self._queue_size_report_lock = threading.RLock()
+
     # Dictionary of DiffRecords, keyed by (expected_image_locator,
     # actual_image_locator) tuples.
     # Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED.
@@ -270,6 +277,29 @@ class ImageDiffDB(object):
       worker.start()
       self._workers.append(worker)
 
+  def log_queue_size_if_changed(self, limit_verbosity=True):
+    """Log the size of self._tasks_queue, if it has changed since the last call.
+
+    Reports the current queue size, using log.info(), unless the queue is the
+    same size as the last time we reported it.
+
+    Args:
+      limit_verbosity: if True, only log if the queue size is a multiple of
+          QUEUE_LOGGING_GRANULARITY
+    """
+    # Acquire the lock, to synchronize access to self._last_queue_size_reported
+    self._queue_size_report_lock.acquire()
+    try:
+      size = self._tasks_queue.qsize()
+      if size == self._last_queue_size_reported:
+        return
+      if limit_verbosity and (size % QUEUE_LOGGING_GRANULARITY != 0):
+        return
+      logging.info('tasks_queue size is %d' % size)
+      self._last_queue_size_reported = size
+    finally:
+      self._queue_size_report_lock.release()
+
   def worker(self, worker_num):
     """Launch a worker thread that pulls tasks off self._tasks_queue.
 
@@ -277,6 +307,7 @@ class ImageDiffDB(object):
       worker_num: (integer) which worker this is
     """
     while True:
+      self.log_queue_size_if_changed()
       params = self._tasks_queue.get()
       key, expected_image_url, actual_image_url = params
       try:
@@ -343,6 +374,7 @@ class ImageDiffDB(object):
 
     if must_add_to_queue:
       self._tasks_queue.put((key, expected_image_url, actual_image_url))
+      self.log_queue_size_if_changed()
 
   def get_diff_record(self, expected_image_locator, actual_image_locator):
     """Returns the DiffRecord for this image pair.