"""
Sampled sensor with interpolated position and geodetic conversion capabilities.
This module defines the SampledSensor class, which extends the base Sensor class to provide
position retrieval via interpolation/extrapolation from discrete position samples. It also
supports geodetic coordinate conversion using ARF (Attitude Reference Frame) polynomials and
radiometric gain calibration.
"""
import h5py
from astropy.coordinates import EarthLocation
from astropy import units
from dataclasses import dataclass
from scipy.interpolate import interp1d
from typing import Optional, Tuple, Union
import numpy as np
from numpy.typing import NDArray
from vista.sensors.sensor import Sensor
from vista.transforms import cartesian_to_spherical, evaluate_2d_polynomial, get_arf_transform, los_to_earth, spherical_to_cartesian
[docs]
@dataclass(eq=False)
class SampledSensor(Sensor):
"""
Sensor implementation using sampled position data with interpolation/extrapolation.
SampledSensor stores discrete position samples at known times and provides
position estimates at arbitrary times through interpolation (within the time range)
or extrapolation (outside the time range). For single-position sensors, the same
position is returned for all query times.
Attributes
----------
positions : NDArray[np.float64]
Sensor positions as (3, N) array where N is the number of samples.
Each column contains [x, y, z] ECEF coordinates in kilometers.
Required - will raise ValueError in __post_init__ if not provided.
times : NDArray[np.datetime64]
Times corresponding to each position sample. Must have length N.
Required - will raise ValueError in __post_init__ if not provided.
frames : NDArray[np.int64]
Sensor frames numbers corresponding to each time sample. Must have length N.
Required - will raise ValueError in __post_init__ if not provided.
radiometric_gain : NDArray, optional
1D array of multiplicative factors for each frame to convert from counts to
irradiance in units of kW/km²/sr.
pointing : NDArray[np.float64], optional
Sensor pointing unit vectors in ECEF coordinates. Shape: (3, num_frames).
Each column is the direction the sensor is pointing for that frame.
poly_pixel_to_arf_azimuth : NDArray[np.float64], optional
Polynomial coefficients for converting (column, row) to ARF azimuth (radians).
Shape: (num_frames, num_coeffs) where num_coeffs depends on polynomial order.
poly_pixel_to_arf_elevation : NDArray[np.float64], optional
Polynomial coefficients for converting (column, row) to ARF elevation (radians).
Shape: (num_frames, num_coeffs) where num_coeffs depends on polynomial order.
poly_arf_to_row : NDArray[np.float64], optional
Polynomial coefficients for converting (azimuth, elevation) to row.
Shape: (num_frames, num_coeffs) where num_coeffs depends on polynomial order.
poly_arf_to_col : NDArray[np.float64], optional
Polynomial coefficients for converting (azimuth, elevation) to column.
Shape: (num_frames, num_coeffs) where num_coeffs depends on polynomial order.
Methods
-------
get_positions(times)
Return interpolated/extrapolated sensor positions for given times
Notes
-----
- Duplicate times in the input are automatically removed during initialization
- For 2+ unique samples: uses linear interpolation within range, linear extrapolation outside
- For 1 sample: returns the same position for all query times (stationary sensor)
- Positions must be (3, N) arrays with x, y, z in each column
- All coordinates are in ECEF Cartesian frame with units of kilometers
- ARF (Attitude Reference Frame) is a local coordinate system where the X-axis
points along the sensor pointing direction
Examples
--------
>>> import numpy as np
>>> # Create sensor with multiple position samples
>>> positions = np.array([[1000, 1100, 1200],
... [2000, 2100, 2200],
... [3000, 3100, 3200]]) # (3, 3) array
>>> times = np.array(['2024-01-01T00:00:00',
... '2024-01-01T00:01:00',
... '2024-01-01T00:02:00'], dtype='datetime64')
>>> sensor = SampledSensor(positions=positions, times=times)
>>> # Get interpolated position
>>> query_times = np.array(['2024-01-01T00:00:30'], dtype='datetime64')
>>> pos = sensor.get_positions(query_times)
>>> pos.shape
(3, 1)
>>> # Create stationary sensor with single position
>>> positions_static = np.array([[1000], [2000], [3000]]) # (3, 1) array
>>> times_static = np.array(['2024-01-01T00:00:00'], dtype='datetime64')
>>> sensor_static = SampledSensor(positions=positions_static, times=times_static)
>>> # Returns same position for any query time
>>> pos = sensor_static.get_positions(query_times)
"""
positions: Optional[NDArray[np.float64]] = None
times: Optional[NDArray[np.datetime64]] = None
frames: Optional[NDArray[np.int64]] = None
radiometric_gain: Optional[NDArray] = None
pointing: Optional[NDArray[np.float64]] = None
poly_pixel_to_arf_azimuth: Optional[NDArray[np.float64]] = None
poly_pixel_to_arf_elevation: Optional[NDArray[np.float64]] = None
poly_arf_to_row: Optional[NDArray[np.float64]] = None
poly_arf_to_col: Optional[NDArray[np.float64]] = None
[docs]
def __post_init__(self):
"""
Validate inputs and remove duplicate times.
Ensures positions and times have compatible shapes and removes any
duplicate time entries along with their corresponding positions.
Raises
------
ValueError
If positions or times are not provided, or if they have incompatible shapes.
"""
# Call parent's __post_init__ to increment instance counter
super().__post_init__()
# Validate required fields
if self.positions is None:
raise ValueError("positions is required for SampledSensor")
if self.times is None:
raise ValueError("times is required for SampledSensor")
if self.frames is None:
raise ValueError("frame numbers are required for SampledSensor")
# Validate shape of positions
if self.positions.ndim != 2 or self.positions.shape[0] != 3:
raise ValueError(f"positions must be a (3, N) array, got shape {self.positions.shape}")
# Validate that times and positions have matching counts
n_positions = self.positions.shape[1]
n_times = len(self.times)
if n_positions != n_times:
raise ValueError(f"Number of positions ({n_positions}) must match number of times ({n_times})")
# Remove duplicate times and corresponding positions
unique_times, unique_indices = np.unique(self.times, return_index=True)
if len(unique_times) < len(self.times):
# Duplicates were found, keep only unique entries
self.times = unique_times
self.positions = self.positions[:, unique_indices]
[docs]
def can_geolocate(self) -> bool:
"""
Check if sensor can convert pixels to geodetic coordinates and vice versa.
Returns
-------
bool
True if sensor has all required ARF geolocation data: pointing vectors
and both forward (pixel→ARF) and reverse (ARF→pixel) polynomials.
"""
return (self.pointing is not None and
self.poly_pixel_to_arf_azimuth is not None and
self.poly_pixel_to_arf_elevation is not None and
self.poly_arf_to_row is not None and
self.poly_arf_to_col is not None)
[docs]
def get_positions(self, times: NDArray[np.datetime64]) -> NDArray[np.float64]:
"""
Return sensor positions for given times via interpolation/extrapolation.
Parameters
----------
times : NDArray[np.datetime64]
Array of times for which to retrieve sensor positions
Returns
-------
NDArray[np.float64]
Sensor positions as (3, N) array where N is the number of query times.
Each column contains [x, y, z] coordinates in ECEF frame (km).
Notes
-----
- For sensors with 1 sample: returns the single position for all times
- For sensors with 2+ samples: uses linear interpolation within the time
range and linear extrapolation outside the range
"""
# Convert query times to numeric values (nanoseconds since epoch)
query_times_ns = times.astype('datetime64[ns]').astype(np.float64)
# Handle single-position case (stationary sensor)
if self.positions.shape[1] == 1:
# Return the same position for all query times
return np.tile(self.positions, (1, len(times)))
# Multi-position case: use interpolation/extrapolation
# Convert sample times to numeric values
sample_times_ns = self.times.astype('datetime64[ns]').astype(np.float64)
# Create interpolators for each coordinate (x, y, z)
# fill_value='extrapolate' enables linear extrapolation outside the range
interpolated_positions = np.zeros((3, len(times)))
for i in range(3):
interpolator = interp1d(
sample_times_ns,
self.positions[i, :],
kind='linear',
fill_value='extrapolate'
)
interpolated_positions[i, :] = interpolator(query_times_ns)
return interpolated_positions
def _pixel_to_geodetic_single_frame(self, frame_idx: int, rows: np.ndarray, columns: np.ndarray) -> np.ndarray:
"""
Convert pixel coordinates to ECEF for a single frame index.
Parameters
----------
frame_idx : int
Index into self.frames (NOT the frame number itself)
rows : np.ndarray
Row pixel coordinates
columns : np.ndarray
Column pixel coordinates
Returns
-------
np.ndarray
ECEF intersections with shape (3, N). NaN for off-Earth pixels.
"""
# Get polynomial coefficients for this frame
az_coeffs = self.poly_pixel_to_arf_azimuth[frame_idx]
el_coeffs = self.poly_pixel_to_arf_elevation[frame_idx]
# Evaluate polynomials: pixel → ARF angles (radians)
azimuth = evaluate_2d_polynomial(az_coeffs, columns, rows)
elevation = evaluate_2d_polynomial(el_coeffs, columns, rows)
# Convert ARF spherical → ARF Cartesian unit vectors
arf_vectors = spherical_to_cartesian(azimuth, elevation)
# Get sensor position for this frame
if self.positions.shape[1] == 1:
sensor_pos = self.positions[:, 0]
else:
time_idx = min(frame_idx, len(self.times) - 1)
sensor_pos = self.get_positions(self.times[time_idx:time_idx + 1])[:, 0]
sensor_pointing = self.pointing[:, frame_idx]
# Get ARF transform and invert (transpose for orthonormal matrix)
arf_to_ecef = get_arf_transform(sensor_pos, sensor_pointing).T
# Transform ARF → ECEF line-of-sight vectors
ecef_vectors = arf_to_ecef @ arf_vectors
# Ray-cast to Earth (returns NaN for non-intersecting rays)
_, intersections = los_to_earth(sensor_pos, ecef_vectors)
# Ensure intersections is 2D (3, N) even for single point
if intersections.ndim == 1:
intersections = intersections.reshape(3, 1)
return intersections
[docs]
def pixel_to_geodetic(self, frame: Union[int, np.ndarray], rows: np.ndarray, columns: np.ndarray):
"""
Convert pixel coordinates to geodetic coordinates using ARF polynomials.
Uses ARF (Attitude Reference Frame) polynomials to map (row, column) pixel
coordinates to geodetic coordinates by ray-casting to the Earth's surface.
Pixels that do not intersect Earth will have NaN coordinates.
Parameters
----------
frame : int or np.ndarray
Frame number(s) for which to perform the conversion. If an array,
must have the same length as rows/columns and each element specifies
the frame for the corresponding pixel coordinate.
rows : np.ndarray
Array of row pixel coordinates
columns : np.ndarray
Array of column pixel coordinates
Returns
-------
EarthLocation
Astropy EarthLocation object(s) with geodetic coordinates.
Returns NaN coordinates for pixels that do not intersect Earth.
Returns zero coordinates if polynomials are not available or frame not found.
Notes
-----
- Requires ARF polynomials and pointing vectors to be defined
- Frame must exist in self.frames array
- Off-Earth pixels will have NaN lat/lon/height values
"""
# If no polynomial coefficients provided, return zeros
if not self.can_geolocate() or self.frames is None:
invalid = np.zeros_like(rows, dtype=np.float64)
return EarthLocation.from_geocentric(x=invalid, y=invalid, z=invalid, unit=units.km)
# Handle array of frames: group by unique frame for efficient batch processing
if isinstance(frame, np.ndarray):
all_intersections = np.full((3, len(rows)), np.nan)
for uframe in np.unique(frame):
# Find sensor frame index
sensor_mask = self.frames == uframe
if not np.any(sensor_mask):
continue # Unknown frame, leave as NaN
frame_idx = np.where(sensor_mask)[0][0]
# Gather pixels belonging to this frame
point_mask = frame == uframe
intersections = self._pixel_to_geodetic_single_frame(
frame_idx, rows[point_mask], columns[point_mask]
)
all_intersections[:, point_mask] = intersections
return EarthLocation.from_geocentric(
x=all_intersections[0] * units.km,
y=all_intersections[1] * units.km,
z=all_intersections[2] * units.km
)
# Single frame path (original fast path)
frame_mask = self.frames == frame
if not np.any(frame_mask):
invalid = np.zeros_like(rows, dtype=np.float64)
return EarthLocation.from_geocentric(x=invalid, y=invalid, z=invalid, unit=units.km)
frame_idx = np.where(frame_mask)[0][0]
intersections = self._pixel_to_geodetic_single_frame(frame_idx, rows, columns)
return EarthLocation.from_geocentric(
x=intersections[0] * units.km,
y=intersections[1] * units.km,
z=intersections[2] * units.km
)
def _geodetic_to_pixel_single_frame(self, frame_idx: int, target_ecef: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert ECEF target positions to pixel coordinates for a single frame index.
Parameters
----------
frame_idx : int
Index into self.frames (NOT the frame number itself)
target_ecef : np.ndarray
ECEF coordinates with shape (3, N)
Returns
-------
rows : np.ndarray
Row pixel coordinates
columns : np.ndarray
Column pixel coordinates
"""
# Get sensor position for this frame
if self.positions.shape[1] == 1:
sensor_pos = self.positions[:, 0]
else:
time_idx = min(frame_idx, len(self.times) - 1)
sensor_pos = self.get_positions(self.times[time_idx:time_idx + 1])[:, 0]
# Compute line-of-sight vectors from sensor to targets
los_vectors = target_ecef - sensor_pos.reshape(3, 1)
los_norms = np.linalg.norm(los_vectors, axis=0, keepdims=True)
los_vectors = los_vectors / los_norms
# Get sensor pointing and compute ECEF → ARF transform
sensor_pointing = self.pointing[:, frame_idx]
ecef_to_arf = get_arf_transform(sensor_pos, sensor_pointing)
# Transform ECEF LOS → ARF Cartesian
arf_vectors = ecef_to_arf @ los_vectors
# Convert ARF Cartesian → spherical (azimuth, elevation in radians)
azimuth, elevation = cartesian_to_spherical(arf_vectors)
# Get polynomial coefficients for this frame
row_coeffs = self.poly_arf_to_row[frame_idx]
col_coeffs = self.poly_arf_to_col[frame_idx]
# Evaluate polynomials: ARF angles → pixel coordinates
rows = evaluate_2d_polynomial(row_coeffs, azimuth, elevation)
columns = evaluate_2d_polynomial(col_coeffs, azimuth, elevation)
return rows, columns
[docs]
def geodetic_to_pixel(self, frame: Union[int, np.ndarray], loc: EarthLocation) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert geodetic coordinates to pixel coordinates using ARF polynomials.
Uses ARF (Attitude Reference Frame) polynomials to map geodetic coordinates
(latitude, longitude, altitude) to (row, column) pixel coordinates. This
method properly handles targets at any altitude, not just ground level.
Parameters
----------
frame : int or np.ndarray
Frame number(s) for which to perform the conversion. If an array,
must have the same length as loc and each element specifies the frame
for the corresponding location.
loc : EarthLocation
Astropy EarthLocation object(s) containing geodetic coordinates
Returns
-------
rows : np.ndarray
Array of row pixel coordinates (zeros if polynomials unavailable)
columns : np.ndarray
Array of column pixel coordinates (zeros if polynomials unavailable)
Notes
-----
- Requires ARF polynomials and pointing vectors to be defined
- Frame must exist in self.frames array
- Returns zero coordinates if polynomials are not available or frame not found
- Properly handles targets at any altitude (not limited to ground level)
"""
# If no polynomial coefficients provided, return zeros
if not self.can_geolocate() or self.frames is None:
try:
n_points = len(loc.lat)
except TypeError:
n_points = 1
invalid = np.zeros(n_points)
return invalid, invalid
# Convert geodetic → ECEF Cartesian (km) once for all points
target_ecef = np.array([
loc.geocentric[0].to(units.km).value,
loc.geocentric[1].to(units.km).value,
loc.geocentric[2].to(units.km).value
])
if target_ecef.ndim == 1:
target_ecef = target_ecef.reshape(3, 1)
# Handle array of frames: group by unique frame for efficient batch processing
if isinstance(frame, np.ndarray):
n_points = target_ecef.shape[1]
all_rows = np.zeros(n_points, dtype=np.float64)
all_cols = np.zeros(n_points, dtype=np.float64)
for uframe in np.unique(frame):
sensor_mask = self.frames == uframe
if not np.any(sensor_mask):
continue # Unknown frame, leave as zeros
frame_idx = np.where(sensor_mask)[0][0]
point_mask = frame == uframe
r, c = self._geodetic_to_pixel_single_frame(
frame_idx, target_ecef[:, point_mask]
)
all_rows[point_mask] = r
all_cols[point_mask] = c
return all_rows, all_cols
# Single frame path (original fast path)
frame_mask = self.frames == frame
if not np.any(frame_mask):
invalid = np.zeros(target_ecef.shape[1])
return invalid, invalid.copy()
frame_idx = np.where(frame_mask)[0][0]
return self._geodetic_to_pixel_single_frame(frame_idx, target_ecef)
[docs]
def to_hdf5(self, group: h5py.Group):
"""
Save sampled sensor data to an HDF5 group.
Parameters
----------
group : h5py.Group
HDF5 group to write sensor data to (typically sensors/<sensor_name>/)
Notes
-----
This method extends the base Sensor.to_hdf5() by adding:
- Position data (positions, times) in position/ subgroup
- Geolocation polynomials in geolocation/ subgroup
- Radiometric gain values in radiometric/ subgroup
"""
# Call parent to save base radiometric data
super().to_hdf5(group)
# Override sensor type
group.attrs['sensor_type'] = 'SampledSensor'
# Save position data
if self.positions is not None and self.times is not None:
position_group = group.create_group('position')
position_group.create_dataset('positions', data=self.positions)
# Convert times to unix nanoseconds
unix_nanoseconds = self.times.astype('datetime64[ns]').astype(np.int64)
position_group.create_dataset('unix_nanoseconds', data=unix_nanoseconds)
# Save ARF geolocation polynomials
if self.can_geolocate():
geolocation_group = group.create_group('geolocation')
geolocation_group.create_dataset('poly_pixel_to_arf_azimuth', data=self.poly_pixel_to_arf_azimuth)
geolocation_group.create_dataset('poly_pixel_to_arf_elevation', data=self.poly_pixel_to_arf_elevation)
geolocation_group.create_dataset('poly_arf_to_row', data=self.poly_arf_to_row)
geolocation_group.create_dataset('poly_arf_to_col', data=self.poly_arf_to_col)
geolocation_group.create_dataset('pointing', data=self.pointing)
geolocation_group.create_dataset('frames', data=self.frames)
# Save radiometric gain (extend radiometric group if exists, or create it)
if self.radiometric_gain is not None:
if 'radiometric' in group:
radiometric_group = group['radiometric']
else:
radiometric_group = group.create_group('radiometric')
radiometric_group.create_dataset('radiometric_gain', data=self.radiometric_gain)
radiometric_group.create_dataset('radiometric_gain_frames', data=self.frames)