import contextlib
import csv
+import errno
import logging
+import Queue
import os
import re
import shutil
import sys
import tempfile
+import time
+import threading
import urllib
try:
from PIL import Image, ImageChops
DEFAULT_IMAGE_SUFFIX = '.png'
DEFAULT_IMAGES_SUBDIR = 'images'
+DEFAULT_NUM_WORKERS = 8
DISALLOWED_FILEPATH_CHAR_REGEX = re.compile('[^\w\-]')
KEY__DIFFERENCE_DATA__PERCEPTUAL_DIFF = 'perceptualDifference'
KEY__DIFFERENCE_DATA__WEIGHTED_DIFF = 'weightedDiffMeasure'
+# Special values within ImageDiffDB._diff_dict
+DIFFRECORD_FAILED = 'failed'
+DIFFRECORD_PENDING = 'pending'
+
+# TODO(epoger): Temporary(?) list to keep track of how many times we download
+# the same file in multiple threads.
+global_file_collisions = 0
+
class DiffRecord(object):
""" Record of differences between two images. """
"""Download this pair of images (unless we already have them on local disk),
and prepare a DiffRecord for them.
- TODO(epoger): Make this asynchronously download images, rather than blocking
- until the images have been downloaded and processed.
-
Args:
storage_root: root directory on local disk within which we store all
images
""" Calculates differences between image pairs, maintaining a database of
them for download."""
- def __init__(self, storage_root):
+ def __init__(self, storage_root, num_workers=DEFAULT_NUM_WORKERS):
"""
Args:
storage_root: string; root path within the DB will store all of its stuff
+ num_workers: integer; number of worker threads to spawn
"""
self._storage_root = storage_root
# Dictionary of DiffRecords, keyed by (expected_image_locator,
# actual_image_locator) tuples.
+ # Values can also be DIFFRECORD_PENDING, DIFFRECORD_FAILED.
self._diff_dict = {}
+ # Set up the queue for asynchronously loading DiffRecords, and start the
+ # worker threads reading from it.
+ self._tasks_queue = Queue.Queue(maxsize=2*num_workers)
+ self._workers = []
+ for i in range(num_workers):
+ worker = threading.Thread(target=self.worker, args=(i,))
+ worker.daemon = True
+ worker.start()
+ self._workers.append(worker)
+
+ def worker(self, worker_num):
+ """Launch a worker thread that pulls tasks off self._tasks_queue.
+
+ Args:
+ worker_num: (integer) which worker this is
+ """
+ while True:
+ params = self._tasks_queue.get()
+ key, expected_image_url, actual_image_url = params
+ try:
+ diff_record = DiffRecord(
+ self._storage_root,
+ expected_image_url=expected_image_url,
+ expected_image_locator=key[0],
+ actual_image_url=actual_image_url,
+ actual_image_locator=key[1])
+ except Exception:
+ logging.exception(
+ 'exception while creating DiffRecord for key %s' % str(key))
+ diff_record = DIFFRECORD_FAILED
+ self._diff_dict[key] = diff_record
+
def add_image_pair(self,
expected_image_url, expected_image_locator,
actual_image_url, actual_image_locator):
"""Download this pair of images (unless we already have them on local disk),
and prepare a DiffRecord for them.
- TODO(epoger): Make this asynchronously download images, rather than blocking
- until the images have been downloaded and processed.
- When we do that, we should probably add a new method that will block
- until all of the images have been downloaded and processed. Otherwise,
- we won't know when it's safe to start calling get_diff_record().
- jcgregorio notes: maybe just make ImageDiffDB thread-safe and create a
- thread-pool/worker queue at a higher level that just uses ImageDiffDB?
+ This method will block until the images are downloaded and DiffRecord is
+ available by calling get_diff_record().
Args:
expected_image_url: file or HTTP url from which we will download the
actual_image_locator: a unique ID string under which we will store the
actual image within storage_root (probably including a checksum to
guarantee uniqueness)
+
+ Raises:
+ Exception if we are unable to create a DiffRecord for this image pair.
"""
- expected_image_locator = _sanitize_locator(expected_image_locator)
- actual_image_locator = _sanitize_locator(actual_image_locator)
- key = (expected_image_locator, actual_image_locator)
+ key = _generate_key(expected_image_locator, actual_image_locator)
if not key in self._diff_dict:
try:
new_diff_record = DiffRecord(
new_diff_record = None
self._diff_dict[key] = new_diff_record
+ def add_image_pair_async(self,
+ expected_image_url, expected_image_locator,
+ actual_image_url, actual_image_locator):
+ """Download this pair of images (unless we already have them on local disk),
+ and prepare a DiffRecord for them.
+
+ This method will return quickly; calls to get_diff_record() will block
+ until the DiffRecord is available (or we have given up on creating it).
+
+ Args:
+ expected_image_url: file or HTTP url from which we will download the
+ expected image
+ expected_image_locator: a unique ID string under which we will store the
+ expected image within storage_root (probably including a checksum to
+ guarantee uniqueness)
+ actual_image_url: file or HTTP url from which we will download the
+ actual image
+ actual_image_locator: a unique ID string under which we will store the
+ actual image within storage_root (probably including a checksum to
+ guarantee uniqueness)
+ """
+ key = _generate_key(expected_image_locator, actual_image_locator)
+ if not key in self._diff_dict:
+ # If we have already requested a diff between these two images,
+ # we don't need to request it again.
+ #
+ # Threading note: If multiple threads called into this method with the
+ # same key at the same time, there will be multiple tasks on the queue
+ # with the same key. But that's OK; they will both complete successfully,
+ # and just waste a little time in the process. Nothing will break.
+ self._diff_dict[key] = DIFFRECORD_PENDING
+ self._tasks_queue.put((key, expected_image_url, actual_image_url))
+
def get_diff_record(self, expected_image_locator, actual_image_locator):
"""Returns the DiffRecord for this image pair.
- Raises a KeyError if we don't have a DiffRecord for this image pair.
+ Args:
+ expected_image_locator: a unique ID string under which we will store the
+ expected image within storage_root (probably including a checksum to
+ guarantee uniqueness)
+ actual_image_locator: a unique ID string under which we will store the
+ actual image within storage_root (probably including a checksum to
+ guarantee uniqueness)
+
+ Returns the DiffRecord for this image pair, or None if we were unable to
+ generate one.
"""
- key = (_sanitize_locator(expected_image_locator),
- _sanitize_locator(actual_image_locator))
- return self._diff_dict[key]
+ key = _generate_key(expected_image_locator, actual_image_locator)
+ diff_record = self._diff_dict[key]
+
+ # If we have no results yet, block until we do.
+ while diff_record == DIFFRECORD_PENDING:
+ time.sleep(1)
+ diff_record = self._diff_dict[key]
+
+ # Once we have the result...
+ if diff_record == DIFFRECORD_FAILED:
+ logging.error(
+ 'failed to create a DiffRecord for expected_image_locator=%s , '
+ 'actual_image_locator=%s' % (
+ expected_image_locator, actual_image_locator))
+ return None
+ else:
+ return diff_record
# Utility functions
Returns: a PIL image object
"""
+ global global_file_collisions
if not os.path.exists(local_filepath):
_mkdir_unless_exists(os.path.dirname(local_filepath))
with contextlib.closing(urllib.urlopen(url)) as url_handle:
- with open(local_filepath, 'wb') as file_handle:
+
+ # First download the file contents into a unique filename, and
+ # then rename that file. That way, if multiple threads are downloading
+ # the same filename at the same time, they won't interfere with each
+ # other (they will both download the file, and one will "win" in the end)
+ temp_filename = '%s-%d' % (local_filepath,
+ threading.current_thread().ident)
+ with open(temp_filename, 'wb') as file_handle:
shutil.copyfileobj(fsrc=url_handle, fdst=file_handle)
+
+ # Keep count of how many colliding downloads we encounter;
+ # if it's a large number, we may want to change our download strategy
+ # to minimize repeated downloads.
+ if os.path.exists(local_filepath):
+ global_file_collisions += 1
+ else:
+ os.rename(temp_filename, local_filepath)
+
return _open_image(local_filepath)
Args:
path: path on local disk
"""
- if not os.path.isdir(path):
+ try:
os.makedirs(path)
+ except OSError as e:
+ if e.errno == errno.EEXIST:
+ pass
def _sanitize_locator(locator):
return DISALLOWED_FILEPATH_CHAR_REGEX.sub('_', str(locator))
+def _generate_key(expected_image_locator, actual_image_locator):
+ """Returns a key suitable for looking up this image pair.
+
+ Args:
+ expected_image_locator: a unique ID string under which we will store the
+ expected image within storage_root (probably including a checksum to
+ guarantee uniqueness)
+ actual_image_locator: a unique ID string under which we will store the
+ actual image within storage_root (probably including a checksum to
+ guarantee uniqueness)
+ """
+ return (_sanitize_locator(expected_image_locator),
+ _sanitize_locator(actual_image_locator))
+
+
def _get_difference_locator(expected_image_locator, actual_image_locator):
"""Returns the locator string used to look up the diffs between expected_image
and actual_image.
self.extra_columns_dict = extra_columns
if not imageA_relative_url or not imageB_relative_url:
self._is_different = True
- self.diff_record = None
+ self._diff_record = None
+ self._diff_record_set = True
elif imageA_relative_url == imageB_relative_url:
self._is_different = False
- self.diff_record = None
+ self._diff_record = None
+ self._diff_record_set = True
else:
- # TODO(epoger): Rather than blocking until image_diff_db can read in
- # the image pair and generate diffs, it would be better to do it
- # asynchronously: tell image_diff_db to download a bunch of file pairs,
- # and only block later if we're still waiting for diff_records to come
- # back.
- self._is_different = True
- image_diff_db.add_image_pair(
+ # Tell image_diff_db to add this ImagePair.
+ # It will do so in a separate thread so as not to block this one;
+ # when you call self.get_diff_record(), it will block until the results
+ # are ready.
+ image_diff_db.add_image_pair_async(
expected_image_locator=imageA_relative_url,
expected_image_url=posixpath.join(base_url, imageA_relative_url),
actual_image_locator=imageB_relative_url,
actual_image_url=posixpath.join(base_url, imageB_relative_url))
- self.diff_record = image_diff_db.get_diff_record(
- expected_image_locator=imageA_relative_url,
- actual_image_locator=imageB_relative_url)
- if self.diff_record and self.diff_record.get_num_pixels_differing() == 0:
+ self._image_diff_db = image_diff_db
+ self._diff_record_set = False
+
+ def get_diff_record(self):
+ """Returns the DiffRecord associated with this ImagePair.
+
+ Returns None if the images are identical, or one is missing.
+ This method will block until the DiffRecord is available.
+ """
+ if not self._diff_record_set:
+ self._diff_record = self._image_diff_db.get_diff_record(
+ expected_image_locator=self.imageA_relative_url,
+ actual_image_locator=self.imageB_relative_url)
+ self._image_diff_db = None # release reference, no longer needed
+ if (self._diff_record and
+ self._diff_record.get_num_pixels_differing() == 0):
self._is_different = False
+ else:
+ self._is_different = True
+ self._diff_record_set = True
+ return self._diff_record
def as_dict(self):
"""Returns a dictionary describing this ImagePair.
KEY__IMAGE_A_URL: self.imageA_relative_url,
KEY__IMAGE_B_URL: self.imageB_relative_url,
}
- asdict[KEY__IS_DIFFERENT] = self._is_different
if self.expectations_dict:
asdict[KEY__EXPECTATIONS_DATA] = self.expectations_dict
if self.extra_columns_dict:
asdict[KEY__EXTRA_COLUMN_VALUES] = self.extra_columns_dict
- if self.diff_record and (self.diff_record.get_num_pixels_differing() > 0):
- asdict[KEY__DIFFERENCE_DATA] = self.diff_record.as_dict()
+ diff_record = self.get_diff_record()
+ if diff_record and (diff_record.get_num_pixels_differing() > 0):
+ asdict[KEY__DIFFERENCE_DATA] = diff_record.as_dict()
+ asdict[KEY__IS_DIFFERENT] = self._is_different
return asdict