#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 迁移其余 SQLite 业务库到 MySQL(保留主键): - deal_outcome_db/outcomes.db -> deal_outcomes - designer_roster_db/roster.db -> designers/designer_shops/designer_online/round_robin - image_tasks.db -> image_tasks/requirement_history - task_db/tasks.db -> tasks/task_logs """ from __future__ import annotations import argparse import sqlite3 from pathlib import Path from typing import List, Dict import pymysql MAPPINGS = [ {"sqlite": Path("db/deal_outcome_db/outcomes.db"), "tables": ["deal_outcomes"]}, {"sqlite": Path("db/designer_roster_db/roster.db"), "tables": ["designers", "designer_shops", "designer_online", "round_robin"]}, {"sqlite": Path("db/image_tasks.db"), "tables": ["image_tasks", "task_requirement_changes"]}, {"sqlite": Path("db/task_db/tasks.db"), "tables": ["tasks"]}, ] def mysql_conn(host: str, port: int, user: str, password: str, database: str): return pymysql.connect( host=host, port=port, user=user, password=password, database=database, charset="utf8mb4", autocommit=False, cursorclass=pymysql.cursors.DictCursor, ) def sqlite_table_exists(conn: sqlite3.Connection, table: str) -> bool: row = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,), ).fetchone() return row is not None def sqlite_fetch_all(conn: sqlite3.Connection, table: str) -> List[sqlite3.Row]: conn.row_factory = sqlite3.Row return conn.execute(f"SELECT * FROM {table}").fetchall() def migrate_table(mysql, rows: List[sqlite3.Row], table: str, truncate_target: bool) -> int: if not rows: return 0 cols = list(rows[0].keys()) col_sql = ", ".join(cols) val_sql = ", ".join(["%s"] * len(cols)) sql = f"REPLACE INTO {table} ({col_sql}) VALUES ({val_sql})" if truncate_target: with mysql.cursor() as cur: try: cur.execute(f"TRUNCATE TABLE {table}") except Exception: try: cur.execute(f"DELETE FROM {table}") except Exception: return 0 values = [tuple(r[c] for c in cols) for r in rows] with mysql.cursor() as cur: cur.executemany(sql, values) mysql.commit() return len(values) def main(): p = argparse.ArgumentParser(description="迁移剩余 SQLite 业务库到 MySQL") p.add_argument("--host", required=True) p.add_argument("--port", type=int, default=3306) p.add_argument("--user", required=True) p.add_argument("--password", required=True) p.add_argument("--database", required=True) p.add_argument("--truncate-target", action="store_true") args = p.parse_args() total = 0 with mysql_conn(args.host, args.port, args.user, args.password, args.database) as mconn: for item in MAPPINGS: sp = item["sqlite"] if not sp.exists(): continue sconn = sqlite3.connect(str(sp)) try: for table in item["tables"]: if not sqlite_table_exists(sconn, table): continue rows = sqlite_fetch_all(sconn, table) n = migrate_table(mconn, rows, table, truncate_target=bool(args.truncate_target)) total += n print(f"[MIGRATE] {sp}::{table} -> {n}") finally: sconn.close() print(f"[DONE] migrated total rows: {total}") if __name__ == "__main__": main()