#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 设计师派单系统数据库 存储派单记录和任务队列 """ import sqlite3 from datetime import datetime from pathlib import Path import uuid # 数据库路径 DB_PATH = Path("/root/tuhui/backend/dispatch.db") def init_db(): """初始化数据库""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 创建派单任务表 cursor.execute(''' CREATE TABLE IF NOT EXISTS dispatch_tasks ( id TEXT PRIMARY KEY, task_name TEXT NOT NULL, task_description TEXT, task_type TEXT DEFAULT 'design', priority INTEGER DEFAULT 1, status TEXT NOT NULL CHECK (status IN ('pending', 'assigned', 'in_progress', 'completed', 'cancelled')), assigned_to TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, assigned_at DATETIME, started_at DATETIME, completed_at DATETIME, deadline DATETIME, metadata TEXT ) ''') # 创建派单历史记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS dispatch_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, designer_name TEXT NOT NULL, action TEXT NOT NULL, action_time DATETIME DEFAULT CURRENT_TIMESTAMP, notes TEXT ) ''') # 创建设计师工作量统计表 cursor.execute(''' CREATE TABLE IF NOT EXISTS designer_workload ( id INTEGER PRIMARY KEY AUTOINCREMENT, designer_name TEXT UNIQUE NOT NULL, total_tasks INTEGER DEFAULT 0, completed_tasks INTEGER DEFAULT 0, pending_tasks INTEGER DEFAULT 0, last_updated DATETIME DEFAULT CURRENT_TIMESTAMP ) '''); conn.commit() conn.close() print("✅ 派单数据库初始化完成") def create_task(task_name: str, description: str = "", task_type: str = "design", priority: int = 1, deadline: str = None) -> str: """创建派单任务""" task_id = str(uuid.uuid4())[:8] conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' INSERT INTO dispatch_tasks (id, task_name, task_description, task_type, priority, status, deadline) VALUES (?, ?, ?, ?, ?, 'pending', ?) ''', (task_id, task_name, description, task_type, priority, deadline)) conn.commit() conn.close() print(f"✅ 已创建任务:{task_id} - {task_name}") return task_id def assign_task(task_id: str, designer_name: str): """分配任务给设计师""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 更新任务状态 cursor.execute(''' UPDATE dispatch_tasks SET status = 'assigned', assigned_to = ?, assigned_at = ? WHERE id = ? ''', (designer_name, datetime.now(), task_id)) # 记录历史 cursor.execute(''' INSERT INTO dispatch_history (task_id, designer_name, action, notes) VALUES (?, ?, 'assigned', ?) ''', (task_id, designer_name, f"任务分配给 {designer_name}")) # 更新工作量 cursor.execute(''' INSERT OR REPLACE INTO designer_workload (designer_name, pending_tasks, total_tasks, last_updated) VALUES ( ?, COALESCE((SELECT pending_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1, COALESCE((SELECT total_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1, ? ) ''', (designer_name, designer_name, designer_name, datetime.now())) conn.commit() conn.close() print(f"✅ 任务 {task_id} 已分配给 {designer_name}") def get_pending_tasks(limit: int = 10) -> list: """获取待分配任务""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT id, task_name, task_description, task_type, priority, created_at, deadline FROM dispatch_tasks WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT ? ''', (limit,)) tasks = cursor.fetchall() conn.close() return [ { "id": row[0], "task_name": row[1], "description": row[2], "type": row[3], "priority": row[4], "created_at": row[5], "deadline": row[6] } for row in tasks ] def get_task_status(task_id: str) -> dict: """查询任务状态""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT id, task_name, status, assigned_to, created_at, assigned_at, completed_at FROM dispatch_tasks WHERE id = ? ''', (task_id,)) row = cursor.fetchone() conn.close() if row: return { "task_id": row[0], "task_name": row[1], "status": row[2], "assigned_to": row[3], "created_at": row[4], "assigned_at": row[5], "completed_at": row[6] } return None def complete_task(task_id: str, notes: str = ""): """完成任务""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 获取任务信息 cursor.execute('SELECT assigned_to FROM dispatch_tasks WHERE id = ?', (task_id,)) row = cursor.fetchone() if row and row[0]: designer_name = row[0] # 更新任务状态 cursor.execute(''' UPDATE dispatch_tasks SET status = 'completed', completed_at = ? WHERE id = ? ''', (datetime.now(), task_id)) # 记录历史 cursor.execute(''' INSERT INTO dispatch_history (task_id, designer_name, action, notes) VALUES (?, ?, 'completed', ?) ''', (task_id, designer_name, notes)) # 更新工作量 cursor.execute(''' UPDATE designer_workload SET pending_tasks = pending_tasks - 1, completed_tasks = completed_tasks + 1, last_updated = ? WHERE designer_name = ? ''', (datetime.now(), designer_name)) conn.commit() conn.close() print(f"✅ 任务 {task_id} 已完成") def get_designer_workload() -> list: """获取设计师工作量统计""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT designer_name, total_tasks, completed_tasks, pending_tasks, last_updated FROM designer_workload ORDER BY total_tasks DESC ''') workload = cursor.fetchall() conn.close() return [ { "designer": row[0], "total": row[1], "completed": row[2], "pending": row[3], "last_updated": row[4] } for row in workload ] if __name__ == "__main__": # 初始化数据库 init_db() # 创建测试任务 task1 = create_task("海报设计", "设计一个产品宣传海报", "design", priority=2) task2 = create_task("Logo 设计", "设计公司 Logo", "design", priority=3) task3 = create_task("详情页设计", "电商产品详情页", "design", priority=1) # 分配任务 assign_task(task1, "橘子") assign_task(task2, "婷婷") # 查询待分配任务 print("\n📋 待分配任务:") for task in get_pending_tasks(): print(f" {task['id']} - {task['task_name']} (优先级:{task['priority']})") # 查询任务状态 print("\n📊 任务状态:") status = get_task_status(task1) print(f" {status}") # 查询工作量 print("\n💼 设计师工作量:") for w in get_designer_workload(): print(f" {w['designer']}: 总{w['total']} 完成{w['completed']} 待处理{w['pending']}")