Files
tuhui.cloud/backend/dispatch_system.py
2026-03-08 19:28:32 +08:00

265 lines
7.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设计师派单系统数据库
存储派单记录和任务队列
"""
import sqlite3
from datetime import datetime
from pathlib import Path
import uuid
# 数据库路径
DB_PATH = Path("/root/tuhui/backend/dispatch.db")
def init_db():
"""初始化数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建派单任务表
cursor.execute('''
CREATE TABLE IF NOT EXISTS dispatch_tasks (
id TEXT PRIMARY KEY,
task_name TEXT NOT NULL,
task_description TEXT,
task_type TEXT DEFAULT 'design',
priority INTEGER DEFAULT 1,
status TEXT NOT NULL CHECK (status IN ('pending', 'assigned', 'in_progress', 'completed', 'cancelled')),
assigned_to TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
assigned_at DATETIME,
started_at DATETIME,
completed_at DATETIME,
deadline DATETIME,
metadata TEXT
)
''')
# 创建派单历史记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS dispatch_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
designer_name TEXT NOT NULL,
action TEXT NOT NULL,
action_time DATETIME DEFAULT CURRENT_TIMESTAMP,
notes TEXT
)
''')
# 创建设计师工作量统计表
cursor.execute('''
CREATE TABLE IF NOT EXISTS designer_workload (
id INTEGER PRIMARY KEY AUTOINCREMENT,
designer_name TEXT UNIQUE NOT NULL,
total_tasks INTEGER DEFAULT 0,
completed_tasks INTEGER DEFAULT 0,
pending_tasks INTEGER DEFAULT 0,
last_updated DATETIME DEFAULT CURRENT_TIMESTAMP
)
''');
conn.commit()
conn.close()
print("✅ 派单数据库初始化完成")
def create_task(task_name: str, description: str = "", task_type: str = "design", priority: int = 1, deadline: str = None) -> str:
"""创建派单任务"""
task_id = str(uuid.uuid4())[:8]
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO dispatch_tasks (id, task_name, task_description, task_type, priority, status, deadline)
VALUES (?, ?, ?, ?, ?, 'pending', ?)
''', (task_id, task_name, description, task_type, priority, deadline))
conn.commit()
conn.close()
print(f"✅ 已创建任务:{task_id} - {task_name}")
return task_id
def assign_task(task_id: str, designer_name: str):
"""分配任务给设计师"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 更新任务状态
cursor.execute('''
UPDATE dispatch_tasks
SET status = 'assigned', assigned_to = ?, assigned_at = ?
WHERE id = ?
''', (designer_name, datetime.now(), task_id))
# 记录历史
cursor.execute('''
INSERT INTO dispatch_history (task_id, designer_name, action, notes)
VALUES (?, ?, 'assigned', ?)
''', (task_id, designer_name, f"任务分配给 {designer_name}"))
# 更新工作量
cursor.execute('''
INSERT OR REPLACE INTO designer_workload (designer_name, pending_tasks, total_tasks, last_updated)
VALUES (
?,
COALESCE((SELECT pending_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1,
COALESCE((SELECT total_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1,
?
)
''', (designer_name, designer_name, designer_name, datetime.now()))
conn.commit()
conn.close()
print(f"✅ 任务 {task_id} 已分配给 {designer_name}")
def get_pending_tasks(limit: int = 10) -> list:
"""获取待分配任务"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT id, task_name, task_description, task_type, priority, created_at, deadline
FROM dispatch_tasks
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT ?
''', (limit,))
tasks = cursor.fetchall()
conn.close()
return [
{
"id": row[0],
"task_name": row[1],
"description": row[2],
"type": row[3],
"priority": row[4],
"created_at": row[5],
"deadline": row[6]
}
for row in tasks
]
def get_task_status(task_id: str) -> dict:
"""查询任务状态"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT id, task_name, status, assigned_to, created_at, assigned_at, completed_at
FROM dispatch_tasks
WHERE id = ?
''', (task_id,))
row = cursor.fetchone()
conn.close()
if row:
return {
"task_id": row[0],
"task_name": row[1],
"status": row[2],
"assigned_to": row[3],
"created_at": row[4],
"assigned_at": row[5],
"completed_at": row[6]
}
return None
def complete_task(task_id: str, notes: str = ""):
"""完成任务"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 获取任务信息
cursor.execute('SELECT assigned_to FROM dispatch_tasks WHERE id = ?', (task_id,))
row = cursor.fetchone()
if row and row[0]:
designer_name = row[0]
# 更新任务状态
cursor.execute('''
UPDATE dispatch_tasks
SET status = 'completed', completed_at = ?
WHERE id = ?
''', (datetime.now(), task_id))
# 记录历史
cursor.execute('''
INSERT INTO dispatch_history (task_id, designer_name, action, notes)
VALUES (?, ?, 'completed', ?)
''', (task_id, designer_name, notes))
# 更新工作量
cursor.execute('''
UPDATE designer_workload
SET pending_tasks = pending_tasks - 1,
completed_tasks = completed_tasks + 1,
last_updated = ?
WHERE designer_name = ?
''', (datetime.now(), designer_name))
conn.commit()
conn.close()
print(f"✅ 任务 {task_id} 已完成")
def get_designer_workload() -> list:
"""获取设计师工作量统计"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT designer_name, total_tasks, completed_tasks, pending_tasks, last_updated
FROM designer_workload
ORDER BY total_tasks DESC
''')
workload = cursor.fetchall()
conn.close()
return [
{
"designer": row[0],
"total": row[1],
"completed": row[2],
"pending": row[3],
"last_updated": row[4]
}
for row in workload
]
if __name__ == "__main__":
# 初始化数据库
init_db()
# 创建测试任务
task1 = create_task("海报设计", "设计一个产品宣传海报", "design", priority=2)
task2 = create_task("Logo 设计", "设计公司 Logo", "design", priority=3)
task3 = create_task("详情页设计", "电商产品详情页", "design", priority=1)
# 分配任务
assign_task(task1, "橘子")
assign_task(task2, "婷婷")
# 查询待分配任务
print("\n📋 待分配任务:")
for task in get_pending_tasks():
print(f" {task['id']} - {task['task_name']} (优先级:{task['priority']})")
# 查询任务状态
print("\n📊 任务状态:")
status = get_task_status(task1)
print(f" {status}")
# 查询工作量
print("\n💼 设计师工作量:")
for w in get_designer_workload():
print(f" {w['designer']}: 总{w['total']} 完成{w['completed']} 待处理{w['pending']}")