Files
tuhui.cloud/backend/dispatch_api.py
2026-03-08 19:28:32 +08:00

390 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
import sqlite3
from pathlib import Path
# 数据库路径
DB_PATH = Path("/root/tuhui/backend/dispatch.db")
DESIGNER_DB_PATH = Path("/root/tuhui/backend/designer_status.db")
# API 密钥(生产环境应该从环境变量读取)
API_KEY = "tuhui_dispatch_key_2026"
app = FastAPI(
title="图汇派单系统 API",
description="专业的设计师派单管理系统",
version="1.0.0"
)
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ========== 数据模型 ==========
class TaskCreate(BaseModel):
task_name: str
description: str = ""
task_type: str = "design"
priority: int = 1
deadline: Optional[str] = None
class TaskAssign(BaseModel):
designer_name: str
notes: Optional[str] = ""
class TaskComplete(BaseModel):
notes: Optional[str] = ""
# ========== 认证依赖 ==========
def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")):
"""验证 API Key"""
if x_api_key != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API Key")
return x_api_key
# ========== 数据库辅助函数 ==========
def get_db_connection(db_path: Path):
"""获取数据库连接"""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
return conn
def get_online_designers() -> List[str]:
"""获取在线设计师"""
conn = get_db_connection(DESIGNER_DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT real_name FROM designer_status WHERE status = 'online'")
designers = [row["real_name"] for row in cursor.fetchall()]
conn.close()
return designers
# ========== API 接口 ==========
@app.get("/health")
def health_check():
"""健康检查"""
return {"status": "ok", "timestamp": datetime.now().isoformat()}
@app.get("/online/designers")
def get_online_designers_api():
"""
获取在线设计师列表
返回当前所有在线的设计师名单
"""
designers = get_online_designers()
return {
"online_count": len(designers),
"online_users": designers,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
@app.post("/tasks", tags=["任务管理"])
def create_task(task: TaskCreate, api_key: str = Depends(verify_api_key)):
"""
创建派单任务
- **task_name**: 任务名称
- **description**: 任务描述
- **task_type**: 任务类型design/design_review/other
- **priority**: 优先级1-55 最高)
- **deadline**: 截止时间(可选)
"""
import uuid
task_id = str(uuid.uuid4())[:8]
conn = get_db_connection(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO dispatch_tasks (id, task_name, task_description, task_type, priority, status, deadline)
VALUES (?, ?, ?, ?, ?, 'pending', ?)
''', (task_id, task.task_name, task.description, task.task_type, task.priority, task.deadline))
conn.commit()
conn.close()
return {
"task_id": task_id,
"task_name": task.task_name,
"status": "pending",
"created_at": datetime.now().isoformat()
}
@app.get("/tasks/pending", tags=["任务管理"])
def get_pending_tasks(limit: int = 10, api_key: str = Depends(verify_api_key)):
"""
获取待分配任务列表
- **limit**: 返回数量限制(默认 10
"""
conn = get_db_connection(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT id, task_name, task_description, task_type, priority, created_at, deadline
FROM dispatch_tasks
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT ?
''', (limit,))
tasks = []
for row in cursor.fetchall():
tasks.append({
"task_id": row["id"],
"task_name": row["task_name"],
"description": row["task_description"],
"type": row["task_type"],
"priority": row["priority"],
"created_at": row["created_at"],
"deadline": row["deadline"]
})
conn.close()
return {
"total": len(tasks),
"tasks": tasks
}
@app.post("/tasks/{task_id}/assign", tags=["任务管理"])
def assign_task(task_id: str, assign_data: TaskAssign, api_key: str = Depends(verify_api_key)):
"""
分配任务给设计师
- **task_id**: 任务 ID
- **designer_name**: 设计师姓名
- **notes**: 备注(可选)
"""
conn = get_db_connection(DB_PATH)
cursor = conn.cursor()
# 检查任务是否存在
cursor.execute('SELECT id, status FROM dispatch_tasks WHERE id = ?', (task_id,))
task = cursor.fetchone()
if not task:
conn.close()
raise HTTPException(status_code=404, detail="Task not found")
if task["status"] != "pending":
conn.close()
raise HTTPException(status_code=400, detail=f"Task status is {task['status']}, cannot assign")
# 更新任务状态
cursor.execute('''
UPDATE dispatch_tasks
SET status = 'assigned', assigned_to = ?, assigned_at = ?
WHERE id = ?
''', (assign_data.designer_name, datetime.now(), task_id))
# 记录历史
cursor.execute('''
INSERT INTO dispatch_history (task_id, designer_name, action, notes)
VALUES (?, ?, 'assigned', ?)
''', (task_id, assign_data.designer_name, assign_data.notes))
# 更新工作量
cursor.execute('''
INSERT OR REPLACE INTO designer_workload (designer_name, pending_tasks, total_tasks, last_updated)
VALUES (
?,
COALESCE((SELECT pending_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1,
COALESCE((SELECT total_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1,
?
)
''', (assign_data.designer_name, assign_data.designer_name, assign_data.designer_name, datetime.now()))
conn.commit()
conn.close()
return {
"task_id": task_id,
"assigned_to": assign_data.designer_name,
"status": "assigned",
"assigned_at": datetime.now().isoformat()
}
@app.get("/tasks/{task_id}", tags=["任务管理"])
def get_task_status(task_id: str, api_key: str = Depends(verify_api_key)):
"""
查询任务状态
"""
conn = get_db_connection(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT id, task_name, status, assigned_to, created_at, assigned_at, completed_at, deadline
FROM dispatch_tasks
WHERE id = ?
''', (task_id,))
row = cursor.fetchone()
conn.close()
if not row:
raise HTTPException(status_code=404, detail="Task not found")
return {
"task_id": row["id"],
"task_name": row["task_name"],
"status": row["status"],
"assigned_to": row["assigned_to"],
"created_at": row["created_at"],
"assigned_at": row["assigned_at"],
"completed_at": row["completed_at"],
"deadline": row["deadline"]
}
@app.post("/tasks/{task_id}/complete", tags=["任务管理"])
def complete_task(task_id: str, complete_data: TaskComplete, api_key: str = Depends(verify_api_key)):
"""
完成任务
- **task_id**: 任务 ID
- **notes**: 完成备注(可选)
"""
conn = get_db_connection(DB_PATH)
cursor = conn.cursor()
# 获取任务信息
cursor.execute('SELECT assigned_to, status FROM dispatch_tasks WHERE id = ?', (task_id,))
row = cursor.fetchone()
if not row:
conn.close()
raise HTTPException(status_code=404, detail="Task not found")
if row["status"] != "assigned" and row["status"] != "in_progress":
conn.close()
raise HTTPException(status_code=400, detail=f"Task status is {row['status']}, cannot complete")
designer_name = row["assigned_to"]
# 更新任务状态
cursor.execute('''
UPDATE dispatch_tasks
SET status = 'completed', completed_at = ?
WHERE id = ?
''', (datetime.now(), task_id))
# 记录历史
cursor.execute('''
INSERT INTO dispatch_history (task_id, designer_name, action, notes)
VALUES (?, ?, 'completed', ?)
''', (task_id, designer_name, complete_data.notes))
# 更新工作量
cursor.execute('''
UPDATE designer_workload
SET pending_tasks = pending_tasks - 1,
completed_tasks = completed_tasks + 1,
last_updated = ?
WHERE designer_name = ?
''', (datetime.now(), designer_name))
conn.commit()
conn.close()
return {
"task_id": task_id,
"status": "completed",
"completed_at": datetime.now().isoformat()
}
@app.get("/designers/workload", tags=["设计师管理"])
def get_designer_workload(api_key: str = Depends(verify_api_key)):
"""
获取设计师工作量统计
"""
conn = get_db_connection(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT designer_name, total_tasks, completed_tasks, pending_tasks, last_updated
FROM designer_workload
ORDER BY total_tasks DESC
''')
workload = []
for row in cursor.fetchall():
workload.append({
"designer": row["designer_name"],
"total_tasks": row["total_tasks"],
"completed_tasks": row["completed_tasks"],
"pending_tasks": row["pending_tasks"],
"last_updated": row["last_updated"]
})
conn.close()
return {
"total_designers": len(workload),
"designers": workload
}
@app.get("/dispatch/queue", tags=["派单队列"])
def get_dispatch_queue(api_key: str = Depends(verify_api_key)):
"""
获取派单队列(待分配任务 + 在线设计师)
用于轮询派单的主接口
"""
# 获取待分配任务
conn = get_db_connection(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT id, task_name, task_description, task_type, priority, created_at, deadline
FROM dispatch_tasks
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 20
''')
pending_tasks = []
for row in cursor.fetchall():
pending_tasks.append({
"task_id": row["id"],
"task_name": row["task_name"],
"description": row["task_description"],
"type": row["task_type"],
"priority": row["priority"],
"created_at": row["created_at"],
"deadline": row["deadline"]
})
conn.close()
# 获取在线设计师
online_designers = get_online_designers()
return {
"timestamp": datetime.now().isoformat(),
"pending_tasks": {
"count": len(pending_tasks),
"tasks": pending_tasks
},
"online_designers": {
"count": len(online_designers),
"designers": online_designers
},
"suggestion": online_designers[0] if online_designers and pending_tasks else None
}
# ========== 主程序 ==========
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)