# -*- coding: utf-8 -*- """ 任务触发条件匹配引擎 支持多种触发条件类型 """ import re import logging from typing import Dict, List, Optional from datetime import datetime logger = logging.getLogger(__name__) class TaskTriggerEngine: """任务触发引擎""" def __init__(self): # 触发器类型注册表 self.trigger_handlers = { 'customer_reply': self._check_customer_reply, 'customer_keyword': self._check_customer_keyword, 'customer_payment': self._check_customer_payment, 'customer_order': self._check_customer_order, 'time_reach': self._check_time_reach, 'custom_event': self._check_custom_event, 'specified_customer_reply': self._check_specified_customer_reply, # 新增:指定客户回复 } def check_trigger(self, message: str, trigger: dict, context: dict = None) -> bool: """ 检查消息是否匹配触发条件 Args: message: 客户消息内容 trigger: 触发条件配置 context: 上下文信息(客户 ID、店铺 ID 等) Returns: 是否匹配 """ trigger_type = trigger.get('type') if trigger_type not in self.trigger_handlers: logger.warning(f"未知触发类型:{trigger_type}") return False handler = self.trigger_handlers[trigger_type] try: return handler(message, trigger, context) except Exception as e: logger.error(f"触发条件检查失败:{e}") return False def _check_customer_reply(self, message: str, trigger: dict, context: dict = None) -> bool: """ 检查客户回复指定内容 触发条件: { "type": "customer_reply", "keyword": "好的" // 包含"好的"即匹配 } """ keyword = trigger.get('keyword') if not keyword: return False return keyword in message def _check_customer_keyword(self, message: str, trigger: dict, context: dict = None) -> bool: """ 检查客户说某关键词 触发条件: { "type": "customer_keyword", "keywords": ["好的", "可以", "行"] // 任一关键词匹配 } """ keywords = trigger.get('keywords', []) if isinstance(keywords, str): import json keywords = json.loads(keywords) if not keywords: return False return any(kw in message for kw in keywords) def _check_customer_payment(self, message: str, trigger: dict, context: dict = None) -> bool: """ 检查客户付款 触发条件: { "type": "customer_payment", "keywords": ["已付款", "拍下了", "已下单", "付款了"] } """ payment_keywords = trigger.get('keywords', [ '已付款', '拍下了', '已下单', '付款了', '已支付', '拍下付款' ]) return any(kw in message for kw in payment_keywords) def _check_customer_order(self, message: str, trigger: dict, context: dict = None) -> bool: """ 检查客户下单 触发条件: { "type": "customer_order", "keywords": ["下单了", "拍了", "购买了"] } """ order_keywords = trigger.get('keywords', [ '下单了', '拍了', '购买了', '买了' ]) return any(kw in message for kw in order_keywords) def _check_time_reach(self, message: str, trigger: dict, context: dict = None) -> bool: """ 检查到达指定时间 触发条件: { "type": "time_reach", "time": "2026-02-27 09:00:00" } """ target_time = trigger.get('time') if not target_time: return False try: target = datetime.fromisoformat(target_time) return datetime.now() >= target except Exception as e: logger.error(f"时间解析失败:{e}") return False def _check_custom_event(self, message: str, trigger: dict, context: dict = None) -> bool: """ 自定义事件触发 触发条件: { "type": "custom_event", "event_name": "customer_complaint" } """ # 预留自定义事件接口 logger.info(f"自定义事件触发:{trigger.get('event_name')}") return True def _check_specified_customer_reply(self, message: str, trigger: dict, context: dict = None) -> bool: """ 【新增】指定客户回复指定内容 触发条件: { "type": "specified_customer_reply", "customer_id": "customer_123", // 指定客户 ID "customer_name": "小明", // 可选:客户名称 "keyword": "好的", // 回复内容 "exact_match": false // 可选:是否精确匹配 } 匹配逻辑: 1. 检查当前消息的客户 ID 是否匹配 2. 检查消息内容是否包含关键词 """ # 1. 检查客户 ID 是否匹配 specified_customer_id = trigger.get('customer_id') specified_customer_name = trigger.get('customer_name') if context: current_customer_id = context.get('customer_id') current_customer_name = context.get('customer_name') # 客户 ID 匹配检查 if specified_customer_id and current_customer_id: if specified_customer_id != current_customer_id: return False # 客户名称匹配检查(可选) if specified_customer_name and current_customer_name: if specified_customer_name != current_customer_name: return False else: logger.warning("缺少上下文信息,无法检查指定客户") return False # 2. 检查回复内容 keyword = trigger.get('keyword') if not keyword: return False exact_match = trigger.get('exact_match', False) if exact_match: # 精确匹配:消息内容完全等于关键词 return message.strip() == keyword.strip() else: # 模糊匹配:消息包含关键词 return keyword in message # 单例 _trigger_engine: Optional[TaskTriggerEngine] = None def get_trigger_engine() -> TaskTriggerEngine: """获取触发引擎单例""" global _trigger_engine if _trigger_engine is None: _trigger_engine = TaskTriggerEngine() return _trigger_engine