Skip to content

YOLO Detector

Detection wrapper around Ultralytics YOLO/RT-DETR for video frames.

Revised by wonstran 01/28/2026 11/11/2025.

DetectorModel

Bases: str, Enum

Enum of available YOLO and RT-DETR model weights.

Each member represents a different model variant with its corresponding weight file name.

Detector

Detector(
    model: DetectorModel = DetectorModel.YOLO26x,
    weights: str | None = None,
    conf: float = 0.25,
    nms: float = 0.7,
    max_det: int = 300,
    device: str = "auto",
    half: bool = False,
)

A wrapper around Ultralytics detection models for running object detection on videos and selected frames.

This class loads a YOLO (v8, v11, 26) or RT-DETR model from a local models/ directory (or from a user-supplied .pt file) and provides convenience methods to:

  • detect objects frame by frame in a video and return results as a pandas DataFrame,
  • run detection only on specified frame indices,
  • process a batch of videos and save per-video detection text files, and
  • query basic video properties (FPS, frame count).

The detector automatically chooses an inference device (cuda, xpu, mps, or cpu) when device="auto", and it can optionally enable half-precision inference on GPU.

Parameters:

Name Type Description Default
model DetectorModel

Built-in model weights to use (for example DetectorModel.YOLO26x). Default is DetectorModel.YOLO26x.

YOLO26x
weights str

Optional custom model weights to load. If relative, path is resolved under <module_dir>/models/. Default is None.

None
conf float

Confidence threshold for detections. Default is 0.25.

0.25
nms float

IoU / non-maximum suppression threshold. Default is 0.7.

0.7
max_det int

Maximum number of detections per frame. Default is 300.

300
device (auto, cuda, xpu, cpu, mps)

Inference device to use. If "auto", the detector will pick an available accelerator first (cudaxpumps) and fall back to CPU. Default is "auto".

"auto"
half bool

Whether to enable half-precision inference. This is only effective on GPU (CUDA). Default is False.

False
Notes
  • The class expects model weight files to be located under <module_dir>/models/ when using the built-in weight names.
  • Returned detection tables typically contain the columns: frame, res, x, y, w, h, conf, class.

Initialize a Detector for Ultralytics YOLO/RT-DETR models.

Parameters:

Name Type Description Default
model DetectorModel

Built-in model to use. Default is "yolo26x".

YOLO26x
weights str

Customized model weights to load. Default is None, which means using the built-in weights in model choice.

None
conf float

Confidence threshold. Default is 0.25.

0.25
nms float

IoU/NMS threshold. Default is 0.7.

0.7
max_det int

Maximum detections per frame. Default is 300. In crowded scenes, you may want to increase this.

300
device (auto, cuda, xpu, cpu, mps)

Inference device. Default is "auto".

"auto"
half bool

Whether to use half precision (GPU only). Default is False.

