"""Multi-object tracker implementation for VISTA"""
import numpy as np
from scipy.optimize import linear_sum_assignment
[docs]
class KalmanTrack:
"""Single track with constant velocity Kalman filter"""
[docs]
def __init__(self, detection_pos, frame, process_noise, measurement_noise, track_id):
"""
Initialize track with first detection.
State vector: [x, vx, y, vy] where x=column, y=row
"""
self.id = track_id
self.state = np.array([[detection_pos[0]], [0], [detection_pos[1]], [0]]) # [x, vx, y, vy]
# Initial covariance (high uncertainty in velocity)
self.covar = np.diag([measurement_noise, 10.0, measurement_noise, 10.0])
# Store parameters
self.process_noise = process_noise
self.measurement_noise = measurement_noise
# Track history
self.frames = [frame]
self.positions = [detection_pos.copy()]
# Track management
self.hits = 1 # Number of detections associated
self.misses = 0 # Number of consecutive missed detections
self.age = 1 # Total frames since creation
[docs]
def predict(self, dt=1.0):
"""Predict next state using constant velocity model"""
# State transition matrix
F = np.array([
[1, dt, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, dt],
[0, 0, 0, 1]
])
# Process noise covariance
Q = np.array([
[dt**4/4, dt**3/2, 0, 0],
[dt**3/2, dt**2, 0, 0],
[0, 0, dt**4/4, dt**3/2],
[0, 0, dt**3/2, dt**2]
]) * self.process_noise
# Predict
self.state = F @ self.state
self.covar = F @ self.covar @ F.T + Q
[docs]
def update(self, detection_pos, frame):
"""Update state with new detection"""
# Measurement matrix (measure position only)
H = np.array([
[1, 0, 0, 0],
[0, 0, 1, 0]
])
# Measurement noise
R = np.eye(2) * self.measurement_noise
# Innovation
z = detection_pos.reshape(2, 1)
y = z - H @ self.state
# Innovation covariance
S = H @ self.covar @ H.T + R
# Kalman gain
K = self.covar @ H.T @ np.linalg.inv(S)
# Update state and covariance
self.state = self.state + K @ y
self.covar = (np.eye(4) - K @ H) @ self.covar
# Update history
self.frames.append(frame)
self.positions.append(detection_pos.copy())
self.hits += 1
self.misses = 0
[docs]
def mark_missed(self, frame):
"""Mark that no detection was associated this frame"""
self.misses += 1
# Add predicted position to history
pred_pos = np.array([self.state[0, 0], self.state[2, 0]])
self.frames.append(frame)
self.positions.append(pred_pos)
[docs]
def get_predicted_position(self):
"""Get predicted position (x, y)"""
return np.array([self.state[0, 0], self.state[2, 0]])
[docs]
def mahalanobis_distance(self, detection_pos):
"""Compute Mahalanobis distance to detection"""
# Measurement matrix
H = np.array([
[1, 0, 0, 0],
[0, 0, 1, 0]
])
# Predicted measurement
z_pred = H @ self.state
# Innovation covariance
R = np.eye(2) * self.measurement_noise
S = H @ self.covar @ H.T + R
# Innovation
z = detection_pos.reshape(2, 1)
y = z - z_pred
# Mahalanobis distance
dist = np.sqrt(y.T @ np.linalg.inv(S) @ y)
return float(dist[0, 0])
[docs]
def run_kalman_tracker(detectors, config):
"""
Run Kalman filter tracker on detections using a custom implementation.
Parameters
----------
detectors : list of Detector
List of Detector objects to use as input
config : dict
Dictionary containing tracker configuration:
- tracker_name: Name for the resulting tracker
- process_noise: Process noise for constant velocity model
- measurement_noise: Measurement noise covariance
- gating_distance: Mahalanobis distance threshold for gating
- min_detections: Minimum detections required to initiate track
- delete_threshold: Covariance trace threshold for track deletion
Returns
-------
list of dict
List of track data dictionaries, each containing:
- 'frames': numpy array of frame numbers
- 'rows': numpy array of row coordinates
- 'columns': numpy array of column coordinates
"""
# Extract configuration
process_noise = config['process_noise']
measurement_noise = config['measurement_noise']
gating_distance = config['gating_distance']
min_detections = config['min_detections']
delete_threshold = config['delete_threshold']
# Collect all detections by frame
detections_by_frame = {}
for detector in detectors:
for i, frame in enumerate(detector.frames):
if frame not in detections_by_frame:
detections_by_frame[frame] = []
# Store as [column, row] = [x, y]
detections_by_frame[frame].append(np.array([detector.columns[i], detector.rows[i]]))
# Get sorted frame list
frames = sorted(detections_by_frame.keys())
# Track management
active_tracks = [] # List of KalmanTrack objects
finished_tracks = [] # Tracks that were deleted but may still be valid
next_track_id = 1
tentative_tracks = [] # Tracks waiting for confirmation
# Process each frame
for frame in frames:
detections = detections_by_frame[frame]
# Predict all active tracks and increment age
for track in active_tracks + tentative_tracks:
track.predict()
track.age += 1
if len(detections) == 0:
# No detections - mark all tracks as missed
for track in active_tracks + tentative_tracks:
track.mark_missed(frame)
continue
# Data association using Global Nearest Neighbor with Hungarian algorithm
if len(active_tracks) + len(tentative_tracks) > 0:
# Build cost matrix (Mahalanobis distances)
all_tracks = active_tracks + tentative_tracks
cost_matrix = np.full((len(all_tracks), len(detections)), gating_distance * 2)
for i, track in enumerate(all_tracks):
for j, detection in enumerate(detections):
dist = track.mahalanobis_distance(detection)
if dist < gating_distance:
cost_matrix[i, j] = dist
# Solve assignment problem
track_indices, detection_indices = linear_sum_assignment(cost_matrix)
# Track which detections were associated
associated_detections = set()
associated_tracks = set()
# Update tracks with assignments
for track_idx, det_idx in zip(track_indices, detection_indices):
if cost_matrix[track_idx, det_idx] < gating_distance:
all_tracks[track_idx].update(detections[det_idx], frame)
associated_detections.add(det_idx)
associated_tracks.add(track_idx)
# Mark unassociated tracks as missed
for i, track in enumerate(all_tracks):
if i not in associated_tracks:
track.mark_missed(frame)
else:
associated_detections = set()
# Initiate new tentative tracks from unassociated detections
for i, detection in enumerate(detections):
if i not in associated_detections:
new_track = KalmanTrack(
detection, frame, process_noise,
measurement_noise, next_track_id
)
tentative_tracks.append(new_track)
next_track_id += 1
# Promote tentative tracks that have enough hits
tracks_to_promote = []
for track in tentative_tracks:
if track.hits >= min_detections:
tracks_to_promote.append(track)
for track in tracks_to_promote:
tentative_tracks.remove(track)
active_tracks.append(track)
# Delete tracks that have been missed too many times or have high uncertainty
tracks_to_delete = []
for track in active_tracks:
# Delete if too many consecutive misses (3x the min_detections)
if track.misses > min_detections * 3:
tracks_to_delete.append(track)
# Delete if covariance is too large
elif np.trace(track.covar) > delete_threshold:
tracks_to_delete.append(track)
for track in tracks_to_delete:
active_tracks.remove(track)
# Save deleted tracks that have enough detections
if track.hits >= min_detections:
finished_tracks.append(track)
# Delete tentative tracks that haven't been confirmed quickly enough
tentative_to_delete = []
for track in tentative_tracks:
if track.age > min_detections * 2 and track.hits < min_detections:
tentative_to_delete.append(track)
for track in tentative_to_delete:
tentative_tracks.remove(track)
# Convert tracks to track data (include both active and finished)
all_valid_tracks = active_tracks + finished_tracks
track_data_list = []
for track in all_valid_tracks:
if len(track.frames) < 2:
continue
# Extract positions
positions = np.array(track.positions)
columns = positions[:, 0] # x
rows = positions[:, 1] # y
frames_array = np.array(track.frames, dtype=np.int_)
track_data = {
'frames': frames_array,
'rows': rows,
'columns': columns,
}
track_data_list.append(track_data)
return track_data_list