results.KEY__RESULT_TYPE__NOCOMPARISON,
])
+ logging.info('Starting to add imagepairs to queue.')
+ self._image_diff_db.log_queue_size_if_changed(limit_verbosity=False)
+
union_dict_paths = sorted(set(setA_dicts.keys() + setB_dicts.keys()))
num_union_dict_paths = len(union_dict_paths)
dict_num = 0
if result_type != results.KEY__RESULT_TYPE__SUCCEEDED:
failing_image_pairs.add_image_pair(one_imagepair)
+ logging.info('Finished adding imagepairs to queue.')
+ self._image_diff_db.log_queue_size_if_changed(limit_verbosity=False)
+
if self._prefetch_only:
return None
else:
_DIFFRECORD_FAILED = 'failed'
_DIFFRECORD_PENDING = 'pending'
+# How often to report tasks_queue size
+QUEUE_LOGGING_GRANULARITY = 1000
+
# Temporary variable to keep track of how many times we download
# the same file in multiple threads.
# TODO(epoger): Delete this, once we see that the number stays close to 0.
self._storage_root = storage_root
self._gs = gs
+ # Mechanism for reporting queue size periodically.
+ self._last_queue_size_reported = None
+ self._queue_size_report_lock = threading.RLock()
+
# Dictionary of DiffRecords, keyed by (expected_image_locator,
# actual_image_locator) tuples.
# Values can also be _DIFFRECORD_PENDING, _DIFFRECORD_FAILED.
worker.start()
self._workers.append(worker)
+ def log_queue_size_if_changed(self, limit_verbosity=True):
+ """Log the size of self._tasks_queue, if it has changed since the last call.
+
+ Reports the current queue size, using log.info(), unless the queue is the
+ same size as the last time we reported it.
+
+ Args:
+ limit_verbosity: if True, only log if the queue size is a multiple of
+ QUEUE_LOGGING_GRANULARITY
+ """
+ # Acquire the lock, to synchronize access to self._last_queue_size_reported
+ self._queue_size_report_lock.acquire()
+ try:
+ size = self._tasks_queue.qsize()
+ if size == self._last_queue_size_reported:
+ return
+ if limit_verbosity and (size % QUEUE_LOGGING_GRANULARITY != 0):
+ return
+ logging.info('tasks_queue size is %d' % size)
+ self._last_queue_size_reported = size
+ finally:
+ self._queue_size_report_lock.release()
+
def worker(self, worker_num):
"""Launch a worker thread that pulls tasks off self._tasks_queue.
worker_num: (integer) which worker this is
"""
while True:
+ self.log_queue_size_if_changed()
params = self._tasks_queue.get()
key, expected_image_url, actual_image_url = params
try:
if must_add_to_queue:
self._tasks_queue.put((key, expected_image_url, actual_image_url))
+ self.log_queue_size_if_changed()
def get_diff_record(self, expected_image_locator, actual_image_locator):
"""Returns the DiffRecord for this image pair.