False
Source code in src/dnt/detect/yolo/detector.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def __init__(
    self,
    model: DetectorModel = DetectorModel.YOLO26x,
    weights: str | None = None,
    conf: float = 0.25,
    nms: float = 0.7,
    max_det: int = 300,
    device: str = "auto",
    half: bool = False,
):
    """Initialize a Detector for Ultralytics YOLO/RT-DETR models.

    Parameters
    ----------
    model : DetectorModel, optional
        Built-in model to use. Default is "yolo26x".
    weights : str, optional
        Customized model weights to load.
        Default is None, which means using the built-in weights in `model` choice.
    conf : float, optional
        Confidence threshold. Default is 0.25.
    nms : float, optional
        IoU/NMS threshold. Default is 0.7.
    max_det : int, optional
        Maximum detections per frame.
        Default is 300. In crowded scenes, you may want to increase this.
    device : {"auto", "cuda", "xpu", "cpu", "mps"}, optional
        Inference device. Default is "auto".
    half : bool, optional
        Whether to use half precision (GPU only). Default is False.

    """
    # Load model
    cwd = Path(__file__).parent.absolute()
    model_dir = cwd / "models"
    if not model_dir.exists():
        os.makedirs(model_dir)

    if weights:
        model_path = Path(weights) if os.path.isabs(weights) else model_dir / weights
    else:
        model_path = model_dir / f"{model.value}"

    # actually load model
    if ("yolo" in str(weights).lower()) or (model in YOLO_MODELS):
        self.model = YOLO(str(model_path))
    elif ("rtdetr" in str(weights).lower()) or (model in RTDETR_MODELS):
        self.model = RTDETR(str(model_path))
    else:
        raise ValueError(
            f"Cannot infer model family from model={model} and weights={weights!r}. "
            "Use a known DetectorModel or provide weights containing 'yolo' or 'rtdetr'."
        )
    self.conf = conf
    self.nms = nms
    self.max_det = max_det

    # device selection
    requested_device = str(device).lower().strip()
    requested_backend = requested_device.split(":", maxsplit=1)[0]
    valid_devices = {"auto", "cuda", "xpu", "mps", "cpu"}
    if requested_backend not in valid_devices:
        raise ValueError(
            f"Invalid device={device!r}. Choose one of {sorted(valid_devices)} or backend:index like 'cuda:0'."
        )

    backend_available = {
        "cuda": torch.cuda.is_available(),
        "xpu": hasattr(torch, "xpu") and hasattr(torch.xpu, "is_available") and torch.xpu.is_available(),
        "mps": hasattr(torch.backends, "mps") and torch.backends.mps.is_available(),
        "cpu": True,
    }

    if requested_backend == "auto":
        auto_priority = ("cuda", "xpu", "mps", "cpu")
        self.device = next(d for d in auto_priority if backend_available[d])
    else:
        self.device = requested_device if backend_available[requested_backend] else "cpu"

    # half precision only makes sense on GPU
    self.half = half and (self.device == "cuda")

detect

detect(
    input_video: str,
    iou_file: str | None = None,
    video_index: int | None = None,
    video_tot: int | None = None,
    start_frame: int | None = None,
    end_frame: int | None = None,
    verbose: bool = True,
    show: bool = False,
    message: str | None = None,
) -> pd.DataFrame

Run object detection on a video and return per-frame detections.

Parameters:

Name Type Description Default
input_video str

Path to the input video file.

required
iou_file str

If provided, detection results are written to this file (CSV without header).

None
video_index int

Index of this video in a batch, used only for progress display.

None
video_tot int

Total number of videos in the batch, used only for progress display.

None
start_frame int

Frame index to start detection from. If None or out of range, starts at 0.

None
end_frame int

Frame index to stop detection at. If None or out of range, uses the last frame.

None
verbose bool

Whether to show a progress bar. Default is True.

True
show bool

Whether to display the video frames with detections. Default is False.

False
message str | None

Optional message shown in the progress bar description. Default is None.

None

Returns:

Type Description
DataFrame

DataFrame with columns: frame, res, x, y, w, h, conf, class. If the video cannot be opened or no detections are found, an empty DataFrame with those columns is returned.

