Source code for vista.widgets.algorithms.detectors.base_detector_widget
"""Base classes for detector widgets to reduce code duplication"""
import traceback
import numpy as np
from PyQt6.QtCore import QThread, pyqtSignal, QSettings
from PyQt6.QtWidgets import (QDialog, QHBoxLayout, QLabel, QMessageBox, QProgressBar, QPushButton, QVBoxLayout)
from vista.detections.detector import Detector
from vista.widgets.utils.algorithm_utils import create_aoi_selector, create_frame_range_spinboxes
[docs]
class BaseDetectorProcessingThread(QThread):
"""Base worker thread for running detector algorithms in background"""
# Signals
progress_updated = pyqtSignal(int, int) # (current_frame, total_frames)
processing_complete = pyqtSignal(object) # Emits Detector object
error_occurred = pyqtSignal(str) # Emits error message
[docs]
def __init__(self, imagery, algorithm_class, algorithm_params, aoi=None,
start_frame=0, end_frame=None, detector_name=None, default_color='r',
default_marker='o', default_marker_size=12):
"""
Initialize the processing thread.
Parameters
----------
imagery : Imagery
Imagery object to process
algorithm_class : type
Detector algorithm class to instantiate (e.g., SimpleThreshold)
algorithm_params : dict
Dictionary of parameters to pass to algorithm constructor
aoi : AOI, optional
AOI object to process subset of imagery, by default None
start_frame : int, optional
Starting frame index, by default 0
end_frame : int, optional
Ending frame index exclusive, by default None for all frames
detector_name : str, optional
Name for the detector, by default None (auto-generated)
default_color : str, optional
Default color for detections, by default 'r'
default_marker : str, optional
Default marker for detections, by default 'o'
default_marker_size : int, optional
Default marker size, by default 12
"""
super().__init__()
self.imagery = imagery
self.algorithm_class = algorithm_class
self.algorithm_params = algorithm_params
self.aoi = aoi
self.start_frame = start_frame
self.end_frame = end_frame if end_frame is not None else len(imagery.frames)
self.detector_name = detector_name
self.default_color = default_color
self.default_marker = default_marker
self.default_marker_size = default_marker_size
self._cancelled = False
[docs]
def cancel(self):
"""Request cancellation of the processing operation"""
self._cancelled = True
[docs]
def run(self):
"""Execute the detector algorithm in background thread"""
try:
# Determine the region to process
if self.aoi:
# Create temporary imagery object for the cropped region
temp_imagery = self.imagery.get_aoi(self.aoi)
else:
# Process frame range of imagery
temp_imagery = self.imagery
# Apply frame range
temp_imagery = temp_imagery[self.start_frame:self.end_frame]
# Create the algorithm instance
algorithm = self.algorithm_class(**self.algorithm_params)
# Process all frames
num_frames = len(temp_imagery)
all_frames = []
all_rows = []
all_columns = []
for i in range(num_frames):
if self._cancelled:
return # Exit early if cancelled
# Get current frame
image = temp_imagery.images[i]
frame_number = temp_imagery.frames[i]
# Call the algorithm to get detections for this frame
rows, columns = algorithm(image)
# Apply offsets to detection coordinates
rows = rows + temp_imagery.row_offset
columns = columns + temp_imagery.column_offset
# Store results
for row, col in zip(rows, columns):
all_frames.append(frame_number)
all_rows.append(row)
all_columns.append(col)
# Emit progress
self.progress_updated.emit(i + 1, num_frames)
if self._cancelled:
return # Exit early if cancelled
# Convert to numpy arrays
all_frames = np.array(all_frames, dtype=np.int_)
all_rows = np.array(all_rows)
all_columns = np.array(all_columns)
# Create Detector object
if self.detector_name is None:
detector_name = f"{self.imagery.name} {algorithm.name}"
if self.aoi:
detector_name += f" (AOI: {self.aoi.name})"
else:
detector_name = self.detector_name
detector = Detector(
name=detector_name,
frames=all_frames,
rows=all_rows,
columns=all_columns,
sensor=self.imagery.sensor,
color=self.default_color,
marker=self.default_marker,
marker_size=self.default_marker_size,
visible=True
)
# Emit the detector
self.processing_complete.emit(detector)
except Exception as e:
# Get full traceback
tb_str = traceback.format_exc()
error_msg = f"Error processing detections: {str(e)}\n\nTraceback:\n{tb_str}"
self.error_occurred.emit(error_msg)
[docs]
class BaseDetectorWidget(QDialog):
"""Base configuration widget for detector algorithms"""
# Signal emitted when processing is complete
detector_processed = pyqtSignal(object) # Emits Detector object
[docs]
def __init__(self, parent=None, imagery=None, aois=None, algorithm_class=None,
settings_name="BaseDetector", window_title="Detector",
description="", default_color='r', default_marker='o',
default_marker_size=12):
"""
Initialize the base detector configuration widget.
Parameters
----------
parent : QWidget, optional
Parent widget, by default None
imagery : Imagery, optional
Imagery object to process, by default None
aois : list of AOI, optional
List of AOI objects to choose from, by default None
algorithm_class : type, optional
Detector algorithm class (e.g., SimpleThreshold), by default None
settings_name : str, optional
Name for QSettings storage, by default "BaseDetector"
window_title : str, optional
Window title, by default "Detector"
description : str, optional
HTML description text for the detector, by default ""
default_color : str, optional
Default color for detections, by default 'r'
default_marker : str, optional
Default marker for detections, by default 'o'
default_marker_size : int, optional
Default marker size, by default 12
"""
super().__init__(parent)
self.imagery = imagery
self.aois = aois if aois is not None else []
self.algorithm_class = algorithm_class
self.processing_thread = None
self.settings = QSettings("VISTA", settings_name)
self.description = description
# Detection styling defaults
self.default_color = default_color
self.default_marker = default_marker
self.default_marker_size = default_marker_size
self.setWindowTitle(window_title)
self.setModal(True)
self.setMinimumWidth(400)
self.init_ui()
self.load_settings()
[docs]
def init_ui(self):
"""Initialize the user interface"""
layout = QVBoxLayout()
# Information label
if self.description:
info_label = QLabel(self.description)
info_label.setWordWrap(True)
layout.addWidget(info_label)
# AOI selection
aoi_layout = QHBoxLayout()
aoi_label = QLabel("Process Region:")
aoi_label.setToolTip(
"Select an Area of Interest (AOI) to process only a subset of the imagery.\n"
"Detections will have coordinates in the full image frame."
)
self.aoi_combo = create_aoi_selector(self.aois)
self.aoi_combo.setToolTip(aoi_label.toolTip())
aoi_layout.addWidget(aoi_label)
aoi_layout.addWidget(self.aoi_combo)
aoi_layout.addStretch()
layout.addLayout(aoi_layout)
# Algorithm-specific parameters (to be added by subclasses)
self.add_algorithm_parameters(layout)
# Frame range selection
start_frame_layout = QHBoxLayout()
start_frame_label = QLabel("Start Frame:")
self.start_frame_spinbox, self.end_frame_spinbox = create_frame_range_spinboxes()
start_frame_layout.addWidget(start_frame_label)
start_frame_layout.addWidget(self.start_frame_spinbox)
start_frame_layout.addStretch()
layout.addLayout(start_frame_layout)
end_frame_layout = QHBoxLayout()
end_frame_label = QLabel("End Frame:")
end_frame_layout.addWidget(end_frame_label)
end_frame_layout.addWidget(self.end_frame_spinbox)
end_frame_layout.addStretch()
layout.addLayout(end_frame_layout)
# Progress bar
self.progress_bar = QProgressBar()
self.progress_bar.setVisible(False)
layout.addWidget(self.progress_bar)
# Button layout
button_layout = QHBoxLayout()
button_layout.addStretch()
self.run_button = QPushButton("Run")
self.run_button.clicked.connect(self.run_algorithm)
button_layout.addWidget(self.run_button)
self.cancel_button = QPushButton("Cancel")
self.cancel_button.clicked.connect(self.cancel_processing)
self.cancel_button.setVisible(False)
button_layout.addWidget(self.cancel_button)
self.close_button = QPushButton("Close")
self.close_button.clicked.connect(self.close)
button_layout.addWidget(self.close_button)
layout.addLayout(button_layout)
self.setLayout(layout)
[docs]
def add_algorithm_parameters(self, layout):
"""
Add algorithm-specific parameters to the layout.
Override this method in subclasses to add custom parameters.
Parameters
----------
layout : QVBoxLayout
QVBoxLayout to add parameters to
"""
pass
[docs]
def load_settings(self):
"""
Load previously saved settings.
Override this method in subclasses to load custom parameters.
"""
self.start_frame_spinbox.setValue(self.settings.value("start_frame", 0, type=int))
self.end_frame_spinbox.setValue(self.settings.value("end_frame", 999999, type=int))
[docs]
def save_settings(self):
"""
Save current settings for next time.
Override this method in subclasses to save custom parameters.
"""
self.settings.setValue("start_frame", self.start_frame_spinbox.value())
self.settings.setValue("end_frame", self.end_frame_spinbox.value())
[docs]
def build_algorithm_params(self):
"""
Build parameter dictionary for the algorithm.
Override this method in subclasses to add custom parameters.
Returns
-------
dict
Dictionary of algorithm parameters
"""
return {}
[docs]
def validate_parameters(self):
"""
Validate algorithm parameters before running.
Override this method in subclasses for custom validation.
Returns
-------
tuple of (bool, str)
Tuple containing (is_valid, error_message)
"""
return True, ""
[docs]
def run_algorithm(self):
"""Start processing the imagery with the configured parameters"""
# Get common parameter values
selected_aoi = self.aoi_combo.currentData() # Get the AOI object (or None)
start_frame = self.start_frame_spinbox.value()
end_frame = min(self.end_frame_spinbox.value(), len(self.imagery.frames))
# Get algorithm-specific parameters
algorithm_params = self.build_algorithm_params()
# Validate parameters
is_valid, error_message = self.validate_parameters()
if not is_valid:
QMessageBox.warning(self, "Invalid Parameters", error_message, QMessageBox.StandardButton.Ok)
return
# Save settings for next time
self.save_settings()
# Update UI for processing state
self.run_button.setEnabled(False)
self.close_button.setEnabled(False)
self.cancel_button.setVisible(True)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
self.progress_bar.setMaximum(end_frame - start_frame)
# Disable all parameter widgets
self.set_parameters_enabled(False)
# Create and start processing thread
self.processing_thread = BaseDetectorProcessingThread(
self.imagery, self.algorithm_class, algorithm_params, selected_aoi,
start_frame, end_frame, None, self.default_color, self.default_marker,
self.default_marker_size
)
self.processing_thread.progress_updated.connect(self.on_progress_updated)
self.processing_thread.processing_complete.connect(self.on_processing_complete)
self.processing_thread.error_occurred.connect(self.on_error_occurred)
self.processing_thread.finished.connect(self.on_thread_finished)
self.processing_thread.start()
[docs]
def set_parameters_enabled(self, enabled):
"""
Enable or disable parameter widgets.
Override to handle custom parameter widgets.
Parameters
----------
enabled : bool
True to enable, False to disable
"""
self.aoi_combo.setEnabled(enabled)
self.start_frame_spinbox.setEnabled(enabled)
self.end_frame_spinbox.setEnabled(enabled)
[docs]
def cancel_processing(self):
"""Cancel the ongoing processing"""
if self.processing_thread:
self.processing_thread.cancel()
self.cancel_button.setEnabled(False)
self.cancel_button.setText("Cancelling...")
[docs]
def on_progress_updated(self, current, total):
"""
Handle progress updates from the processing thread.
Parameters
----------
current : int
Current frame number being processed
total : int
Total number of frames to process
"""
self.progress_bar.setValue(current)
[docs]
def on_processing_complete(self, detector):
"""
Handle successful completion of processing.
Parameters
----------
detector : Detector
The resulting Detector object containing all detections
"""
# Emit signal with detector
self.detector_processed.emit(detector)
# Show success message
num_detections = len(detector.frames)
QMessageBox.information(
self,
"Processing Complete",
f"Successfully processed imagery.\n\n"
f"Detector: {detector.name}\n"
f"Total detections: {num_detections}",
QMessageBox.StandardButton.Ok
)
# Close the dialog
self.accept()
[docs]
def on_error_occurred(self, error_message):
"""
Handle errors from the processing thread.
Parameters
----------
error_message : str
Error message string, potentially including traceback information
"""
# Create message box with detailed text
msg_box = QMessageBox(self)
msg_box.setIcon(QMessageBox.Icon.Critical)
msg_box.setWindowTitle("Processing Error")
# Split error message to show brief summary and full traceback
if "\n\nTraceback:\n" in error_message:
summary, full_traceback = error_message.split("\n\nTraceback:\n", 1)
msg_box.setText(summary)
msg_box.setDetailedText(f"Traceback:\n{full_traceback}")
else:
msg_box.setText(error_message)
msg_box.setStandardButtons(QMessageBox.StandardButton.Ok)
msg_box.exec()
# Reset UI
self.reset_ui()
[docs]
def on_thread_finished(self):
"""Handle thread completion (cleanup)"""
if self.processing_thread:
self.processing_thread.deleteLater()
self.processing_thread = None
# If we're still here (not closed by success), reset UI
if self.isVisible():
self.reset_ui()
[docs]
def reset_ui(self):
"""Reset UI to initial state"""
self.run_button.setEnabled(True)
self.close_button.setEnabled(True)
self.cancel_button.setVisible(False)
self.cancel_button.setEnabled(True)
self.cancel_button.setText("Cancel")
self.progress_bar.setVisible(False)
self.set_parameters_enabled(True)
[docs]
def closeEvent(self, event):
"""
Handle dialog close event.
Parameters
----------
event : QCloseEvent
Close event to accept or ignore
"""
if self.processing_thread and self.processing_thread.isRunning():
reply = QMessageBox.question(
self,
"Processing in Progress",
"Processing is still in progress. Are you sure you want to cancel and close?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.No
)
if reply == QMessageBox.StandardButton.Yes:
self.cancel_processing()
# Wait for thread to finish
if self.processing_thread:
self.processing_thread.wait()
event.accept()
else:
event.ignore()
else:
event.accept()