Source code for vista.sensors.sampled_sensor

"""
Sampled sensor with interpolated position and geodetic conversion capabilities.

This module defines the SampledSensor class, which extends the base Sensor class to provide
position retrieval via interpolation/extrapolation from discrete position samples. It also
supports geodetic coordinate conversion using ARF (Attitude Reference Frame) polynomials and
radiometric gain calibration.
"""

import h5py
from astropy.coordinates import EarthLocation
from astropy import units
from dataclasses import dataclass
from scipy.interpolate import interp1d
from typing import Optional, Tuple
import numpy as np
from numpy.typing import NDArray

from vista.sensors.sensor import Sensor
from vista.transforms import cartesian_to_spherical, evaluate_2d_polynomial, get_arf_transform, los_to_earth, spherical_to_cartesian


[docs] @dataclass class SampledSensor(Sensor): """ Sensor implementation using sampled position data with interpolation/extrapolation. SampledSensor stores discrete position samples at known times and provides position estimates at arbitrary times through interpolation (within the time range) or extrapolation (outside the time range). For single-position sensors, the same position is returned for all query times. Attributes ---------- positions : NDArray[np.float64] Sensor positions as (3, N) array where N is the number of samples. Each column contains [x, y, z] ECEF coordinates in kilometers. Required - will raise ValueError in __post_init__ if not provided. times : NDArray[np.datetime64] Times corresponding to each position sample. Must have length N. Required - will raise ValueError in __post_init__ if not provided. frames : NDArray[np.int64] Sensor frames numbers corresponding to each time sample. Must have length N. Required - will raise ValueError in __post_init__ if not provided. radiometric_gain : NDArray, optional 1D array of multiplicative factors for each frame to convert from counts to irradiance in units of kW/km²/sr. pointing : NDArray[np.float64], optional Sensor pointing unit vectors in ECEF coordinates. Shape: (3, num_frames). Each column is the direction the sensor is pointing for that frame. poly_pixel_to_arf_azimuth : NDArray[np.float64], optional Polynomial coefficients for converting (column, row) to ARF azimuth (radians). Shape: (num_frames, num_coeffs) where num_coeffs depends on polynomial order. poly_pixel_to_arf_elevation : NDArray[np.float64], optional Polynomial coefficients for converting (column, row) to ARF elevation (radians). Shape: (num_frames, num_coeffs) where num_coeffs depends on polynomial order. poly_arf_to_row : NDArray[np.float64], optional Polynomial coefficients for converting (azimuth, elevation) to row. Shape: (num_frames, num_coeffs) where num_coeffs depends on polynomial order. poly_arf_to_col : NDArray[np.float64], optional Polynomial coefficients for converting (azimuth, elevation) to column. Shape: (num_frames, num_coeffs) where num_coeffs depends on polynomial order. Methods ------- get_positions(times) Return interpolated/extrapolated sensor positions for given times Notes ----- - Duplicate times in the input are automatically removed during initialization - For 2+ unique samples: uses linear interpolation within range, linear extrapolation outside - For 1 sample: returns the same position for all query times (stationary sensor) - Positions must be (3, N) arrays with x, y, z in each column - All coordinates are in ECEF Cartesian frame with units of kilometers - ARF (Attitude Reference Frame) is a local coordinate system where the X-axis points along the sensor pointing direction Examples -------- >>> import numpy as np >>> # Create sensor with multiple position samples >>> positions = np.array([[1000, 1100, 1200], ... [2000, 2100, 2200], ... [3000, 3100, 3200]]) # (3, 3) array >>> times = np.array(['2024-01-01T00:00:00', ... '2024-01-01T00:01:00', ... '2024-01-01T00:02:00'], dtype='datetime64') >>> sensor = SampledSensor(positions=positions, times=times) >>> # Get interpolated position >>> query_times = np.array(['2024-01-01T00:00:30'], dtype='datetime64') >>> pos = sensor.get_positions(query_times) >>> pos.shape (3, 1) >>> # Create stationary sensor with single position >>> positions_static = np.array([[1000], [2000], [3000]]) # (3, 1) array >>> times_static = np.array(['2024-01-01T00:00:00'], dtype='datetime64') >>> sensor_static = SampledSensor(positions=positions_static, times=times_static) >>> # Returns same position for any query time >>> pos = sensor_static.get_positions(query_times) """ positions: Optional[NDArray[np.float64]] = None times: Optional[NDArray[np.datetime64]] = None frames: Optional[NDArray[np.int64]] = None radiometric_gain: Optional[NDArray] = None pointing: Optional[NDArray[np.float64]] = None poly_pixel_to_arf_azimuth: Optional[NDArray[np.float64]] = None poly_pixel_to_arf_elevation: Optional[NDArray[np.float64]] = None poly_arf_to_row: Optional[NDArray[np.float64]] = None poly_arf_to_col: Optional[NDArray[np.float64]] = None
[docs] def __post_init__(self): """ Validate inputs and remove duplicate times. Ensures positions and times have compatible shapes and removes any duplicate time entries along with their corresponding positions. Raises ------ ValueError If positions or times are not provided, or if they have incompatible shapes. """ # Call parent's __post_init__ to increment instance counter super().__post_init__() # Validate required fields if self.positions is None: raise ValueError("positions is required for SampledSensor") if self.times is None: raise ValueError("times is required for SampledSensor") if self.frames is None: raise ValueError("frame numbers are required for SampledSensor") # Validate shape of positions if self.positions.ndim != 2 or self.positions.shape[0] != 3: raise ValueError(f"positions must be a (3, N) array, got shape {self.positions.shape}") # Validate that times and positions have matching counts n_positions = self.positions.shape[1] n_times = len(self.times) if n_positions != n_times: raise ValueError(f"Number of positions ({n_positions}) must match number of times ({n_times})") # Remove duplicate times and corresponding positions unique_times, unique_indices = np.unique(self.times, return_index=True) if len(unique_times) < len(self.times): # Duplicates were found, keep only unique entries self.times = unique_times self.positions = self.positions[:, unique_indices]
[docs] def can_geolocate(self) -> bool: """ Check if sensor can convert pixels to geodetic coordinates and vice versa. Returns ------- bool True if sensor has all required ARF geolocation data: pointing vectors and both forward (pixel→ARF) and reverse (ARF→pixel) polynomials. """ return (self.pointing is not None and self.poly_pixel_to_arf_azimuth is not None and self.poly_pixel_to_arf_elevation is not None and self.poly_arf_to_row is not None and self.poly_arf_to_col is not None)
[docs] def get_positions(self, times: NDArray[np.datetime64]) -> NDArray[np.float64]: """ Return sensor positions for given times via interpolation/extrapolation. Parameters ---------- times : NDArray[np.datetime64] Array of times for which to retrieve sensor positions Returns ------- NDArray[np.float64] Sensor positions as (3, N) array where N is the number of query times. Each column contains [x, y, z] coordinates in ECEF frame (km). Notes ----- - For sensors with 1 sample: returns the single position for all times - For sensors with 2+ samples: uses linear interpolation within the time range and linear extrapolation outside the range """ # Convert query times to numeric values (nanoseconds since epoch) query_times_ns = times.astype('datetime64[ns]').astype(np.float64) # Handle single-position case (stationary sensor) if self.positions.shape[1] == 1: # Return the same position for all query times return np.tile(self.positions, (1, len(times))) # Multi-position case: use interpolation/extrapolation # Convert sample times to numeric values sample_times_ns = self.times.astype('datetime64[ns]').astype(np.float64) # Create interpolators for each coordinate (x, y, z) # fill_value='extrapolate' enables linear extrapolation outside the range interpolated_positions = np.zeros((3, len(times))) for i in range(3): interpolator = interp1d( sample_times_ns, self.positions[i, :], kind='linear', fill_value='extrapolate' ) interpolated_positions[i, :] = interpolator(query_times_ns) return interpolated_positions
[docs] def pixel_to_geodetic(self, frame: int, rows: np.ndarray, columns: np.ndarray): """ Convert pixel coordinates to geodetic coordinates using ARF polynomials. Uses ARF (Attitude Reference Frame) polynomials to map (row, column) pixel coordinates to geodetic coordinates by ray-casting to the Earth's surface. Pixels that do not intersect Earth will have NaN coordinates. Parameters ---------- frame : int Frame number for which to perform the conversion rows : np.ndarray Array of row pixel coordinates columns : np.ndarray Array of column pixel coordinates Returns ------- EarthLocation Astropy EarthLocation object(s) with geodetic coordinates. Returns NaN coordinates for pixels that do not intersect Earth. Returns zero coordinates if polynomials are not available or frame not found. Notes ----- - Requires ARF polynomials and pointing vectors to be defined - Frame must exist in self.frames array - Off-Earth pixels will have NaN lat/lon/height values """ # If no polynomial coefficients provided, return zeros if not self.can_geolocate() or self.frames is None: invalid = np.zeros_like(rows, dtype=np.float64) return EarthLocation.from_geocentric(x=invalid, y=invalid, z=invalid, unit=units.km) # Find frame index in sensor's frame array frame_mask = self.frames == frame if not np.any(frame_mask): # Frame not found in sensor calibration, return zeros invalid = np.zeros_like(rows, dtype=np.float64) return EarthLocation.from_geocentric(x=invalid, y=invalid, z=invalid, unit=units.km) frame_idx = np.where(frame_mask)[0][0] # Get polynomial coefficients for this frame az_coeffs = self.poly_pixel_to_arf_azimuth[frame_idx] el_coeffs = self.poly_pixel_to_arf_elevation[frame_idx] # Evaluate polynomials: pixel → ARF angles (radians) azimuth = evaluate_2d_polynomial(az_coeffs, columns, rows) elevation = evaluate_2d_polynomial(el_coeffs, columns, rows) # Convert ARF spherical → ARF Cartesian unit vectors arf_vectors = spherical_to_cartesian(azimuth, elevation) # Get sensor position for this frame # For stationary sensors (single position sample), use that position directly if self.positions.shape[1] == 1: sensor_pos = self.positions[:, 0] else: # For moving sensors, interpolate position at the frame's time # Use min to avoid index out of bounds for stationary sensor with single time time_idx = min(frame_idx, len(self.times) - 1) sensor_pos = self.get_positions(self.times[time_idx:time_idx + 1])[:, 0] sensor_pointing = self.pointing[:, frame_idx] # Get ARF transform and invert (transpose for orthonormal matrix) arf_to_ecef = get_arf_transform(sensor_pos, sensor_pointing).T # Transform ARF → ECEF line-of-sight vectors ecef_vectors = arf_to_ecef @ arf_vectors # Ray-cast to Earth (returns NaN for non-intersecting rays) _, intersections = los_to_earth(sensor_pos, ecef_vectors) # Ensure intersections is 2D (3, N) even for single point # los_to_earth squeezes single-point results to (3,) if intersections.ndim == 1: intersections = intersections.reshape(3, 1) # Convert ECEF intersection → geodetic (NaN intersections remain NaN) return EarthLocation.from_geocentric( x=intersections[0] * units.km, y=intersections[1] * units.km, z=intersections[2] * units.km )
[docs] def geodetic_to_pixel(self, frame: int, loc: EarthLocation) -> Tuple[np.ndarray, np.ndarray]: """ Convert geodetic coordinates to pixel coordinates using ARF polynomials. Uses ARF (Attitude Reference Frame) polynomials to map geodetic coordinates (latitude, longitude, altitude) to (row, column) pixel coordinates. This method properly handles targets at any altitude, not just ground level. Parameters ---------- frame : int Frame number for which to perform the conversion loc : EarthLocation Astropy EarthLocation object(s) containing geodetic coordinates Returns ------- rows : np.ndarray Array of row pixel coordinates (zeros if polynomials unavailable) columns : np.ndarray Array of column pixel coordinates (zeros if polynomials unavailable) Notes ----- - Requires ARF polynomials and pointing vectors to be defined - Frame must exist in self.frames array - Returns zero coordinates if polynomials are not available or frame not found - Properly handles targets at any altitude (not limited to ground level) """ # If no polynomial coefficients provided, return zeros if not self.can_geolocate() or self.frames is None: # Handle both scalar and array EarthLocation try: n_points = len(loc.lat) except TypeError: n_points = 1 invalid = np.zeros(n_points) return invalid, invalid # Find frame index in sensor's frame array frame_mask = self.frames == frame if not np.any(frame_mask): # Frame not found in sensor calibration, return zeros try: n_points = len(loc.lat) except TypeError: n_points = 1 invalid = np.zeros(n_points) return invalid, invalid frame_idx = np.where(frame_mask)[0][0] # Convert geodetic → ECEF Cartesian (km) target_ecef = np.array([ loc.geocentric[0].to(units.km).value, loc.geocentric[1].to(units.km).value, loc.geocentric[2].to(units.km).value ]) # Ensure target_ecef is 2D (3, N) even for single point if target_ecef.ndim == 1: target_ecef = target_ecef.reshape(3, 1) # Get sensor position for this frame # For stationary sensors (single position sample), use that position directly if self.positions.shape[1] == 1: sensor_pos = self.positions[:, 0] else: # For moving sensors, interpolate position at the frame's time time_idx = min(frame_idx, len(self.times) - 1) sensor_pos = self.get_positions(self.times[time_idx:time_idx + 1])[:, 0] # Compute line-of-sight vectors from sensor to targets los_vectors = target_ecef - sensor_pos.reshape(3, 1) los_norms = np.linalg.norm(los_vectors, axis=0, keepdims=True) los_vectors = los_vectors / los_norms # Get sensor pointing and compute ECEF → ARF transform sensor_pointing = self.pointing[:, frame_idx] ecef_to_arf = get_arf_transform(sensor_pos, sensor_pointing) # Transform ECEF LOS → ARF Cartesian arf_vectors = ecef_to_arf @ los_vectors # Convert ARF Cartesian → spherical (azimuth, elevation in radians) azimuth, elevation = cartesian_to_spherical(arf_vectors) # Get polynomial coefficients for this frame row_coeffs = self.poly_arf_to_row[frame_idx] col_coeffs = self.poly_arf_to_col[frame_idx] # Evaluate polynomials: ARF angles → pixel coordinates rows = evaluate_2d_polynomial(row_coeffs, azimuth, elevation) columns = evaluate_2d_polynomial(col_coeffs, azimuth, elevation) return rows, columns
[docs] def to_hdf5(self, group: h5py.Group): """ Save sampled sensor data to an HDF5 group. Parameters ---------- group : h5py.Group HDF5 group to write sensor data to (typically sensors/<sensor_name>/) Notes ----- This method extends the base Sensor.to_hdf5() by adding: - Position data (positions, times) in position/ subgroup - Geolocation polynomials in geolocation/ subgroup - Radiometric gain values in radiometric/ subgroup """ # Call parent to save base radiometric data super().to_hdf5(group) # Override sensor type group.attrs['sensor_type'] = 'SampledSensor' # Save position data if self.positions is not None and self.times is not None: position_group = group.create_group('position') position_group.create_dataset('positions', data=self.positions) # Convert times to unix nanoseconds unix_nanoseconds = self.times.astype('datetime64[ns]').astype(np.int64) position_group.create_dataset('unix_nanoseconds', data=unix_nanoseconds) # Save ARF geolocation polynomials if self.can_geolocate(): geolocation_group = group.create_group('geolocation') geolocation_group.create_dataset('poly_pixel_to_arf_azimuth', data=self.poly_pixel_to_arf_azimuth) geolocation_group.create_dataset('poly_pixel_to_arf_elevation', data=self.poly_pixel_to_arf_elevation) geolocation_group.create_dataset('poly_arf_to_row', data=self.poly_arf_to_row) geolocation_group.create_dataset('poly_arf_to_col', data=self.poly_arf_to_col) geolocation_group.create_dataset('pointing', data=self.pointing) geolocation_group.create_dataset('frames', data=self.frames) # Save radiometric gain (extend radiometric group if exists, or create it) if self.radiometric_gain is not None: if 'radiometric' in group: radiometric_group = group['radiometric'] else: radiometric_group = group.create_group('radiometric') radiometric_group.create_dataset('radiometric_gain', data=self.radiometric_gain) radiometric_group.create_dataset('radiometric_gain_frames', data=self.frames)