265 lines
7.8 KiB
Python
265 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
设计师派单系统数据库
|
|
存储派单记录和任务队列
|
|
"""
|
|
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import uuid
|
|
|
|
# 数据库路径
|
|
DB_PATH = Path("/root/tuhui/backend/dispatch.db")
|
|
|
|
def init_db():
|
|
"""初始化数据库"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# 创建派单任务表
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS dispatch_tasks (
|
|
id TEXT PRIMARY KEY,
|
|
task_name TEXT NOT NULL,
|
|
task_description TEXT,
|
|
task_type TEXT DEFAULT 'design',
|
|
priority INTEGER DEFAULT 1,
|
|
status TEXT NOT NULL CHECK (status IN ('pending', 'assigned', 'in_progress', 'completed', 'cancelled')),
|
|
assigned_to TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
assigned_at DATETIME,
|
|
started_at DATETIME,
|
|
completed_at DATETIME,
|
|
deadline DATETIME,
|
|
metadata TEXT
|
|
)
|
|
''')
|
|
|
|
# 创建派单历史记录表
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS dispatch_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id TEXT NOT NULL,
|
|
designer_name TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
action_time DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
notes TEXT
|
|
)
|
|
''')
|
|
|
|
# 创建设计师工作量统计表
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS designer_workload (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
designer_name TEXT UNIQUE NOT NULL,
|
|
total_tasks INTEGER DEFAULT 0,
|
|
completed_tasks INTEGER DEFAULT 0,
|
|
pending_tasks INTEGER DEFAULT 0,
|
|
last_updated DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''');
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print("✅ 派单数据库初始化完成")
|
|
|
|
def create_task(task_name: str, description: str = "", task_type: str = "design", priority: int = 1, deadline: str = None) -> str:
|
|
"""创建派单任务"""
|
|
task_id = str(uuid.uuid4())[:8]
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
INSERT INTO dispatch_tasks (id, task_name, task_description, task_type, priority, status, deadline)
|
|
VALUES (?, ?, ?, ?, ?, 'pending', ?)
|
|
''', (task_id, task_name, description, task_type, priority, deadline))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ 已创建任务:{task_id} - {task_name}")
|
|
return task_id
|
|
|
|
def assign_task(task_id: str, designer_name: str):
|
|
"""分配任务给设计师"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# 更新任务状态
|
|
cursor.execute('''
|
|
UPDATE dispatch_tasks
|
|
SET status = 'assigned', assigned_to = ?, assigned_at = ?
|
|
WHERE id = ?
|
|
''', (designer_name, datetime.now(), task_id))
|
|
|
|
# 记录历史
|
|
cursor.execute('''
|
|
INSERT INTO dispatch_history (task_id, designer_name, action, notes)
|
|
VALUES (?, ?, 'assigned', ?)
|
|
''', (task_id, designer_name, f"任务分配给 {designer_name}"))
|
|
|
|
# 更新工作量
|
|
cursor.execute('''
|
|
INSERT OR REPLACE INTO designer_workload (designer_name, pending_tasks, total_tasks, last_updated)
|
|
VALUES (
|
|
?,
|
|
COALESCE((SELECT pending_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1,
|
|
COALESCE((SELECT total_tasks FROM designer_workload WHERE designer_name = ?), 0) + 1,
|
|
?
|
|
)
|
|
''', (designer_name, designer_name, designer_name, datetime.now()))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ 任务 {task_id} 已分配给 {designer_name}")
|
|
|
|
def get_pending_tasks(limit: int = 10) -> list:
|
|
"""获取待分配任务"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, task_name, task_description, task_type, priority, created_at, deadline
|
|
FROM dispatch_tasks
|
|
WHERE status = 'pending'
|
|
ORDER BY priority DESC, created_at ASC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
|
|
tasks = cursor.fetchall()
|
|
conn.close()
|
|
|
|
return [
|
|
{
|
|
"id": row[0],
|
|
"task_name": row[1],
|
|
"description": row[2],
|
|
"type": row[3],
|
|
"priority": row[4],
|
|
"created_at": row[5],
|
|
"deadline": row[6]
|
|
}
|
|
for row in tasks
|
|
]
|
|
|
|
def get_task_status(task_id: str) -> dict:
|
|
"""查询任务状态"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, task_name, status, assigned_to, created_at, assigned_at, completed_at
|
|
FROM dispatch_tasks
|
|
WHERE id = ?
|
|
''', (task_id,))
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row:
|
|
return {
|
|
"task_id": row[0],
|
|
"task_name": row[1],
|
|
"status": row[2],
|
|
"assigned_to": row[3],
|
|
"created_at": row[4],
|
|
"assigned_at": row[5],
|
|
"completed_at": row[6]
|
|
}
|
|
return None
|
|
|
|
def complete_task(task_id: str, notes: str = ""):
|
|
"""完成任务"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# 获取任务信息
|
|
cursor.execute('SELECT assigned_to FROM dispatch_tasks WHERE id = ?', (task_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if row and row[0]:
|
|
designer_name = row[0]
|
|
|
|
# 更新任务状态
|
|
cursor.execute('''
|
|
UPDATE dispatch_tasks
|
|
SET status = 'completed', completed_at = ?
|
|
WHERE id = ?
|
|
''', (datetime.now(), task_id))
|
|
|
|
# 记录历史
|
|
cursor.execute('''
|
|
INSERT INTO dispatch_history (task_id, designer_name, action, notes)
|
|
VALUES (?, ?, 'completed', ?)
|
|
''', (task_id, designer_name, notes))
|
|
|
|
# 更新工作量
|
|
cursor.execute('''
|
|
UPDATE designer_workload
|
|
SET pending_tasks = pending_tasks - 1,
|
|
completed_tasks = completed_tasks + 1,
|
|
last_updated = ?
|
|
WHERE designer_name = ?
|
|
''', (datetime.now(), designer_name))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"✅ 任务 {task_id} 已完成")
|
|
|
|
def get_designer_workload() -> list:
|
|
"""获取设计师工作量统计"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT designer_name, total_tasks, completed_tasks, pending_tasks, last_updated
|
|
FROM designer_workload
|
|
ORDER BY total_tasks DESC
|
|
''')
|
|
|
|
workload = cursor.fetchall()
|
|
conn.close()
|
|
|
|
return [
|
|
{
|
|
"designer": row[0],
|
|
"total": row[1],
|
|
"completed": row[2],
|
|
"pending": row[3],
|
|
"last_updated": row[4]
|
|
}
|
|
for row in workload
|
|
]
|
|
|
|
if __name__ == "__main__":
|
|
# 初始化数据库
|
|
init_db()
|
|
|
|
# 创建测试任务
|
|
task1 = create_task("海报设计", "设计一个产品宣传海报", "design", priority=2)
|
|
task2 = create_task("Logo 设计", "设计公司 Logo", "design", priority=3)
|
|
task3 = create_task("详情页设计", "电商产品详情页", "design", priority=1)
|
|
|
|
# 分配任务
|
|
assign_task(task1, "橘子")
|
|
assign_task(task2, "婷婷")
|
|
|
|
# 查询待分配任务
|
|
print("\n📋 待分配任务:")
|
|
for task in get_pending_tasks():
|
|
print(f" {task['id']} - {task['task_name']} (优先级:{task['priority']})")
|
|
|
|
# 查询任务状态
|
|
print("\n📊 任务状态:")
|
|
status = get_task_status(task1)
|
|
print(f" {status}")
|
|
|
|
# 查询工作量
|
|
print("\n💼 设计师工作量:")
|
|
for w in get_designer_workload():
|
|
print(f" {w['designer']}: 总{w['total']} 完成{w['completed']} 待处理{w['pending']}")
|