Files
tw/core/task_trigger.py
ZuoWei a6c42d505a feat: 完整功能部署 v1.0
新增功能:
- 天网协作系统 (HTTP API 端口 6060)
- 三种工作流 (查找图片/处理图片/转人工派单)
- 图片任务数据库 (支持客户后续增加需求)
- 图绘派单系统集成 (API: 8005)
- 文字检测与加价 (60-80 元高价值订单)
- 风险评估与接单判断
- 作图失败自动转人工

新增文档:
- 项目功能汇总.md
- 三种工作流功能说明.md
- 文字加价功能说明.md
- 风险评估功能说明.md
- 图片任务数据库功能说明.md
- 图绘派单系统集成说明.md
- 作图失败转接人工说明.md
- DEPLOYMENT.md
- TIANWANG_INTEGRATION.md

核心修改:
- core/pydantic_ai_agent.py
- core/workflow.py
- core/websocket_client.py
- image/image_analyzer.py
- services/service_tuhui_dispatch.py
- db/image_tasks_db.py

版本:v1.0
日期:2026-02-28
2026-02-28 11:20:40 +08:00

212 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
"""
任务触发条件匹配引擎
支持多种触发条件类型
"""
import re
import logging
from typing import Dict, List, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class TaskTriggerEngine:
"""任务触发引擎"""
def __init__(self):
# 触发器类型注册表
self.trigger_handlers = {
'customer_reply': self._check_customer_reply,
'customer_keyword': self._check_customer_keyword,
'customer_payment': self._check_customer_payment,
'customer_order': self._check_customer_order,
'time_reach': self._check_time_reach,
'custom_event': self._check_custom_event,
'specified_customer_reply': self._check_specified_customer_reply, # 新增:指定客户回复
}
def check_trigger(self, message: str, trigger: dict, context: dict = None) -> bool:
"""
检查消息是否匹配触发条件
Args:
message: 客户消息内容
trigger: 触发条件配置
context: 上下文信息(客户 ID、店铺 ID 等)
Returns:
是否匹配
"""
trigger_type = trigger.get('type')
if trigger_type not in self.trigger_handlers:
logger.warning(f"未知触发类型:{trigger_type}")
return False
handler = self.trigger_handlers[trigger_type]
try:
return handler(message, trigger, context)
except Exception as e:
logger.error(f"触发条件检查失败:{e}")
return False
def _check_customer_reply(self, message: str, trigger: dict, context: dict = None) -> bool:
"""
检查客户回复指定内容
触发条件:
{
"type": "customer_reply",
"keyword": "好的" // 包含"好的"即匹配
}
"""
keyword = trigger.get('keyword')
if not keyword:
return False
return keyword in message
def _check_customer_keyword(self, message: str, trigger: dict, context: dict = None) -> bool:
"""
检查客户说某关键词
触发条件:
{
"type": "customer_keyword",
"keywords": ["好的", "可以", ""] // 任一关键词匹配
}
"""
keywords = trigger.get('keywords', [])
if isinstance(keywords, str):
import json
keywords = json.loads(keywords)
if not keywords:
return False
return any(kw in message for kw in keywords)
def _check_customer_payment(self, message: str, trigger: dict, context: dict = None) -> bool:
"""
检查客户付款
触发条件:
{
"type": "customer_payment",
"keywords": ["已付款", "拍下了", "已下单", "付款了"]
}
"""
payment_keywords = trigger.get('keywords', [
'已付款', '拍下了', '已下单', '付款了', '已支付', '拍下付款'
])
return any(kw in message for kw in payment_keywords)
def _check_customer_order(self, message: str, trigger: dict, context: dict = None) -> bool:
"""
检查客户下单
触发条件:
{
"type": "customer_order",
"keywords": ["下单了", "拍了", "购买了"]
}
"""
order_keywords = trigger.get('keywords', [
'下单了', '拍了', '购买了', '买了'
])
return any(kw in message for kw in order_keywords)
def _check_time_reach(self, message: str, trigger: dict, context: dict = None) -> bool:
"""
检查到达指定时间
触发条件:
{
"type": "time_reach",
"time": "2026-02-27 09:00:00"
}
"""
target_time = trigger.get('time')
if not target_time:
return False
try:
target = datetime.fromisoformat(target_time)
return datetime.now() >= target
except Exception as e:
logger.error(f"时间解析失败:{e}")
return False
def _check_custom_event(self, message: str, trigger: dict, context: dict = None) -> bool:
"""
自定义事件触发
触发条件:
{
"type": "custom_event",
"event_name": "customer_complaint"
}
"""
# 预留自定义事件接口
logger.info(f"自定义事件触发:{trigger.get('event_name')}")
return True
def _check_specified_customer_reply(self, message: str, trigger: dict, context: dict = None) -> bool:
"""
【新增】指定客户回复指定内容
触发条件:
{
"type": "specified_customer_reply",
"customer_id": "customer_123", // 指定客户 ID
"customer_name": "小明", // 可选:客户名称
"keyword": "好的", // 回复内容
"exact_match": false // 可选:是否精确匹配
}
匹配逻辑:
1. 检查当前消息的客户 ID 是否匹配
2. 检查消息内容是否包含关键词
"""
# 1. 检查客户 ID 是否匹配
specified_customer_id = trigger.get('customer_id')
specified_customer_name = trigger.get('customer_name')
if context:
current_customer_id = context.get('customer_id')
current_customer_name = context.get('customer_name')
# 客户 ID 匹配检查
if specified_customer_id and current_customer_id:
if specified_customer_id != current_customer_id:
return False
# 客户名称匹配检查(可选)
if specified_customer_name and current_customer_name:
if specified_customer_name != current_customer_name:
return False
else:
logger.warning("缺少上下文信息,无法检查指定客户")
return False
# 2. 检查回复内容
keyword = trigger.get('keyword')
if not keyword:
return False
exact_match = trigger.get('exact_match', False)
if exact_match:
# 精确匹配:消息内容完全等于关键词
return message.strip() == keyword.strip()
else:
# 模糊匹配:消息包含关键词
return keyword in message
# 单例
_trigger_engine: Optional[TaskTriggerEngine] = None
def get_trigger_engine() -> TaskTriggerEngine:
"""获取触发引擎单例"""
global _trigger_engine
if _trigger_engine is None:
_trigger_engine = TaskTriggerEngine()
return _trigger_engine