Source code for vista.algorithms.background_removal.static_subspace_background_removal

"""Static (non-sliding) subspace background removal for VISTA

This module implements a non-sliding subspace background estimator. A low-rank
background subspace is computed once via truncated SVD from a user-specified
set of background frames (typically frames without transient events), then
each target frame is projected onto that subspace to estimate its background
contribution.

Compared to the sliding-window variant, this is appropriate for non-moving
transient events where one or more quiescent periods can be designated for
background modeling.
"""
import numpy as np

from vista.algorithms.background_removal.subspace_background_removal import (
    _find_knee_numpy,
    _images_to_tiles,
    _tiles_to_image,
)


[docs] def static_subspace_background_removal(background_images, target_images, rank=5, tile_size=None, callback=None): """ Remove background from target imagery using a fixed low-rank subspace modeled from a separate set of background frames. A truncated SVD of ``background_images`` defines the low-rank background subspace, and each target frame is projected onto that subspace to estimate its background contribution. When ``tile_size`` is provided, each frame is divided into non-overlapping square tiles that are processed independently, reducing the per-SVD matrix size. Parameters ---------- background_images : numpy.ndarray 3D array of shape (num_background_frames, height, width) used to model the background. Typically frames without transient events. target_images : numpy.ndarray 3D array of shape (num_target_frames, height, width) to which the background removal is applied. Must share height and width with ``background_images``. rank : int or None, optional Number of singular values to retain for the background subspace, by default 5. If None, automatically selected via knee detection. tile_size : int or None, optional Size of square tiles for processing, by default None (no tiling). When provided, images are divided into ``tile_size`` x ``tile_size`` tiles and each tile is processed independently. Recommended values: 32, 64, or 128. callback : callable, optional Called after each target frame with (frame_processed, total_frames). Should return False to cancel processing. Returns ------- tuple of (numpy.ndarray, numpy.ndarray) (background, foreground) arrays with the shape and dtype of ``target_images``. Raises ------ InterruptedError If the callback returns False (user cancellation). ValueError If the height/width of ``background_images`` and ``target_images`` do not match, or if ``background_images`` is empty. """ if background_images.ndim != 3 or target_images.ndim != 3: raise ValueError("background_images and target_images must be 3D arrays.") if (background_images.shape[1] != target_images.shape[1] or background_images.shape[2] != target_images.shape[2]): raise ValueError( "background_images and target_images must have the same height and width." ) if background_images.shape[0] == 0: raise ValueError("background_images must contain at least one frame.") num_bg = background_images.shape[0] num_tgt, height, width = target_images.shape if tile_size is not None: bg_tiles, orig_h, orig_w = _images_to_tiles(background_images, tile_size) tgt_tiles, _, _ = _images_to_tiles(target_images, tile_size) num_tiles = bg_tiles.shape[1] # Pre-compute the truncated subspace for each tile truncated_bases = [] for ti in range(num_tiles): ref_data = bg_tiles[:, ti, :].T # (tile_pixels, num_bg) U, S, _ = np.linalg.svd(ref_data, full_matrices=False) if rank is None: k = _find_knee_numpy(S) else: k = min(rank, num_bg) truncated_bases.append(U[:, :k]) bg_out_tiles = np.zeros_like(tgt_tiles) fg_out_tiles = np.zeros_like(tgt_tiles) for t in range(num_tgt): for ti in range(num_tiles): U_k = truncated_bases[ti] current = tgt_tiles[t, ti, :] coeffs = U_k.T @ current projection = U_k @ coeffs bg_out_tiles[t, ti, :] = projection fg_out_tiles[t, ti, :] = current - projection if callback is not None: if not callback(t + 1, num_tgt): raise InterruptedError("Processing cancelled by user") background = _tiles_to_image(bg_out_tiles, tile_size, orig_h, orig_w) foreground = _tiles_to_image(fg_out_tiles, tile_size, orig_h, orig_w) return background, foreground # Non-tiled path: process full flattened frames num_pixels = height * width flat_bg = background_images.reshape(num_bg, num_pixels) flat_tgt = target_images.reshape(num_tgt, num_pixels) ref_data = flat_bg.T # (num_pixels, num_bg) U, S, _ = np.linalg.svd(ref_data, full_matrices=False) if rank is None: k = _find_knee_numpy(S) else: k = min(rank, num_bg) U_k = U[:, :k] bg_flat = np.zeros_like(flat_tgt) fg_flat = np.zeros_like(flat_tgt) for t in range(num_tgt): current = flat_tgt[t] coeffs = U_k.T @ current projection = U_k @ coeffs bg_flat[t] = projection fg_flat[t] = current - projection if callback is not None: if not callback(t + 1, num_tgt): raise InterruptedError("Processing cancelled by user") return (bg_flat.reshape(num_tgt, height, width), fg_flat.reshape(num_tgt, height, width))