WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def upgrade() -> None:
sa.Column("sink_id", sa.Text(), nullable=True),
sa.Column("model_id", sa.Text(), nullable=True),
sa.Column("inference_device", sa.String(length=64), nullable=True),
sa.Column("overlay", sa.Boolean(), nullable=True),
sa.Column("is_running", sa.Boolean(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column("data_collection_policies", sa.JSON(), nullable=False),
Expand Down
1 change: 1 addition & 0 deletions application/backend/src/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class PipelineDB(Base):
sink_id: Mapped[str | None] = mapped_column(Text, ForeignKey("sinks.id", ondelete="RESTRICT"))
model_id: Mapped[str | None] = mapped_column(Text, ForeignKey("models.id", ondelete="RESTRICT"))
inference_device: Mapped[str | None] = mapped_column(String(64), nullable=True)
overlay: Mapped[bool | None] = mapped_column(Boolean, default=True)
is_running: Mapped[bool] = mapped_column(Boolean, default=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=False)
data_collection_policies: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
Expand Down
2 changes: 1 addition & 1 deletion application/backend/src/entities/base_opencv_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _read_frame(self) -> np.ndarray:
ret, frame = self.cap.read()
if not ret:
return self._handle_read_failure()
return frame
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

def _handle_read_failure(self) -> np.ndarray:
"""Handle frame read failure. Override in subclasses for specific behavior."""
Expand Down
1 change: 1 addition & 0 deletions application/backend/src/entities/images_folder_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def get_data(self) -> StreamData | None:
if image is None:
# Image cannot be loaded
return None
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
return StreamData(
frame_data=image,
timestamp=os.path.getmtime(file),
Expand Down
1 change: 1 addition & 0 deletions application/backend/src/pydantic_models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ class TrainJobPayload(BaseModel):
model_name: str
device: str | None = Field(default=None)
dataset_snapshot_id: str | None = Field(default=None) # used because UUID is not JSON serializable
max_epochs: int | None = Field(default=None, ge=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we planning on dealing with rest of the configurable parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't for the time being. We're likely going to remove this once we have a proper design for configurable parameters.

1 change: 1 addition & 0 deletions application/backend/src/pydantic_models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Pipeline(BaseModel):
) # ID of the model, used for DB mapping, not exposed in API
status: PipelineStatus = PipelineStatus.IDLE # Current status of the pipeline
inference_device: str | None = Field(default=None)
overlay: bool | None = Field(default=None)

# TODO: can be confused with status.is_running / is_active, consider refactoring
is_running: bool | None = Field(default=None, exclude=True) # If set will overwrite status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def from_schema(pipeline_db: PipelineDB) -> Pipeline:
source_id=UUID(pipeline_db.source_id) if pipeline_db.source_id else None,
status=PipelineStatus.from_bool(pipeline_db.is_running, pipeline_db.is_active),
inference_device=pipeline_db.inference_device.upper() if pipeline_db.inference_device else None,
overlay=pipeline_db.overlay,
)

@staticmethod
Expand All @@ -38,4 +39,5 @@ def to_schema(pipeline: Pipeline) -> PipelineDB:
is_running=pipeline.status.is_running,
is_active=pipeline.status.is_active,
inference_device=pipeline.inference_device.upper() if pipeline.inference_device else None,
overlay=pipeline.overlay,
)
6 changes: 2 additions & 4 deletions application/backend/src/services/model_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,10 @@ def _run_prediction_pipeline(inference_model: OpenVINOInferencer, image_bytes: b
"""Run the complete prediction pipeline in a single thread."""
# Process image
npd = np.frombuffer(image_bytes, np.uint8)
bgr_image = cv2.imdecode(npd, -1)
if bgr_image is None:
numpy_image = cv2.imdecode(npd, -1)
if numpy_image is None:
raise ValueError("Failed to decode image")

numpy_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB)

# Run prediction
pred = inference_model.predict(numpy_image)

Expand Down
2 changes: 1 addition & 1 deletion application/backend/src/services/pipeline_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def update_pipeline(self, project_id: UUID, partial_config: dict) -> Pipel
await self._notify_pipeline_changed()
# Intentionally call activate_model on status change regardless of whether a model exists.
self._model_service.activate_model()
if updated.inference_device != pipeline.inference_device:
if updated.inference_device != pipeline.inference_device or updated.overlay != pipeline.overlay:
# reload model on device change
self._model_service.activate_model()
return updated
Expand Down
5 changes: 4 additions & 1 deletion application/backend/src/services/training_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async def _run_training_job(cls, job: Job, job_service: JobService) -> Model | N
device = job.payload.get("device")
snapshot_id_ = job.payload.get("dataset_snapshot_id")
snapshot_id = UUID(snapshot_id_) if snapshot_id_ else None
max_epochs = job.payload.get("max_epochs", 200)

if model_name is None:
raise ValueError(f"Job {job.id} payload must contain 'model_name'")
Expand Down Expand Up @@ -105,6 +106,7 @@ async def _run_training_job(cls, job: Job, job_service: JobService) -> Model | N
model=model,
device=device,
synchronization_parameters=synchronization_parameters,
max_epochs=max_epochs,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this to the model training dialog? cc @ashwinvaidya17, @MarkRedeman

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can already expose this (perhaps next to the picker of the training device from the new designs)

However there are some edge cases we would need to deal with. Some models will overwrite the max epochs to be 1 since they only need to do a single pass.
We could hardcode the UI to not show the input for those models, but that might get messy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's expose this and just have a note saying something like, "some models only extract features so max_epochs will be overridden for those models".

dataset_root=dataset_root,
)

