From e69eb81785bbdfdf7d208d859fb4e292c0f775eb Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Mon, 8 Dec 2025 16:34:35 +0100 Subject: [PATCH 01/10] fix pipeline visualization Signed-off-by: Max Xiang --- .../backend/src/services/model_service.py | 13 ++-- .../backend/src/utils/visualization.py | 74 +++++++++---------- application/backend/src/workers/inference.py | 1 + 3 files changed, 41 insertions(+), 47 deletions(-) diff --git a/application/backend/src/services/model_service.py b/application/backend/src/services/model_service.py index 62d904257e..2f91cb5712 100644 --- a/application/backend/src/services/model_service.py +++ b/application/backend/src/services/model_service.py @@ -253,6 +253,7 @@ async def predict_image( image_bytes: bytes, cached_models: dict[UUID, OpenVINOInferencer] | None = None, device: str | None = None, + is_bgr: bool = False, ) -> PredictionResponse: """Run prediction on an image using the specified model. @@ -264,6 +265,7 @@ async def predict_image( image_bytes: Raw image bytes from uploaded file cached_models: Optional dict to cache loaded models (for performance) device: Optional string indicating the device to use for inference + is_bgr: Whether the image is in BGR format Returns: PredictionResponse: Structured prediction results @@ -285,20 +287,19 @@ async def predict_image( # Run entire prediction pipeline in a single thread # This includes image processing, model inference, and result processing - response_data = await asyncio.to_thread(cls._run_prediction_pipeline, inference_model, image_bytes) + response_data = await asyncio.to_thread(cls._run_prediction_pipeline, inference_model, image_bytes, is_bgr) return PredictionResponse(**response_data) @staticmethod - def _run_prediction_pipeline(inference_model: OpenVINOInferencer, image_bytes: bytes) -> dict: + def _run_prediction_pipeline(inference_model: OpenVINOInferencer, image_bytes: bytes, is_bgr: bool = False) -> dict: """Run the complete prediction pipeline in a single thread.""" # Process image npd = np.frombuffer(image_bytes, np.uint8) - bgr_image = cv2.imdecode(npd, -1) - if bgr_image is None: + image = cv2.imdecode(npd, -1) + if image is None: raise ValueError("Failed to decode image") - - numpy_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB) + numpy_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) if is_bgr else image # Run prediction pred = inference_model.predict(numpy_image) diff --git a/application/backend/src/utils/visualization.py b/application/backend/src/utils/visualization.py index e4a99ad9fc..469db107f9 100644 --- a/application/backend/src/utils/visualization.py +++ b/application/backend/src/utils/visualization.py @@ -7,7 +7,7 @@ import numpy as np from loguru import logger -from pydantic_models import PredictionResponse +from pydantic_models import PredictionLabel, PredictionResponse class Visualizer: @@ -46,7 +46,7 @@ def overlay_predictions( def overlay_anomaly_heatmap( base_image: np.ndarray, prediction: PredictionResponse, - threshold_value: int = 128, + threshold: float = 0.5, alpha: float = 0.25, ) -> np.ndarray: """Overlay the anomaly heatmap onto the image. @@ -58,45 +58,30 @@ def overlay_anomaly_heatmap( - Blend onto the base image using alpha """ try: - anomaly_map_base64 = prediction.anomaly_map - result = base_image.copy() - try: - anomaly_png_bytes = base64.b64decode(anomaly_map_base64) - anomaly_np = np.frombuffer(anomaly_png_bytes, dtype=np.uint8) - anomaly_img = cv2.imdecode(anomaly_np, cv2.IMREAD_UNCHANGED) - except Exception: - return result + # Decode anomaly map + anomaly_bytes = base64.b64decode(prediction.anomaly_map) + anomaly_img = cv2.imdecode(np.frombuffer(anomaly_bytes, dtype=np.uint8), cv2.IMREAD_GRAYSCALE) if anomaly_img is None: - return result - - try: - if anomaly_img.ndim == 3 and anomaly_img.shape[2] > 1: - anomaly_gray = cv2.cvtColor(anomaly_img, cv2.COLOR_BGR2GRAY) - else: - anomaly_gray = anomaly_img + return base_image - if anomaly_gray.dtype != np.uint8: - anomaly_gray = anomaly_gray.astype(np.uint8) + # Resize to match base image + h, w = base_image.shape[:2] + anomaly_gray = cv2.resize(anomaly_img, (w, h)) - heatmap = cv2.applyColorMap(anomaly_gray, cv2.COLORMAP_JET) - heatmap_resized = cv2.resize(heatmap, (result.shape[1], result.shape[0])) + # Apply colormap and create threshold mask + heatmap = cv2.applyColorMap(anomaly_gray, cv2.COLORMAP_JET) + mask = anomaly_gray >= (threshold * 255) - mask_gray = cv2.resize(anomaly_gray, (result.shape[1], result.shape[0])) - mask_bool = mask_gray >= threshold_value - - masked_heatmap = np.zeros_like(heatmap_resized) - try: - masked_heatmap[mask_bool] = heatmap_resized[mask_bool] - except Exception as e: - logger.debug(f"Failed to apply heatmap mask: {e}") + # Create masked heatmap (only show where above threshold) + masked_heatmap = np.zeros_like(heatmap) + masked_heatmap[mask] = heatmap[mask] - result = cv2.addWeighted(result, 1.0, masked_heatmap, alpha, 0) - except Exception as e: - logger.debug(f"Failed to overlay heatmap: {e}") - return result + # Blend onto base image + result = base_image.copy() + return cv2.addWeighted(result, 1.0, masked_heatmap, alpha, 0) except Exception as e: - logger.debug(f"Failed in overlay_anomaly_heatmap: {e}") + logger.debug(f"Failed to overlay heatmap: {e}") return base_image @staticmethod @@ -104,20 +89,27 @@ def draw_prediction_label( base_image: np.ndarray, prediction: PredictionResponse, *, - position: tuple[int, int] = (10, 20), - font_scale: float = 2.0, - thickness: int = 3, - text_color: tuple[int, int, int] = (0, 255, 0), - background_color: tuple[int, int, int] = (0, 0, 0), + position: tuple[int, int] = (5, 5), + font_scale: float = 1.0, + thickness: int = 2, ) -> np.ndarray: """Draw the prediction label with a background rectangle for readability.""" + alpha = 0.85 + text_color = (36, 37, 40) + green = (139, 174, 70) + red = (255, 86, 98) + background_color: tuple[int, int, int] = green if prediction.label == PredictionLabel.NORMAL else red try: - label_text = f"{prediction.label.value} ({prediction.score:.3f})" + label_text = f"{prediction.label.value} {int(prediction.score * 100)}%" result = base_image.copy() font = cv2.FONT_HERSHEY_SIMPLEX (text_w, text_h), _ = cv2.getTextSize(label_text, font, font_scale, thickness) x, y = position[0], position[1] + text_h - cv2.rectangle(result, (x - 8, y - text_h - 8), (x - 8 + text_w + 16, y + 8), background_color, -1) + # Create overlay for transparent background + overlay = result.copy() + cv2.rectangle(overlay, (x - 8, y - text_h - 8), (x - 8 + text_w + 16, y + 8), background_color[::-1], -1) + result = cv2.addWeighted(result, 1.0 - alpha, overlay, alpha, 0) + cv2.putText(result, label_text, (x, y), font, font_scale, text_color, thickness, cv2.LINE_AA) return result except Exception as e: diff --git a/application/backend/src/workers/inference.py b/application/backend/src/workers/inference.py index 3984b18bfd..95074ad302 100644 --- a/application/backend/src/workers/inference.py +++ b/application/backend/src/workers/inference.py @@ -190,6 +190,7 @@ async def _run_inference(self, image_bytes: bytes) -> Any | None: self._loaded_model.model, image_bytes, self._cached_models, # type: ignore[arg-type] + is_bgr=True, ) except Exception as e: logger.error(f"Inference failed: {e}", exc_info=True) From c885465be22844edaa99e125f2cc91ddc1c646b6 Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Mon, 8 Dec 2025 17:15:09 +0100 Subject: [PATCH 02/10] fix tests Signed-off-by: Max Xiang --- application/backend/src/pydantic_models/job.py | 1 + application/backend/src/services/training_service.py | 5 ++++- .../backend/tests/unit/services/test_training_service.py | 8 ++++---- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/application/backend/src/pydantic_models/job.py b/application/backend/src/pydantic_models/job.py index 7373f37b06..1f99cf017c 100644 --- a/application/backend/src/pydantic_models/job.py +++ b/application/backend/src/pydantic_models/job.py @@ -64,3 +64,4 @@ class TrainJobPayload(BaseModel): model_name: str device: str | None = Field(default=None) dataset_snapshot_id: str | None = Field(default=None) # used because UUID is not JSON serializable + max_epochs: int = Field(default=200, ge=1) diff --git a/application/backend/src/services/training_service.py b/application/backend/src/services/training_service.py index 0fe5b3bcc5..ec54009ed6 100644 --- a/application/backend/src/services/training_service.py +++ b/application/backend/src/services/training_service.py @@ -67,6 +67,7 @@ async def _run_training_job(cls, job: Job, job_service: JobService) -> Model | N device = job.payload.get("device") snapshot_id_ = job.payload.get("dataset_snapshot_id") snapshot_id = UUID(snapshot_id_) if snapshot_id_ else None + max_epochs = job.payload.get("max_epochs", 200) if model_name is None: raise ValueError(f"Job {job.id} payload must contain 'model_name'") @@ -105,6 +106,7 @@ async def _run_training_job(cls, job: Job, job_service: JobService) -> Model | N model=model, device=device, synchronization_parameters=synchronization_parameters, + max_epochs=max_epochs, dataset_root=dataset_root, ) @@ -155,6 +157,7 @@ def _train_model( model: Model, synchronization_parameters: ProgressSyncParams, dataset_root: str, + max_epochs: int, device: str | None = None, ) -> Model | None: """ @@ -208,7 +211,7 @@ def _train_model( default_root_dir=model.export_path, logger=[trackio, tensorboard], devices=[0], # Only single GPU training is supported for now - max_epochs=10, + max_epochs=max_epochs, callbacks=[GetiInspectProgressCallback(synchronization_parameters)], accelerator=training_device, ) diff --git a/application/backend/tests/unit/services/test_training_service.py b/application/backend/tests/unit/services/test_training_service.py index cc5f9438b7..ce54de1c7c 100644 --- a/application/backend/tests/unit/services/test_training_service.py +++ b/application/backend/tests/unit/services/test_training_service.py @@ -231,7 +231,7 @@ def test_train_pending_job_cleanup_on_failure( with patch("services.training_service.asyncio.to_thread") as mock_to_thread: # Mock the training to succeed first, setting export_path, then fail def mock_train_model( - cls, model, synchronization_parameters: ProgressSyncParams, device=None, dataset_root=None + cls, model, synchronization_parameters: ProgressSyncParams, device=None, dataset_root=None, max_epochs=1 ): model.export_path = "/path/to/model" raise Exception("Training failed") @@ -263,7 +263,7 @@ def test_train_model_success( # Call the method with patch.object(TrainingService, "_compute_export_size", return_value=123): result = TrainingService._train_model( - fxt_model, synchronization_parameters=ProgressSyncParams(), dataset_root="/tmp/dataset" + fxt_model, synchronization_parameters=ProgressSyncParams(), dataset_root="/tmp/dataset", max_epochs=42 ) # Verify the result @@ -282,7 +282,7 @@ def test_train_model_success( assert call_args[1]["default_root_dir"] == "/path/to/model" assert "logger" in call_args[1] assert len(call_args[1]["logger"]) == 2 # trackio and tensorboard - assert call_args[1]["max_epochs"] == 10 + assert call_args[1]["max_epochs"] == 42 fxt_mock_anomalib_components["engine"].fit.assert_called_once_with( model=fxt_mock_anomalib_components["anomalib_model"], datamodule=fxt_mock_anomalib_components["folder"] @@ -332,7 +332,7 @@ def test_train_model_cancelled_before_start( sync_params.set_cancel_training_event() result = TrainingService._train_model( - fxt_model, synchronization_parameters=sync_params, dataset_root="/tmp/dataset" + fxt_model, synchronization_parameters=sync_params, dataset_root="/tmp/dataset", max_epochs=1 ) assert result is None From b4a6fdf5c9b24834990d9f7fb1cc8e2c056a14bf Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Tue, 9 Dec 2025 09:50:10 +0100 Subject: [PATCH 03/10] make max_epochs optional Signed-off-by: Max Xiang --- application/backend/src/pydantic_models/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/backend/src/pydantic_models/job.py b/application/backend/src/pydantic_models/job.py index 1f99cf017c..10b40bea59 100644 --- a/application/backend/src/pydantic_models/job.py +++ b/application/backend/src/pydantic_models/job.py @@ -64,4 +64,4 @@ class TrainJobPayload(BaseModel): model_name: str device: str | None = Field(default=None) dataset_snapshot_id: str | None = Field(default=None) # used because UUID is not JSON serializable - max_epochs: int = Field(default=200, ge=1) + max_epochs: int | None = Field(default=200, ge=1) From 48ce89d734e75b7a77c7184bf62c75029721a29b Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Tue, 9 Dec 2025 10:08:19 +0100 Subject: [PATCH 04/10] make max_epochs optional Signed-off-by: Max Xiang --- application/backend/src/pydantic_models/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/backend/src/pydantic_models/job.py b/application/backend/src/pydantic_models/job.py index 10b40bea59..3e3d10d99e 100644 --- a/application/backend/src/pydantic_models/job.py +++ b/application/backend/src/pydantic_models/job.py @@ -64,4 +64,4 @@ class TrainJobPayload(BaseModel): model_name: str device: str | None = Field(default=None) dataset_snapshot_id: str | None = Field(default=None) # used because UUID is not JSON serializable - max_epochs: int | None = Field(default=200, ge=1) + max_epochs: int | None = Field(default=None, ge=1) From e107d95483980e930473ac057d96bbf7fdf25906 Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Tue, 9 Dec 2025 13:15:43 +0100 Subject: [PATCH 05/10] add pipeline overlay on/off Signed-off-by: Max Xiang --- .../src/alembic/versions/7a213a27d666_initial_schema.py | 1 + application/backend/src/db/schema.py | 1 + application/backend/src/pydantic_models/pipeline.py | 1 + .../backend/src/repositories/mappers/pipeline_mapper.py | 2 ++ application/backend/src/services/pipeline_service.py | 2 +- application/backend/src/workers/inference.py | 9 ++++++++- 6 files changed, 14 insertions(+), 2 deletions(-) diff --git a/application/backend/src/alembic/versions/7a213a27d666_initial_schema.py b/application/backend/src/alembic/versions/7a213a27d666_initial_schema.py index f0c647c92b..d797c32fcb 100644 --- a/application/backend/src/alembic/versions/7a213a27d666_initial_schema.py +++ b/application/backend/src/alembic/versions/7a213a27d666_initial_schema.py @@ -136,6 +136,7 @@ def upgrade() -> None: sa.Column("sink_id", sa.Text(), nullable=True), sa.Column("model_id", sa.Text(), nullable=True), sa.Column("inference_device", sa.String(length=64), nullable=True), + sa.Column("overlay", sa.Boolean(), nullable=True), sa.Column("is_running", sa.Boolean(), nullable=False), sa.Column("is_active", sa.Boolean(), nullable=False), sa.Column("data_collection_policies", sa.JSON(), nullable=False), diff --git a/application/backend/src/db/schema.py b/application/backend/src/db/schema.py index e12de57e75..a0706024ce 100644 --- a/application/backend/src/db/schema.py +++ b/application/backend/src/db/schema.py @@ -90,6 +90,7 @@ class PipelineDB(Base): sink_id: Mapped[str | None] = mapped_column(Text, ForeignKey("sinks.id", ondelete="RESTRICT")) model_id: Mapped[str | None] = mapped_column(Text, ForeignKey("models.id", ondelete="RESTRICT")) inference_device: Mapped[str | None] = mapped_column(String(64), nullable=True) + overlay: Mapped[bool | None] = mapped_column(Boolean, default=True) is_running: Mapped[bool] = mapped_column(Boolean, default=False) is_active: Mapped[bool] = mapped_column(Boolean, default=False) data_collection_policies: Mapped[list] = mapped_column(JSON, nullable=False, default=list) diff --git a/application/backend/src/pydantic_models/pipeline.py b/application/backend/src/pydantic_models/pipeline.py index bc4efe3050..5cbcd59473 100644 --- a/application/backend/src/pydantic_models/pipeline.py +++ b/application/backend/src/pydantic_models/pipeline.py @@ -48,6 +48,7 @@ class Pipeline(BaseModel): ) # ID of the model, used for DB mapping, not exposed in API status: PipelineStatus = PipelineStatus.IDLE # Current status of the pipeline inference_device: str | None = Field(default=None) + overlay: bool | None = Field(default=None) # TODO: can be confused with status.is_running / is_active, consider refactoring is_running: bool | None = Field(default=None, exclude=True) # If set will overwrite status diff --git a/application/backend/src/repositories/mappers/pipeline_mapper.py b/application/backend/src/repositories/mappers/pipeline_mapper.py index c47092427e..ac98efdeae 100644 --- a/application/backend/src/repositories/mappers/pipeline_mapper.py +++ b/application/backend/src/repositories/mappers/pipeline_mapper.py @@ -25,6 +25,7 @@ def from_schema(pipeline_db: PipelineDB) -> Pipeline: source_id=UUID(pipeline_db.source_id) if pipeline_db.source_id else None, status=PipelineStatus.from_bool(pipeline_db.is_running, pipeline_db.is_active), inference_device=pipeline_db.inference_device.upper() if pipeline_db.inference_device else None, + overlay=pipeline_db.overlay, ) @staticmethod @@ -38,4 +39,5 @@ def to_schema(pipeline: Pipeline) -> PipelineDB: is_running=pipeline.status.is_running, is_active=pipeline.status.is_active, inference_device=pipeline.inference_device.upper() if pipeline.inference_device else None, + overlay=pipeline.overlay, ) diff --git a/application/backend/src/services/pipeline_service.py b/application/backend/src/services/pipeline_service.py index 8e7d303242..4a4a68d013 100644 --- a/application/backend/src/services/pipeline_service.py +++ b/application/backend/src/services/pipeline_service.py @@ -74,7 +74,7 @@ async def update_pipeline(self, project_id: UUID, partial_config: dict) -> Pipel await self._notify_pipeline_changed() # Intentionally call activate_model on status change regardless of whether a model exists. self._model_service.activate_model() - if updated.inference_device != pipeline.inference_device: + if updated.inference_device != pipeline.inference_device or updated.overlay != pipeline.overlay: # reload model on device change self._model_service.activate_model() return updated diff --git a/application/backend/src/workers/inference.py b/application/backend/src/workers/inference.py index 95074ad302..e8105eab06 100644 --- a/application/backend/src/workers/inference.py +++ b/application/backend/src/workers/inference.py @@ -12,6 +12,7 @@ if TYPE_CHECKING: import multiprocessing as mp + from collections.abc import Callable from multiprocessing.synchronize import Event as EventClass from multiprocessing.synchronize import Lock @@ -57,6 +58,7 @@ def __init__( self._cached_models: dict[Any, object] = {} self._model_check_interval: float = 5.0 # seconds between model refresh checks self._is_passthrough_mode: bool = False + self._overlay = True def setup(self) -> None: super().setup() @@ -71,6 +73,7 @@ async def _get_active_model(self) -> LoadedModel | None: self._is_passthrough_mode = pipeline is None or ( pipeline.status.is_active and not pipeline.status.is_running ) + self._overlay = pipeline.overlay if pipeline and pipeline.overlay is not None else self._overlay logger.info(f"Passthrough mode {'activated' if self._is_passthrough_mode else 'disabled'}.") if pipeline is None or pipeline.model is None: return None @@ -224,7 +227,11 @@ async def _process_frame_with_model(self, stream_data: StreamData) -> None: raise RuntimeError("Inference failed or model became unavailable") # Build visualization - vis_frame: np.ndarray = Visualizer.overlay_predictions(frame, prediction_response) + overlays: list[Callable] = [] + if self._overlay: + overlays.append(Visualizer.overlay_anomaly_heatmap) + overlays.append(Visualizer.draw_prediction_label) + vis_frame: np.ndarray = Visualizer.overlay_predictions(frame, prediction_response, *overlays) # Package inference data stream_data.inference_data = InferenceData( From d1ae1fb70e745130798af22023d3331eab1600fb Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Tue, 9 Dec 2025 14:04:58 +0100 Subject: [PATCH 06/10] make fps more reactive Signed-off-by: Max Xiang --- .../ui/src/features/inspect/stream/fps/fps.component.tsx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/ui/src/features/inspect/stream/fps/fps.component.tsx b/application/ui/src/features/inspect/stream/fps/fps.component.tsx index 727d6c1c39..716753a719 100644 --- a/application/ui/src/features/inspect/stream/fps/fps.component.tsx +++ b/application/ui/src/features/inspect/stream/fps/fps.component.tsx @@ -19,10 +19,10 @@ export const Fps = ({ projectId }: FpsProp) => { 'get', '/api/projects/{project_id}/pipeline/metrics', { params: { path: { project_id: projectId } } }, - { enabled: isRunning } + { enabled: isRunning, refetchInterval: 2_000, } ); - const requestsPerSecond = metrics?.inference.throughput.avg_requests_per_second; + const requestsPerSecond = metrics?.inference.latency.latest_ms ? 1000 / metrics.inference.latency.latest_ms : undefined; if (isEmpty(metrics) || isNil(requestsPerSecond)) { return null; From 0bbd23cefabf1c6bf4f09588e80bf2d8b0696a9b Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Tue, 9 Dec 2025 14:29:28 +0100 Subject: [PATCH 07/10] address comment Signed-off-by: Max Xiang --- .../backend/src/entities/base_opencv_stream.py | 2 +- .../backend/src/entities/images_folder_stream.py | 1 + application/backend/src/services/model_service.py | 11 ++++------- application/backend/src/workers/inference.py | 1 - 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/application/backend/src/entities/base_opencv_stream.py b/application/backend/src/entities/base_opencv_stream.py index f2f0fffa3b..3cd08030a5 100644 --- a/application/backend/src/entities/base_opencv_stream.py +++ b/application/backend/src/entities/base_opencv_stream.py @@ -44,7 +44,7 @@ def _read_frame(self) -> np.ndarray: ret, frame = self.cap.read() if not ret: return self._handle_read_failure() - return frame + return cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) def _handle_read_failure(self) -> np.ndarray: """Handle frame read failure. Override in subclasses for specific behavior.""" diff --git a/application/backend/src/entities/images_folder_stream.py b/application/backend/src/entities/images_folder_stream.py index 7e73baa4f7..1922741947 100644 --- a/application/backend/src/entities/images_folder_stream.py +++ b/application/backend/src/entities/images_folder_stream.py @@ -98,6 +98,7 @@ def get_data(self) -> StreamData | None: if image is None: # Image cannot be loaded return None + image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) return StreamData( frame_data=image, timestamp=os.path.getmtime(file), diff --git a/application/backend/src/services/model_service.py b/application/backend/src/services/model_service.py index 2f91cb5712..09f6b62408 100644 --- a/application/backend/src/services/model_service.py +++ b/application/backend/src/services/model_service.py @@ -253,7 +253,6 @@ async def predict_image( image_bytes: bytes, cached_models: dict[UUID, OpenVINOInferencer] | None = None, device: str | None = None, - is_bgr: bool = False, ) -> PredictionResponse: """Run prediction on an image using the specified model. @@ -265,7 +264,6 @@ async def predict_image( image_bytes: Raw image bytes from uploaded file cached_models: Optional dict to cache loaded models (for performance) device: Optional string indicating the device to use for inference - is_bgr: Whether the image is in BGR format Returns: PredictionResponse: Structured prediction results @@ -287,19 +285,18 @@ async def predict_image( # Run entire prediction pipeline in a single thread # This includes image processing, model inference, and result processing - response_data = await asyncio.to_thread(cls._run_prediction_pipeline, inference_model, image_bytes, is_bgr) + response_data = await asyncio.to_thread(cls._run_prediction_pipeline, inference_model, image_bytes) return PredictionResponse(**response_data) @staticmethod - def _run_prediction_pipeline(inference_model: OpenVINOInferencer, image_bytes: bytes, is_bgr: bool = False) -> dict: + def _run_prediction_pipeline(inference_model: OpenVINOInferencer, image_bytes: bytes) -> dict: """Run the complete prediction pipeline in a single thread.""" # Process image npd = np.frombuffer(image_bytes, np.uint8) - image = cv2.imdecode(npd, -1) - if image is None: + numpy_image = cv2.imdecode(npd, -1) + if numpy_image is None: raise ValueError("Failed to decode image") - numpy_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) if is_bgr else image # Run prediction pred = inference_model.predict(numpy_image) diff --git a/application/backend/src/workers/inference.py b/application/backend/src/workers/inference.py index e8105eab06..e18a77d7f8 100644 --- a/application/backend/src/workers/inference.py +++ b/application/backend/src/workers/inference.py @@ -193,7 +193,6 @@ async def _run_inference(self, image_bytes: bytes) -> Any | None: self._loaded_model.model, image_bytes, self._cached_models, # type: ignore[arg-type] - is_bgr=True, ) except Exception as e: logger.error(f"Inference failed: {e}", exc_info=True) From 83a53cd0b0ca4e54ba8818f039bf5bc4a50a2aa0 Mon Sep 17 00:00:00 2001 From: "Colorado, Camilo" Date: Tue, 9 Dec 2025 17:11:08 +0100 Subject: [PATCH 08/10] enable/disable pipeline overlay Signed-off-by: Colorado, Camilo --- .../inspect/stream/fps/fps.component.tsx | 6 +- .../features/inspect/stream/fps/fps.test.tsx | 5 +- .../pipeline-switch.component.tsx | 41 +--- .../pipeline-switch/pipeline-switch.test.tsx | 182 +++--------------- .../src/features/inspect/toolbar/toolbar.tsx | 3 + 5 files changed, 44 insertions(+), 193 deletions(-) diff --git a/application/ui/src/features/inspect/stream/fps/fps.component.tsx b/application/ui/src/features/inspect/stream/fps/fps.component.tsx index 716753a719..d59002e089 100644 --- a/application/ui/src/features/inspect/stream/fps/fps.component.tsx +++ b/application/ui/src/features/inspect/stream/fps/fps.component.tsx @@ -19,10 +19,12 @@ export const Fps = ({ projectId }: FpsProp) => { 'get', '/api/projects/{project_id}/pipeline/metrics', { params: { path: { project_id: projectId } } }, - { enabled: isRunning, refetchInterval: 2_000, } + { enabled: isRunning, refetchInterval: 2_000 } ); - const requestsPerSecond = metrics?.inference.latency.latest_ms ? 1000 / metrics.inference.latency.latest_ms : undefined; + const requestsPerSecond = metrics?.inference.latency.latest_ms + ? 1000 / metrics.inference.latency.latest_ms + : undefined; if (isEmpty(metrics) || isNil(requestsPerSecond)) { return null; diff --git a/application/ui/src/features/inspect/stream/fps/fps.test.tsx b/application/ui/src/features/inspect/stream/fps/fps.test.tsx index cda3567304..c5ab411bd4 100644 --- a/application/ui/src/features/inspect/stream/fps/fps.test.tsx +++ b/application/ui/src/features/inspect/stream/fps/fps.test.tsx @@ -37,12 +37,13 @@ describe('Fps', () => { ); }; + it('renders FPS value when metrics are available', async () => { const metricsConfig = cloneDeep(getMockedMetrics({})); - metricsConfig.inference.throughput.avg_requests_per_second = 25; + metricsConfig.inference.latency.latest_ms = 25; renderFps({ metricsConfig }); - expect(await screen.findByText(/25/)).toBeVisible(); + expect(await screen.findByText(/40/)).toBeVisible(); }); it('renders nothing if metrics are missing', async () => { diff --git a/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.component.tsx b/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.component.tsx index 750f4e55f4..028fe45298 100644 --- a/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.component.tsx +++ b/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.component.tsx @@ -1,34 +1,18 @@ -import { usePipeline, useProjectIdentifier, useRunPipeline, useStopPipeline } from '@geti-inspect/hooks'; +import { usePatchPipeline, usePipeline, useProjectIdentifier } from '@geti-inspect/hooks'; import { Flex, Switch } from '@geti/ui'; -import isEmpty from 'lodash-es/isEmpty'; -import { useWebRTCConnection } from 'src/components/stream/web-rtc-connection-provider'; - -import { isStatusActive } from '../../utils'; import classes from './pipeline-switch.module.scss'; export const PipelineSwitch = () => { const { projectId } = useProjectIdentifier(); - const stopPipeline = useStopPipeline(projectId); - const { status, start } = useWebRTCConnection(); - const { data: pipeline, isLoading } = usePipeline(); - - const runPipeline = useRunPipeline({ - onSuccess: async () => { - await start(); - }, - }); - - const isSinkMissing = isEmpty(pipeline.sink?.id); - const isModelMissing = isEmpty(pipeline.model?.id); - const isPipelineActive = isStatusActive(pipeline.status); - const isWebRtcConnecting = status === 'connecting'; - const isProcessing = runPipeline.isPending || stopPipeline.isPending; + const patchPipeline = usePatchPipeline(projectId); + const { data: pipeline } = usePipeline(); - const handleChange = (isSelected: boolean) => { - const handler = isSelected ? runPipeline.mutate : stopPipeline.mutate; + const hasOverlay = pipeline?.overlay ?? false; + const isPipelineStopped = pipeline?.status !== 'running'; - handler({ params: { path: { project_id: projectId } } }); + const handleChange = () => { + patchPipeline.mutateAsync({ params: { path: { project_id: projectId } }, body: { overlay: !hasOverlay } }); }; return ( @@ -36,15 +20,8 @@ export const PipelineSwitch = () => { Enabled diff --git a/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.test.tsx b/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.test.tsx index 7671979608..8722b05ef9 100644 --- a/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.test.tsx +++ b/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.test.tsx @@ -8,33 +8,17 @@ import { http } from 'src/api/utils'; import { server } from 'src/msw-node-setup'; import { getMockedPipeline } from '../../../../../mocks/mock-pipeline'; -import { useWebRTCConnection, WebRTCConnectionState } from '../../../../components/stream/web-rtc-connection-provider'; import { PipelineSwitch } from './pipeline-switch.component'; -vi.mock('../../../../components/stream/web-rtc-connection-provider', () => ({ - useWebRTCConnection: vi.fn(), -})); - describe('PipelineSwitch', () => { - const renderApp = ({ - webRtcConfig = {}, - pipelineConfig = {}, - }: { - webRtcConfig?: Partial; - pipelineConfig?: Partial; - } = {}) => { - vi.mocked(useWebRTCConnection).mockReturnValue({ - status: 'idle', - stop: vi.fn(), - start: vi.fn(), - webRTCConnectionRef: { current: null }, - ...webRtcConfig, - }); - + const renderApp = ({ pipelineConfig = {} }: { pipelineConfig?: Partial } = {}) => { server.use( - http.get('/api/projects/{project_id}/pipeline', ({ response }) => - response(200).json(getMockedPipeline(pipelineConfig)) - ) + http.get('/api/projects/{project_id}/pipeline', () => { + if (pipelineConfig === null) { + return HttpResponse.json(null); + } + return HttpResponse.json(getMockedPipeline(pipelineConfig)); + }) ); return render( @@ -48,150 +32,34 @@ describe('PipelineSwitch', () => { ); }; - beforeEach(() => { - vi.clearAllMocks(); - }); - - describe('Switch rendering', () => { - it('renders the switch with "Enabled" label', async () => { - renderApp(); - - expect(await screen.findByText('Enabled')).toBeVisible(); - }); - - it('switch is selected when pipeline status is "running"', async () => { - renderApp({ pipelineConfig: { status: 'running' } }); - - expect(await screen.findByRole('switch')).toBeChecked(); - }); - - it('switch is not selected when pipeline status is not "running"', async () => { - renderApp({ pipelineConfig: { status: 'idle' } }); + it('disables button if no pipeline', async () => { + renderApp({ pipelineConfig: { status: 'idle' } }); - expect(await screen.findByRole('switch')).not.toBeChecked(); - }); + expect(await screen.findByRole('switch')).toBeDisabled(); }); - describe('Switch disabled states', () => { - it('model is missing', async () => { - renderApp({ pipelineConfig: { model: undefined } }); - - expect(await screen.findByRole('switch')).toBeDisabled(); - }); - - it('model id is empty', async () => { - renderApp({ pipelineConfig: { model: { id: '' } as SchemaPipeline['model'] } }); - - expect(await screen.findByRole('switch')).toBeDisabled(); - }); - - it('pipeline status is not active', async () => { - renderApp({ pipelineConfig: { status: 'idle' } }); - - expect(await screen.findByRole('switch')).toBeDisabled(); - }); - - it('WebRTC is connecting', async () => { - renderApp({ webRtcConfig: { status: 'connecting' } }); - - expect(await screen.findByRole('switch')).toBeDisabled(); - }); - - it('source is missing', async () => { - renderApp({ pipelineConfig: { source: undefined } }); - - expect(await screen.findByRole('switch')).toBeEnabled(); - }); - - it('sink is missing', async () => { - renderApp({ pipelineConfig: { sink: undefined } }); + it('renders enabled switch if pipeline is running', async () => { + renderApp({ pipelineConfig: { status: 'running' } }); - expect(await screen.findByRole('switch')).toBeDisabled(); - }); - - it('both source and sink are missing', async () => { - renderApp({ pipelineConfig: { source: undefined, sink: undefined } }); - - expect(await screen.findByRole('switch')).toBeDisabled(); - }); - - it('is enabled when pipeline is active and all required components are present', async () => { - renderApp({ pipelineConfig: { status: 'active' } }); - - expect(await screen.findByRole('switch')).toBeEnabled(); - }); - - it('is enabled when pipeline status is "running"', async () => { - renderApp({ pipelineConfig: { status: 'running' } }); - - expect(await screen.findByRole('switch')).toBeEnabled(); - }); + expect(await screen.findByRole('switch')).toBeEnabled(); }); - describe('Switch interactions', () => { - it('calls runPipeline when switch is turned on', async () => { - const mockStart = vi.fn(); - const runPipelineSpy = vi.fn(); - - server.use( - http.post('/api/projects/{project_id}/pipeline:run', () => { - runPipelineSpy(); - return HttpResponse.json({}, { status: 204 }); - }) - ); - - renderApp({ - pipelineConfig: { status: 'active' }, - webRtcConfig: { start: mockStart }, - }); - - expect(await screen.findByRole('switch')).not.toBeChecked(); + it('calls API and toggles switch', async () => { + const requestSpy = vi.fn(); - await userEvent.click(await screen.findByRole('switch')); - - await waitFor(() => { - expect(runPipelineSpy).toHaveBeenCalled(); - expect(mockStart).toHaveBeenCalled(); - }); - }); - - it('stops inference when switch is turned off', async () => { - const mockStart = vi.fn(); - const stopPipelineSpy = vi.fn(); - - server.use( - http.post('/api/projects/{project_id}/pipeline:stop', () => { - stopPipelineSpy(); - return HttpResponse.json({}, { status: 204 }); - }) - ); - - renderApp({ pipelineConfig: { status: 'running' }, webRtcConfig: { start: mockStart } }); - - expect(await screen.findByRole('switch')).toBeChecked(); - - await userEvent.click(await screen.findByRole('switch')); - - await waitFor(() => { - expect(stopPipelineSpy).toHaveBeenCalled(); - expect(mockStart).not.toHaveBeenCalled(); - }); - }); - - it('starts WebRTC connection after successful runPipeline', async () => { - const mockStart = vi.fn(); - - server.use( - http.post('/api/projects/{project_id}/pipeline:run', () => HttpResponse.json({}, { status: 204 })) - ); + server.use( + http.patch('/api/projects/{project_id}/pipeline', async ({ response }) => { + requestSpy(); + return response(200).json(getMockedPipeline()); + }) + ); - renderApp({ pipelineConfig: { status: 'active' }, webRtcConfig: { start: mockStart } }); + renderApp({ pipelineConfig: { status: 'running' } }); - await userEvent.click(await screen.findByRole('switch')); + await userEvent.click(await screen.findByRole('switch')); - await waitFor(() => { - expect(mockStart).toHaveBeenCalled(); - }); + await waitFor(() => { + expect(requestSpy).toHaveBeenCalled(); }); }); }); diff --git a/application/ui/src/features/inspect/toolbar/toolbar.tsx b/application/ui/src/features/inspect/toolbar/toolbar.tsx index 5ee4e4ae61..4de5d7b885 100644 --- a/application/ui/src/features/inspect/toolbar/toolbar.tsx +++ b/application/ui/src/features/inspect/toolbar/toolbar.tsx @@ -5,6 +5,7 @@ import { dimensionValue, Divider, Flex, View } from '@geti/ui'; import { InferenceDevices } from './inference-devices/inference-devices.component'; import { PipelineConfiguration } from './pipeline-configuration.component'; +import { PipelineSwitch } from './pipeline-switch/pipeline-switch.component'; export const Toolbar = () => { return ( @@ -20,6 +21,8 @@ export const Toolbar = () => { + + From fa24f0d02e29528f9abba015c56bcde71f05c21f Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Tue, 9 Dec 2025 21:38:23 +0100 Subject: [PATCH 09/10] COLOR_RGB2BGR Signed-off-by: Max Xiang --- application/backend/src/entities/base_opencv_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/backend/src/entities/base_opencv_stream.py b/application/backend/src/entities/base_opencv_stream.py index 3cd08030a5..504ddb584f 100644 --- a/application/backend/src/entities/base_opencv_stream.py +++ b/application/backend/src/entities/base_opencv_stream.py @@ -44,7 +44,7 @@ def _read_frame(self) -> np.ndarray: ret, frame = self.cap.read() if not ret: return self._handle_read_failure() - return cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) def _handle_read_failure(self) -> np.ndarray: """Handle frame read failure. Override in subclasses for specific behavior.""" From d54c591661379c380222bc2e17e3ae9b70945abc Mon Sep 17 00:00:00 2001 From: Max Xiang Date: Wed, 10 Dec 2025 09:12:02 +0100 Subject: [PATCH 10/10] enabled -> overlay Signed-off-by: Max Xiang --- .../toolbar/pipeline-switch/pipeline-switch.component.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.component.tsx b/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.component.tsx index 028fe45298..5485da9ae6 100644 --- a/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.component.tsx +++ b/application/ui/src/features/inspect/toolbar/pipeline-switch/pipeline-switch.component.tsx @@ -23,7 +23,7 @@ export const PipelineSwitch = () => { isSelected={hasOverlay} isDisabled={patchPipeline.isPending || isPipelineStopped} > - Enabled + Overlay );