from fastapi import FastAPI, HTTPException, Header, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List from datetime import datetime import sqlite3 from pathlib import Path # 数据库路径 DB_PATH = Path("/root/tuhui/backend/dispatch.db") DESIGNER_DB_PATH = Path("/root/tuhui/backend/designer_status.db") # API 密钥(生产环境应该从环境变量读取) API_KEY = "tuhui_dispatch_key_2026" app = FastAPI( title="图汇派单系统 API", description="专业的设计师派单管理系统", version="1.0.0" ) # CORS 配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ========== 数据模型 ========== class TaskCreate(BaseModel): task_name: str description: str = "" task_type: str = "design" priority: int = 1 deadline: Optional[str] = None class TaskAssign(BaseModel): designer_name: str notes: Optional[str] = "" class TaskComplete(BaseModel): notes: Optional[str] = "" # ========== 认证依赖 ========== def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")): """验证 API Key""" if x_api_key != API_KEY: raise HTTPException(status_code=401, detail="Invalid API Key") return x_api_key # ========== 数据库辅助函数 ========== def get_db_connection(db_path: Path): """获取数据库连接""" conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row return conn def get_online_designers() -> List[str]: """获取在线设计师""" conn = get_db_connection(DESIGNER_DB_PATH) cursor = conn.cursor() cursor.execute("SELECT real_name FROM designer_status WHERE status = 'online'") designers = [row["real_name"] for row in cursor.fetchall()] conn.close() return designers # ========== API 接口 ========== @app.get("/health") def health_check(): """健康检查""" return {"status": "ok", "timestamp": datetime.now().isoformat()} @app.get("/online/designers") def get_online_designers_api(): """ 获取在线设计师列表 返回当前所有在线的设计师名单 """ designers = get_online_designers() return { "online_count": len(designers), "online_users": designers, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } @app.post("/tasks", tags=["任务管理"]) def create_task(task: TaskCreate, api_key: str = Depends(verify_api_key)): """ 创建派单任务 - **task_name**: 任务名称 - **description**: 任务描述 - **task_type**: 任务类型(design/design_review/other) - **priority**: 优先级(1-5,5 最高) - **deadline**: 截止时间(可选) """ import uuid task_id = str(uuid.uuid4())[:8] conn = get_db_connection(DB_PATH) cursor = conn.cursor() cursor.execute(''' INSERT INTO dispatch_tasks (id, task_name, task_description, task_type, priority, status, deadline) VALUES (?, ?, ?, ?, ?, 'pending', ?) ''', (task_id, task.task_name, task.description, task.task_type, task.priority, task.deadline)) conn.commit() conn.close() return { "task_id": task_id, "task_name": task.task_name, "status": "pending", "created_at": datetime.now().isoformat() } @app.get("/tasks/pending", tags=["任务管理"]) def get_pending_tasks(limit: int = 10, api_key: str = Depends(verify_api_key)): """ 获取待分配任务列表 - **limit**: 返回数量限制(默认 10) """ conn = get_db_connection(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT id, task_name, task_description, task_type, priority, created_at, deadline FROM dispatch_tasks WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT ? ''', (limit,)) tasks = [] for row in cursor.fetchall(): tasks.append({ "task_id": row["id"], "task_name": row["task_name"], "description": row["task_description"], "type": row["task_type"], "priority": row["priority"], "created_at": row["created_at"], "deadline": row["deadline"] }) conn.close() return { "total": len(tasks), "tasks": tasks } @app.post("/tasks/{task_id}/assign", tags=["任务管理"]) def assign_task(task_id: str, assign_data: TaskAssign, api_key: str = Depends(verify_api_key)): """ 分配任务给设计师 - **task_id**: 任务 ID - **designer_name**: 设计师姓名 - **notes**: 备注(可选) """ conn = get_db_connection(DB_PATH) cursor = conn.cursor() # 检查任务是否存在 cursor.execute('SELECT id, status FROM dispatch_tasks WHERE id = ?', (task_id,)) task = cursor.fetchone() if not task: conn.close() raise HTTPException(status_code=404, detail="Task not found") if task["status"] != "pending": conn.close() raise HTTPException(status_code=400, detail=f"Task status is {task['status']}, cannot assign") # 更新任务状态 cursor.execute(''' UPDATE dispatch_tasks SET status = 'assigned', assigned_to = ?, assigned_at = ? WHERE id = ? ''', (assign_data.designer_name, datetime.now(), task_id)) # 记录历史 cursor.execute(''' INSERT INTO dispatch_history (task_id, designer_name, action, notes) VALUES (?, ?, 'assigned', ?) ''', (task_id, assign_data.designer_name, assign_data.notes)) # 更新工作量 cursor.execute(''' INSERT OR REPLACE INTO designer_workload (designer_name, pending_tasks, total_tasks, last_updated) VALUES ( ?, COALESCE((SELECT pending_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1, COALESCE((SELECT total_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1, ? ) ''', (assign_data.designer_name, assign_data.designer_name, assign_data.designer_name, datetime.now())) conn.commit() conn.close() return { "task_id": task_id, "assigned_to": assign_data.designer_name, "status": "assigned", "assigned_at": datetime.now().isoformat() } @app.get("/tasks/{task_id}", tags=["任务管理"]) def get_task_status(task_id: str, api_key: str = Depends(verify_api_key)): """ 查询任务状态 """ conn = get_db_connection(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT id, task_name, status, assigned_to, created_at, assigned_at, completed_at, deadline FROM dispatch_tasks WHERE id = ? ''', (task_id,)) row = cursor.fetchone() conn.close() if not row: raise HTTPException(status_code=404, detail="Task not found") return { "task_id": row["id"], "task_name": row["task_name"], "status": row["status"], "assigned_to": row["assigned_to"], "created_at": row["created_at"], "assigned_at": row["assigned_at"], "completed_at": row["completed_at"], "deadline": row["deadline"] } @app.post("/tasks/{task_id}/complete", tags=["任务管理"]) def complete_task(task_id: str, complete_data: TaskComplete, api_key: str = Depends(verify_api_key)): """ 完成任务 - **task_id**: 任务 ID - **notes**: 完成备注(可选) """ conn = get_db_connection(DB_PATH) cursor = conn.cursor() # 获取任务信息 cursor.execute('SELECT assigned_to, status FROM dispatch_tasks WHERE id = ?', (task_id,)) row = cursor.fetchone() if not row: conn.close() raise HTTPException(status_code=404, detail="Task not found") if row["status"] != "assigned" and row["status"] != "in_progress": conn.close() raise HTTPException(status_code=400, detail=f"Task status is {row['status']}, cannot complete") designer_name = row["assigned_to"] # 更新任务状态 cursor.execute(''' UPDATE dispatch_tasks SET status = 'completed', completed_at = ? WHERE id = ? ''', (datetime.now(), task_id)) # 记录历史 cursor.execute(''' INSERT INTO dispatch_history (task_id, designer_name, action, notes) VALUES (?, ?, 'completed', ?) ''', (task_id, designer_name, complete_data.notes)) # 更新工作量 cursor.execute(''' UPDATE designer_workload SET pending_tasks = pending_tasks - 1, completed_tasks = completed_tasks + 1, last_updated = ? WHERE designer_name = ? ''', (datetime.now(), designer_name)) conn.commit() conn.close() return { "task_id": task_id, "status": "completed", "completed_at": datetime.now().isoformat() } @app.get("/designers/workload", tags=["设计师管理"]) def get_designer_workload(api_key: str = Depends(verify_api_key)): """ 获取设计师工作量统计 """ conn = get_db_connection(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT designer_name, total_tasks, completed_tasks, pending_tasks, last_updated FROM designer_workload ORDER BY total_tasks DESC ''') workload = [] for row in cursor.fetchall(): workload.append({ "designer": row["designer_name"], "total_tasks": row["total_tasks"], "completed_tasks": row["completed_tasks"], "pending_tasks": row["pending_tasks"], "last_updated": row["last_updated"] }) conn.close() return { "total_designers": len(workload), "designers": workload } @app.get("/dispatch/queue", tags=["派单队列"]) def get_dispatch_queue(api_key: str = Depends(verify_api_key)): """ 获取派单队列(待分配任务 + 在线设计师) 用于轮询派单的主接口 """ # 获取待分配任务 conn = get_db_connection(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT id, task_name, task_description, task_type, priority, created_at, deadline FROM dispatch_tasks WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 20 ''') pending_tasks = [] for row in cursor.fetchall(): pending_tasks.append({ "task_id": row["id"], "task_name": row["task_name"], "description": row["task_description"], "type": row["task_type"], "priority": row["priority"], "created_at": row["created_at"], "deadline": row["deadline"] }) conn.close() # 获取在线设计师 online_designers = get_online_designers() return { "timestamp": datetime.now().isoformat(), "pending_tasks": { "count": len(pending_tasks), "tasks": pending_tasks }, "online_designers": { "count": len(online_designers), "designers": online_designers }, "suggestion": online_designers[0] if online_designers and pending_tasks else None } # ========== 主程序 ========== if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001)