Source code for vista.algorithms.background_removal.robust_pca

"""Robust PCA background subtraction for VISTA

This module implements Principal Component Pursuit (PCP) to decompose image sequences
into low-rank (background) and sparse (foreground) components.

Reference:
Candès, E. J., Li, X., Ma, Y., & Wright, J. (2011).
Robust principal component analysis?
Journal of the ACM (JACM), 58(3), 1-37.
"""
import numpy as np


[docs] def shrinkage_operator(X, tau): """ Soft-thresholding (shrinkage) operator for sparse component. Parameters ---------- X : ndarray Input matrix tau : float Threshold parameter Returns ------- ndarray Thresholded matrix """ return np.sign(X) * np.maximum(np.abs(X) - tau, 0)
[docs] def singular_value_threshold(X, tau): """ Singular Value Thresholding (SVT) operator for low-rank component. Parameters ---------- X : ndarray Input matrix tau : float Threshold parameter Returns ------- ndarray Low-rank approximation of X """ U, s, Vt = np.linalg.svd(X, full_matrices=False) s_thresh = shrinkage_operator(s, tau) return U @ np.diag(s_thresh) @ Vt
[docs] def robust_pca_inexact_alm(M, lambda_param=None, mu=None, tol=1e-7, max_iter=1000, callback=None): """ Robust PCA using Inexact Augmented Lagrange Multiplier method. Decomposes M = L + S where: - L is low-rank (background) - S is sparse (foreground/moving objects) Solves: minimize ||L||_* + λ||S||_1 subject to L + S = M Parameters ---------- M : ndarray Input matrix (each column is a vectorized image frame) lambda_param : float, optional Sparsity parameter, by default 1/sqrt(max(m,n)) mu : float, optional Augmented Lagrangian parameter, by default auto tol : float, optional Convergence tolerance, by default 1e-7 max_iter : int, optional Maximum iterations, by default 1000 callback : callable, optional Optional callback function called after each iteration. Called with (iteration, max_iter, rel_error). Should return False to cancel processing. Returns ------- L : ndarray Low-rank component (background) S : ndarray Sparse component (foreground) """ m, n = M.shape # Default parameters if lambda_param is None: lambda_param = 1.0 / np.sqrt(max(m, n)) if mu is None: mu = 0.25 / np.abs(M).mean() # Initialize L = np.zeros_like(M) S = np.zeros_like(M) Y = np.zeros_like(M) # Lagrange multiplier # Precompute norm for convergence check norm_M = np.linalg.norm(M, 'fro') for iteration in range(max_iter): # Update L: Singular value thresholding L = singular_value_threshold(M - S + Y / mu, 1.0 / mu) # Update S: Soft thresholding S = shrinkage_operator(M - L + Y / mu, lambda_param / mu) # Update Y: Lagrange multiplier Y = Y + mu * (M - L - S) # Check convergence residual = M - L - S rel_error = np.linalg.norm(residual, 'fro') / norm_M # Call progress callback if provided if callback is not None: if not callback(iteration + 1, max_iter, rel_error): # Callback returned False - user requested cancellation raise InterruptedError("Processing cancelled by user") if rel_error < tol: break return L, S
[docs] def run_robust_pca(images, lambda_param=None, tol=1e-7, max_iter=1000, callback=None): """ Apply Robust PCA background subtraction to a 3D array of images. Parameters ---------- images : ndarray 3D numpy array (num_frames, height, width) containing image data lambda_param : float, optional Sparsity parameter, by default auto = 1/sqrt(max(m,n)) tol : float, optional Convergence tolerance, by default 1e-7 max_iter : int, optional Maximum iterations, by default 1000 callback : callable, optional Optional callback function called after each iteration. Called with (iteration, max_iter, rel_error). Should return False to cancel processing. Returns ------- tuple of (ndarray, ndarray) (background_images, foreground_images) where: - background_images: Low-rank background component (same shape as input) - foreground_images: Sparse foreground component (same shape as input) """ # Get dimensions num_frames, height, width = images.shape # Reshape images into matrix: each column is a vectorized frame M = images.reshape(num_frames, height * width).T # Apply Robust PCA L, S = robust_pca_inexact_alm( M, lambda_param=lambda_param, mu=None, tol=tol, max_iter=max_iter, callback=callback ) # Reshape back to image sequences background_images = L.T.reshape(num_frames, height, width).astype(np.float32) foreground_images = S.T.reshape(num_frames, height, width).astype(np.float32) return background_images, foreground_images