Source code in src/dnt/detect/yolo/detector.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def detect(
    self,
    input_video: str,
    iou_file: str | None = None,
    video_index: int | None = None,
    video_tot: int | None = None,
    start_frame: int | None = None,
    end_frame: int | None = None,
    verbose: bool = True,
    show: bool = False,
    message: str | None = None,
) -> pd.DataFrame:
    """Run object detection on a video and return per-frame detections.

    Parameters
    ----------
    input_video : str
        Path to the input video file.
    iou_file : str, optional
        If provided, detection results are written to this file (CSV without header).
    video_index : int, optional
        Index of this video in a batch, used only for progress display.
    video_tot : int, optional
        Total number of videos in the batch, used only for progress display.
    start_frame : int, optional
        Frame index to start detection from. If None or out of range, starts at 0.
    end_frame : int, optional
        Frame index to stop detection at. If None or out of range, uses the last frame.
    verbose : bool, optional
        Whether to show a progress bar. Default is True.
    show : bool, optional
        Whether to display the video frames with detections. Default is False.
    message : str | None, optional
        Optional message shown in the progress bar description.
        Default is None.

    Returns
    -------
    pandas.DataFrame
        DataFrame with columns:
        `frame, res, x, y, w, h, conf, class`.
        If the video cannot be opened or no detections are found, an empty DataFrame
        with those columns is returned.

    """
    # validate path
    cap = cv2.VideoCapture(input_video)
    if not cap.isOpened():
        if verbose:
            print(f"Cannot open video: {input_video}")
        return pd.DataFrame(columns=self.DET_FIELDS)

    tot_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # normalize start_frame
    if start_frame is None or start_frame < 0 or start_frame >= tot_frames:
        start_frame = 0
    # normalize end_frame
    if end_frame is None or end_frame < 0 or end_frame >= tot_frames:
        end_frame = tot_frames - 1
    if start_frame > end_frame:
        cap.release()
        raise ValueError("start_frame must be less than or equal to end_frame.")

    frame_total = end_frame - start_frame + 1

    # Some codecs return 0 or -1 for frame count
    if verbose:
        if (video_index is not None) and (video_tot is not None):
            desc = f"Detecting {video_index} of {video_tot}"
        else:
            desc = "Detecting"

        if message is not None:
            desc += f" {message}"

        if tot_frames <= 0:
            pbar = tqdm(desc=desc, unit="frame")
        else:
            pbar = tqdm(total=frame_total, desc=desc, unit="frame")

    results: list[dict] = []
    frame_idx = start_frame
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)

    win_name = "Detection (press q/ESC to quit)"
    if show:
        cv2.namedWindow(win_name, cv2.WINDOW_NORMAL)

    # optional FPS calc
    t0 = time()
    n_show = 0

    while cap.isOpened():
        pos_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        ret, frame = cap.read()
        if not ret:
            break

        if end_frame is not None and frame_idx > end_frame:
            break

        preds = self.model.predict(
            source=frame,
            conf=self.conf,
            iou=self.nms,
            max_det=self.max_det,
            device=self.device,
            half=self.half,
            verbose=False,
        )

        det = preds[0]
        boxes = det.boxes
        if boxes is not None and len(boxes) > 0:
            xyxy = boxes.xyxy.cpu().numpy()  # (N,4)
            confs = boxes.conf.cpu().numpy()  # (N,)
            clss = boxes.cls.cpu().numpy().astype(int)  # (N,)

            for (x1, y1, x2, y2), cf, c in zip(xyxy, confs, clss, strict=True):
                results.append({
                    "frame": pos_frame,
                    "res": -1,
                    "x": float(x1),
                    "y": float(y1),
                    "x2": float(x2),
                    "y2": float(y2),
                    "conf": float(cf),
                    "class": int(c),
                })

        if show:
            # Ultralytics built-in drawing (fast & clean)
            vis = det.plot()  # returns BGR image with boxes/labels

            # add simple overlay: frame index + FPS
            n_show += 1
            dt = time() - t0
            fps = n_show / dt if dt > 0 else 0.0
            cv2.putText(
                vis,
                f"frame={pos_frame}/{frame_total}  fps={fps:.1f}",
                (10, 25),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.7,
                (0, 255, 255),
                2,
            )

            cv2.imshow(win_name, vis)
            key = cv2.waitKey(1) & 0xFF
            if key == ord("q") or key == 27:  # q or ESC
                break

        if verbose and pbar is not None:
            pbar.update(1)

        frame_idx += 1

    cap.release()
    if verbose and pbar is not None:
        pbar.close()

    if not results:
        empty_df = pd.DataFrame(columns=self.DET_FIELDS)
        if iou_file:
            empty_df.to_csv(iou_file, index=False, header=False)
        return empty_df

    else:
        results_df = pd.DataFrame(results, columns=["frame", "res", "x", "y", "x2", "y2", "conf", "class"])
        results_df["w"] = (results_df["x2"] - results_df["x"]).astype(int)
        results_df["h"] = (results_df["y2"] - results_df["y"]).astype(int)
        results_df["x"] = results_df["x"].astype(int)
        results_df["y"] = results_df["y"].astype(int)
        results_df["conf"] = results_df["conf"].round(2)
        results_df = results_df[self.DET_FIELDS].reset_index(drop=True)

    if iou_file:
        folder = Path(iou_file).parent
        if not folder.exists():
            Path(folder).mkdir(parents=True, exist_ok=True)

        results_df.to_csv(iou_file, index=False, header=False)
        if verbose:
            print(f"Wrote detections to {iou_file}")

    return results_df

