#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 简单轮询派单系统(数据库版) - 自动轮询派单队列 - 轮流分配设计师(数据库持久化) - 自动重试机制 """ import requests import sqlite3 import time from datetime import datetime from pathlib import Path # 配置 API_BASE = "http://1.12.50.92:8005" API_KEY = "tuhui_dispatch_key_2026" HEADERS = {"X-API-Key": API_KEY} DB_PATH = Path("/root/tuhui/backend/dispatch.db") # 轮询间隔(秒) POLL_INTERVAL = 10 MAX_RETRIES = 3 def log(message: str): """打印日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {message}") def init_db(): """初始化数据库表""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 创建轮询历史表 cursor.execute(''' CREATE TABLE IF NOT EXISTS dispatch_rotation ( id INTEGER PRIMARY KEY AUTOINCREMENT, designer_name TEXT NOT NULL, task_id TEXT NOT NULL, assigned_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() def get_recent_assignments(limit: int = 20) -> list: """获取最近分配的设计师(数据库)""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT designer_name FROM dispatch_rotation ORDER BY assigned_at DESC LIMIT ? ''', (limit,)) designers = [row[0] for row in cursor.fetchall()] conn.close() return designers def record_assignment(designer_name: str, task_id: str): """记录分配历史(数据库)""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' INSERT INTO dispatch_rotation (designer_name, task_id) VALUES (?, ?) ''', (designer_name, task_id)) conn.commit() conn.close() def get_dispatch_queue() -> dict: """获取派单队列""" try: response = requests.get(f"{API_BASE}/dispatch/queue", headers=HEADERS, timeout=5) if response.status_code == 200: return response.json() except Exception as e: log(f"❌ 获取队列失败:{e}") return None def select_designer(online_designers: list) -> str: """ 轮流选择设计师 找最近没被分配过的设计师 """ if not online_designers: return None if len(online_designers) == 1: return online_designers[0] # 获取最近分配记录 recent = get_recent_assignments(limit=20) # 找最近没被分配的 for designer in online_designers: if designer not in recent: return designer # 如果都被分配过了,选第一个 return online_designers[0] def assign_task(task_id: str, designer: str) -> bool: """分配任务""" for attempt in range(MAX_RETRIES): try: response = requests.post( f"{API_BASE}/tasks/{task_id}/assign", headers=HEADERS, json={ "designer_name": designer, "notes": "系统自动派单" }, timeout=5 ) if response.status_code == 200: # 记录到数据库 record_assignment(designer, task_id) return True else: log(f"❌ 派单失败:{response.text}") except Exception as e: log(f"❌ 网络错误:{e}") if attempt < MAX_RETRIES - 1: time.sleep(2) return False def poll_and_dispatch(): """主轮询函数""" # 初始化数据库 init_db() log("🚀 轮询派单系统启动(数据库版)") log(f"轮询间隔:{POLL_INTERVAL}秒") log(f"数据库:{DB_PATH}") while True: try: # 获取派单队列 queue = get_dispatch_queue() if not queue: time.sleep(POLL_INTERVAL) continue pending_count = queue["pending_tasks"]["count"] online_count = queue["online_designers"]["count"] tasks = queue["pending_tasks"]["tasks"] designers = queue["online_designers"]["designers"] # 没任务 if pending_count == 0: log("📭 暂无待分配任务") time.sleep(POLL_INTERVAL) continue # 没人在线 if online_count == 0: log("😴 暂无设计师在线") time.sleep(POLL_INTERVAL) continue log(f"📋 待分配:{pending_count} 个 | 👥 在线:{online_count} 人 - {', '.join(designers)}") # 分配任务 assigned = 0 for task in tasks: designer = select_designer(designers) if designer: success = assign_task(task["task_id"], designer) if success: log(f"✅ {task['task_name']} → {designer}") assigned += 1 time.sleep(0.5) else: break log(f"📊 本轮分配:{assigned}/{pending_count} 个") except Exception as e: log(f"❌ 异常:{e}") time.sleep(POLL_INTERVAL) if __name__ == "__main__": try: poll_and_dispatch() except KeyboardInterrupt: log("👋 已停止")