elif self.blender_type == "multiband":
self.blender = cv.detail_MultiBandBlender()
- self.blender.setNumBands((np.log(blend_width) /
- np.log(2.) - 1.).astype(np.int))
+ self.blender.setNumBands(int((np.log(blend_width) /
+ np.log(2.) - 1.)))
elif self.blender_type == "feather":
self.blender = cv.detail_FeatherBlender()
result_mask = None
result, result_mask = self.blender.blend(result, result_mask)
result = cv.convertScaleAbs(result)
- return result
+ return result, result_mask
+
+ @classmethod
+ def create_panorama(cls, imgs, masks, corners, sizes):
+ blender = cls("no")
+ blender.prepare(corners, sizes)
+ for img, mask, corner in zip(imgs, masks, corners):
+ blender.feed(img, mask, corner)
+ return blender.blend()
--- /dev/null
+from collections import namedtuple
+import cv2 as cv
+
+from .blender import Blender
+from .stitching_error import StitchingError
+
+
+class Rectangle(namedtuple('Rectangle', 'x y width height')):
+ __slots__ = ()
+
+ @property
+ def area(self):
+ return self.width * self.height
+
+ @property
+ def corner(self):
+ return (self.x, self.y)
+
+ @property
+ def size(self):
+ return (self.width, self.height)
+
+ @property
+ def x2(self):
+ return self.x + self.width
+
+ @property
+ def y2(self):
+ return self.y + self.height
+
+ def times(self, x):
+ return Rectangle(*(int(round(i*x)) for i in self))
+
+ def draw_on(self, img, color=(0, 0, 255), size=1):
+ if len(img.shape) == 2:
+ img = cv.cvtColor(img, cv.COLOR_GRAY2RGB)
+ start_point = (self.x, self.y)
+ end_point = (self.x2-1, self.y2-1)
+ cv.rectangle(img, start_point, end_point, color, size)
+ return img
+
+
+class Cropper:
+
+ DEFAULT_CROP = False
+
+ def __init__(self, crop=DEFAULT_CROP):
+ self.do_crop = crop
+ self.overlapping_rectangles = []
+ self.cropping_rectangles = []
+
+ def prepare(self, imgs, masks, corners, sizes):
+ if self.do_crop:
+ mask = self.estimate_panorama_mask(imgs, masks, corners, sizes)
+ self.compile_numba_functionality()
+ lir = self.estimate_largest_interior_rectangle(mask)
+ corners = self.get_zero_center_corners(corners)
+ rectangles = self.get_rectangles(corners, sizes)
+ self.overlapping_rectangles = self.get_overlaps(
+ rectangles, lir)
+ self.intersection_rectangles = self.get_intersections(
+ rectangles, self.overlapping_rectangles)
+
+ def crop_images(self, imgs, aspect=1):
+ for idx, img in enumerate(imgs):
+ yield self.crop_img(img, idx, aspect)
+
+ def crop_img(self, img, idx, aspect=1):
+ if self.do_crop:
+ intersection_rect = self.intersection_rectangles[idx]
+ scaled_intersection_rect = intersection_rect.times(aspect)
+ cropped_img = self.crop_rectangle(img, scaled_intersection_rect)
+ return cropped_img
+ return img
+
+ def crop_rois(self, corners, sizes, aspect=1):
+ if self.do_crop:
+ scaled_overlaps = \
+ [r.times(aspect) for r in self.overlapping_rectangles]
+ cropped_corners = [r.corner for r in scaled_overlaps]
+ cropped_corners = self.get_zero_center_corners(cropped_corners)
+ cropped_sizes = [r.size for r in scaled_overlaps]
+ return cropped_corners, cropped_sizes
+ return corners, sizes
+
+ @staticmethod
+ def estimate_panorama_mask(imgs, masks, corners, sizes):
+ _, mask = Blender.create_panorama(imgs, masks, corners, sizes)
+ return mask
+
+ def compile_numba_functionality(self):
+ # numba functionality is only imported if cropping
+ # is explicitely desired
+ try:
+ import numba
+ except ModuleNotFoundError:
+ raise StitchingError("Numba is needed for cropping but not installed")
+ from .largest_interior_rectangle import largest_interior_rectangle
+ self.largest_interior_rectangle = largest_interior_rectangle
+
+ def estimate_largest_interior_rectangle(self, mask):
+ lir = self.largest_interior_rectangle(mask)
+ lir = Rectangle(*lir)
+ return lir
+
+ @staticmethod
+ def get_zero_center_corners(corners):
+ min_corner_x = min([corner[0] for corner in corners])
+ min_corner_y = min([corner[1] for corner in corners])
+ return [(x - min_corner_x, y - min_corner_y) for x, y in corners]
+
+ @staticmethod
+ def get_rectangles(corners, sizes):
+ rectangles = []
+ for corner, size in zip(corners, sizes):
+ rectangle = Rectangle(*corner, *size)
+ rectangles.append(rectangle)
+ return rectangles
+
+ @staticmethod
+ def get_overlaps(rectangles, lir):
+ return [Cropper.get_overlap(r, lir) for r in rectangles]
+
+ @staticmethod
+ def get_overlap(rectangle1, rectangle2):
+ x1 = max(rectangle1.x, rectangle2.x)
+ y1 = max(rectangle1.y, rectangle2.y)
+ x2 = min(rectangle1.x2, rectangle2.x2)
+ y2 = min(rectangle1.y2, rectangle2.y2)
+ if x2 < x1 or y2 < y1:
+ raise StitchingError("Rectangles do not overlap!")
+ return Rectangle(x1, y1, x2-x1, y2-y1)
+
+ @staticmethod
+ def get_intersections(rectangles, overlapping_rectangles):
+ return [Cropper.get_intersection(r, overlap_r) for r, overlap_r
+ in zip(rectangles, overlapping_rectangles)]
+
+ @staticmethod
+ def get_intersection(rectangle, overlapping_rectangle):
+ x = abs(overlapping_rectangle.x - rectangle.x)
+ y = abs(overlapping_rectangle.y - rectangle.y)
+ width = overlapping_rectangle.width
+ height = overlapping_rectangle.height
+ return Rectangle(x, y, width, height)
+
+ @staticmethod
+ def crop_rectangle(img, rectangle):
+ return img[rectangle.y:rectangle.y2, rectangle.x:rectangle.x2]
self.matcher = cv.detail_AffineBestOf2NearestMatcher(**kwargs)
elif range_width == -1:
"""https://docs.opencv.org/4.x/d4/d26/classcv_1_1detail_1_1BestOf2NearestMatcher.html""" # noqa
- self.matcher = cv.detail.BestOf2NearestMatcher_create(**kwargs)
+ self.matcher = cv.detail_BestOf2NearestMatcher(**kwargs)
else:
"""https://docs.opencv.org/4.x/d8/d72/classcv_1_1detail_1_1BestOf2NearestRangeMatcher.html""" # noqa
- self.matcher = cv.detail.BestOf2NearestRangeMatcher_create(
+ self.matcher = cv.detail_BestOf2NearestRangeMatcher(
range_width, **kwargs
)
import cv2 as cv
-from .megapix_downscaler import MegapixDownscaler
+from .megapix_scaler import MegapixDownscaler
from .stitching_error import StitchingError
class ImageHandler:
def resize_to_low_resolution(self, medium_imgs=None):
if medium_imgs and self.scales_set:
- return self.resize_medium_to_low(medium_imgs)
+ return self.resize_imgs_by_scaler(medium_imgs, self.low_scaler)
return self.read_and_resize_imgs(self.low_scaler)
def resize_to_final_resolution(self):
for img, size in self.input_images():
yield self.resize_img_by_scaler(scaler, size, img)
- def resize_medium_to_low(self, medium_imgs):
+ def resize_imgs_by_scaler(self, medium_imgs, scaler):
for img, size in zip(medium_imgs, self.img_sizes):
- yield self.resize_img_by_scaler(self.low_scaler, size, img)
+ yield self.resize_img_by_scaler(scaler, size, img)
@staticmethod
def resize_img_by_scaler(scaler, size, img):
def get_final_to_low_ratio(self):
return self.low_scaler.scale / self.final_scaler.scale
+
+ def get_low_to_final_ratio(self):
+ return self.final_scaler.scale / self.low_scaler.scale
+
+ def get_final_img_sizes(self):
+ return [self.final_scaler.get_scaled_img_size(sz)
+ for sz in self.img_sizes]
+
+ def get_low_img_sizes(self):
+ return [self.low_scaler.get_scaled_img_size(sz)
+ for sz in self.img_sizes]
--- /dev/null
+import numpy as np
+import numba as nb
+import cv2 as cv
+
+from .stitching_error import StitchingError
+
+
+def largest_interior_rectangle(cells):
+ outline = get_outline(cells)
+ adjacencies = adjacencies_all_directions(cells)
+ s_map, _, saddle_candidates_map = create_maps(outline, adjacencies)
+ lir1 = biggest_span_in_span_map(s_map)
+
+ candidate_cells = cells_of_interest(saddle_candidates_map)
+ s_map = span_map(adjacencies[0], adjacencies[2], candidate_cells)
+ lir2 = biggest_span_in_span_map(s_map)
+
+ lir = biggest_rectangle(lir1, lir2)
+ return lir
+
+
+def get_outline(cells):
+ contours, hierarchy = \
+ cv.findContours(cells, cv.RETR_TREE, cv.CHAIN_APPROX_NONE)
+ # TODO support multiple contours
+ # test that only one regular contour exists
+ if not hierarchy.shape == (1, 1, 4) or not np.all(hierarchy == -1):
+ raise StitchingError("Invalid Contour. Try without cropping.")
+ contour = contours[0][:, 0, :]
+ x_values = contour[:, 0].astype("uint32", order="C")
+ y_values = contour[:, 1].astype("uint32", order="C")
+ return x_values, y_values
+
+
+@nb.njit('uint32[:,::1](uint8[:,::1], boolean)', parallel=True, cache=True)
+def horizontal_adjacency(cells, direction):
+ result = np.zeros(cells.shape, dtype=np.uint32)
+ for y in nb.prange(cells.shape[0]):
+ span = 0
+ if direction:
+ iterator = range(cells.shape[1]-1, -1, -1)
+ else:
+ iterator = range(cells.shape[1])
+ for x in iterator:
+ if cells[y, x] > 0:
+ span += 1
+ else:
+ span = 0
+ result[y, x] = span
+ return result
+
+
+@nb.njit('uint32[:,::1](uint8[:,::1], boolean)', parallel=True, cache=True)
+def vertical_adjacency(cells, direction):
+ result = np.zeros(cells.shape, dtype=np.uint32)
+ for x in nb.prange(cells.shape[1]):
+ span = 0
+ if direction:
+ iterator = range(cells.shape[0]-1, -1, -1)
+ else:
+ iterator = range(cells.shape[0])
+ for y in iterator:
+ if cells[y, x] > 0:
+ span += 1
+ else:
+ span = 0
+ result[y, x] = span
+ return result
+
+
+@nb.njit(cache=True)
+def adjacencies_all_directions(cells):
+ h_left2right = horizontal_adjacency(cells, 1)
+ h_right2left = horizontal_adjacency(cells, 0)
+ v_top2bottom = vertical_adjacency(cells, 1)
+ v_bottom2top = vertical_adjacency(cells, 0)
+ return h_left2right, h_right2left, v_top2bottom, v_bottom2top
+
+
+@nb.njit('uint32(uint32[:])', cache=True)
+def predict_vector_size(array):
+ zero_indices = np.where(array == 0)[0]
+ if len(zero_indices) == 0:
+ if len(array) == 0:
+ return 0
+ return len(array)
+ return zero_indices[0]
+
+
+@nb.njit('uint32[:](uint32[:,::1], uint32, uint32)', cache=True)
+def h_vector_top2bottom(h_adjacency, x, y):
+ vector_size = predict_vector_size(h_adjacency[y:, x])
+ h_vector = np.zeros(vector_size, dtype=np.uint32)
+ h = np.Inf
+ for p in range(vector_size):
+ h = np.minimum(h_adjacency[y+p, x], h)
+ h_vector[p] = h
+ h_vector = np.unique(h_vector)[::-1]
+ return h_vector
+
+
+@nb.njit('uint32[:](uint32[:,::1], uint32, uint32)', cache=True)
+def h_vector_bottom2top(h_adjacency, x, y):
+ vector_size = predict_vector_size(np.flip(h_adjacency[:y+1, x]))
+ h_vector = np.zeros(vector_size, dtype=np.uint32)
+ h = np.Inf
+ for p in range(vector_size):
+ h = np.minimum(h_adjacency[y-p, x], h)
+ h_vector[p] = h
+ h_vector = np.unique(h_vector)[::-1]
+ return h_vector
+
+
+@nb.njit(cache=True)
+def h_vectors_all_directions(h_left2right, h_right2left, x, y):
+ h_l2r_t2b = h_vector_top2bottom(h_left2right, x, y)
+ h_r2l_t2b = h_vector_top2bottom(h_right2left, x, y)
+ h_l2r_b2t = h_vector_bottom2top(h_left2right, x, y)
+ h_r2l_b2t = h_vector_bottom2top(h_right2left, x, y)
+ return h_l2r_t2b, h_r2l_t2b, h_l2r_b2t, h_r2l_b2t
+
+
+@nb.njit('uint32[:](uint32[:,::1], uint32, uint32)', cache=True)
+def v_vector_left2right(v_adjacency, x, y):
+ vector_size = predict_vector_size(v_adjacency[y, x:])
+ v_vector = np.zeros(vector_size, dtype=np.uint32)
+ v = np.Inf
+ for q in range(vector_size):
+ v = np.minimum(v_adjacency[y, x+q], v)
+ v_vector[q] = v
+ v_vector = np.unique(v_vector)[::-1]
+ return v_vector
+
+
+@nb.njit('uint32[:](uint32[:,::1], uint32, uint32)', cache=True)
+def v_vector_right2left(v_adjacency, x, y):
+ vector_size = predict_vector_size(np.flip(v_adjacency[y, :x+1]))
+ v_vector = np.zeros(vector_size, dtype=np.uint32)
+ v = np.Inf
+ for q in range(vector_size):
+ v = np.minimum(v_adjacency[y, x-q], v)
+ v_vector[q] = v
+ v_vector = np.unique(v_vector)[::-1]
+ return v_vector
+
+
+@nb.njit(cache=True)
+def v_vectors_all_directions(v_top2bottom, v_bottom2top, x, y):
+ v_l2r_t2b = v_vector_left2right(v_top2bottom, x, y)
+ v_r2l_t2b = v_vector_right2left(v_top2bottom, x, y)
+ v_l2r_b2t = v_vector_left2right(v_bottom2top, x, y)
+ v_r2l_b2t = v_vector_right2left(v_bottom2top, x, y)
+ return v_l2r_t2b, v_r2l_t2b, v_l2r_b2t, v_r2l_b2t
+
+
+@nb.njit('uint32[:,:](uint32[:], uint32[:])', cache=True)
+def spans(h_vector, v_vector):
+ spans = np.stack((h_vector, v_vector[::-1]), axis=1)
+ return spans
+
+
+@nb.njit('uint32[:](uint32[:,:])', cache=True)
+def biggest_span(spans):
+ if len(spans) == 0:
+ return np.array([0, 0], dtype=np.uint32)
+ areas = spans[:, 0] * spans[:, 1]
+ biggest_span_index = np.where(areas == np.amax(areas))[0][0]
+ return spans[biggest_span_index]
+
+
+@nb.njit(cache=True)
+def spans_all_directions(h_vectors, v_vectors):
+ span_l2r_t2b = spans(h_vectors[0], v_vectors[0])
+ span_r2l_t2b = spans(h_vectors[1], v_vectors[1])
+ span_l2r_b2t = spans(h_vectors[2], v_vectors[2])
+ span_r2l_b2t = spans(h_vectors[3], v_vectors[3])
+ return span_l2r_t2b, span_r2l_t2b, span_l2r_b2t, span_r2l_b2t
+
+
+@nb.njit(cache=True)
+def get_n_directions(spans_all_directions):
+ n_directions = 1
+ for spans in spans_all_directions:
+ all_x_1 = np.all(spans[:, 0] == 1)
+ all_y_1 = np.all(spans[:, 1] == 1)
+ if not all_x_1 and not all_y_1:
+ n_directions += 1
+ return n_directions
+
+
+@nb.njit(cache=True)
+def get_xy_array(x, y, spans, mode=0):
+ """0 - flip none, 1 - flip x, 2 - flip y, 3 - flip both"""
+ xy = spans.copy()
+ xy[:, 0] = x
+ xy[:, 1] = y
+ if mode == 1:
+ xy[:, 0] = xy[:, 0] - spans[:, 0] + 1
+ if mode == 2:
+ xy[:, 1] = xy[:, 1] - spans[:, 1] + 1
+ if mode == 3:
+ xy[:, 0] = xy[:, 0] - spans[:, 0] + 1
+ xy[:, 1] = xy[:, 1] - spans[:, 1] + 1
+ return xy
+
+
+@nb.njit(cache=True)
+def get_xy_arrays(x, y, spans_all_directions):
+ xy_l2r_t2b = get_xy_array(x, y, spans_all_directions[0], 0)
+ xy_r2l_t2b = get_xy_array(x, y, spans_all_directions[1], 1)
+ xy_l2r_b2t = get_xy_array(x, y, spans_all_directions[2], 2)
+ xy_r2l_b2t = get_xy_array(x, y, spans_all_directions[3], 3)
+ return xy_l2r_t2b, xy_r2l_t2b, xy_l2r_b2t, xy_r2l_b2t
+
+
+@nb.njit(cache=True)
+def point_on_outline(x, y, outline):
+ x_vals, y_vals = outline
+ x_true = x_vals == x
+ y_true = y_vals == y
+ both_true = np.logical_and(x_true, y_true)
+ return np.any(both_true)
+
+
+@nb.njit('Tuple((uint32[:,:,::1], uint8[:,::1], uint8[:,::1]))'
+ '(UniTuple(uint32[:], 2), UniTuple(uint32[:,::1], 4))',
+ parallel=True, cache=True)
+def create_maps(outline, adjacencies):
+ x_values, y_values = outline
+ h_left2right, h_right2left, v_top2bottom, v_bottom2top = adjacencies
+
+ shape = h_left2right.shape
+ span_map = np.zeros(shape + (2,), "uint32")
+ direction_map = np.zeros(shape, "uint8")
+ saddle_candidates_map = np.zeros(shape, "uint8")
+
+ for idx in nb.prange(len(x_values)):
+ x, y = x_values[idx], y_values[idx]
+ h_vectors = h_vectors_all_directions(h_left2right, h_right2left, x, y)
+ v_vectors = v_vectors_all_directions(v_top2bottom, v_bottom2top, x, y)
+ span_arrays = spans_all_directions(h_vectors, v_vectors)
+ n = get_n_directions(span_arrays)
+ direction_map[y, x] = n
+ xy_arrays = get_xy_arrays(x, y, span_arrays)
+ for direction_idx in range(4):
+ xy_array = xy_arrays[direction_idx]
+ span_array = span_arrays[direction_idx]
+ for span_idx in range(span_array.shape[0]):
+ x, y = xy_array[span_idx][0], xy_array[span_idx][1]
+ w, h = span_array[span_idx][0], span_array[span_idx][1]
+ if w*h > span_map[y, x, 0] * span_map[y, x, 1]:
+ span_map[y, x, :] = np.array([w, h], "uint32")
+ if n == 3 and not point_on_outline(x, y, outline):
+ saddle_candidates_map[y, x] = np.uint8(255)
+
+ return span_map, direction_map, saddle_candidates_map
+
+
+def cells_of_interest(cells):
+ y_vals, x_vals = cells.nonzero()
+ x_vals = x_vals.astype("uint32", order="C")
+ y_vals = y_vals.astype("uint32", order="C")
+ return x_vals, y_vals
+
+
+@nb.njit('uint32[:, :, :]'
+ '(uint32[:,::1], uint32[:,::1], UniTuple(uint32[:], 2))',
+ parallel=True, cache=True)
+def span_map(h_adjacency_left2right,
+ v_adjacency_top2bottom,
+ cells_of_interest):
+
+ x_values, y_values = cells_of_interest
+
+ span_map = np.zeros(h_adjacency_left2right.shape + (2,), dtype=np.uint32)
+
+ for idx in nb.prange(len(x_values)):
+ x, y = x_values[idx], y_values[idx]
+ h_vector = h_vector_top2bottom(h_adjacency_left2right, x, y)
+ v_vector = v_vector_left2right(v_adjacency_top2bottom, x, y)
+ s = spans(h_vector, v_vector)
+ s = biggest_span(s)
+ span_map[y, x, :] = s
+
+ return span_map
+
+
+@nb.njit('uint32[:](uint32[:, :, :])', cache=True)
+def biggest_span_in_span_map(span_map):
+ areas = span_map[:, :, 0] * span_map[:, :, 1]
+ largest_rectangle_indices = np.where(areas == np.amax(areas))
+ x = largest_rectangle_indices[1][0]
+ y = largest_rectangle_indices[0][0]
+ span = span_map[y, x]
+ return np.array([x, y, span[0], span[1]], dtype=np.uint32)
+
+
+def biggest_rectangle(*args):
+ biggest_rect = np.array([0, 0, 0, 0], dtype=np.uint32)
+ for rect in args:
+ if rect[2] * rect[3] > biggest_rect[2] * biggest_rect[3]:
+ biggest_rect = rect
+ return biggest_rect
+++ /dev/null
-from .megapix_scaler import MegapixScaler
-
-
-class MegapixDownscaler(MegapixScaler):
-
- @staticmethod
- def force_downscale(scale):
- return min(1.0, scale)
-
- def set_scale(self, scale):
- scale = self.force_downscale(scale)
- super().set_scale(scale)
width = int(round(img_size[0] * self.scale))
height = int(round(img_size[1] * self.scale))
return (width, height)
+
+
+class MegapixDownscaler(MegapixScaler):
+
+ @staticmethod
+ def force_downscale(scale):
+ return min(1.0, scale)
+
+ def set_scale(self, scale):
+ scale = self.force_downscale(scale)
+ super().set_scale(scale)
+++ /dev/null
-import statistics
-
-
-def estimate_final_panorama_dimensions(cameras, warper, img_handler):
- medium_to_final_ratio = img_handler.get_medium_to_final_ratio()
-
- panorama_scale_determined_on_medium_img = \
- estimate_panorama_scale(cameras)
-
- panorama_scale = (panorama_scale_determined_on_medium_img *
- medium_to_final_ratio)
- panorama_corners = []
- panorama_sizes = []
-
- for size, camera in zip(img_handler.img_sizes, cameras):
- width, height = img_handler.final_scaler.get_scaled_img_size(size)
- roi = warper.warp_roi(width, height, camera, panorama_scale, medium_to_final_ratio)
- panorama_corners.append(roi[0:2])
- panorama_sizes.append(roi[2:4])
-
- return panorama_scale, panorama_corners, panorama_sizes
-
-
-def estimate_panorama_scale(cameras):
- focals = [cam.focal for cam in cameras]
- panorama_scale = statistics.median(focals)
- return panorama_scale
return cv.dilate(seam_lines, kernel)
@staticmethod
- def blend_seam_masks(seam_masks, corners, sizes, colors=[
+ def blend_seam_masks(seam_masks, corners, sizes):
+ imgs = colored_img_generator(sizes)
+ blended_seam_masks, _ = \
+ Blender.create_panorama(imgs, seam_masks, corners, sizes)
+ return blended_seam_masks
+
+
+def colored_img_generator(sizes, colors=(
(255, 000, 000), # Blue
(000, 000, 255), # Red
(000, 255, 000), # Green
(128, 128, 255), # Pink
(128, 128, 128), # Gray
(000, 000, 128), # Brown
- (000, 128, 255)] # Orange
+ (000, 128, 255)) # Orange
):
-
- blender = Blender("no")
- blender.prepare(corners, sizes)
-
- for idx, (seam_mask, size, corner) in enumerate(
- zip(seam_masks, sizes, corners)):
- if idx+1 > len(colors):
- raise ValueError("Not enough default colors! Pass additional "
- "colors to \"colors\" parameter")
- one_color_img = create_img_by_size(size, colors[idx])
- blender.feed(one_color_img, seam_mask, corner)
-
- return blender.blend()
+ for idx, size in enumerate(sizes):
+ if idx+1 > len(colors):
+ raise ValueError("Not enough default colors! Pass additional "
+ "colors to \"colors\" parameter")
+ yield create_img_by_size(size, colors[idx])
def create_img_by_size(size, color=(0, 0, 0)):
from .camera_adjuster import CameraAdjuster
from .camera_wave_corrector import WaveCorrector
from .warper import Warper
-from .panorama_estimation import estimate_final_panorama_dimensions
+from .cropper import Cropper
from .exposure_error_compensator import ExposureErrorCompensator
from .seam_finder import SeamFinder
from .blender import Blender
"wave_correct_kind": WaveCorrector.DEFAULT_WAVE_CORRECTION,
"warper_type": Warper.DEFAULT_WARP_TYPE,
"low_megapix": ImageHandler.DEFAULT_LOW_MEGAPIX,
+ "crop": Cropper.DEFAULT_CROP,
"compensator": ExposureErrorCompensator.DEFAULT_COMPENSATOR,
"nr_feeds": ExposureErrorCompensator.DEFAULT_NR_FEEDS,
"block_size": ExposureErrorCompensator.DEFAULT_BLOCK_SIZE,
CameraAdjuster(args.adjuster, args.refinement_mask)
self.wave_corrector = WaveCorrector(args.wave_correct_kind)
self.warper = Warper(args.warper_type)
+ self.cropper = Cropper(args.crop)
self.compensator = \
ExposureErrorCompensator(args.compensator, args.nr_feeds,
args.block_size)
def stitch(self, img_names):
self.initialize_registration(img_names)
-
imgs = self.resize_medium_resolution()
features = self.find_features(imgs)
matches = self.match_features(features)
cameras = self.estimate_camera_parameters(features, matches)
cameras = self.refine_camera_parameters(features, matches, cameras)
cameras = self.perform_wave_correction(cameras)
- panorama_scale, panorama_corners, panorama_sizes = \
- self.estimate_final_panorama_dimensions(cameras)
-
- self.initialize_composition(panorama_corners, panorama_sizes)
+ self.estimate_scale(cameras)
imgs = self.resize_low_resolution(imgs)
- imgs = self.warp_low_resolution_images(imgs, cameras, panorama_scale)
- self.estimate_exposure_errors(imgs)
- seam_masks = self.find_seam_masks(imgs)
+ imgs, masks, corners, sizes = self.warp_low_resolution(imgs, cameras)
+ self.prepare_cropper(imgs, masks, corners, sizes)
+ imgs, masks, corners, sizes = \
+ self.crop_low_resolution(imgs, masks, corners, sizes)
+ self.estimate_exposure_errors(corners, imgs, masks)
+ seam_masks = self.find_seam_masks(imgs, corners, masks)
imgs = self.resize_final_resolution()
- imgs = self.warp_final_resolution_images(imgs, cameras, panorama_scale)
- imgs = self.compensate_exposure_errors(imgs)
+ imgs, masks, corners, sizes = self.warp_final_resolution(imgs, cameras)
+ imgs, masks, corners, sizes = \
+ self.crop_final_resolution(imgs, masks, corners, sizes)
+ self.set_masks(masks)
+ imgs = self.compensate_exposure_errors(corners, imgs)
seam_masks = self.resize_seam_masks(seam_masks)
- self.blend_images(imgs, seam_masks)
+ self.initialize_composition(corners, sizes)
+ self.blend_images(imgs, seam_masks, corners)
return self.create_final_panorama()
def initialize_registration(self, img_names):
def perform_wave_correction(self, cameras):
return self.wave_corrector.correct(cameras)
- def estimate_final_panorama_dimensions(self, cameras):
- return estimate_final_panorama_dimensions(cameras, self.warper,
- self.img_handler)
-
- def initialize_composition(self, corners, sizes):
- if self.timelapser.do_timelapse:
- self.timelapser.initialize(corners, sizes)
- else:
- self.blender.prepare(corners, sizes)
+ def estimate_scale(self, cameras):
+ self.warper.set_scale(cameras)
def resize_low_resolution(self, imgs=None):
return list(self.img_handler.resize_to_low_resolution(imgs))
- def warp_low_resolution_images(self, imgs, cameras, final_scale):
+ def warp_low_resolution(self, imgs, cameras):
+ sizes = self.img_handler.get_low_img_sizes()
camera_aspect = self.img_handler.get_medium_to_low_ratio()
- scale = final_scale * self.img_handler.get_final_to_low_ratio()
- return list(self.warp_images(imgs, cameras, scale, camera_aspect))
+ imgs, masks, corners, sizes = \
+ self.warp(imgs, cameras, sizes, camera_aspect)
+ return list(imgs), list(masks), corners, sizes
- def warp_final_resolution_images(self, imgs, cameras, scale):
+ def warp_final_resolution(self, imgs, cameras):
+ sizes = self.img_handler.get_final_img_sizes()
camera_aspect = self.img_handler.get_medium_to_final_ratio()
- return self.warp_images(imgs, cameras, scale, camera_aspect)
+ return self.warp(imgs, cameras, sizes, camera_aspect)
+
+ def warp(self, imgs, cameras, sizes, aspect=1):
+ imgs = self.warper.warp_images(imgs, cameras, aspect)
+ masks = self.warper.create_and_warp_masks(sizes, cameras, aspect)
+ corners, sizes = self.warper.warp_rois(sizes, cameras, aspect)
+ return imgs, masks, corners, sizes
+
+ def prepare_cropper(self, imgs, masks, corners, sizes):
+ self.cropper.prepare(imgs, masks, corners, sizes)
- def warp_images(self, imgs, cameras, scale, aspect=1):
- self._masks = []
- self._corners = []
- for img_warped, mask_warped, corner in \
- self.warper.warp_images_and_image_masks(
- imgs, cameras, scale, aspect
- ):
- self._masks.append(mask_warped)
- self._corners.append(corner)
- yield img_warped
+ def crop_low_resolution(self, imgs, masks, corners, sizes):
+ imgs, masks, corners, sizes = self.crop(imgs, masks, corners, sizes)
+ return list(imgs), list(masks), corners, sizes
- def estimate_exposure_errors(self, imgs):
- self.compensator.feed(self._corners, imgs, self._masks)
+ def crop_final_resolution(self, imgs, masks, corners, sizes):
+ lir_aspect = self.img_handler.get_low_to_final_ratio()
+ return self.crop(imgs, masks, corners, sizes, lir_aspect)
- def find_seam_masks(self, imgs):
- return self.seam_finder.find(imgs, self._corners, self._masks)
+ def crop(self, imgs, masks, corners, sizes, aspect=1):
+ masks = self.cropper.crop_images(masks, aspect)
+ imgs = self.cropper.crop_images(imgs, aspect)
+ corners, sizes = self.cropper.crop_rois(corners, sizes, aspect)
+ return imgs, masks, corners, sizes
+
+ def estimate_exposure_errors(self, corners, imgs, masks):
+ self.compensator.feed(corners, imgs, masks)
+
+ def find_seam_masks(self, imgs, corners, masks):
+ return self.seam_finder.find(imgs, corners, masks)
def resize_final_resolution(self):
return self.img_handler.resize_to_final_resolution()
- def compensate_exposure_errors(self, imgs):
- for idx, img in enumerate(imgs):
- yield self.compensator.apply(idx, self._corners[idx],
- img, self._masks[idx])
+ def compensate_exposure_errors(self, corners, imgs):
+ for idx, (corner, img) in enumerate(zip(corners, imgs)):
+ yield self.compensator.apply(idx, corner, img, self.get_mask(idx))
def resize_seam_masks(self, seam_masks):
for idx, seam_mask in enumerate(seam_masks):
- yield SeamFinder.resize(seam_mask, self._masks[idx])
+ yield SeamFinder.resize(seam_mask, self.get_mask(idx))
+
+ def set_masks(self, mask_generator):
+ self.masks = mask_generator
+ self.mask_index = -1
+
+ def get_mask(self, idx):
+ if idx == self.mask_index + 1:
+ self.mask_index += 1
+ self.mask = next(self.masks)
+ return self.mask
+ elif idx == self.mask_index:
+ return self.mask
+ else:
+ raise StitchingError("Invalid Mask Index!")
+
+ def initialize_composition(self, corners, sizes):
+ if self.timelapser.do_timelapse:
+ self.timelapser.initialize(corners, sizes)
+ else:
+ self.blender.prepare(corners, sizes)
- def blend_images(self, imgs, masks):
- for idx, (img, mask) in enumerate(zip(imgs, masks)):
+ def blend_images(self, imgs, masks, corners):
+ for idx, (img, mask, corner) in enumerate(zip(imgs, masks, corners)):
if self.timelapser.do_timelapse:
self.timelapser.process_and_save_frame(
- self.img_handler.img_names[idx], img, self._corners[idx]
+ self.img_handler.img_names[idx], img, corner
)
else:
- self.blender.feed(img, mask, self._corners[idx])
+ self.blender.feed(img, mask, corner)
def create_final_panorama(self):
if not self.timelapser.do_timelapse:
- return self.blender.blend()
+ panorama, _ = self.blender.blend()
+ return panorama
@staticmethod
def validate_kwargs(kwargs):
for arg in kwargs:
if arg not in Stitcher.DEFAULT_SETTINGS:
raise StitchingError("Invalid Argument: " + arg)
-
- def collect_garbage(self):
- del self.img_handler.img_names, self.img_handler.img_sizes,
- del self._corners, self._masks
indices = cv.detail.leaveBiggestComponent(features,
pairwise_matches,
self.confidence_threshold)
- indices_as_list = [int(idx) for idx in list(indices[:, 0])]
- if len(indices_as_list) < 2:
+ if len(indices) < 2:
raise StitchingError("No match exceeds the "
"given confidence theshold.")
- return indices_as_list
+ return indices
@staticmethod
def subset_list(list_to_subset, indices):
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__),
'..', '..')))
-from opencv_stitching.megapix_scaler import MegapixScaler
-from opencv_stitching.megapix_downscaler import MegapixDownscaler
-#%%
+from opencv_stitching.megapix_scaler import MegapixScaler, MegapixDownscaler
+# %%
class TestScaler(unittest.TestCase):
class TestStitcher(unittest.TestCase):
+ @unittest.skip("skip performance test (not needed in every run)")
def test_performance(self):
print("Run new Stitcher class:")
stitcher.stitch(["boat5.jpg", "boat2.jpg",
"boat3.jpg", "boat4.jpg",
"boat1.jpg", "boat6.jpg"])
- stitcher.collect_garbage()
_, peak_memory = tracemalloc.get_traced_memory()
tracemalloc.stop()
indices_to_delete = subsetter.get_indices_to_delete(len(img_names),
indices)
- self.assertEqual(indices, [2, 3, 4])
- self.assertEqual(indices_to_delete, [0, 1])
+ np.testing.assert_array_equal(indices, np.array([2, 3, 4]))
+ np.testing.assert_array_equal(indices_to_delete, np.array([0, 1]))
subsetted_image_names = subsetter.subset_list(img_names, indices)
self.assertEqual(subsetted_image_names,
class TestStitcher(unittest.TestCase):
def test_stitcher_aquaduct(self):
- stitcher = Stitcher(n_features=250)
+ stitcher = Stitcher(nfeatures=250)
result = stitcher.stitch(["s1.jpg", "s2.jpg"])
cv.imwrite("result.jpg", result)
"wave_correct_kind": "no",
"finder": "dp_colorgrad",
"compensator": "no",
- "conf_thresh": 0.3}
+ "confidence_threshold": 0.3}
stitcher = Stitcher(**settings)
result = stitcher.stitch(["boat5.jpg", "boat2.jpg",
settings = {"warper_type": "compressedPlaneA2B1",
"finder": "dp_colorgrad",
"compensator": "channel_blocks",
- "conf_thresh": 0.3}
+ "confidence_threshold": 0.3}
stitcher = Stitcher(**settings)
result = stitcher.stitch(["boat5.jpg", "boat2.jpg",
atol=max_image_shape_derivation)
def test_stitcher_boat_aquaduct_subset(self):
- settings = {"final_megapix": 1}
+ settings = {"final_megapix": 1, "crop": True}
stitcher = Stitcher(**settings)
result = stitcher.stitch(["boat5.jpg",
max_image_shape_derivation = 100
np.testing.assert_allclose(result.shape[:2],
- (839, 3384),
+ (705, 3374),
atol=max_image_shape_derivation)
def test_stitcher_budapest(self):
+from statistics import median
+
import cv2 as cv
import numpy as np
DEFAULT_WARP_TYPE = 'spherical'
- def __init__(self, warper_type=DEFAULT_WARP_TYPE, scale=1):
+ def __init__(self, warper_type=DEFAULT_WARP_TYPE):
self.warper_type = warper_type
- self.warper = cv.PyRotationWarper(warper_type, scale)
- self.scale = scale
+ self.scale = None
+
+ def set_scale(self, cameras):
+ focals = [cam.focal for cam in cameras]
+ self.scale = median(focals)
- def warp_images_and_image_masks(self, imgs, cameras, scale=None, aspect=1):
- self.update_scale(scale)
+ def warp_images(self, imgs, cameras, aspect=1):
for img, camera in zip(imgs, cameras):
- yield self.warp_image_and_image_mask(img, camera, scale, aspect)
+ yield self.warp_image(img, camera, aspect)
- def warp_image_and_image_mask(self, img, camera, scale=None, aspect=1):
- self.update_scale(scale)
- corner, img_warped = self.warp_image(img, camera, aspect)
- mask = 255 * np.ones((img.shape[0], img.shape[1]), np.uint8)
- _, mask_warped = self.warp_image(mask, camera, aspect, mask=True)
- return img_warped, mask_warped, corner
+ def warp_image(self, img, camera, aspect=1):
+ warper = cv.PyRotationWarper(self.warper_type, self.scale*aspect)
+ _, warped_image = warper.warp(img,
+ Warper.get_K(camera, aspect),
+ camera.R,
+ cv.INTER_LINEAR,
+ cv.BORDER_REFLECT)
+ return warped_image
- def warp_image(self, image, camera, aspect=1, mask=False):
- if mask:
- interp_mode = cv.INTER_NEAREST
- border_mode = cv.BORDER_CONSTANT
- else:
- interp_mode = cv.INTER_LINEAR
- border_mode = cv.BORDER_REFLECT
+ def create_and_warp_masks(self, sizes, cameras, aspect=1):
+ for size, camera in zip(sizes, cameras):
+ yield self.create_and_warp_mask(size, camera, aspect)
- corner, warped_image = self.warper.warp(image,
- Warper.get_K(camera, aspect),
- camera.R,
- interp_mode,
- border_mode)
- return corner, warped_image
+ def create_and_warp_mask(self, size, camera, aspect=1):
+ warper = cv.PyRotationWarper(self.warper_type, self.scale*aspect)
+ mask = 255 * np.ones((size[1], size[0]), np.uint8)
+ _, warped_mask = warper.warp(mask,
+ Warper.get_K(camera, aspect),
+ camera.R,
+ cv.INTER_NEAREST,
+ cv.BORDER_CONSTANT)
+ return warped_mask
- def warp_roi(self, width, height, camera, scale=None, aspect=1):
- self.update_scale(scale)
- roi = (width, height)
- K = Warper.get_K(camera, aspect)
- return self.warper.warpRoi(roi, K, camera.R)
+ def warp_rois(self, sizes, cameras, aspect=1):
+ roi_corners = []
+ roi_sizes = []
+ for size, camera in zip(sizes, cameras):
+ roi = self.warp_roi(size, camera, aspect)
+ roi_corners.append(roi[0:2])
+ roi_sizes.append(roi[2:4])
+ return roi_corners, roi_sizes
- def update_scale(self, scale):
- if scale is not None and scale != self.scale:
- self.warper = cv.PyRotationWarper(self.warper_type, scale) # setScale not working: https://docs.opencv.org/4.x/d5/d76/classcv_1_1PyRotationWarper.html#a90b000bb75f95294f9b0b6ec9859eb55
- self.scale = scale
+ def warp_roi(self, size, camera, aspect=1):
+ warper = cv.PyRotationWarper(self.warper_type, self.scale*aspect)
+ K = Warper.get_K(camera, aspect)
+ return warper.warpRoi(size, K, camera.R)
@staticmethod
def get_K(camera, aspect=1):
from opencv_stitching.camera_adjuster import CameraAdjuster
from opencv_stitching.camera_wave_corrector import WaveCorrector
from opencv_stitching.warper import Warper
+from opencv_stitching.cropper import Cropper
from opencv_stitching.exposure_error_compensator import ExposureErrorCompensator # noqa
from opencv_stitching.seam_finder import SeamFinder
from opencv_stitching.blender import Blender
type=int, dest='range_width'
)
parser.add_argument(
- '--try_use_gpu',
- action='store',
- default=False,
+ '--try_use_gpu', action='store', default=False,
help="Try to use CUDA. The default value is no. "
"All default values are for CPU mode.",
type=bool, dest='try_use_gpu'
type=float, dest='low_megapix'
)
parser.add_argument(
+ '--crop', action='store', default=Cropper.DEFAULT_CROP,
+ help="Crop black borders around images caused by warping using the "
+ "largest interior rectangle. "
+ "Default is '%s'." % Cropper.DEFAULT_CROP,
+ type=bool, dest='crop'
+)
+parser.add_argument(
'--compensator', action='store',
default=ExposureErrorCompensator.DEFAULT_COMPENSATOR,
help="Exposure compensation method. "