From dff4a8baaa14113dbd35cc83272cac90e0475399 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sun, 1 Mar 2026 15:02:27 +0800 Subject: [PATCH] refactor: split order handling and ai routing flow from agent --- core/ai_reply_flow.py | 135 ++++++++++++++++++++++++++++++++++++++ core/order_flow.py | 59 +++++++++++++++++ core/pydantic_ai_agent.py | 135 ++++---------------------------------- 3 files changed, 207 insertions(+), 122 deletions(-) create mode 100644 core/ai_reply_flow.py create mode 100644 core/order_flow.py diff --git a/core/ai_reply_flow.py b/core/ai_reply_flow.py new file mode 100644 index 0000000..b3bbe28 --- /dev/null +++ b/core/ai_reply_flow.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from core.pydantic_ai_agent import AgentDeps, ConversationState, CustomerMessage, CustomerServiceAgent + + +def select_target_agent(agent: "CustomerServiceAgent", message: "CustomerMessage", state: "ConversationState"): + msg_lower = message.msg.lower() + pricing_kw = ["多少钱", "多少一张", "报价", "给个价", "几块", "价位", "能便宜点吗"] + processing_kw = ["安排", "处理一下", "开始做", "做一下", "尽快", "加急", "付款了", "已付款"] + similar_kw = ["有一样的", "有一样吗", "一样的吗", "类似的", "类似的吗", "同款", "相似", "类似吗"] + order_markers = ["[系统订单信息]", "订单状态", "买家已付款"] + risk_kw = [ + "黄色", + "擦边", + "色情", + "涉黄", + "涉政", + "政治", + "裸", + "不雅", + "天安门", + "政治人物", + "政治事件", + "领导人", + "党政", + "习近平", + "毛泽东", + "邓小平", + "江泽民", + "胡锦涛", + "特朗普", + "拜登", + "普京", + "泽连斯基", + "地图", + "地形图", + "行政区划图", + "卫星地图", + ] + target_agent = agent.agent_after_sale if state.stage == "售后" else agent.agent + risk_hit = any(k in msg_lower for k in risk_kw) or agent._is_political_inquiry(message.msg) or agent._is_map_inquiry(message.msg) + if risk_hit: + return agent.agent_risk + if any(k in message.msg for k in order_markers): + return agent.agent_order + if any(k in msg_lower for k in processing_kw): + return agent.agent_processing + if any(k in msg_lower for k in pricing_kw): + return agent.agent_pricing + if any(k in msg_lower for k in similar_kw): + return agent.agent_similar + return target_agent + + +async def execute_ai_turn( + agent: "CustomerServiceAgent", + *, + message: "CustomerMessage", + state: "ConversationState", + user_prompt: str, + deps: "AgentDeps", + history: list, +) -> str: + target_agent = select_target_agent(agent, message, state) + result = await target_agent.run(user_prompt, deps=deps, message_history=history) + agent.message_histories[message.from_id] = result.all_messages()[-30:] + reply_text = agent._colloquialize_reply(agent._normalize_reply_text(result.output)) + + strategy_reply = agent._negotiation_strategy_reply(message.msg, state) + if strategy_reply: + reply_text = strategy_reply + + try: + from config.config import MIN_PRICE_FLOOR + import re + + offer = None + m = re.search(r"(\d{1,4})\s*(?:元|块|块钱|元钱)\b", message.msg) + if m: + offer = int(m.group(1)) + else: + m2 = re.search(r"(?:能|可以|可否|能否)\s*(\d{1,4})\b", message.msg) + offer = int(m2.group(1)) if m2 else None + st = agent._get_conversation_state(message.from_id) + floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR + if offer is not None and offer < floor: + reply_text = "不好意思" + except Exception: + pass + + try: + from config.config import MIN_PRICE_FLOOR + import re + + st = agent._get_conversation_state(message.from_id) + floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR + + def _adjust(text: str) -> str: + def _repl(m: Any): + num = int(m.group(1)) + adj = max(floor, round(num / 5) * 5) + return m.group(0).replace(str(num), str(adj)) + + patterns = [ + r"按(\d{1,4})元", + r"报价[::]\s*(\d{1,4})\s*元", + r"(\d{1,4})\s*元一张", + r"打包(\d{1,4})\s*元", + ] + t = text + for p in patterns: + t = re.sub(p, _repl, t) + return t + + reply_text = _adjust(reply_text or "") + except Exception: + pass + + for msg in result.new_messages(): + for part in getattr(msg, "parts", []): + part_type = type(part).__name__ + if "ToolCall" in part_type: + print( + f"{agent.C_TOOL}[THINK/TOOL_CALL]{agent.C_RESET} " + f"{getattr(part, 'tool_name', '')}({getattr(part, 'args', '')})" + ) + elif "ToolReturn" in part_type: + ret = str(getattr(part, "content", ""))[:120] + print(f"{agent.C_TOOL}[THINK/TOOL_RETURN]{agent.C_RESET} {ret}") + + print(f"{agent.C_THINK}[THINK/RAW_OUTPUT]{agent.C_RESET} {repr(reply_text)}") + return reply_text diff --git a/core/order_flow.py b/core/order_flow.py new file mode 100644 index 0000000..f5df9ad --- /dev/null +++ b/core/order_flow.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from core.pydantic_ai_agent import AgentResponse, ConversationState, CustomerMessage, CustomerServiceAgent + + +async def handle_order_notification( + agent: "CustomerServiceAgent", + *, + message: "CustomerMessage", + state: "ConversationState", +) -> Optional["AgentResponse"]: + """Handle system order notifications before normal AI dialogue.""" + from core.pydantic_ai_agent import AgentResponse + + if "系统订单信息" not in message.msg and "订单状态" not in message.msg: + return None + + _, order_block = agent._split_customer_text(message.msg) + customer_text, _ = agent._split_customer_text(message.msg) + order = agent._parse_order_info(order_block or message.msg) + pay_status = order.get("pay_status", "") + order_status = order.get("order_status", "") + + paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"] + is_paid = any(kw in pay_status or kw in order_status for kw in paid_keywords) + + if is_paid: + asyncio.create_task(agent._check_order_amount(message.from_id, order, message.acc_id)) + asyncio.create_task( + agent._record_deal_success( + message.from_id, + message.from_name, + message.acc_id, + message.acc_type, + order, + state, + ) + ) + try: + from core.workflow import workflow + + asyncio.create_task( + workflow.trigger_processing_on_payment( + customer_id=message.from_id, + acc_id=message.acc_id, + acc_type=message.acc_type, + ) + ) + except Exception as e: + print(f"[Agent] 触发作图失败: {e}") + elif not customer_text: + print(f"[Agent] 订单通知静默({pay_status or order_status}),跳过回复") + return AgentResponse(reply="", should_reply=False, need_transfer=False) + + return None diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 98b6384..fa20a37 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -27,6 +27,8 @@ from core.quote_state_machine import QuoteStateMachine from services.risk_service import RiskService from core.agent_pre_rules import AgentPreRuleService from core.find_image_flow import handle_find_image_batch_flow +from core.order_flow import handle_order_notification +from core.ai_reply_flow import execute_ai_turn load_dotenv() @@ -1790,41 +1792,9 @@ class CustomerServiceAgent: state.last_update = datetime.now().isoformat() - # 订单通知前置处理 - if "系统订单信息" in message.msg or "订单状态" in message.msg: - _, order_block = self._split_customer_text(message.msg) - customer_text, _ = self._split_customer_text(message.msg) - order = self._parse_order_info(order_block or message.msg) - pay_status = order.get("pay_status", "") - order_status = order.get("order_status", "") - - paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"] - is_paid = any(kw in pay_status or kw in order_status for kw in paid_keywords) - - if is_paid: - # 订单金额核查:对比报价和实付金额 - asyncio.create_task(self._check_order_amount( - message.from_id, order, message.acc_id - )) - # 成交记录:写入数据库供日报分析 - asyncio.create_task(self._record_deal_success( - message.from_id, message.from_name, message.acc_id, message.acc_type, - order, state - )) - # 已付款:触发 Gemini 作图 - try: - from core.workflow import workflow - asyncio.create_task(workflow.trigger_processing_on_payment( - customer_id=message.from_id, - acc_id=message.acc_id, - acc_type=message.acc_type, - )) - except Exception as e: - print(f"[Agent] 触发作图失败: {e}") - elif not customer_text: - # 非付款 + 没有客户文字 → 直接静默,不调用 AI - print(f"[Agent] 订单通知静默({pay_status or order_status}),跳过回复") - return AgentResponse(reply="", should_reply=False, need_transfer=False) + order_response = await handle_order_notification(self, message=message, state=state) + if isinstance(order_response, AgentResponse): + return order_response customer_text, _ = self._split_customer_text(message.msg) shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "") @@ -1874,93 +1844,14 @@ class CustomerServiceAgent: self._log_block("PROMPT->AI 前置提示词", user_prompt) try: - msg_lower = message.msg.lower() - pricing_kw = ["多少钱", "多少一张", "报价", "给个价", "几块", "价位", "能便宜点吗"] - processing_kw = ["安排", "处理一下", "开始做", "做一下", "尽快", "加急", "付款了", "已付款"] - similar_kw = ["有一样的", "有一样吗", "一样的吗", "类似的", "类似的吗", "同款", "相似", "类似吗"] - order_markers = ["[系统订单信息]", "订单状态", "买家已付款"] - risk_kw = [ - "黄色", "擦边", "色情", "涉黄", "涉政", "政治", "裸", "不雅", - "天安门", "政治人物", "政治事件", "领导人", "党政", - "习近平", "毛泽东", "邓小平", "江泽民", "胡锦涛", - "特朗普", "拜登", "普京", "泽连斯基", - "地图", "地形图", "行政区划图", "卫星地图", - ] - target_agent = self.agent_after_sale if state.stage == "售后" else self.agent - risk_hit = any(k in msg_lower for k in risk_kw) or self._is_political_inquiry(message.msg) or self._is_map_inquiry(message.msg) - if risk_hit: - target_agent = self.agent_risk - elif any(k in message.msg for k in order_markers): - target_agent = self.agent_order - elif any(k in msg_lower for k in processing_kw): - target_agent = self.agent_processing - elif any(k in msg_lower for k in pricing_kw): - target_agent = self.agent_pricing - elif any(k in msg_lower for k in similar_kw): - target_agent = self.agent_similar - result = await target_agent.run(user_prompt, deps=deps, message_history=history) - # 更新历史,最多保留最近 30 条消息防止 token 超限 - self.message_histories[message.from_id] = result.all_messages()[-30:] - reply_text = self._colloquialize_reply(self._normalize_reply_text(result.output)) - - # 价格谈判与信任建立固定策略(避免只回“最低了/先拍下”) - strategy_reply = self._negotiation_strategy_reply(message.msg, state) - if strategy_reply: - reply_text = strategy_reply - # 拦截超低杀价:客户报价低于底线时,统一礼貌拒绝 - try: - from config.config import MIN_PRICE_FLOOR - import re - offer = None - m = re.search(r'(\d{1,4})\s*(?:元|块|块钱|元钱)\b', message.msg) - if m: - offer = int(m.group(1)) - else: - m2 = re.search(r'(?:能|可以|可否|能否)\s*(\d{1,4})\b', message.msg) - offer = int(m2.group(1)) if m2 else None - st = self._get_conversation_state(message.from_id) - floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR - if offer is not None and offer < floor: - reply_text = "不好意思" - except Exception: - pass - # 降限:若AI在回复中给出小于底线的报价,提升到>=底线且为5的倍数 - try: - from config.config import MIN_PRICE_FLOOR - st = self._get_conversation_state(message.from_id) - floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR - def _adjust(text: str) -> str: - import re - def _repl(m): - num = int(m.group(1)) - adj = max(floor, round(num / 5) * 5) - return m.group(0).replace(str(num), str(adj)) - patterns = [ - r'按(\d{1,4})元', - r'报价[::]\s*(\d{1,4})\s*元', - r'(\d{1,4})\s*元一张', - r'打包(\d{1,4})\s*元', - ] - t = text - for p in patterns: - t = re.sub(p, _repl, t) - return t - reply_text = _adjust(reply_text or "") - except Exception: - pass - - # 打印工具调用记录 - for msg in result.new_messages(): - for part in getattr(msg, 'parts', []): - part_type = type(part).__name__ - if 'ToolCall' in part_type: - print(f"{self.C_TOOL}[THINK/TOOL_CALL]{self.C_RESET} {getattr(part, 'tool_name', '')}({getattr(part, 'args', '')})") - elif 'ToolReturn' in part_type: - ret = str(getattr(part, 'content', ''))[:120] - print(f"{self.C_TOOL}[THINK/TOOL_RETURN]{self.C_RESET} {ret}") - - print(f"{self.C_THINK}[THINK/RAW_OUTPUT]{self.C_RESET} {repr(reply_text)}") - + reply_text = await execute_ai_turn( + self, + message=message, + state=state, + user_prompt=user_prompt, + deps=deps, + history=history, + ) except Exception as e: err_str = str(e) print(f"[Agent] AI 调用失败: {e},使用兜底回复")