Files
tuhui/backend/smart_dispatch_poller.py
2026-03-08 19:28:32 +08:00

215 lines
7.0 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
智能轮询派单系统
- 自动轮询派单队列
- 负载均衡:轮流分配 + 工作量感知
- 自动重试机制
- 详细日志记录
"""
import requests
import time
import json
from datetime import datetime
from collections import deque
# 配置
API_BASE = "http://1.12.50.92:8005"
API_KEY = "tuhui_dispatch_key_2026"
HEADERS = {"X-API-Key": API_KEY}
# 轮询间隔(秒)
POLL_INTERVAL = 10
MAX_RETRIES = 3
RETRY_DELAY = 3
# 负载均衡:记录最近分配的设计师(轮询分配)
recent_assignments = deque(maxlen=10)
def log(message: str, level: str = "INFO"):
"""打印日志"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def get_online_designers() -> list:
"""获取在线设计师"""
try:
response = requests.get(f"{API_BASE}/online/designers", headers=HEADERS, timeout=5)
if response.status_code == 200:
data = response.json()
return data["online_users"]
except Exception as e:
log(f"获取在线设计师失败:{e}", "ERROR")
return []
def get_pending_tasks(limit: int = 10) -> list:
"""获取待分配任务"""
try:
response = requests.get(f"{API_BASE}/tasks/pending", headers=HEADERS, timeout=5, params={"limit": limit})
if response.status_code == 200:
data = response.json()
return data["tasks"]
except Exception as e:
log(f"获取待分配任务失败:{e}", "ERROR")
return []
def get_designer_workload() -> dict:
"""获取设计师工作量"""
try:
response = requests.get(f"{API_BASE}/designers/workload", headers=HEADERS, timeout=5)
if response.status_code == 200:
data = response.json()
# 转换为字典:设计师 -> 待处理任务数
return {d["designer"]: d["pending_tasks"] for d in data["designers"]}
except Exception as e:
log(f"获取工作量失败:{e}", "ERROR")
return {}
def select_designer_smart(online_designers: list) -> str:
"""
智能选择设计师(负载均衡)
策略:
1. 优先选最近没被分配的设计师(轮询)
2. 如果都有任务,选工作量最少的
3. 如果都没数据,随机选
"""
if not online_designers:
return None
if len(online_designers) == 1:
return online_designers[0]
# 策略 1找最近没被分配的
for designer in online_designers:
if designer not in recent_assignments:
log(f"选择设计师(轮询):{designer}")
return designer
# 策略 2找工作量最少的
workload = get_designer_workload()
min_workload = float('inf')
selected = None
for designer in online_designers:
pending = workload.get(designer, 0)
if pending < min_workload:
min_workload = pending
selected = designer
if selected:
log(f"选择设计师(工作量最少:{min_workload}{selected}")
return selected
# 策略 3随机选第一个
log(f"选择设计师(默认):{online_designers[0]}")
return online_designers[0]
def assign_task_with_retry(task_id: str, designer: str, max_retries: int = MAX_RETRIES) -> bool:
"""带重试的分配任务"""
for attempt in range(max_retries):
try:
response = requests.post(
f"{API_BASE}/tasks/{task_id}/assign",
headers=HEADERS,
json={
"designer_name": designer,
"notes": "系统自动派单"
},
timeout=5
)
if response.status_code == 200:
log(f"✅ 任务 {task_id} 已分配给 {designer}", "SUCCESS")
# 记录分配历史
recent_assignments.append(designer)
return True
else:
log(f"派单失败 (尝试 {attempt+1}/{max_retries}): {response.text}", "ERROR")
except Exception as e:
log(f"网络错误 (尝试 {attempt+1}/{max_retries}): {e}", "ERROR")
if attempt < max_retries - 1:
time.sleep(RETRY_DELAY)
return False
def poll_and_dispatch():
"""主轮询函数"""
log("🚀 智能轮询派单系统启动", "START")
log(f"轮询间隔:{POLL_INTERVAL}")
log(f"API 地址:{API_BASE}")
consecutive_errors = 0
max_consecutive_errors = 10
while True:
try:
# 1. 获取待分配任务
tasks = get_pending_tasks(limit=20)
if not tasks:
log("📭 暂无待分配任务")
consecutive_errors = 0
time.sleep(POLL_INTERVAL)
continue
log(f"📋 待分配任务:{len(tasks)}")
# 2. 获取在线设计师
online_designers = get_online_designers()
if not online_designers:
log("😴 暂无设计师在线")
consecutive_errors = 0
time.sleep(POLL_INTERVAL)
continue
log(f"👥 在线设计师:{len(online_designers)} 人 - {', '.join(online_designers)}")
# 3. 按优先级排序任务
tasks.sort(key=lambda x: x["priority"], reverse=True)
# 4. 智能分配
assigned_count = 0
for task in tasks:
designer = select_designer_smart(online_designers)
if designer:
success = assign_task_with_retry(task["task_id"], designer)
if success:
assigned_count += 1
# 短暂等待,避免并发冲突
time.sleep(0.5)
else:
log(f"❌ 任务 {task['task_id']} 分配失败", "ERROR")
break # 停止分配,下次轮询再试
else:
log("⚠️ 无法选择设计师", "ERROR")
break
log(f"📊 本轮分配:{assigned_count}/{len(tasks)} 个任务", "SUCCESS")
consecutive_errors = 0
except Exception as e:
log(f"❌ 轮询异常:{e}", "ERROR")
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
log(f"⚠️ 连续 {consecutive_errors} 次错误,等待 30 秒...", "WARNING")
time.sleep(30)
consecutive_errors = 0
else:
time.sleep(POLL_INTERVAL)
# 正常轮询间隔
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
try:
poll_and_dispatch()
except KeyboardInterrupt:
log("👋 轮询派单系统已停止", "INFO")