"""Tracklet-based hierarchical tracker for high false alarm scenarios
This tracker uses a two-stage approach optimized for scenarios with:
- High false alarm rate (100:1 or higher)
- Smooth real target motion with consistent velocity
Stage 1: Form high-confidence tracklets using strict association criteria
Stage 2: Link tracklets based on velocity extrapolation and smoothness
"""
import numpy as np
from scipy.optimize import linear_sum_assignment
[docs]
class Tracklet:
"""High-confidence track segment with velocity consistency"""
[docs]
def __init__(self, detection_pos, frame, tracklet_id):
self.id = tracklet_id
self.positions = [detection_pos.copy()]
self.frames = [frame]
self.velocity = None # Will be computed when we have 2+ detections
# Track quality metrics for "m out of n" logic
self.hits = 1 # Number of actual detections
self.age = 1 # Total frames since creation
self.consecutive_misses = 0 # Consecutive frames without detection
[docs]
def add_detection(self, detection_pos, frame):
"""Add detection to tracklet and update velocity estimate"""
self.positions.append(detection_pos.copy())
self.frames.append(frame)
self.hits += 1
self.consecutive_misses = 0 # Reset consecutive misses
# Update velocity estimate using last 3 positions if available
if len(self.positions) >= 2:
n = min(3, len(self.positions))
recent_positions = np.array(self.positions[-n:])
recent_frames = np.array(self.frames[-n:])
# Fit velocity using all recent points
if n >= 2:
dt_total = recent_frames[-1] - recent_frames[0]
if dt_total > 0:
displacement = recent_positions[-1] - recent_positions[0]
self.velocity = displacement / dt_total
else:
self.velocity = np.zeros(2)
else:
self.velocity = np.zeros(2)
[docs]
def mark_missed(self, frame):
"""Mark that no detection was associated this frame"""
self.consecutive_misses += 1
self.age += 1
# Add predicted position to history to maintain continuity
pred_pos = self.predict_position(frame)
if pred_pos is not None:
self.positions.append(pred_pos)
self.frames.append(frame)
[docs]
def detection_rate(self):
"""Compute detection rate (hits / age)"""
if self.age == 0:
return 0.0
return self.hits / self.age
[docs]
def predict_position(self, target_frame):
"""Predict position at target frame using velocity"""
if self.velocity is None or len(self.positions) == 0:
return None
last_frame = self.frames[-1]
dt = target_frame - last_frame
return self.positions[-1] + self.velocity * dt
[docs]
def check_velocity_consistency(self, detection_pos, frame, max_velocity_change):
"""Check if detection is consistent with tracklet velocity"""
if len(self.positions) < 2:
# Can't check velocity with only 1 detection
return True
# Compute velocity from last position to detection
dt = frame - self.frames[-1]
if dt == 0:
return False # Same frame, can't associate
new_velocity = (detection_pos - self.positions[-1]) / dt
# Check velocity change (acceleration)
velocity_change = np.linalg.norm(new_velocity - self.velocity)
return velocity_change < max_velocity_change
[docs]
def distance_to(self, detection_pos, frame):
"""Distance from predicted position to detection"""
pred = self.predict_position(frame)
if pred is None:
return np.linalg.norm(detection_pos - self.positions[-1])
return np.linalg.norm(detection_pos - pred)
[docs]
def get_start_velocity(self):
"""Get velocity at tracklet start"""
return self.velocity if self.velocity is not None else np.zeros(2)
[docs]
def get_end_velocity(self):
"""Get velocity at tracklet end"""
return self.velocity if self.velocity is not None else np.zeros(2)
[docs]
def run_tracklet_tracker(detectors, config):
"""
Run tracklet-based hierarchical tracker on detections.
This tracker is optimized for high false alarm scenarios where real tracks
move smoothly. It uses a two-stage approach:
Stage 1: Form high-confidence tracklets with strict association criteria
- Small search radius for initial associations
- Velocity consistency checking
- "M out of N" approach: allows small detection gaps
Stage 2: Link tracklets using global optimization
- Velocity extrapolation for gap filling
- Smoothness scoring based on velocity/position consistency
- Hungarian algorithm for optimal linking
Parameters
----------
detectors : list of Detector
List of Detector objects to use as input
config : dict
Dictionary containing tracker configuration:
- tracker_name: Name for the resulting tracker
- initial_search_radius: Max distance for tracklet formation (default: 10.0)
- max_velocity_change: Max velocity change for tracklet formation (default: 5.0)
- min_tracklet_length: Minimum detections for valid tracklet (default: 3)
- max_consecutive_misses: Max consecutive missed detections in Stage 1 (default: 2)
- min_detection_rate: Minimum detection rate (hits/age) for tracklets (default: 0.6)
- max_linking_gap: Maximum frame gap to link tracklets (default: 10)
- linking_search_radius: Max distance for tracklet linking (default: 30.0)
- smoothness_weight: Weight for smoothness in linking cost (default: 1.0)
- min_track_length: Minimum detections for final track (default: 5)
Returns
-------
list of dict
List of track data dictionaries, each containing:
- 'frames': numpy array of frame numbers
- 'rows': numpy array of row coordinates
- 'columns': numpy array of column coordinates
"""
# Extract configuration with smart defaults
tracker_name = config.get('tracker_name', 'Tracklet Tracker')
initial_search_radius = config.get('initial_search_radius', 10.0)
max_velocity_change = config.get('max_velocity_change', 5.0)
min_tracklet_length = config.get('min_tracklet_length', 3)
max_consecutive_misses = config.get('max_consecutive_misses', 2)
min_detection_rate = config.get('min_detection_rate', 0.6)
max_linking_gap = config.get('max_linking_gap', 10)
linking_search_radius = config.get('linking_search_radius', 30.0)
smoothness_weight = config.get('smoothness_weight', 1.0)
min_track_length = config.get('min_track_length', 5)
# Collect all detections by frame
detections_by_frame = {}
for detector in detectors:
for i, frame in enumerate(detector.frames):
pos = np.array([detector.columns[i], detector.rows[i]])
if frame not in detections_by_frame:
detections_by_frame[frame] = []
detections_by_frame[frame].append(pos)
if len(detections_by_frame) == 0:
return []
frames = sorted(detections_by_frame.keys())
# ========================================================================
# STAGE 1: Form high-confidence tracklets with strict criteria
# ========================================================================
active_tracklets = []
finished_tracklets = []
next_tracklet_id = 1
for frame in frames:
detections = detections_by_frame[frame]
if len(active_tracklets) > 0 and len(detections) > 0:
# Build cost matrix for tracklet-detection association
cost_matrix = np.full((len(active_tracklets), len(detections)),
initial_search_radius * 2)
for i, tracklet in enumerate(active_tracklets):
for j, detection in enumerate(detections):
# Only consider if tracklet hasn't been missed too many times
# Allow associations within max_consecutive_misses + 1
frame_gap = frame - tracklet.frames[-1]
if frame_gap > max_consecutive_misses + 1:
continue
# Check distance
dist = tracklet.distance_to(detection, frame)
if dist >= initial_search_radius:
continue
# Check velocity consistency (strict criterion)
if not tracklet.check_velocity_consistency(detection, frame, max_velocity_change):
continue
cost_matrix[i, j] = dist
# Solve assignment
track_indices, det_indices = linear_sum_assignment(cost_matrix)
# Track assignments
assigned_detections = set()
assigned_tracklets = set()
for tracklet_idx, det_idx in zip(track_indices, det_indices):
if cost_matrix[tracklet_idx, det_idx] < initial_search_radius:
active_tracklets[tracklet_idx].add_detection(detections[det_idx], frame)
assigned_detections.add(det_idx)
assigned_tracklets.add(tracklet_idx)
# Mark unassigned tracklets as missed (don't remove immediately)
for i, tracklet in enumerate(active_tracklets):
if i not in assigned_tracklets:
tracklet.mark_missed(frame)
# Remove tracklets that exceed thresholds ("m out of n" filtering)
tracklets_to_remove = []
for tracklet in active_tracklets:
# Remove if too many consecutive misses
if tracklet.consecutive_misses > max_consecutive_misses:
# Save if it has enough detections
if tracklet.hits >= min_tracklet_length:
finished_tracklets.append(tracklet)
tracklets_to_remove.append(tracklet)
# Remove if detection rate is too low (after minimum age)
elif tracklet.age >= min_tracklet_length * 2:
if tracklet.detection_rate() < min_detection_rate:
# Don't save - poor quality tracklet
tracklets_to_remove.append(tracklet)
for tracklet in tracklets_to_remove:
active_tracklets.remove(tracklet)
# Create new tracklets from unassigned detections
for j, detection in enumerate(detections):
if j not in assigned_detections:
new_tracklet = Tracklet(detection, frame, next_tracklet_id)
active_tracklets.append(new_tracklet)
next_tracklet_id += 1
elif len(detections) > 0:
# No tracklets yet, initialize from detections
for detection in detections:
new_tracklet = Tracklet(detection, frame, next_tracklet_id)
active_tracklets.append(new_tracklet)
next_tracklet_id += 1
else:
# No detections this frame - mark all tracklets as missed
for tracklet in active_tracklets:
tracklet.mark_missed(frame)
# Remove tracklets that exceed thresholds
tracklets_to_remove = []
for tracklet in active_tracklets:
if tracklet.consecutive_misses > max_consecutive_misses:
if tracklet.hits >= min_tracklet_length:
finished_tracklets.append(tracklet)
tracklets_to_remove.append(tracklet)
elif tracklet.age >= min_tracklet_length * 2:
if tracklet.detection_rate() < min_detection_rate:
tracklets_to_remove.append(tracklet)
for tracklet in tracklets_to_remove:
active_tracklets.remove(tracklet)
# Finish remaining active tracklets
for tracklet in active_tracklets:
if tracklet.hits >= min_tracklet_length:
finished_tracklets.append(tracklet)
# ========================================================================
# STAGE 2: Link tracklets based on velocity extrapolation and smoothness
# ========================================================================
if len(finished_tracklets) == 0:
return []
# Build tracklet linking cost matrix
n_tracklets = len(finished_tracklets)
link_cost_matrix = np.full((n_tracklets, n_tracklets), np.inf)
for i, tracklet_i in enumerate(finished_tracklets):
for j, tracklet_j in enumerate(finished_tracklets):
if i == j:
continue
# Only link if j comes after i in time
if tracklet_j.frames[0] <= tracklet_i.frames[-1]:
continue
# Check frame gap
frame_gap = tracklet_j.frames[0] - tracklet_i.frames[-1]
if frame_gap > max_linking_gap:
continue
# Extrapolate tracklet_i forward to tracklet_j start frame
pred_pos = tracklet_i.extrapolate_forward(frame_gap)
if pred_pos is None:
continue
# Position error
position_error = np.linalg.norm(pred_pos - tracklet_j.positions[0])
if position_error > linking_search_radius:
continue
# Velocity consistency (smoothness)
velocity_i = tracklet_i.get_end_velocity()
velocity_j = tracklet_j.get_start_velocity()
velocity_error = np.linalg.norm(velocity_i - velocity_j)
# Combined cost: position error + smoothness penalty
cost = position_error + smoothness_weight * velocity_error
link_cost_matrix[i, j] = cost
# Solve tracklet linking using Hungarian algorithm
# This finds optimal non-overlapping links
linked_tracklets = []
used_tracklets = set()
# Find best links iteratively
while True:
# Find minimum cost link among unused tracklets
min_cost = np.inf
best_i, best_j = -1, -1
for i in range(n_tracklets):
if i in used_tracklets:
continue
for j in range(n_tracklets):
if j in used_tracklets:
continue
if link_cost_matrix[i, j] < min_cost:
min_cost = link_cost_matrix[i, j]
best_i, best_j = i, j
if best_i == -1 or np.isinf(min_cost):
break
# Link these tracklets
# Try to extend existing track or start new one
linked = False
for track in linked_tracklets:
# Check if best_i is the end of this track
if track[-1] == best_i:
track.append(best_j)
linked = True
break
if not linked:
linked_tracklets.append([best_i, best_j])
used_tracklets.add(best_i)
used_tracklets.add(best_j)
# Add unlinked tracklets as single-tracklet tracks
for i in range(n_tracklets):
if i not in used_tracklets:
linked_tracklets.append([i])
# ========================================================================
# Convert linked tracklets to track data dictionaries
# ========================================================================
track_data_list = []
for tracklet_indices in linked_tracklets:
# Combine tracklets
all_positions = []
all_frames = []
for idx in tracklet_indices:
tracklet = finished_tracklets[idx]
all_positions.extend(tracklet.positions)
all_frames.extend(tracklet.frames)
# Filter by minimum track length
if len(all_positions) < min_track_length:
continue
# Convert to arrays
positions = np.array(all_positions)
frames_array = np.array(all_frames, dtype=np.int_)
# Sort by frame (in case tracklets were out of order)
sort_idx = np.argsort(frames_array)
frames_array = frames_array[sort_idx]
positions = positions[sort_idx]
track_data = {
'frames': frames_array,
'rows': positions[:, 1], # y
'columns': positions[:, 0], # x
}
track_data_list.append(track_data)
return track_data_list