Files
tw/core/workflow_router.py
ZuoWei a6c42d505a feat: 完整功能部署 v1.0
新增功能:
- 天网协作系统 (HTTP API 端口 6060)
- 三种工作流 (查找图片/处理图片/转人工派单)
- 图片任务数据库 (支持客户后续增加需求)
- 图绘派单系统集成 (API: 8005)
- 文字检测与加价 (60-80 元高价值订单)
- 风险评估与接单判断
- 作图失败自动转人工

新增文档:
- 项目功能汇总.md
- 三种工作流功能说明.md
- 文字加价功能说明.md
- 风险评估功能说明.md
- 图片任务数据库功能说明.md
- 图绘派单系统集成说明.md
- 作图失败转接人工说明.md
- DEPLOYMENT.md
- TIANWANG_INTEGRATION.md

核心修改:
- core/pydantic_ai_agent.py
- core/workflow.py
- core/websocket_client.py
- image/image_analyzer.py
- services/service_tuhui_dispatch.py
- db/image_tasks_db.py

版本:v1.0
日期:2026-02-28
2026-02-28 11:20:40 +08:00

94 lines
2.8 KiB
Python

# -*- coding: utf-8 -*-
"""
工作流程路由器
根据客户说的话判断执行哪种工作流
"""
import logging
from typing import Tuple, Optional
import re
logger = logging.getLogger(__name__)
class WorkflowRouter:
"""工作流程路由器"""
def __init__(self):
# 查找图片关键词
self.find_keywords = [
"找一下", "找图", "找原图", "帮我找", "能找到吗",
"有吗", "有没有", "找到", "搜一下"
]
# 处理图片关键词
self.process_keywords = [
"做一下", "处理一下", "安排", "开始做",
"弄一下", "修一下", "P 一下", "P 图"
]
# 无法处理关键词
self.unable_keywords = [
"做不了", "处理不了", "弄不了", "无法处理",
"做不到", "搞不定"
]
def detect_workflow(self, message: str) -> Tuple[str, str]:
"""
检测工作流程类型
Args:
message: 客户消息
Returns:
(workflow_type, confidence)
workflow_type: "find_image" / "process_image" / "transfer_human"
confidence: 置信度 0-1
"""
message_lower = message.lower()
# 1. 检测查找图片
for kw in self.find_keywords:
if kw in message_lower:
logger.info(f"检测到查找图片意图:{kw}")
return ("find_image", 0.9)
# 2. 检测处理图片
for kw in self.process_keywords:
if kw in message_lower:
logger.info(f"检测到处理图片意图:{kw}")
return ("process_image", 0.9)
# 3. 检测无法处理
for kw in self.unable_keywords:
if kw in message_lower:
logger.info(f"检测到无法处理:{kw}")
return ("transfer_human", 0.9)
# 默认返回未知
return ("unknown", 0.5)
def should_find_image(self, message: str) -> bool:
"""是否应该查找图片"""
workflow_type, _ = self.detect_workflow(message)
return workflow_type == "find_image"
def should_process_image(self, message: str) -> bool:
"""是否应该处理图片"""
workflow_type, _ = self.detect_workflow(message)
return workflow_type == "process_image"
def should_transfer_human(self, message: str) -> bool:
"""是否应该转人工"""
workflow_type, _ = self.detect_workflow(message)
return workflow_type == "transfer_human"
# 单例
_router: Optional[WorkflowRouter] = None
def get_workflow_router() -> WorkflowRouter:
"""获取工作流程路由器单例"""
global _router
if _router is None:
_router = WorkflowRouter()
return _router