Source code for vista.tracks.track

"""
Track data model for representing temporal object trajectories.

This module defines the Track class, which represents a single object trajectory
across multiple frames with support for multiple coordinate systems (pixel, geodetic,
time-based), visualization styling, and data persistence.
"""
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
from numpy.typing import NDArray
import pandas as pd
import pyqtgraph as pg
from PyQt6.QtCore import Qt
import uuid
from vista.utils.geodetic_mapping import map_geodetic_to_pixel
from vista.utils.time_mapping import map_times_to_frames
from vista.sensors.sensor import Sensor


[docs] @dataclass class Track: """ Represents a single object trajectory across multiple frames. A Track contains temporal position data for a moving object, with support for multiple coordinate systems (pixel, geodetic, time-based) and rich visualization options. Tracks can be created manually, loaded from files, or generated by tracking algorithms. Parameters ---------- name : str Unique identifier for this track frames : NDArray[np.int_] Frame numbers where track positions are defined rows : NDArray[np.float64] Row (vertical) pixel coordinates for each frame columns : NDArray[np.float64] Column (horizontal) pixel coordinates for each frame sensor : Sensor Sensor object providing coordinate conversion capabilities Attributes ---------- color : str, optional Color for track visualization, by default 'g' (green) marker : str, optional Marker style for current position ('o', 's', 't', 'd', '+', 'x', 'star'), by default 'o' (circle) line_width : int, optional Width of line connecting track points, by default 2 marker_size : int, optional Size of current position marker, by default 12 visible : bool, optional Whether track is visible in viewer, by default True tail_length : int, optional Number of previous frames to show (0 = all history), by default 0 complete : bool, optional If True, show entire track regardless of current frame, by default False show_line : bool, optional Whether to draw line connecting track points, by default True line_style : str, optional Qt line style ('SolidLine', 'DashLine', 'DotLine', 'DashDotLine', 'DashDotDotLine'), by default 'SolidLine' labels : set[str], optional Set of text labels for categorizing/filtering tracks, by default empty set extraction_metadata : dict, optional Extraction metadata containing image chips and signal detection results. Dictionary with keys: 'chip_size' (int), 'chips' (NDArray with shape (n_points, diameter, diameter)), 'signal_masks' (boolean NDArray with same shape), 'noise_stds' (NDArray with shape (n_points,)), by default None Methods ------- __getitem__(slice) Slice track by frame range get_times() Get timestamps for each track point using sensor imagery times from_dataframe(df, sensor, name) Create Track from pandas DataFrame with coordinate conversion length Property that returns cumulative Euclidean distance along track copy() Create a deep copy of the track to_csv(file) Save track to CSV file to_dataframe() Convert track to pandas DataFrame Notes ----- - Track coordinates can be provided as pixel (row/col) or geodetic (lat/lon/alt) - Times can be used instead of frames with automatic conversion via sensor - The from_dataframe() method handles coordinate system conversions automatically - Track length is computed lazily and cached for performance """ name: str frames: NDArray[np.int_] rows: NDArray[np.float64] columns: NDArray[np.float64] sensor: Sensor # Styling attributes color: str = 'g' # Green by default marker: str = 'o' # Circle by default line_width: int = 2 marker_size: int = 12 visible: bool = True tail_length: int = 0 # 0 means show all history, >0 means show only last N frames complete: bool = False # If True, show complete track regardless of current frame and override tail_length show_line: bool = True # If True, show line connecting track points line_style: str = 'SolidLine' # Line style: 'SolidLine', 'DashLine', 'DotLine', 'DashDotLine', 'DashDotDotLine' labels: set[str] = field(default_factory=set) # Set of labels for this track # Extraction metadata extraction_metadata: Optional[dict] = None # Dict containing 'chip_size', 'chips', 'signal_masks', 'noise_stds' # Uncertainty visualization (2D covariance matrix: [[C00, C01], [C01, C11]]) covariance_00: Optional[NDArray[np.float64]] = None # Row variance (C_row_row) covariance_01: Optional[NDArray[np.float64]] = None # Row-column covariance (C_row_col) covariance_11: Optional[NDArray[np.float64]] = None # Column variance (C_col_col) show_uncertainty: bool = False # Whether to display uncertainty ellipses # Private attributes _length: int = field(init=False, default=None) # Performance optimization: cached data structures _frame_index: dict = field(default=None, init=False, repr=False) # Frame number -> index _cached_pen: object = field(default=None, init=False, repr=False) # Cached PyQtGraph pen _cached_brush: object = field(default=None, init=False, repr=False) # Cached PyQtGraph brush _pen_params: tuple = field(default=None, init=False, repr=False) # Parameters used for cached pen _brush_params: tuple = field(default=None, init=False, repr=False) # Parameters used for cached brush uuid: str = field(init=None, default=None) def __post_init__(self): self.uuid = uuid.uuid4() def __eq__(self, other): if not isinstance(other, Track): return False return self.uuid == other.uuid
[docs] def __getitem__(self, s): if isinstance(s, slice) or isinstance(s, np.ndarray): # Handle slice objects track_slice = self.copy() track_slice.frames = track_slice.frames[s] track_slice.rows = track_slice.rows[s] track_slice.columns = track_slice.columns[s] # Slice extraction metadata if present if track_slice.extraction_metadata is not None: track_slice.extraction_metadata = { 'chip_size': track_slice.extraction_metadata['chip_size'], 'chips': track_slice.extraction_metadata['chips'][s], 'signal_masks': track_slice.extraction_metadata['signal_masks'][s], 'noise_stds': track_slice.extraction_metadata['noise_stds'][s], } # Slice uncertainty data if present if track_slice.covariance_00 is not None: track_slice.covariance_00 = track_slice.covariance_00[s] if track_slice.covariance_01 is not None: track_slice.covariance_01 = track_slice.covariance_01[s] if track_slice.covariance_11 is not None: track_slice.covariance_11 = track_slice.covariance_11[s] return track_slice else: raise TypeError("Invalid index or slice type.")
[docs] def __len__(self): return len(self.frames)
def __str__(self): return self.__repr__() def __repr__(self): s = f"{self.__class__.__name__}({self.name})" s += "\n" + len(s) * "-" + "\n" s += str(self.to_dataframe()) return s def _build_frame_index(self): """Build index mapping frame numbers to track indices for O(1) lookup.""" if self._frame_index is None: self._frame_index = {} for i, frame in enumerate(self.frames): self._frame_index[frame] = i
[docs] def get_track_data_at_frame(self, frame_num): """ Get track position at a specific frame using O(1) cached lookup. Parameters ---------- frame_num : int Frame number to query Returns ------- tuple or None (row, column) coordinates at this frame, or None if frame not in track """ self._build_frame_index() idx = self._frame_index.get(frame_num) if idx is not None: return self.rows[idx], self.columns[idx] return None
[docs] def get_visible_indices(self, current_frame): """ Get indices of track points that should be visible at the current frame. Parameters ---------- current_frame : int Current frame number Returns ------- NDArray or None Array of indices for visible track points, or None if no points visible """ if self.complete: # Show entire track return np.arange(len(self.frames)) # Find points up to current frame mask = self.frames <= current_frame if self.tail_length > 0: # Only show last N frames frame_diff = current_frame - self.frames mask &= (frame_diff <= self.tail_length) & (frame_diff >= 0) indices = np.where(mask)[0] return indices if len(indices) > 0 else None
[docs] def invalidate_caches(self): """Invalidate cached data structures when track data changes.""" self._frame_index = None self._cached_pen = None self._cached_brush = None self._pen_params = None self._brush_params = None self._length = None
[docs] def get_pen(self, width=None, style=None): """ Get cached PyQtGraph pen object, creating only if parameters changed. Parameters ---------- width : int, optional Line width override, uses self.line_width if None style : str, optional Line style override, uses self.line_style if None Returns ------- pg.mkPen PyQtGraph pen object """ actual_width = width if width is not None else self.line_width actual_style = style if style is not None else self.line_style # Map string style to Qt constant style_map = { 'SolidLine': Qt.PenStyle.SolidLine, 'DashLine': Qt.PenStyle.DashLine, 'DotLine': Qt.PenStyle.DotLine, 'DashDotLine': Qt.PenStyle.DashDotLine, 'DashDotDotLine': Qt.PenStyle.DashDotDotLine, } qt_style = style_map.get(actual_style, Qt.PenStyle.SolidLine) params = (self.color, actual_width, qt_style) if self._pen_params != params: self._cached_pen = pg.mkPen(color=self.color, width=actual_width, style=qt_style) self._pen_params = params return self._cached_pen
[docs] def get_brush(self): """ Get cached PyQtGraph brush object for marker fill, creating only if parameters changed. Returns ------- pg.mkBrush PyQtGraph brush object """ params = (self.color,) if self._brush_params != params: self._cached_brush = pg.mkBrush(color=self.color) self._brush_params = params return self._cached_brush
[docs] def has_uncertainty(self) -> bool: """ Check if track has uncertainty data. Returns ------- bool True if track has all three covariance matrix elements (C00, C01, C11), False otherwise """ return (self.covariance_00 is not None and self.covariance_01 is not None and self.covariance_11 is not None)
[docs] def get_uncertainty_ellipse_parameters(self) -> Optional[tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]]: """ Convert covariance matrix to ellipse parameters for visualization. Computes the semi-major axis length, semi-minor axis length, and rotation angle from the 2D covariance matrix at each track point. Returns ------- Optional[tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]] Tuple of (semi_major_axis, semi_minor_axis, rotation_degrees) arrays, or None if no uncertainty data. Rotation is in degrees, counter-clockwise from horizontal (positive column axis). """ if not self.has_uncertainty(): return None # Eigenvalue decomposition of 2x2 covariance matrix # For [[C00, C01], [C01, C11]], eigenvalues are: # lambda = 0.5 * (C00 + C11 +/- sqrt((C00 - C11)^2 + 4*C01^2)) trace = self.covariance_00 + self.covariance_11 det = self.covariance_00 * self.covariance_11 - self.covariance_01**2 discriminant = np.sqrt(np.maximum((self.covariance_00 - self.covariance_11)**2 + 4 * self.covariance_01**2, 0)) lambda1 = 0.5 * (trace + discriminant) # Larger eigenvalue lambda2 = 0.5 * (trace - discriminant) # Smaller eigenvalue # Semi-axes are square roots of eigenvalues semi_major = np.sqrt(np.maximum(lambda1, 0)) semi_minor = np.sqrt(np.maximum(lambda2, 0)) # Rotation angle (in degrees, counter-clockwise from horizontal) # arctan2(2*C01, C00 - C11) gives twice the rotation angle rotation_rad = 0.5 * np.arctan2(2 * self.covariance_01, self.covariance_00 - self.covariance_11) rotation_deg = np.degrees(rotation_rad) return semi_major, semi_minor, rotation_deg
[docs] def get_uncertainty_radius(self) -> Optional[NDArray[np.float64]]: """ Compute the geometric mean radius of uncertainty ellipses. The geometric mean radius is computed as the fourth root of the covariance matrix determinant: sqrt(sqrt(det(Cov))) = sqrt(sqrt(C00*C11 - C01^2)). This represents the radius of a circle with the same area as the uncertainty ellipse. Returns ------- Optional[NDArray[np.float64]] Array of geometric mean radii for each track point, or None if uncertainty data is not available """ if not self.has_uncertainty(): return None # Geometric mean radius = sqrt(det(covariance_matrix)) det = self.covariance_00 * self.covariance_11 - self.covariance_01**2 return np.sqrt(np.maximum(det, 0))
[docs] def get_times(self) -> NDArray[np.datetime64]: """ Get timestamps for each track point using sensor imagery times. Matches track frames to sensor imagery frames and returns corresponding timestamps. Returns NaT (Not-a-Time) for frames without matching imagery. Returns ------- NDArray[np.datetime64] or None Array of timestamps with same length as track, or None if sensor has no imagery times Notes ----- Uses binary search (searchsorted) for efficient frame matching. """ sensor_imagery_frames, sensor_imagery_times = self.sensor.get_imagery_frames_and_times() if len(sensor_imagery_times) < 1: return None # Find where each track_frame would be inserted in sensor_frames indices = np.searchsorted(sensor_imagery_frames, self.frames) # Create output array filled with NaT track_times = np.full(len(self.frames), np.datetime64('NaT'), dtype='datetime64[ns]') # Validate matches: check if indices are in bounds and values actually match valid_mask = (indices < len(sensor_imagery_frames)) & (sensor_imagery_frames[indices] == self.frames) # Assign matching times track_times[valid_mask] = sensor_imagery_times[indices[valid_mask]] return track_times
[docs] @classmethod def from_dataframe(cls, df: pd.DataFrame, sensor: Sensor, name: str = None): """ Create Track from DataFrame with automatic coordinate conversion. Supports multiple input coordinate systems with automatic conversion: - Frames or Times → Frames (Times require sensor imagery) - Rows/Columns or Lat/Lon/Alt → Rows/Columns (Geodetic requires sensor) Priority system: Frames > Times, Rows/Columns > Lat/Lon/Alt Parameters ---------- df : pd.DataFrame DataFrame containing track data with required columns based on coordinate system (see Notes) sensor : Sensor Sensor object for coordinate conversions name : str, optional Track name, by default taken from df["Track"] Returns ------- Track New Track object with converted coordinates Raises ------ ValueError If required columns are missing or coordinate conversion fails Notes ----- Required columns (one set from each group): Temporal coordinates (choose one): - "Frames" : Frame numbers (preferred) - "Times" : Timestamps (requires sensor with imagery times) Spatial coordinates (choose one): - "Rows" and "Columns" : Pixel coordinates (preferred) - "Latitude (deg)", "Longitude (deg)", "Altitude (km)" : Geodetic coordinates (requires sensor with geolocation capability) Optional styling columns: - "Color", "Marker", "Line Width", "Marker Size", "Visible", "Complete", "Show Line", "Line Style", "Tail Length", "Labels" """ if name is None: name = df["Track"][0] kwargs = {} if "Color" in df.columns: kwargs["color"] = df["Color"].iloc[0] if "Marker" in df.columns: kwargs["marker"] = df["Marker"].iloc[0] if "Line Width" in df.columns: kwargs["line_width"] = df["Line Width"].iloc[0] if "Marker Size" in df.columns: kwargs["marker_size"] = df["Marker Size"].iloc[0] if "Tail Length" in df.columns: kwargs["tail_length"] = df["Tail Length"].iloc[0] if "Visible" in df.columns: kwargs["visible"] = df["Visible"].iloc[0] if "Complete" in df.columns: kwargs["complete"] = df["Complete"].iloc[0] if "Show Line" in df.columns: kwargs["show_line"] = df["Show Line"].iloc[0] if "Line Style" in df.columns: kwargs["line_style"] = df["Line Style"].iloc[0] if "Labels" in df.columns: # Parse labels from comma-separated string labels_str = df["Labels"].iloc[0] if pd.notna(labels_str) and labels_str: kwargs["labels"] = set(label.strip() for label in labels_str.split(',')) else: kwargs["labels"] = set() # Handle uncertainty data (optional) - covariance matrix elements # Only populate if all three columns exist and all values are valid (not NaN) if ("Covariance 00" in df.columns and "Covariance 01" in df.columns and "Covariance 11" in df.columns): cov_00 = df["Covariance 00"].to_numpy(dtype=np.float64) cov_01 = df["Covariance 01"].to_numpy(dtype=np.float64) cov_11 = df["Covariance 11"].to_numpy(dtype=np.float64) # Only set covariance if all values are valid (no NaN or inf) if (np.all(np.isfinite(cov_00)) and np.all(np.isfinite(cov_01)) and np.all(np.isfinite(cov_11))): kwargs["covariance_00"] = cov_00 kwargs["covariance_01"] = cov_01 kwargs["covariance_11"] = cov_11 # Handle times (optional) times = None if "Times" in df.columns: # Parse times as datetime64 times = pd.to_datetime(df["Times"]).to_numpy() # Determine frames - priority: Frames column > time-to-frame mapping if "Frames" in df.columns: # Frames take precedence frames = df["Frames"].to_numpy() elif times is not None: sensor_imagery_frames, sensor_imagery_times = sensor.get_imagery_frames_and_times() if len(sensor_imagery_times) == 0: # Times present but no cannot map to frames using sensor - raise error raise ValueError(f"Track '{name}' has times but no frames. Sensor imagery times are required for time-to-frame mapping.") # Map times to frames using the sensor imagery frames = map_times_to_frames(times, sensor_imagery_times, sensor_imagery_frames) else: raise ValueError(f"Track '{name}' must have either 'Frames' or 'Times' column") # Determine rows/columns - priority: Rows/Columns > geodetic-to-pixel mapping if "Rows" in df.columns and "Columns" in df.columns: # Row/Column take precedence rows = df["Rows"].to_numpy() columns = df["Columns"].to_numpy() elif "Latitude (deg)" in df.columns and "Longitude (deg)" in df.columns and "Altitude (km)" in df.columns: # Need geodetic-to-pixel conversion if sensor is None: raise ValueError( f"Track '{name}' has geodetic coordinates (Lat/Lon/Alt) but no row/column. " "Sensor required for geodetic-to-pixel mapping." ) if not hasattr(sensor, 'can_geolocate') or not sensor.can_geolocate(): raise ValueError( f"Track '{name}' has geodetic coordinates (Lat/Lon/Alt) but sensor '{sensor.name}' " "does not support geolocation." ) # Map geodetic to pixel using sensor rows, columns = map_geodetic_to_pixel( df["Latitude (deg)"].to_numpy(), df["Longitude (deg)"].to_numpy(), df["Altitude (km)"].to_numpy(), frames, sensor ) else: raise ValueError( f"Track '{name}' must have either 'Rows' and 'Columns' columns, " "or 'Latitude', 'Longitude', and 'Altitude' columns" ) # Enable show_uncertainty by default if uncertainty data is present if ('covariance_00' in kwargs and 'covariance_01' in kwargs and 'covariance_11' in kwargs): # Only set to True if not already explicitly set if 'show_uncertainty' not in kwargs: kwargs['show_uncertainty'] = True return cls( name = name, frames = frames, rows = rows, columns = columns, sensor = sensor, **kwargs )
@property def length(self): """ Cumulative Euclidean distance along the track path. Computes the sum of pixel distances between consecutive track points. Result is cached for performance. Returns ------- float Total track length in pixels, or 0.0 if track has fewer than 2 points """ if self._length is None: if len(self.rows) < 2: self._length = 0.0 else: self._length = np.sum(np.sqrt(np.diff(self.rows)**2 + np.diff(self.columns)**2)) return self._length
[docs] def copy(self): """ Create a deep copy of this track object. Returns ------- Track New Track object with copied arrays and styling attributes """ # Deep copy extraction metadata if present extraction_metadata_copy = None if self.extraction_metadata is not None: extraction_metadata_copy = { 'chip_size': self.extraction_metadata['chip_size'], 'chips': self.extraction_metadata['chips'].copy(), 'signal_masks': self.extraction_metadata['signal_masks'].copy(), 'noise_stds': self.extraction_metadata['noise_stds'].copy(), } return self.__class__( name = self.name, frames = self.frames.copy(), rows = self.rows.copy(), columns = self.columns.copy(), sensor = self.sensor, color = self.color, marker = self.marker, line_width = self.line_width, marker_size = self.marker_size, visible = self.visible, tail_length = self.tail_length, complete = self.complete, show_line = self.show_line, line_style = self.line_style, labels = self.labels.copy(), extraction_metadata = extraction_metadata_copy, covariance_00 = self.covariance_00.copy() if self.covariance_00 is not None else None, covariance_01 = self.covariance_01.copy() if self.covariance_01 is not None else None, covariance_11 = self.covariance_11.copy() if self.covariance_11 is not None else None, show_uncertainty = self.show_uncertainty, )
[docs] def to_dataframe(self) -> pd.DataFrame: """Convert track to DataFrame Raises: ValueError: If geolocation/time requested but imagery is missing required data """ data = { "Track": len(self)*[self.name], "Frames": self.frames, "Rows": self.rows, "Columns": self.columns, "Color": self.color, "Marker": self.marker, "Line Width": self.line_width, "Marker Size": self.marker_size, "Tail Length": self.tail_length, "Visible": self.visible, "Complete": self.complete, "Show Line": self.show_line, "Line Style": self.line_style, "Labels": ', '.join(sorted(self.labels)) if self.labels else '', } # Include geolocation if possible # Convert pixel coordinates to geodetic for each frame latitudes = [] longitudes = [] altitudes = [] for i, frame in enumerate(self.frames): # Convert single point locations = self.sensor.pixel_to_geodetic(frame, np.array([self.rows[i]]), np.array([self.columns[i]])) latitudes.append(locations.lat.deg[0]) longitudes.append(locations.lon.deg[0]) altitudes.append(locations.height.to('km').value[0]) data["Latitude (deg)"] = latitudes data["Longitude (deg)"] = longitudes data["Altitude (km)"] = altitudes # Include times if possible track_times = self.get_times() if track_times is not None: data["Times"] = pd.to_datetime(track_times).strftime('%Y-%m-%dT%H:%M:%S.%f') # Include uncertainty data if present if self.has_uncertainty(): data["Covariance 00"] = self.covariance_00 data["Covariance 01"] = self.covariance_01 data["Covariance 11"] = self.covariance_11 return pd.DataFrame(data)