"""
Track data model for representing temporal object trajectories.
This module defines the Track class, which represents a single object trajectory
across multiple frames with support for multiple coordinate systems (pixel, geodetic,
time-based), visualization styling, and data persistence.
"""
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
from numpy.typing import NDArray
import pandas as pd
import pyqtgraph as pg
from PyQt6.QtCore import Qt
import uuid
from vista.utils.geodetic_mapping import map_geodetic_to_pixel
from vista.utils.time_mapping import map_times_to_frames
from vista.sensors.sensor import Sensor
[docs]
@dataclass
class Track:
"""
Represents a single object trajectory across multiple frames.
A Track contains temporal position data for a moving object, with support for
multiple coordinate systems (pixel, geodetic, time-based) and rich visualization
options. Tracks can be created manually, loaded from files, or generated by
tracking algorithms.
Parameters
----------
name : str
Unique identifier for this track
frames : NDArray[np.int_]
Frame numbers where track positions are defined
rows : NDArray[np.float64]
Row (vertical) pixel coordinates for each frame
columns : NDArray[np.float64]
Column (horizontal) pixel coordinates for each frame
sensor : Sensor
Sensor object providing coordinate conversion capabilities
Attributes
----------
color : str, optional
Color for track visualization, by default 'g' (green)
marker : str, optional
Marker style for current position ('o', 's', 't', 'd', '+', 'x', 'star'),
by default 'o' (circle)
line_width : int, optional
Width of line connecting track points, by default 2
marker_size : int, optional
Size of current position marker, by default 12
visible : bool, optional
Whether track is visible in viewer, by default True
tail_length : int, optional
Number of previous frames to show (0 = all history), by default 0
complete : bool, optional
If True, show entire track regardless of current frame, by default False
show_line : bool, optional
Whether to draw line connecting track points, by default True
line_style : str, optional
Qt line style ('SolidLine', 'DashLine', 'DotLine', 'DashDotLine',
'DashDotDotLine'), by default 'SolidLine'
labels : set[str], optional
Set of text labels for categorizing/filtering tracks, by default empty set
extraction_metadata : dict, optional
Extraction metadata containing image chips and signal detection results.
Dictionary with keys: 'chip_size' (int), 'chips' (NDArray with shape
(n_points, diameter, diameter)), 'signal_masks' (boolean NDArray with
same shape), 'noise_stds' (NDArray with shape (n_points,)), by default None
Methods
-------
__getitem__(slice)
Slice track by frame range
get_times()
Get timestamps for each track point using sensor imagery times
from_dataframe(df, sensor, name)
Create Track from pandas DataFrame with coordinate conversion
length
Property that returns cumulative Euclidean distance along track
copy()
Create a deep copy of the track
to_csv(file)
Save track to CSV file
to_dataframe()
Convert track to pandas DataFrame
Notes
-----
- Track coordinates can be provided as pixel (row/col) or geodetic (lat/lon/alt)
- Times can be used instead of frames with automatic conversion via sensor
- The from_dataframe() method handles coordinate system conversions automatically
- Track length is computed lazily and cached for performance
"""
name: str
frames: NDArray[np.int_]
rows: NDArray[np.float64]
columns: NDArray[np.float64]
sensor: Sensor
# Styling attributes
color: str = 'g' # Green by default
marker: str = 'o' # Circle by default
line_width: int = 2
marker_size: int = 12
visible: bool = True
tail_length: int = 0 # 0 means show all history, >0 means show only last N frames
complete: bool = False # If True, show complete track regardless of current frame and override tail_length
show_line: bool = True # If True, show line connecting track points
line_style: str = 'SolidLine' # Line style: 'SolidLine', 'DashLine', 'DotLine', 'DashDotLine', 'DashDotDotLine'
labels: set[str] = field(default_factory=set) # Set of labels for this track
# Extraction metadata
extraction_metadata: Optional[dict] = None # Dict containing 'chip_size', 'chips', 'signal_masks', 'noise_stds'
# Uncertainty visualization (2D covariance matrix: [[C00, C01], [C01, C11]])
covariance_00: Optional[NDArray[np.float64]] = None # Row variance (C_row_row)
covariance_01: Optional[NDArray[np.float64]] = None # Row-column covariance (C_row_col)
covariance_11: Optional[NDArray[np.float64]] = None # Column variance (C_col_col)
show_uncertainty: bool = False # Whether to display uncertainty ellipses
# Private attributes
_length: int = field(init=False, default=None)
# Performance optimization: cached data structures
_frame_index: dict = field(default=None, init=False, repr=False) # Frame number -> index
_cached_pen: object = field(default=None, init=False, repr=False) # Cached PyQtGraph pen
_cached_brush: object = field(default=None, init=False, repr=False) # Cached PyQtGraph brush
_pen_params: tuple = field(default=None, init=False, repr=False) # Parameters used for cached pen
_brush_params: tuple = field(default=None, init=False, repr=False) # Parameters used for cached brush
uuid: str = field(init=None, default=None)
def __post_init__(self):
self.uuid = uuid.uuid4()
def __eq__(self, other):
if not isinstance(other, Track):
return False
return self.uuid == other.uuid
[docs]
def __getitem__(self, s):
if isinstance(s, slice) or isinstance(s, np.ndarray):
# Handle slice objects
track_slice = self.copy()
track_slice.frames = track_slice.frames[s]
track_slice.rows = track_slice.rows[s]
track_slice.columns = track_slice.columns[s]
# Slice extraction metadata if present
if track_slice.extraction_metadata is not None:
track_slice.extraction_metadata = {
'chip_size': track_slice.extraction_metadata['chip_size'],
'chips': track_slice.extraction_metadata['chips'][s],
'signal_masks': track_slice.extraction_metadata['signal_masks'][s],
'noise_stds': track_slice.extraction_metadata['noise_stds'][s],
}
# Slice uncertainty data if present
if track_slice.covariance_00 is not None:
track_slice.covariance_00 = track_slice.covariance_00[s]
if track_slice.covariance_01 is not None:
track_slice.covariance_01 = track_slice.covariance_01[s]
if track_slice.covariance_11 is not None:
track_slice.covariance_11 = track_slice.covariance_11[s]
return track_slice
else:
raise TypeError("Invalid index or slice type.")
[docs]
def __len__(self):
return len(self.frames)
def __str__(self):
return self.__repr__()
def __repr__(self):
s = f"{self.__class__.__name__}({self.name})"
s += "\n" + len(s) * "-" + "\n"
s += str(self.to_dataframe())
return s
def _build_frame_index(self):
"""Build index mapping frame numbers to track indices for O(1) lookup."""
if self._frame_index is None:
self._frame_index = {}
for i, frame in enumerate(self.frames):
self._frame_index[frame] = i
[docs]
def get_track_data_at_frame(self, frame_num):
"""
Get track position at a specific frame using O(1) cached lookup.
Parameters
----------
frame_num : int
Frame number to query
Returns
-------
tuple or None
(row, column) coordinates at this frame, or None if frame not in track
"""
self._build_frame_index()
idx = self._frame_index.get(frame_num)
if idx is not None:
return self.rows[idx], self.columns[idx]
return None
[docs]
def get_visible_indices(self, current_frame):
"""
Get indices of track points that should be visible at the current frame.
Parameters
----------
current_frame : int
Current frame number
Returns
-------
NDArray or None
Array of indices for visible track points, or None if no points visible
"""
if self.complete:
# Show entire track
return np.arange(len(self.frames))
# Find points up to current frame
mask = self.frames <= current_frame
if self.tail_length > 0:
# Only show last N frames
frame_diff = current_frame - self.frames
mask &= (frame_diff <= self.tail_length) & (frame_diff >= 0)
indices = np.where(mask)[0]
return indices if len(indices) > 0 else None
[docs]
def invalidate_caches(self):
"""Invalidate cached data structures when track data changes."""
self._frame_index = None
self._cached_pen = None
self._cached_brush = None
self._pen_params = None
self._brush_params = None
self._length = None
[docs]
def get_pen(self, width=None, style=None):
"""
Get cached PyQtGraph pen object, creating only if parameters changed.
Parameters
----------
width : int, optional
Line width override, uses self.line_width if None
style : str, optional
Line style override, uses self.line_style if None
Returns
-------
pg.mkPen
PyQtGraph pen object
"""
actual_width = width if width is not None else self.line_width
actual_style = style if style is not None else self.line_style
# Map string style to Qt constant
style_map = {
'SolidLine': Qt.PenStyle.SolidLine,
'DashLine': Qt.PenStyle.DashLine,
'DotLine': Qt.PenStyle.DotLine,
'DashDotLine': Qt.PenStyle.DashDotLine,
'DashDotDotLine': Qt.PenStyle.DashDotDotLine,
}
qt_style = style_map.get(actual_style, Qt.PenStyle.SolidLine)
params = (self.color, actual_width, qt_style)
if self._pen_params != params:
self._cached_pen = pg.mkPen(color=self.color, width=actual_width, style=qt_style)
self._pen_params = params
return self._cached_pen
[docs]
def get_brush(self):
"""
Get cached PyQtGraph brush object for marker fill, creating only if parameters changed.
Returns
-------
pg.mkBrush
PyQtGraph brush object
"""
params = (self.color,)
if self._brush_params != params:
self._cached_brush = pg.mkBrush(color=self.color)
self._brush_params = params
return self._cached_brush
[docs]
def has_uncertainty(self) -> bool:
"""
Check if track has uncertainty data.
Returns
-------
bool
True if track has all three covariance matrix elements (C00, C01, C11), False otherwise
"""
return (self.covariance_00 is not None and
self.covariance_01 is not None and
self.covariance_11 is not None)
[docs]
def get_uncertainty_ellipse_parameters(self) -> Optional[tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]]:
"""
Convert covariance matrix to ellipse parameters for visualization.
Computes the semi-major axis length, semi-minor axis length, and rotation angle
from the 2D covariance matrix at each track point.
Returns
-------
Optional[tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]]
Tuple of (semi_major_axis, semi_minor_axis, rotation_degrees) arrays, or None if no uncertainty data.
Rotation is in degrees, counter-clockwise from horizontal (positive column axis).
"""
if not self.has_uncertainty():
return None
# Eigenvalue decomposition of 2x2 covariance matrix
# For [[C00, C01], [C01, C11]], eigenvalues are:
# lambda = 0.5 * (C00 + C11 +/- sqrt((C00 - C11)^2 + 4*C01^2))
trace = self.covariance_00 + self.covariance_11
det = self.covariance_00 * self.covariance_11 - self.covariance_01**2
discriminant = np.sqrt(np.maximum((self.covariance_00 - self.covariance_11)**2 + 4 * self.covariance_01**2, 0))
lambda1 = 0.5 * (trace + discriminant) # Larger eigenvalue
lambda2 = 0.5 * (trace - discriminant) # Smaller eigenvalue
# Semi-axes are square roots of eigenvalues
semi_major = np.sqrt(np.maximum(lambda1, 0))
semi_minor = np.sqrt(np.maximum(lambda2, 0))
# Rotation angle (in degrees, counter-clockwise from horizontal)
# arctan2(2*C01, C00 - C11) gives twice the rotation angle
rotation_rad = 0.5 * np.arctan2(2 * self.covariance_01, self.covariance_00 - self.covariance_11)
rotation_deg = np.degrees(rotation_rad)
return semi_major, semi_minor, rotation_deg
[docs]
def get_uncertainty_radius(self) -> Optional[NDArray[np.float64]]:
"""
Compute the geometric mean radius of uncertainty ellipses.
The geometric mean radius is computed as the fourth root of the covariance
matrix determinant: sqrt(sqrt(det(Cov))) = sqrt(sqrt(C00*C11 - C01^2)).
This represents the radius of a circle with the same area as the uncertainty ellipse.
Returns
-------
Optional[NDArray[np.float64]]
Array of geometric mean radii for each track point, or None if uncertainty data is not available
"""
if not self.has_uncertainty():
return None
# Geometric mean radius = sqrt(det(covariance_matrix))
det = self.covariance_00 * self.covariance_11 - self.covariance_01**2
return np.sqrt(np.maximum(det, 0))
[docs]
def get_times(self) -> NDArray[np.datetime64]:
"""
Get timestamps for each track point using sensor imagery times.
Matches track frames to sensor imagery frames and returns corresponding
timestamps. Returns NaT (Not-a-Time) for frames without matching imagery.
Returns
-------
NDArray[np.datetime64] or None
Array of timestamps with same length as track, or None if sensor
has no imagery times
Notes
-----
Uses binary search (searchsorted) for efficient frame matching.
"""
sensor_imagery_frames, sensor_imagery_times = self.sensor.get_imagery_frames_and_times()
if len(sensor_imagery_times) < 1:
return None
# Find where each track_frame would be inserted in sensor_frames
indices = np.searchsorted(sensor_imagery_frames, self.frames)
# Create output array filled with NaT
track_times = np.full(len(self.frames), np.datetime64('NaT'), dtype='datetime64[ns]')
# Validate matches: check if indices are in bounds and values actually match
valid_mask = (indices < len(sensor_imagery_frames)) & (sensor_imagery_frames[indices] == self.frames)
# Assign matching times
track_times[valid_mask] = sensor_imagery_times[indices[valid_mask]]
return track_times
[docs]
@classmethod
def from_dataframe(cls, df: pd.DataFrame, sensor: Sensor, name: str = None):
"""
Create Track from DataFrame with automatic coordinate conversion.
Supports multiple input coordinate systems with automatic conversion:
- Frames or Times → Frames (Times require sensor imagery)
- Rows/Columns or Lat/Lon/Alt → Rows/Columns (Geodetic requires sensor)
Priority system: Frames > Times, Rows/Columns > Lat/Lon/Alt
Parameters
----------
df : pd.DataFrame
DataFrame containing track data with required columns based on
coordinate system (see Notes)
sensor : Sensor
Sensor object for coordinate conversions
name : str, optional
Track name, by default taken from df["Track"]
Returns
-------
Track
New Track object with converted coordinates
Raises
------
ValueError
If required columns are missing or coordinate conversion fails
Notes
-----
Required columns (one set from each group):
Temporal coordinates (choose one):
- "Frames" : Frame numbers (preferred)
- "Times" : Timestamps (requires sensor with imagery times)
Spatial coordinates (choose one):
- "Rows" and "Columns" : Pixel coordinates (preferred)
- "Latitude (deg)", "Longitude (deg)", "Altitude (km)" :
Geodetic coordinates (requires sensor with geolocation capability)
Optional styling columns:
- "Color", "Marker", "Line Width", "Marker Size", "Visible",
"Complete", "Show Line", "Line Style", "Tail Length", "Labels"
"""
if name is None:
name = df["Track"][0]
kwargs = {}
if "Color" in df.columns:
kwargs["color"] = df["Color"].iloc[0]
if "Marker" in df.columns:
kwargs["marker"] = df["Marker"].iloc[0]
if "Line Width" in df.columns:
kwargs["line_width"] = df["Line Width"].iloc[0]
if "Marker Size" in df.columns:
kwargs["marker_size"] = df["Marker Size"].iloc[0]
if "Tail Length" in df.columns:
kwargs["tail_length"] = df["Tail Length"].iloc[0]
if "Visible" in df.columns:
kwargs["visible"] = df["Visible"].iloc[0]
if "Complete" in df.columns:
kwargs["complete"] = df["Complete"].iloc[0]
if "Show Line" in df.columns:
kwargs["show_line"] = df["Show Line"].iloc[0]
if "Line Style" in df.columns:
kwargs["line_style"] = df["Line Style"].iloc[0]
if "Labels" in df.columns:
# Parse labels from comma-separated string
labels_str = df["Labels"].iloc[0]
if pd.notna(labels_str) and labels_str:
kwargs["labels"] = set(label.strip() for label in labels_str.split(','))
else:
kwargs["labels"] = set()
# Handle uncertainty data (optional) - covariance matrix elements
# Only populate if all three columns exist and all values are valid (not NaN)
if ("Covariance 00" in df.columns and "Covariance 01" in df.columns and
"Covariance 11" in df.columns):
cov_00 = df["Covariance 00"].to_numpy(dtype=np.float64)
cov_01 = df["Covariance 01"].to_numpy(dtype=np.float64)
cov_11 = df["Covariance 11"].to_numpy(dtype=np.float64)
# Only set covariance if all values are valid (no NaN or inf)
if (np.all(np.isfinite(cov_00)) and np.all(np.isfinite(cov_01)) and
np.all(np.isfinite(cov_11))):
kwargs["covariance_00"] = cov_00
kwargs["covariance_01"] = cov_01
kwargs["covariance_11"] = cov_11
# Handle times (optional)
times = None
if "Times" in df.columns:
# Parse times as datetime64
times = pd.to_datetime(df["Times"]).to_numpy()
# Determine frames - priority: Frames column > time-to-frame mapping
if "Frames" in df.columns:
# Frames take precedence
frames = df["Frames"].to_numpy()
elif times is not None:
sensor_imagery_frames, sensor_imagery_times = sensor.get_imagery_frames_and_times()
if len(sensor_imagery_times) == 0:
# Times present but no cannot map to frames using sensor - raise error
raise ValueError(f"Track '{name}' has times but no frames. Sensor imagery times are required for time-to-frame mapping.")
# Map times to frames using the sensor imagery
frames = map_times_to_frames(times, sensor_imagery_times, sensor_imagery_frames)
else:
raise ValueError(f"Track '{name}' must have either 'Frames' or 'Times' column")
# Determine rows/columns - priority: Rows/Columns > geodetic-to-pixel mapping
if "Rows" in df.columns and "Columns" in df.columns:
# Row/Column take precedence
rows = df["Rows"].to_numpy()
columns = df["Columns"].to_numpy()
elif "Latitude (deg)" in df.columns and "Longitude (deg)" in df.columns and "Altitude (km)" in df.columns:
# Need geodetic-to-pixel conversion
if sensor is None:
raise ValueError(
f"Track '{name}' has geodetic coordinates (Lat/Lon/Alt) but no row/column. "
"Sensor required for geodetic-to-pixel mapping."
)
if not hasattr(sensor, 'can_geolocate') or not sensor.can_geolocate():
raise ValueError(
f"Track '{name}' has geodetic coordinates (Lat/Lon/Alt) but sensor '{sensor.name}' "
"does not support geolocation."
)
# Map geodetic to pixel using sensor
rows, columns = map_geodetic_to_pixel(
df["Latitude (deg)"].to_numpy(),
df["Longitude (deg)"].to_numpy(),
df["Altitude (km)"].to_numpy(),
frames,
sensor
)
else:
raise ValueError(
f"Track '{name}' must have either 'Rows' and 'Columns' columns, "
"or 'Latitude', 'Longitude', and 'Altitude' columns"
)
# Enable show_uncertainty by default if uncertainty data is present
if ('covariance_00' in kwargs and 'covariance_01' in kwargs and
'covariance_11' in kwargs):
# Only set to True if not already explicitly set
if 'show_uncertainty' not in kwargs:
kwargs['show_uncertainty'] = True
return cls(
name = name,
frames = frames,
rows = rows,
columns = columns,
sensor = sensor,
**kwargs
)
@property
def length(self):
"""
Cumulative Euclidean distance along the track path.
Computes the sum of pixel distances between consecutive track points.
Result is cached for performance.
Returns
-------
float
Total track length in pixels, or 0.0 if track has fewer than 2 points
"""
if self._length is None:
if len(self.rows) < 2:
self._length = 0.0
else:
self._length = np.sum(np.sqrt(np.diff(self.rows)**2 + np.diff(self.columns)**2))
return self._length
[docs]
def copy(self):
"""
Create a deep copy of this track object.
Returns
-------
Track
New Track object with copied arrays and styling attributes
"""
# Deep copy extraction metadata if present
extraction_metadata_copy = None
if self.extraction_metadata is not None:
extraction_metadata_copy = {
'chip_size': self.extraction_metadata['chip_size'],
'chips': self.extraction_metadata['chips'].copy(),
'signal_masks': self.extraction_metadata['signal_masks'].copy(),
'noise_stds': self.extraction_metadata['noise_stds'].copy(),
}
return self.__class__(
name = self.name,
frames = self.frames.copy(),
rows = self.rows.copy(),
columns = self.columns.copy(),
sensor = self.sensor,
color = self.color,
marker = self.marker,
line_width = self.line_width,
marker_size = self.marker_size,
visible = self.visible,
tail_length = self.tail_length,
complete = self.complete,
show_line = self.show_line,
line_style = self.line_style,
labels = self.labels.copy(),
extraction_metadata = extraction_metadata_copy,
covariance_00 = self.covariance_00.copy() if self.covariance_00 is not None else None,
covariance_01 = self.covariance_01.copy() if self.covariance_01 is not None else None,
covariance_11 = self.covariance_11.copy() if self.covariance_11 is not None else None,
show_uncertainty = self.show_uncertainty,
)
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert track to DataFrame
Raises:
ValueError: If geolocation/time requested but imagery is missing required data
"""
data = {
"Track": len(self)*[self.name],
"Frames": self.frames,
"Rows": self.rows,
"Columns": self.columns,
"Color": self.color,
"Marker": self.marker,
"Line Width": self.line_width,
"Marker Size": self.marker_size,
"Tail Length": self.tail_length,
"Visible": self.visible,
"Complete": self.complete,
"Show Line": self.show_line,
"Line Style": self.line_style,
"Labels": ', '.join(sorted(self.labels)) if self.labels else '',
}
# Include geolocation if possible
# Convert pixel coordinates to geodetic for each frame
latitudes = []
longitudes = []
altitudes = []
for i, frame in enumerate(self.frames):
# Convert single point
locations = self.sensor.pixel_to_geodetic(frame, np.array([self.rows[i]]), np.array([self.columns[i]]))
latitudes.append(locations.lat.deg[0])
longitudes.append(locations.lon.deg[0])
altitudes.append(locations.height.to('km').value[0])
data["Latitude (deg)"] = latitudes
data["Longitude (deg)"] = longitudes
data["Altitude (km)"] = altitudes
# Include times if possible
track_times = self.get_times()
if track_times is not None:
data["Times"] = pd.to_datetime(track_times).strftime('%Y-%m-%dT%H:%M:%S.%f')
# Include uncertainty data if present
if self.has_uncertainty():
data["Covariance 00"] = self.covariance_00
data["Covariance 01"] = self.covariance_01
data["Covariance 11"] = self.covariance_11
return pd.DataFrame(data)