doum
昨天 ce44d803b73a65b2cc31db5bcc662139029463d3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# -*- coding: utf-8 -*-
import logging
import os
import subprocess
import tempfile
from typing import List, Tuple
 
from app.video_io import get_ffmpeg_cmd
 
logger = logging.getLogger(__name__)
 
 
def sample_frames(video_path: str, sample_fps: float, duration: float) -> List[Tuple[float, str]]:
    """按 sample_fps 抽帧,返回 [(time_sec, jpg_path), ...]"""
    if sample_fps <= 0:
        sample_fps = 0.5
    step = 1.0 / sample_fps
    out_dir = tempfile.mkdtemp(prefix="snap_frames_")
    frames: List[Tuple[float, str]] = []
    t = 0.0
    idx = 0
    ffmpeg = get_ffmpeg_cmd("ffmpeg")
    while t <= duration:
        out_path = os.path.join(out_dir, f"{idx:06d}.jpg")
        cmd = [
            ffmpeg, "-y", "-ss", f"{t:.3f}",
            "-i", video_path,
            "-frames:v", "1", "-q:v", "2",
            out_path,
        ]
        print(f"cmd{cmd}")
        subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        if os.path.isfile(out_path) and os.path.getsize(out_path) > 0:
            frames.append((round(t, 3), out_path))
        t += step
        idx += 1
    return frames
 
 
def cleanup_frames(frames: List[Tuple[float, str]]) -> None:
    if not frames:
        return
    out_dir = os.path.dirname(frames[0][1])
    import shutil
    shutil.rmtree(out_dir, ignore_errors=True)