doum
昨天 ce44d803b73a65b2cc31db5bcc662139029463d3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""从 MySQL collection_media 导出待标注视频清单(CSV + JSONL 模板)。"""
import argparse
import csv
import json
import os
import sys
 
try:
    import pymysql
except ImportError:
    pymysql = None
 
 
def export_from_mysql(host, port, user, password, database, ftp_prefix, limit, out_csv, out_jsonl):
    if pymysql is None:
        print("请安装 pymysql: py -m pip install pymysql", file=sys.stderr)
        sys.exit(1)
    conn = pymysql.connect(
        host=host, port=port, user=user, password=password,
        database=database, charset="utf8mb4",
    )
    sql = """
        SELECT id, file_name, file_path_local, start_time, end_time, recorder_sn
        FROM collection_media
        WHERE isdeleted = 0 AND download_status = 1 AND media_type = 0
          AND file_path_local IS NOT NULL AND file_path_local != ''
        ORDER BY id DESC
        LIMIT %s
    """
    with conn.cursor() as cur:
        cur.execute(sql, (limit,))
        rows = cur.fetchall()
    conn.close()
 
    media_folder = os.environ.get("COLLECTION_MEDIA_FOLDER", "/collection_media/")
    if not media_folder.endswith("/"):
        media_folder += "/"
 
    records = []
    for row in rows:
        media_id, file_name, file_path_local, start_time, end_time, recorder_sn = row
        video_url = ftp_prefix.rstrip("/") + "/" + media_folder.lstrip("/") + file_path_local.lstrip("/")
        driver_date = ""
        if start_time:
            driver_date = f"{recorder_sn or 'unknown'}_{start_time.strftime('%Y%m%d')}"
        records.append({
            "media_id": media_id,
            "file_name": file_name or "",
            "video_path": video_url,
            "recorder_sn": recorder_sn or "",
            "driver_date": driver_date,
            "storefront_time_sec": "",
            "handover_time_sec": "",
            "store_type": "",
            "has_voice_marker": "",
            "split": "",
            "notes": "",
        })
 
    os.makedirs(os.path.dirname(out_csv) or ".", exist_ok=True)
    with open(out_csv, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=list(records[0].keys()) if records else [])
        writer.writeheader()
        writer.writerows(records)
 
    with open(out_jsonl, "w", encoding="utf-8") as f:
        for r in records:
            template = {
                "media_id": r["media_id"],
                "video_path": r["video_path"],
                "storefront_time_sec": 0.0,
                "handover_time_sec": 0.0,
                "store_type": "",
                "has_voice_marker": False,
                "driver_date": r["driver_date"],
                "split": "train",
                "notes": "TODO: 填写标注",
            }
            f.write(json.dumps(template, ensure_ascii=False) + "\n")
 
    print(f"导出 {len(records)} 条 -> {out_csv}, {out_jsonl}")
 
 
def export_from_csv(in_csv, out_jsonl):
    records = []
    with open(in_csv, newline="", encoding="utf-8-sig") as f:
        for row in csv.DictReader(f):
            records.append(row)
    with open(out_jsonl, "w", encoding="utf-8") as f:
        for r in records:
            item = {
                "media_id": int(r["media_id"]),
                "video_path": r.get("video_path") or r.get("video_url", ""),
                "storefront_time_sec": float(r["storefront_time_sec"]) if r.get("storefront_time_sec") else 0.0,
                "handover_time_sec": float(r["handover_time_sec"]) if r.get("handover_time_sec") else 0.0,
                "store_type": r.get("store_type", ""),
                "has_voice_marker": str(r.get("has_voice_marker", "")).lower() in ("1", "true", "yes"),
                "driver_date": r.get("driver_date", ""),
                "split": r.get("split") or "train",
                "notes": r.get("notes", ""),
            }
            f.write(json.dumps(item, ensure_ascii=False) + "\n")
    print(f"转换 {len(records)} 条 -> {out_jsonl}")
 
 
def main():
    parser = argparse.ArgumentParser(description="导出 collection_media 待标注清单")
    parser.add_argument("--mysql", action="store_true", help="从 MySQL 读取")
    parser.add_argument("--host", default=os.environ.get("MYSQL_HOST", "127.0.0.1"))
    parser.add_argument("--port", type=int, default=int(os.environ.get("MYSQL_PORT", "3306")))
    parser.add_argument("--user", default=os.environ.get("MYSQL_USER", "root"))
    parser.add_argument("--password", default=os.environ.get("MYSQL_PASSWORD", ""))
    parser.add_argument("--database", default=os.environ.get("MYSQL_DATABASE", "wuhuyancao"))
    parser.add_argument("--ftp-prefix", default=os.environ.get("FTP_RESOURCE_PREFIX", "http://127.0.0.1/files"))
    parser.add_argument("--limit", type=int, default=200)
    parser.add_argument("--out-csv", default="data/annotation_tasks.csv")
    parser.add_argument("--out-jsonl", default="data/annotations_template.jsonl")
    parser.add_argument("--from-csv", help="从已填 CSV 转 JSONL")
    args = parser.parse_args()
 
    if args.from_csv:
        export_from_csv(args.from_csv, args.out_jsonl)
    elif args.mysql:
        export_from_mysql(
            args.host, args.port, args.user, args.password, args.database,
            args.ftp_prefix, args.limit, args.out_csv, args.out_jsonl,
        )
    else:
        print("请指定 --mysql 或 --from-csv", file=sys.stderr)
        sys.exit(1)
 
 
if __name__ == "__main__":
    main()