Expand Down Expand Up @@ -155,6 +157,7 @@ def _train_model(
model: Model,
synchronization_parameters: ProgressSyncParams,
dataset_root: str,
max_epochs: int,
device: str | None = None,
) -> Model | None:
"""
Expand Down Expand Up @@ -208,7 +211,7 @@ def _train_model(
default_root_dir=model.export_path,
logger=[trackio, tensorboard],
devices=[0], # Only single GPU training is supported for now
max_epochs=10,
max_epochs=max_epochs,
callbacks=[GetiInspectProgressCallback(synchronization_parameters)],
accelerator=training_device,
)
Expand Down
74 changes: 33 additions & 41 deletions application/backend/src/utils/visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np
from loguru import logger

from pydantic_models import PredictionResponse
from pydantic_models import PredictionLabel, PredictionResponse


class Visualizer:
Expand Down Expand Up @@ -46,7 +46,7 @@ def overlay_predictions(
def overlay_anomaly_heatmap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something for the future but maybe we should consolidate the visualization methods here and the ones in anomalib. This way we won't have to maintain two separate visualizers

base_image: np.ndarray,
prediction: PredictionResponse,
threshold_value: int = 128,
threshold: float = 0.5,
alpha: float = 0.25,
) -> np.ndarray:
"""Overlay the anomaly heatmap onto the image.
Expand All @@ -58,66 +58,58 @@ def overlay_anomaly_heatmap(
- Blend onto the base image using alpha
"""
try:
anomaly_map_base64 = prediction.anomaly_map
result = base_image.copy()
try:
anomaly_png_bytes = base64.b64decode(anomaly_map_base64)
anomaly_np = np.frombuffer(anomaly_png_bytes, dtype=np.uint8)
anomaly_img = cv2.imdecode(anomaly_np, cv2.IMREAD_UNCHANGED)
except Exception:
return result
# Decode anomaly map
anomaly_bytes = base64.b64decode(prediction.anomaly_map)
anomaly_img = cv2.imdecode(np.frombuffer(anomaly_bytes, dtype=np.uint8), cv2.IMREAD_GRAYSCALE)

if anomaly_img is None:
return result

try:
if anomaly_img.ndim == 3 and anomaly_img.shape[2] > 1:
anomaly_gray = cv2.cvtColor(anomaly_img, cv2.COLOR_BGR2GRAY)
else:
anomaly_gray = anomaly_img
return base_image

if anomaly_gray.dtype != np.uint8:
anomaly_gray = anomaly_gray.astype(np.uint8)
# Resize to match base image
h, w = base_image.shape[:2]
anomaly_gray = cv2.resize(anomaly_img, (w, h))

heatmap = cv2.applyColorMap(anomaly_gray, cv2.COLORMAP_JET)
heatmap_resized = cv2.resize(heatmap, (result.shape[1], result.shape[0]))
# Apply colormap and create threshold mask
heatmap = cv2.applyColorMap(anomaly_gray, cv2.COLORMAP_JET)
mask = anomaly_gray >= (threshold * 255)

mask_gray = cv2.resize(anomaly_gray, (result.shape[1], result.shape[0]))
mask_bool = mask_gray >= threshold_value

masked_heatmap = np.zeros_like(heatmap_resized)
try:
masked_heatmap[mask_bool] = heatmap_resized[mask_bool]
except Exception as e:
logger.debug(f"Failed to apply heatmap mask: {e}")
# Create masked heatmap (only show where above threshold)
masked_heatmap = np.zeros_like(heatmap)
masked_heatmap[mask] = heatmap[mask]

result = cv2.addWeighted(result, 1.0, masked_heatmap, alpha, 0)
except Exception as e:
logger.debug(f"Failed to overlay heatmap: {e}")
return result
# Blend onto base image
result = base_image.copy()
return cv2.addWeighted(result, 1.0, masked_heatmap, alpha, 0)
except Exception as e:
logger.debug(f"Failed in overlay_anomaly_heatmap: {e}")
logger.debug(f"Failed to overlay heatmap: {e}")
return base_image

@staticmethod
def draw_prediction_label(
base_image: np.ndarray,
prediction: PredictionResponse,
*,
position: tuple[int, int] = (10, 20),
font_scale: float = 2.0,
thickness: int = 3,
text_color: tuple[int, int, int] = (0, 255, 0),
background_color: tuple[int, int, int] = (0, 0, 0),
position: tuple[int, int] = (5, 5),
font_scale: float = 1.0,
thickness: int = 2,
) -> np.ndarray:
"""Draw the prediction label with a background rectangle for readability."""
alpha = 0.85
text_color = (36, 37, 40)
green = (139, 174, 70)
red = (255, 86, 98)
background_color: tuple[int, int, int] = green if prediction.label == PredictionLabel.NORMAL else red
try:
label_text = f"{prediction.label.value} ({prediction.score:.3f})"
label_text = f"{prediction.label.value} {int(prediction.score * 100)}%"
result = base_image.copy()
font = cv2.FONT_HERSHEY_SIMPLEX
(text_w, text_h), _ = cv2.getTextSize(label_text, font, font_scale, thickness)
x, y = position[0], position[1] + text_h
cv2.rectangle(result, (x - 8, y - text_h - 8), (x - 8 + text_w + 16, y + 8), background_color, -1)
# Create overlay for transparent background
overlay = result.copy()
cv2.rectangle(overlay, (x - 8, y - text_h - 8), (x - 8 + text_w + 16, y + 8), background_color[::-1], -1)
result = cv2.addWeighted(result, 1.0 - alpha, overlay, alpha, 0)

cv2.putText(result, label_text, (x, y), font, font_scale, text_color, thickness, cv2.LINE_AA)
return result
except Exception as e:
Expand Down
9 changes: 8 additions & 1 deletion application/backend/src/workers/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

if TYPE_CHECKING:
import multiprocessing as mp
from collections.abc import Callable
from multiprocessing.synchronize import Event as EventClass
from multiprocessing.synchronize import Lock

Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(
self._cached_models: dict[Any, object] = {}
self._model_check_interval: float = 5.0 # seconds between model refresh checks
self._is_passthrough_mode: bool = False
self._overlay = True

def setup(self) -> None:
super().setup()
Expand All @@ -71,6 +73,7 @@ async def _get_active_model(self) -> LoadedModel | None:
self._is_passthrough_mode = pipeline is None or (
pipeline.status.is_active and not pipeline.status.is_running
)
self._overlay = pipeline.overlay if pipeline and pipeline.overlay is not None else self._overlay
logger.info(f"Passthrough mode {'activated' if self._is_passthrough_mode else 'disabled'}.")
if pipeline is None or pipeline.model is None:
return None
Expand Down Expand Up @@ -223,7 +226,11 @@ async def _process_frame_with_model(self, stream_data: StreamData) -> None:
raise RuntimeError("Inference failed or model became unavailable")

# Build visualization
vis_frame: np.ndarray = Visualizer.overlay_predictions(frame, prediction_response)
overlays: list[Callable] = []
if self._overlay:
overlays.append(Visualizer.overlay_anomaly_heatmap)
overlays.append(Visualizer.draw_prediction_label)
vis_frame: np.ndarray = Visualizer.overlay_predictions(frame, prediction_response, *overlays)

# Package inference data
stream_data.inference_data = InferenceData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def test_train_pending_job_cleanup_on_failure(
with patch("services.training_service.asyncio.to_thread") as mock_to_thread:
# Mock the training to succeed first, setting export_path, then fail
def mock_train_model(
cls, model, synchronization_parameters: ProgressSyncParams, device=None, dataset_root=None
cls, model, synchronization_parameters: ProgressSyncParams, device=None, dataset_root=None, max_epochs=1
):
model.export_path = "/path/to/model"
raise Exception("Training failed")
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_train_model_success(
# Call the method
with patch.object(TrainingService, "_compute_export_size", return_value=123):
result = TrainingService._train_model(
fxt_model, synchronization_parameters=ProgressSyncParams(), dataset_root="/tmp/dataset"
fxt_model, synchronization_parameters=ProgressSyncParams(), dataset_root="/tmp/dataset", max_epochs=42
)

# Verify the result
Expand All @@ -282,7 +282,7 @@ def test_train_model_success(
assert call_args[1]["default_root_dir"] == "/path/to/model"
assert "logger" in call_args[1]
assert len(call_args[1]["logger"]) == 2 # trackio and tensorboard
assert call_args[1]["max_epochs"] == 10
assert call_args[1]["max_epochs"] == 42

fxt_mock_anomalib_components["engine"].fit.assert_called_once_with(
model=fxt_mock_anomalib_components["anomalib_model"], datamodule=fxt_mock_anomalib_components["folder"]
Expand Down Expand Up @@ -332,7 +332,7 @@ def test_train_model_cancelled_before_start(
sync_params.set_cancel_training_event()

result = TrainingService._train_model(
fxt_model, synchronization_parameters=sync_params, dataset_root="/tmp/dataset"
fxt_model, synchronization_parameters=sync_params, dataset_root="/tmp/dataset", max_epochs=1
)

assert result is None
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ export const Fps = ({ projectId }: FpsProp) => {
'get',
'/api/projects/{project_id}/pipeline/metrics',
{ params: { path: { project_id: projectId } } },
{ enabled: isRunning }
{ enabled: isRunning, refetchInterval: 2_000 }
);

const requestsPerSecond = metrics?.inference.throughput.avg_requests_per_second;
const requestsPerSecond = metrics?.inference.latency.latest_ms
? 1000 / metrics.inference.latency.latest_ms
: undefined;

if (isEmpty(metrics) || isNil(requestsPerSecond)) {
return null;
Expand Down
5 changes: 3 additions & 2 deletions application/ui/src/features/inspect/stream/fps/fps.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ describe('Fps', () => {
</QueryClientProvider>
);
};

it('renders FPS value when metrics are available', async () => {
const metricsConfig = cloneDeep(getMockedMetrics({}));
metricsConfig.inference.throughput.avg_requests_per_second = 25;
metricsConfig.inference.latency.latest_ms = 25;

renderFps({ metricsConfig });
expect(await screen.findByText(/25/)).toBeVisible();
expect(await screen.findByText(/40/)).toBeVisible();
});

it('renders nothing if metrics are missing', async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,29 @@
import { usePipeline, useProjectIdentifier, useRunPipeline, useStopPipeline } from '@geti-inspect/hooks';
import { usePatchPipeline, usePipeline, useProjectIdentifier } from '@geti-inspect/hooks';
import { Flex, Switch } from '@geti/ui';
import isEmpty from 'lodash-es/isEmpty';
import { useWebRTCConnection } from 'src/components/stream/web-rtc-connection-provider';

import { isStatusActive } from '../../utils';

import classes from './pipeline-switch.module.scss';

export const PipelineSwitch = () => {
const { projectId } = useProjectIdentifier();
const stopPipeline = useStopPipeline(projectId);
const { status, start } = useWebRTCConnection();
const { data: pipeline, isLoading } = usePipeline();

const runPipeline = useRunPipeline({
onSuccess: async () => {
await start();
},
});

const isSinkMissing = isEmpty(pipeline.sink?.id);
const isModelMissing = isEmpty(pipeline.model?.id);
const isPipelineActive = isStatusActive(pipeline.status);
const isWebRtcConnecting = status === 'connecting';
const isProcessing = runPipeline.isPending || stopPipeline.isPending;
const patchPipeline = usePatchPipeline(projectId);
const { data: pipeline } = usePipeline();

const handleChange = (isSelected: boolean) => {
const handler = isSelected ? runPipeline.mutate : stopPipeline.mutate;
const hasOverlay = pipeline?.overlay ?? false;
const isPipelineStopped = pipeline?.status !== 'running';

handler({ params: { path: { project_id: projectId } } });
const handleChange = () => {
patchPipeline.mutateAsync({ params: { path: { project_id: projectId } }, body: { overlay: !hasOverlay } });
};

return (
<Flex>
<Switch
UNSAFE_className={classes.switch}
onChange={handleChange}
isSelected={pipeline.status === 'running'}
isDisabled={
isLoading ||
isProcessing ||
isSinkMissing ||
isModelMissing ||
!isPipelineActive ||
isWebRtcConnecting
}
isSelected={hasOverlay}
isDisabled={patchPipeline.isPending || isPipelineStopped}
>
Enabled
Overlay
</Switch>
</Flex>
);
Expand Down
Loading
Loading