#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""从 MySQL collection_media 导出待标注视频清单(CSV + JSONL 模板)。"""
|
import argparse
|
import csv
|
import json
|
import os
|
import sys
|
|
try:
|
import pymysql
|
except ImportError:
|
pymysql = None
|
|
|
def export_from_mysql(host, port, user, password, database, ftp_prefix, limit, out_csv, out_jsonl):
|
if pymysql is None:
|
print("请安装 pymysql: py -m pip install pymysql", file=sys.stderr)
|
sys.exit(1)
|
conn = pymysql.connect(
|
host=host, port=port, user=user, password=password,
|
database=database, charset="utf8mb4",
|
)
|
sql = """
|
SELECT id, file_name, file_path_local, start_time, end_time, recorder_sn
|
FROM collection_media
|
WHERE isdeleted = 0 AND download_status = 1 AND media_type = 0
|
AND file_path_local IS NOT NULL AND file_path_local != ''
|
ORDER BY id DESC
|
LIMIT %s
|
"""
|
with conn.cursor() as cur:
|
cur.execute(sql, (limit,))
|
rows = cur.fetchall()
|
conn.close()
|
|
media_folder = os.environ.get("COLLECTION_MEDIA_FOLDER", "/collection_media/")
|
if not media_folder.endswith("/"):
|
media_folder += "/"
|
|
records = []
|
for row in rows:
|
media_id, file_name, file_path_local, start_time, end_time, recorder_sn = row
|
video_url = ftp_prefix.rstrip("/") + "/" + media_folder.lstrip("/") + file_path_local.lstrip("/")
|
driver_date = ""
|
if start_time:
|
driver_date = f"{recorder_sn or 'unknown'}_{start_time.strftime('%Y%m%d')}"
|
records.append({
|
"media_id": media_id,
|
"file_name": file_name or "",
|
"video_path": video_url,
|
"recorder_sn": recorder_sn or "",
|
"driver_date": driver_date,
|
"storefront_time_sec": "",
|
"handover_time_sec": "",
|
"store_type": "",
|
"has_voice_marker": "",
|
"split": "",
|
"notes": "",
|
})
|
|
os.makedirs(os.path.dirname(out_csv) or ".", exist_ok=True)
|
with open(out_csv, "w", newline="", encoding="utf-8-sig") as f:
|
writer = csv.DictWriter(f, fieldnames=list(records[0].keys()) if records else [])
|
writer.writeheader()
|
writer.writerows(records)
|
|
with open(out_jsonl, "w", encoding="utf-8") as f:
|
for r in records:
|
template = {
|
"media_id": r["media_id"],
|
"video_path": r["video_path"],
|
"storefront_time_sec": 0.0,
|
"handover_time_sec": 0.0,
|
"store_type": "",
|
"has_voice_marker": False,
|
"driver_date": r["driver_date"],
|
"split": "train",
|
"notes": "TODO: 填写标注",
|
}
|
f.write(json.dumps(template, ensure_ascii=False) + "\n")
|
|
print(f"导出 {len(records)} 条 -> {out_csv}, {out_jsonl}")
|
|
|
def export_from_csv(in_csv, out_jsonl):
|
records = []
|
with open(in_csv, newline="", encoding="utf-8-sig") as f:
|
for row in csv.DictReader(f):
|
records.append(row)
|
with open(out_jsonl, "w", encoding="utf-8") as f:
|
for r in records:
|
item = {
|
"media_id": int(r["media_id"]),
|
"video_path": r.get("video_path") or r.get("video_url", ""),
|
"storefront_time_sec": float(r["storefront_time_sec"]) if r.get("storefront_time_sec") else 0.0,
|
"handover_time_sec": float(r["handover_time_sec"]) if r.get("handover_time_sec") else 0.0,
|
"store_type": r.get("store_type", ""),
|
"has_voice_marker": str(r.get("has_voice_marker", "")).lower() in ("1", "true", "yes"),
|
"driver_date": r.get("driver_date", ""),
|
"split": r.get("split") or "train",
|
"notes": r.get("notes", ""),
|
}
|
f.write(json.dumps(item, ensure_ascii=False) + "\n")
|
print(f"转换 {len(records)} 条 -> {out_jsonl}")
|
|
|
def main():
|
parser = argparse.ArgumentParser(description="导出 collection_media 待标注清单")
|
parser.add_argument("--mysql", action="store_true", help="从 MySQL 读取")
|
parser.add_argument("--host", default=os.environ.get("MYSQL_HOST", "127.0.0.1"))
|
parser.add_argument("--port", type=int, default=int(os.environ.get("MYSQL_PORT", "3306")))
|
parser.add_argument("--user", default=os.environ.get("MYSQL_USER", "root"))
|
parser.add_argument("--password", default=os.environ.get("MYSQL_PASSWORD", ""))
|
parser.add_argument("--database", default=os.environ.get("MYSQL_DATABASE", "wuhuyancao"))
|
parser.add_argument("--ftp-prefix", default=os.environ.get("FTP_RESOURCE_PREFIX", "http://127.0.0.1/files"))
|
parser.add_argument("--limit", type=int, default=200)
|
parser.add_argument("--out-csv", default="data/annotation_tasks.csv")
|
parser.add_argument("--out-jsonl", default="data/annotations_template.jsonl")
|
parser.add_argument("--from-csv", help="从已填 CSV 转 JSONL")
|
args = parser.parse_args()
|
|
if args.from_csv:
|
export_from_csv(args.from_csv, args.out_jsonl)
|
elif args.mysql:
|
export_from_mysql(
|
args.host, args.port, args.user, args.password, args.database,
|
args.ftp_prefix, args.limit, args.out_csv, args.out_jsonl,
|
)
|
else:
|
print("请指定 --mysql 或 --from-csv", file=sys.stderr)
|
sys.exit(1)
|
|
|
if __name__ == "__main__":
|
main()
|