detect_frames

detect_frames(
    input_video: str,
    frames: list[int],
    verbose: bool = True,
) -> pd.DataFrame

Run object detection on specific frames of a video.

This method is useful when you don't need to process the entire video and only want detections for selected frame indices.

Parameters:

Name Type Description Default
input_video str

Path to the input video file.

required
frames list of int

List of frame indices to process.

required
verbose bool

Whether to show a progress bar. Default is True.

True

Returns:

Type Description
DataFrame

DataFrame with columns frame, res, x, y, w, h, conf, class. If the video cannot be opened or no detections are found, an empty DataFrame with those columns is returned.

Source code in src/dnt/detect/yolo/detector.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def detect_frames(
    self,
    input_video: str,
    frames: list[int],
    verbose: bool = True,
) -> pd.DataFrame:
    """Run object detection on specific frames of a video.

    This method is useful when you don't need to process the entire video and
    only want detections for selected frame indices.

    Parameters
    ----------
    input_video : str
        Path to the input video file.
    frames : list of int
        List of frame indices to process.
    verbose : bool, optional
        Whether to show a progress bar. Default is True.

    Returns
    -------
    pandas.DataFrame
        DataFrame with columns
        `frame, res, x, y, w, h, conf, class`.
        If the video cannot be opened or no detections are found, an empty
        DataFrame with those columns is returned.

    """
    # validate path
    if not os.path.exists(input_video):
        # return an empty, well-shaped DataFrame instead of None
        if verbose:
            print(f"{input_video} does not exist!")
        return pd.DataFrame(columns=self.DET_FIELDS)

    cap = cv2.VideoCapture(input_video)
    if not cap.isOpened():
        if verbose:
            print(f"Cannot open {input_video}")
        return pd.DataFrame(columns=self.DET_FIELDS)

    results: list[dict] = []

    pbar = tqdm(total=len(frames), unit=" frames") if verbose else None

    for pos_frame in frames:
        # move to target frame
        cap.set(cv2.CAP_PROP_POS_FRAMES, pos_frame)
        ret, frame = cap.read()
        if not ret:
            # e.g. frame index out of range
            continue

        preds = self.model.predict(
            frame,
            verbose=False,
            conf=self.conf,
            iou=self.nms,
            max_det=self.max_det,
            device=self.device,
            half=self.half,
        )
        det = preds[0]
        boxes = det.boxes
        if boxes is not None and len(boxes) > 0:
            xyxy = boxes.xyxy.cpu().numpy()  # (N,4)
            confs = boxes.conf.cpu().numpy()  # (N,)
            clss = boxes.cls.cpu().numpy().astype(int)  # (N,)

            for (x1, y1, x2, y2), cf, c in zip(xyxy, confs, clss, strict=True):
                results.append({
                    "frame": pos_frame,
                    "res": -1,
                    "x": float(x1),
                    "y": float(y1),
                    "x2": float(x2),
                    "y2": float(y2),
                    "conf": float(cf),
                    "class": int(c),
                })

        if pbar is not None:
            pbar.update()

    if pbar is not None:
        pbar.close()
    cap.release()

    # no detections at all

    if not results:
        return pd.DataFrame(columns=self.DET_FIELDS)

    df = pd.DataFrame(results)
    # compute width/height and round
    df["w"] = (df["x2"] - df["x"]).round(0)
    df["h"] = (df["y2"] - df["y"]).round(0)
    df["x"] = df["x"].round(1)
    df["y"] = df["y"].round(1)
    df["conf"] = df["conf"].round(2)
    df["class"] = df["class"].round(0).astype(int)

    df = df[self.DET_FIELDS].reset_index(drop=True)

    return df

detect_batch

detect_batch(
    input_videos: list[str],
    output_path: str | None = None,
    is_overwrite: bool = False,
    is_report: bool = True,
    verbose: bool = True,
    message: str | None = None,
) -> list[str]

Run detection on multiple videos and optionally write per-video output files.

Parameters:

Name Type Description Default
input_videos list of str

Paths to the input video files to be processed.

required
output_path str

Directory where per-video detection files will be written. If None, detections are not written to disk and the returned list will be empty.

