175 lines
5.3 KiB
Python
175 lines
5.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
待转接队列(本地 SQLite)
|
||
用于在设计师不在线时暂存转接请求,待设计师上线后自动转接。
|
||
"""
|
||
|
||
import os
|
||
import sqlite3
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Dict
|
||
|
||
_DB_PATH = os.path.join(os.path.dirname(__file__), "pending_transfer.db")
|
||
|
||
|
||
def _get_conn() -> sqlite3.Connection:
|
||
conn = sqlite3.connect(_DB_PATH)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
def init_db():
|
||
os.makedirs(os.path.dirname(_DB_PATH), exist_ok=True)
|
||
with _get_conn() as conn:
|
||
conn.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS pending_transfers (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
customer_id TEXT NOT NULL,
|
||
acc_id TEXT DEFAULT '',
|
||
acc_type TEXT DEFAULT '',
|
||
platform TEXT DEFAULT 'qianniu',
|
||
reason TEXT DEFAULT '',
|
||
status TEXT NOT NULL DEFAULT 'pending',
|
||
retry_count INTEGER NOT NULL DEFAULT 0,
|
||
next_retry_at TEXT NOT NULL,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
completed_at TEXT DEFAULT '',
|
||
last_error TEXT DEFAULT ''
|
||
)
|
||
"""
|
||
)
|
||
conn.execute(
|
||
"CREATE INDEX IF NOT EXISTS idx_pending_status_retry ON pending_transfers(status, next_retry_at)"
|
||
)
|
||
conn.execute(
|
||
"CREATE INDEX IF NOT EXISTS idx_pending_customer_acc ON pending_transfers(customer_id, acc_id)"
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
init_db()
|
||
|
||
|
||
def enqueue_pending_transfer(
|
||
customer_id: str,
|
||
acc_id: str,
|
||
acc_type: str,
|
||
platform: str,
|
||
reason: str,
|
||
) -> int:
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
with _get_conn() as conn:
|
||
row = conn.execute(
|
||
"""
|
||
SELECT id
|
||
FROM pending_transfers
|
||
WHERE customer_id = ? AND acc_id = ? AND status IN ('pending', 'processing')
|
||
ORDER BY id DESC
|
||
LIMIT 1
|
||
""",
|
||
(customer_id, acc_id),
|
||
).fetchone()
|
||
|
||
if row:
|
||
conn.execute(
|
||
"""
|
||
UPDATE pending_transfers
|
||
SET acc_type = ?, platform = ?, reason = ?, status = 'pending',
|
||
next_retry_at = ?, updated_at = ?, last_error = ''
|
||
WHERE id = ?
|
||
""",
|
||
(acc_type, platform, reason, now, now, row["id"]),
|
||
)
|
||
conn.commit()
|
||
return int(row["id"])
|
||
|
||
cur = conn.execute(
|
||
"""
|
||
INSERT INTO pending_transfers
|
||
(customer_id, acc_id, acc_type, platform, reason, status, retry_count, next_retry_at, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?)
|
||
""",
|
||
(customer_id, acc_id, acc_type, platform, reason, now, now, now),
|
||
)
|
||
conn.commit()
|
||
return int(cur.lastrowid)
|
||
|
||
|
||
def claim_due_pending_transfers(limit: int = 10) -> List[Dict]:
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
with _get_conn() as conn:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT *
|
||
FROM pending_transfers
|
||
WHERE status = 'pending' AND next_retry_at <= ?
|
||
ORDER BY created_at ASC, id ASC
|
||
LIMIT ?
|
||
""",
|
||
(now, limit),
|
||
).fetchall()
|
||
|
||
ids = [int(r["id"]) for r in rows]
|
||
if ids:
|
||
placeholders = ",".join("?" for _ in ids)
|
||
conn.execute(
|
||
f"""
|
||
UPDATE pending_transfers
|
||
SET status = 'processing', updated_at = ?
|
||
WHERE id IN ({placeholders})
|
||
""",
|
||
(now, *ids),
|
||
)
|
||
conn.commit()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def complete_pending_transfer(row_id: int):
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
with _get_conn() as conn:
|
||
conn.execute(
|
||
"""
|
||
UPDATE pending_transfers
|
||
SET status = 'completed', completed_at = ?, updated_at = ?, last_error = ''
|
||
WHERE id = ?
|
||
""",
|
||
(now, now, row_id),
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def retry_pending_transfer(row_id: int, delay_seconds: int = 60, error: str = ""):
|
||
now = datetime.now()
|
||
next_retry = now + timedelta(seconds=max(delay_seconds, 5))
|
||
now_s = now.strftime("%Y-%m-%d %H:%M:%S")
|
||
next_s = next_retry.strftime("%Y-%m-%d %H:%M:%S")
|
||
with _get_conn() as conn:
|
||
conn.execute(
|
||
"""
|
||
UPDATE pending_transfers
|
||
SET status = 'pending',
|
||
retry_count = retry_count + 1,
|
||
next_retry_at = ?,
|
||
updated_at = ?,
|
||
last_error = ?
|
||
WHERE id = ?
|
||
""",
|
||
(next_s, now_s, error[:500], row_id),
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def count_open_pending_transfers() -> int:
|
||
with _get_conn() as conn:
|
||
row = conn.execute(
|
||
"""
|
||
SELECT COUNT(*) AS cnt
|
||
FROM pending_transfers
|
||
WHERE status IN ('pending', 'processing')
|
||
"""
|
||
).fetchone()
|
||
return int(row["cnt"] or 0) if row else 0
|