doum
2 天以前 ce44d803b73a65b2cc31db5bcc662139029463d3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""从 annotations.jsonl 生成 labels.csv(帧路径, label, task)。"""
import argparse
import csv
import json
import os
import random
import subprocess
import sys
from pathlib import Path
 
import yaml
 
ROOT = Path(__file__).resolve().parent.parent
 
 
def load_config(path):
    with open(path, encoding="utf-8") as f:
        cfg = yaml.safe_load(f)
    base = Path(path).resolve().parent
    for key in ("annotations_jsonl", "frames_dir", "labels_csv"):
        p = cfg["data"][key]
        if not os.path.isabs(p):
            cfg["data"][key] = str((base / p).resolve())
    cfg["data"]["output_dir"] = cfg["export"]["output_dir"]
    if not os.path.isabs(cfg["export"]["output_dir"]):
        cfg["export"]["output_dir"] = str((base / cfg["export"]["output_dir"]).resolve())
    return cfg
 
 
def ffprobe_duration(video_path, ffmpeg_dir=""):
    ffprobe = "ffprobe"
    if ffmpeg_dir:
        ffprobe = os.path.join(ffmpeg_dir, "ffprobe.exe" if os.name == "nt" else "ffprobe")
    cmd = [
        ffprobe, "-v", "error", "-show_entries", "format=duration",
        "-of", "default=noprint_wrappers=1:nokey=1", video_path,
    ]
    try:
        out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True).strip()
        return float(out) if out else 0.0
    except Exception:
        return 0.0
 
 
def extract_frame_at(video_path, out_path, sec, ffmpeg_dir=""):
    ffmpeg = "ffmpeg"
    if ffmpeg_dir:
        ffmpeg = os.path.join(ffmpeg_dir, "ffmpeg.exe" if os.name == "nt" else "ffmpeg")
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    cmd = [
        ffmpeg, "-y", "-ss", str(sec), "-i", video_path,
        "-frames:v", "1", "-q:v", "2", out_path,
    ]
    subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
 
 
def process_video(item, cfg, ffmpeg_dir):
    media_id = item["media_id"]
    video_path = item["video_path"]
    if not os.path.isfile(video_path) and not video_path.startswith("http"):
        print(f"跳过 media_id={media_id}: 视频不存在 {video_path}")
        return []
 
    local_video = video_path
    tmp_video = None
    if video_path.startswith("http"):
        import httpx
        tmp_video = os.path.join(cfg["data"]["frames_dir"], f"_tmp_{media_id}.mp4")
        os.makedirs(cfg["data"]["frames_dir"], exist_ok=True)
        with httpx.stream("GET", video_path, timeout=600.0) as r:
            r.raise_for_status()
            with open(tmp_video, "wb") as f:
                for chunk in r.iter_bytes():
                    f.write(chunk)
        local_video = tmp_video
 
    duration = ffprobe_duration(local_video, ffmpeg_dir)
    if duration <= 0:
        duration = max(item.get("handover_time_sec", 600), 600)
 
    sample_fps = cfg["sampling"]["sample_fps"]
    win = cfg["sampling"]["positive_window_sec"]
    sf_t = float(item["storefront_time_sec"])
    ho_t = float(item["handover_time_sec"])
 
    rows = []
    t = 0.0
    step = 1.0 / sample_fps
    other_count = 0
    other_budget = max(1, int(duration * sample_fps / cfg["sampling"]["other_downsample_ratio"]))
    while t <= duration:
        rel = f"{media_id}/{int(t * 1000)}.jpg"
        out_path = os.path.join(cfg["data"]["frames_dir"], rel)
        is_sf = abs(t - sf_t) <= win
        is_ho = abs(t - ho_t) <= win
        need_other = not is_sf and not is_ho and other_count < other_budget
        if is_sf or is_ho or need_other:
            extract_frame_at(local_video, out_path, t, ffmpeg_dir)
            if os.path.isfile(out_path):
                split = item.get("split", "train")
                sf_label = 1 if is_sf else 0
                ho_label = 1 if is_ho else 0
                rows.append((rel, sf_label, "storefront", split))
                rows.append((rel, ho_label, "handover", split))
                if need_other:
                    other_count += 1
        t += step
    if tmp_video and os.path.isfile(tmp_video):
        os.remove(tmp_video)
    return rows
 
 
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-c", "--config", default=str(Path(__file__).parent / "config.yaml"))
    parser.add_argument("--ffmpeg-dir", default=os.environ.get("FFMPEG_DIR", ""))
    parser.add_argument("--limit", type=int, default=0)
    args = parser.parse_args()
    cfg = load_config(args.config)
    os.makedirs(cfg["data"]["frames_dir"], exist_ok=True)
 
    items = []
    with open(cfg["data"]["annotations_jsonl"], encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                items.append(json.loads(line))
    if args.limit:
        items = items[: args.limit]
 
    all_rows = []
    for i, item in enumerate(items):
        if item.get("storefront_time_sec", 0) <= 0 or item.get("handover_time_sec", 0) <= 0:
            print(f"跳过未标注 media_id={item.get('media_id')}")
            continue
        print(f"[{i+1}/{len(items)}] media_id={item['media_id']}")
        all_rows.extend(process_video(item, cfg, args.ffmpeg_dir))
 
    with open(cfg["data"]["labels_csv"], "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["frame_path", "label", "task", "split"])
        for row in all_rows:
            w.writerow(row)
    print(f"写入 {len(all_rows)} 行 -> {cfg['data']['labels_csv']}")
 
 
if __name__ == "__main__":
    main()