None
is_overwrite bool

If False (default), existing detection files with the same name will be skipped. If True, they will be regenerated.

False
is_report bool

If True (default), existing detection files (that were skipped) are still included in the returned list.

True
verbose bool

If True, prints progress messages. Default is True.

True
message str | None

Optional message shown in each progress bar description. Default is None.

None

Returns:

Type Description
list of str

A list of paths to detection files that were created or already existed. If output_path is None, this will be an empty list.

Source code in src/dnt/detect/yolo/detector.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
def detect_batch(
    self,
    input_videos: list[str],
    output_path: str | None = None,
    is_overwrite: bool = False,
    is_report: bool = True,
    verbose: bool = True,
    message: str | None = None,
) -> list[str]:
    """Run detection on multiple videos and optionally write per-video output files.

    Parameters
    ----------
    input_videos : list of str
        Paths to the input video files to be processed.
    output_path : str, optional
        Directory where per-video detection files will be written. If None,
        detections are not written to disk and the returned list will be empty.
    is_overwrite : bool, optional
        If False (default), existing detection files with the same name will be
        skipped. If True, they will be regenerated.
    is_report : bool, optional
        If True (default), existing detection files (that were skipped) are still
        included in the returned list.
    verbose : bool, optional
        If True, prints progress messages. Default is True.
    message : str | None, optional
        Optional message shown in each progress bar description.
        Default is None.

    Returns
    -------
    list of str
        A list of paths to detection files that were created or already existed.
        If `output_path` is None, this will be an empty list.

    """
    results: list[str] = []
    total_videos = len(input_videos)

    for idx, input_video in enumerate(input_videos, start=1):
        # default: no output file
        iou_file = None

        # build output path / file name if requested
        if output_path is not None:
            Path(output_path).mkdir(parents=True, exist_ok=True)
            base_filename = os.path.splitext(os.path.basename(input_video))[0]
            iou_file = os.path.join(output_path, f"{base_filename}_iou.txt")

        # if we have an output file name, check overwrite logic
        if (iou_file is not None) and (not is_overwrite) and os.path.exists(iou_file):
            if is_report:
                results.append(iou_file)
            # skip processing this video
            continue

        # run detection (may write to iou_file if not None)
        self.detect(
            input_video=input_video,
            iou_file=iou_file,
            video_index=idx,
            video_tot=total_videos,
            verbose=verbose,
            message=message,
        )

        if iou_file is not None:
            results.append(iou_file)

    return results

get_fps staticmethod

get_fps(video: str) -> float

Return the frames-per-second (FPS) value of a video file.

Parameters:

Name Type Description Default
video str

Path to the video file.

required

Returns:

Type Description
float

FPS of the video. Returns 0.0 if the video cannot be opened.

Source code in src/dnt/detect/yolo/detector.py
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
@staticmethod
def get_fps(video: str) -> float:
    """Return the frames-per-second (FPS) value of a video file.

    Parameters
    ----------
    video : str
        Path to the video file.

    Returns
    -------
    float
        FPS of the video. Returns 0.0 if the video cannot be opened.

    """
    if not Path(video).exists():
        print(f"{video} does not exist!")
        return 0.0
    cap = cv2.VideoCapture(video)
    if not cap.isOpened():
        print(f"Failed to open the video: {video}")
        return 0.0

    fps = float(cap.get(cv2.CAP_PROP_FPS))
    cap.release()
    return fps

get_frames staticmethod

get_frames(video: str) -> int

Return the total number of frames in a video file.

Parameters:

Name Type Description Default
video str

Path to the video file.

required

Returns:

Type Description
int

Total frame count. Returns 0 if the video cannot be opened.

Source code in src/dnt/detect/yolo/detector.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
@staticmethod
def get_frames(video: str) -> int:
    """Return the total number of frames in a video file.

    Parameters
    ----------
    video : str
        Path to the video file.

    Returns
    -------
    int
        Total frame count. Returns 0 if the video cannot be opened.

    """
    if not Path(video).exists():
        print(f"{video} does not exist!")
        return 0
    cap = cv2.VideoCapture(video)
    if not cap.isOpened():
        print(f"Failed to open the video: {video}")
        return 0

    frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()
    return frames