#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""从 annotations.jsonl 生成 labels.csv(帧路径, label, task)。"""
|
import argparse
|
import csv
|
import json
|
import os
|
import random
|
import subprocess
|
import sys
|
from pathlib import Path
|
|
import yaml
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
|
def load_config(path):
|
with open(path, encoding="utf-8") as f:
|
cfg = yaml.safe_load(f)
|
base = Path(path).resolve().parent
|
for key in ("annotations_jsonl", "frames_dir", "labels_csv"):
|
p = cfg["data"][key]
|
if not os.path.isabs(p):
|
cfg["data"][key] = str((base / p).resolve())
|
cfg["data"]["output_dir"] = cfg["export"]["output_dir"]
|
if not os.path.isabs(cfg["export"]["output_dir"]):
|
cfg["export"]["output_dir"] = str((base / cfg["export"]["output_dir"]).resolve())
|
return cfg
|
|
|
def ffprobe_duration(video_path, ffmpeg_dir=""):
|
ffprobe = "ffprobe"
|
if ffmpeg_dir:
|
ffprobe = os.path.join(ffmpeg_dir, "ffprobe.exe" if os.name == "nt" else "ffprobe")
|
cmd = [
|
ffprobe, "-v", "error", "-show_entries", "format=duration",
|
"-of", "default=noprint_wrappers=1:nokey=1", video_path,
|
]
|
try:
|
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True).strip()
|
return float(out) if out else 0.0
|
except Exception:
|
return 0.0
|
|
|
def extract_frame_at(video_path, out_path, sec, ffmpeg_dir=""):
|
ffmpeg = "ffmpeg"
|
if ffmpeg_dir:
|
ffmpeg = os.path.join(ffmpeg_dir, "ffmpeg.exe" if os.name == "nt" else "ffmpeg")
|
os.makedirs(os.path.dirname(out_path), exist_ok=True)
|
cmd = [
|
ffmpeg, "-y", "-ss", str(sec), "-i", video_path,
|
"-frames:v", "1", "-q:v", "2", out_path,
|
]
|
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
|
|
|
def process_video(item, cfg, ffmpeg_dir):
|
media_id = item["media_id"]
|
video_path = item["video_path"]
|
if not os.path.isfile(video_path) and not video_path.startswith("http"):
|
print(f"跳过 media_id={media_id}: 视频不存在 {video_path}")
|
return []
|
|
local_video = video_path
|
tmp_video = None
|
if video_path.startswith("http"):
|
import httpx
|
tmp_video = os.path.join(cfg["data"]["frames_dir"], f"_tmp_{media_id}.mp4")
|
os.makedirs(cfg["data"]["frames_dir"], exist_ok=True)
|
with httpx.stream("GET", video_path, timeout=600.0) as r:
|
r.raise_for_status()
|
with open(tmp_video, "wb") as f:
|
for chunk in r.iter_bytes():
|
f.write(chunk)
|
local_video = tmp_video
|
|
duration = ffprobe_duration(local_video, ffmpeg_dir)
|
if duration <= 0:
|
duration = max(item.get("handover_time_sec", 600), 600)
|
|
sample_fps = cfg["sampling"]["sample_fps"]
|
win = cfg["sampling"]["positive_window_sec"]
|
sf_t = float(item["storefront_time_sec"])
|
ho_t = float(item["handover_time_sec"])
|
|
rows = []
|
t = 0.0
|
step = 1.0 / sample_fps
|
other_count = 0
|
other_budget = max(1, int(duration * sample_fps / cfg["sampling"]["other_downsample_ratio"]))
|
while t <= duration:
|
rel = f"{media_id}/{int(t * 1000)}.jpg"
|
out_path = os.path.join(cfg["data"]["frames_dir"], rel)
|
is_sf = abs(t - sf_t) <= win
|
is_ho = abs(t - ho_t) <= win
|
need_other = not is_sf and not is_ho and other_count < other_budget
|
if is_sf or is_ho or need_other:
|
extract_frame_at(local_video, out_path, t, ffmpeg_dir)
|
if os.path.isfile(out_path):
|
split = item.get("split", "train")
|
sf_label = 1 if is_sf else 0
|
ho_label = 1 if is_ho else 0
|
rows.append((rel, sf_label, "storefront", split))
|
rows.append((rel, ho_label, "handover", split))
|
if need_other:
|
other_count += 1
|
t += step
|
if tmp_video and os.path.isfile(tmp_video):
|
os.remove(tmp_video)
|
return rows
|
|
|
def main():
|
parser = argparse.ArgumentParser()
|
parser.add_argument("-c", "--config", default=str(Path(__file__).parent / "config.yaml"))
|
parser.add_argument("--ffmpeg-dir", default=os.environ.get("FFMPEG_DIR", ""))
|
parser.add_argument("--limit", type=int, default=0)
|
args = parser.parse_args()
|
cfg = load_config(args.config)
|
os.makedirs(cfg["data"]["frames_dir"], exist_ok=True)
|
|
items = []
|
with open(cfg["data"]["annotations_jsonl"], encoding="utf-8") as f:
|
for line in f:
|
line = line.strip()
|
if line:
|
items.append(json.loads(line))
|
if args.limit:
|
items = items[: args.limit]
|
|
all_rows = []
|
for i, item in enumerate(items):
|
if item.get("storefront_time_sec", 0) <= 0 or item.get("handover_time_sec", 0) <= 0:
|
print(f"跳过未标注 media_id={item.get('media_id')}")
|
continue
|
print(f"[{i+1}/{len(items)}] media_id={item['media_id']}")
|
all_rows.extend(process_video(item, cfg, args.ffmpeg_dir))
|
|
with open(cfg["data"]["labels_csv"], "w", newline="", encoding="utf-8") as f:
|
w = csv.writer(f)
|
w.writerow(["frame_path", "label", "task", "split"])
|
for row in all_rows:
|
w.writerow(row)
|
print(f"写入 {len(all_rows)} 行 -> {cfg['data']['labels_csv']}")
|
|
|
if __name__ == "__main__":
|
main()
|