1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| # -*- coding: utf-8 -*-
| import logging
| import os
| import subprocess
| import tempfile
| from typing import List, Tuple
|
| from app.video_io import get_ffmpeg_cmd
|
| logger = logging.getLogger(__name__)
|
|
| def sample_frames(video_path: str, sample_fps: float, duration: float) -> List[Tuple[float, str]]:
| """按 sample_fps 抽帧,返回 [(time_sec, jpg_path), ...]"""
| if sample_fps <= 0:
| sample_fps = 0.5
| step = 1.0 / sample_fps
| out_dir = tempfile.mkdtemp(prefix="snap_frames_")
| frames: List[Tuple[float, str]] = []
| t = 0.0
| idx = 0
| ffmpeg = get_ffmpeg_cmd("ffmpeg")
| while t <= duration:
| out_path = os.path.join(out_dir, f"{idx:06d}.jpg")
| cmd = [
| ffmpeg, "-y", "-ss", f"{t:.3f}",
| "-i", video_path,
| "-frames:v", "1", "-q:v", "2",
| out_path,
| ]
| print(f"cmd{cmd}")
| subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
| if os.path.isfile(out_path) and os.path.getsize(out_path) > 0:
| frames.append((round(t, 3), out_path))
| t += step
| idx += 1
| return frames
|
|
| def cleanup_frames(frames: List[Tuple[float, str]]) -> None:
| if not frames:
| return
| out_dir = os.path.dirname(frames[0][1])
| import shutil
| shutil.rmtree(out_dir, ignore_errors=True)
|
|