#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""将 Label Studio 导出 JSON 转为训练用 JSONL。"""
|
import argparse
|
import json
|
import sys
|
|
|
def convert(in_path, out_path, default_split="train"):
|
with open(in_path, encoding="utf-8") as f:
|
tasks = json.load(f)
|
if isinstance(tasks, dict):
|
tasks = tasks.get("tasks") or tasks.get("data") or [tasks]
|
|
count = 0
|
with open(out_path, "w", encoding="utf-8") as out:
|
for task in tasks:
|
data = task.get("data") or task
|
media_id = data.get("media_id") or data.get("id")
|
video_path = data.get("video_path") or data.get("video")
|
annotations = task.get("annotations") or []
|
storefront = handover = None
|
for ann in annotations:
|
for r in ann.get("result") or []:
|
if r.get("type") != "timelinelabels":
|
continue
|
labels = (r.get("value") or {}).get("timelinelabels") or []
|
ranges = (r.get("value") or {}).get("ranges") or []
|
if not ranges:
|
continue
|
t = float(ranges[0].get("start", 0))
|
if "storefront" in labels:
|
storefront = t
|
if "handover" in labels:
|
handover = t
|
if storefront is None or handover is None:
|
continue
|
item = {
|
"media_id": int(media_id) if media_id else count,
|
"video_path": video_path,
|
"storefront_time_sec": round(storefront, 2),
|
"handover_time_sec": round(handover, 2),
|
"store_type": data.get("store_type", ""),
|
"has_voice_marker": bool(data.get("has_voice_marker")),
|
"driver_date": data.get("driver_date", ""),
|
"split": data.get("split") or default_split,
|
"notes": data.get("notes", ""),
|
}
|
out.write(json.dumps(item, ensure_ascii=False) + "\n")
|
count += 1
|
print(f"转换 {count} 条 -> {out_path}")
|
|
|
def main():
|
parser = argparse.ArgumentParser()
|
parser.add_argument("input", help="Label Studio 导出 JSON")
|
parser.add_argument("-o", "--output", default="data/annotations.jsonl")
|
parser.add_argument("--split", default="train")
|
args = parser.parse_args()
|
convert(args.input, args.output, args.split)
|
|
|
if __name__ == "__main__":
|
main()
|