Files
tw/db/pending_transfer_db.py

175 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
待转接队列(本地 SQLite
用于在设计师不在线时暂存转接请求,待设计师上线后自动转接。
"""
import os
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict
_DB_PATH = os.path.join(os.path.dirname(__file__), "pending_transfer.db")
def _get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(_DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
os.makedirs(os.path.dirname(_DB_PATH), exist_ok=True)
with _get_conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS pending_transfers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
acc_id TEXT DEFAULT '',
acc_type TEXT DEFAULT '',
platform TEXT DEFAULT 'qianniu',
reason TEXT DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending',
retry_count INTEGER NOT NULL DEFAULT 0,
next_retry_at TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
completed_at TEXT DEFAULT '',
last_error TEXT DEFAULT ''
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_pending_status_retry ON pending_transfers(status, next_retry_at)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_pending_customer_acc ON pending_transfers(customer_id, acc_id)"
)
conn.commit()
init_db()
def enqueue_pending_transfer(
customer_id: str,
acc_id: str,
acc_type: str,
platform: str,
reason: str,
) -> int:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
row = conn.execute(
"""
SELECT id
FROM pending_transfers
WHERE customer_id = ? AND acc_id = ? AND status IN ('pending', 'processing')
ORDER BY id DESC
LIMIT 1
""",
(customer_id, acc_id),
).fetchone()
if row:
conn.execute(
"""
UPDATE pending_transfers
SET acc_type = ?, platform = ?, reason = ?, status = 'pending',
next_retry_at = ?, updated_at = ?, last_error = ''
WHERE id = ?
""",
(acc_type, platform, reason, now, now, row["id"]),
)
conn.commit()
return int(row["id"])
cur = conn.execute(
"""
INSERT INTO pending_transfers
(customer_id, acc_id, acc_type, platform, reason, status, retry_count, next_retry_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?)
""",
(customer_id, acc_id, acc_type, platform, reason, now, now, now),
)
conn.commit()
return int(cur.lastrowid)
def claim_due_pending_transfers(limit: int = 10) -> List[Dict]:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
conn.execute("BEGIN IMMEDIATE")
rows = conn.execute(
"""
SELECT *
FROM pending_transfers
WHERE status = 'pending' AND next_retry_at <= ?
ORDER BY created_at ASC, id ASC
LIMIT ?
""",
(now, limit),
).fetchall()
ids = [int(r["id"]) for r in rows]
if ids:
placeholders = ",".join("?" for _ in ids)
conn.execute(
f"""
UPDATE pending_transfers
SET status = 'processing', updated_at = ?
WHERE id IN ({placeholders})
""",
(now, *ids),
)
conn.commit()
return [dict(r) for r in rows]
def complete_pending_transfer(row_id: int):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
conn.execute(
"""
UPDATE pending_transfers
SET status = 'completed', completed_at = ?, updated_at = ?, last_error = ''
WHERE id = ?
""",
(now, now, row_id),
)
conn.commit()
def retry_pending_transfer(row_id: int, delay_seconds: int = 60, error: str = ""):
now = datetime.now()
next_retry = now + timedelta(seconds=max(delay_seconds, 5))
now_s = now.strftime("%Y-%m-%d %H:%M:%S")
next_s = next_retry.strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
conn.execute(
"""
UPDATE pending_transfers
SET status = 'pending',
retry_count = retry_count + 1,
next_retry_at = ?,
updated_at = ?,
last_error = ?
WHERE id = ?
""",
(next_s, now_s, error[:500], row_id),
)
conn.commit()
def count_open_pending_transfers() -> int:
with _get_conn() as conn:
row = conn.execute(
"""
SELECT COUNT(*) AS cnt
FROM pending_transfers
WHERE status IN ('pending', 'processing')
"""
).fetchone()
return int(row["cnt"] or 0) if row else 0