Source code for vista.algorithms.enhancement.coadd
"""Coaddition algorithm for enhancing slowly moving objects by summing frames over a running window"""
import numpy as np
from vista.imagery.imagery import Imagery
[docs]
class Coaddition:
"""
Enhancement algorithm that sums imagery over a running window.
Useful for highlighting slowly moving objects by integrating signal
over multiple frames. The algorithm maintains a running sum of frames
within a sliding window.
"""
name = "Coaddition"
[docs]
def __init__(self, imagery: Imagery, window_size: int):
"""
Initialize the Coaddition algorithm.
Parameters
----------
imagery : Imagery
Imagery object to process
window_size : int
Number of frames to sum in the running window
"""
self.imagery = imagery
self.window_size = window_size
self.current_frame_idx = 0
# Validate window size
if window_size < 1:
raise ValueError("Window size must be at least 1")
if window_size > len(imagery):
raise ValueError(f"Window size ({window_size}) cannot exceed number of frames ({len(imagery)})")
[docs]
def __call__(self):
"""
Process the next frame and return the coadded result.
Returns
-------
tuple
(frame_index, coadded_frame) where coadded_frame is the sum
of frames within the window centered on the current frame.
"""
if self.current_frame_idx >= len(self.imagery):
raise StopIteration("No more frames to process")
# Calculate window bounds
# Center the window on the current frame
half_window = self.window_size // 2
# Calculate start and end indices, clamping to valid range
start_idx = max(0, self.current_frame_idx - half_window)
end_idx = min(len(self.imagery), self.current_frame_idx + half_window + 1)
# Ensure we always get window_size frames if possible
# If we're at the beginning, extend to the right
if start_idx == 0 and end_idx - start_idx < self.window_size:
end_idx = min(len(self.imagery), self.window_size)
# If we're at the end, extend to the left
elif end_idx == len(self.imagery) and end_idx - start_idx < self.window_size:
start_idx = max(0, end_idx - self.window_size)
# Sum frames in the window
coadded_frame = np.sum(self.imagery.images[start_idx:end_idx], axis=0, dtype=np.float32)
# Get the current frame index
frame_idx = self.current_frame_idx
# Move to next frame
self.current_frame_idx += 1
return frame_idx, coadded_frame
[docs]
def __len__(self):
"""Return the number of frames to process"""
return len(self.imagery)
[docs]
class DecimatingCoaddition:
"""
Enhancement algorithm that sums imagery over non-overlapping windows.
Unlike the streaming Coaddition which produces an output for every input frame,
this decimating version produces one output frame per window. This reduces the
output frame count by a factor of window_size.
For example, with 10 input frames (0-9) and window_size=3:
- Window 1: sum frames 0, 1, 2 → output at frame index 1 (center)
- Window 2: sum frames 3, 4, 5 → output at frame index 4 (center)
- Window 3: sum frames 6, 7, 8 → output at frame index 7 (center)
- Remaining frame 9 is discarded (incomplete window)
"""
name = "Decimating Coaddition"
[docs]
def __init__(self, imagery: Imagery, window_size: int):
"""
Initialize the Decimating Coaddition algorithm.
Parameters
----------
imagery : Imagery
Imagery object to process
window_size : int
Number of frames to sum in each non-overlapping window
"""
self.imagery = imagery
self.window_size = window_size
self.current_window_idx = 0
# Validate window size
if window_size < 1:
raise ValueError("Window size must be at least 1")
if window_size > len(imagery):
raise ValueError(f"Window size ({window_size}) cannot exceed number of frames ({len(imagery)})")
# Calculate number of complete windows
self.num_windows = len(imagery) // window_size
[docs]
def __call__(self):
"""
Process the next window and return the coadded result.
Returns
-------
tuple
(output_frame_index, coadded_frame) where output_frame_index is the
center frame of the window in the original imagery coordinates, and
coadded_frame is the sum of all frames in the window.
"""
if self.current_window_idx >= self.num_windows:
raise StopIteration("No more windows to process")
# Calculate window bounds in original imagery coordinates
start_idx = self.current_window_idx * self.window_size
end_idx = start_idx + self.window_size
# Sum frames in the window
coadded_frame = np.sum(self.imagery.images[start_idx:end_idx], axis=0, dtype=np.float32)
# Output frame index is the center of the window
# For window_size=3 starting at 0: center is 1
# For window_size=3 starting at 3: center is 4
output_frame_idx = start_idx + (self.window_size - 1) // 2
# Move to next window
self.current_window_idx += 1
return output_frame_idx, coadded_frame
[docs]
def __len__(self):
"""Return the number of output frames (complete windows)"""
return self.num_windows
[docs]
def get_output_frame_indices(self):
"""
Get the list of output frame indices in original imagery coordinates.
Returns
-------
list[int]
List of frame indices that will have output values
"""
return [i * self.window_size + (self.window_size - 1) // 2 for i in range(self.num_windows)]