From a6b7bf198241aa2500806fa28403d7fbcad898b3 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sun, 1 Mar 2026 16:21:22 +0800 Subject: [PATCH] refactor: extract process_message orchestration from agent --- core/message_orchestrator.py | 110 +++++++++++++++++++++++++++++++++++ core/pydantic_ai_agent.py | 103 +++----------------------------- 2 files changed, 117 insertions(+), 96 deletions(-) create mode 100644 core/message_orchestrator.py diff --git a/core/message_orchestrator.py b/core/message_orchestrator.py new file mode 100644 index 0000000..76bc8f4 --- /dev/null +++ b/core/message_orchestrator.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import asyncio +from typing import Any + +from core.ai_reply_flow import execute_ai_turn +from core.find_image_flow import handle_find_image_batch_flow +from core.order_flow import handle_order_notification +from core.prompt_flow import build_prompt_bundle +from core.reply_finalize_flow import finalize_ai_reply +from utils.metrics_tracker import emit as metrics_emit +from utils.observability import build_trace_id + + +async def process_incoming_message(agent: Any, message: Any) -> Any: + """主消息处理编排:预处理 -> 业务流 -> AI -> 收尾。""" + trace_id = build_trace_id(message.acc_id, message.from_id, message.msg_id, message.msg[:64]) + agent._activity_log( + "agent_inbound", + trace_id=trace_id, + acc_id=message.acc_id, + customer_id=message.from_id, + msg=message.msg, + msg_type=message.msg_type, + ) + metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id) + + state = agent._get_conversation_state(message.from_id) + pre_response = await agent.pre_rule_service.run(message=message, state=state, trace_id=trace_id) + if pre_response is not None: + return pre_response + + new_stage = agent._detect_stage(message.msg) + if new_stage != state.stage: + state.stage = new_stage + + from datetime import datetime + + state.last_update = datetime.now().isoformat() + + order_response = await handle_order_notification(agent, message=message, state=state) + if order_response is not None: + return order_response + + customer_text, _ = agent._split_customer_text(message.msg) + shop_type = agent._get_shop_type(message.acc_id or "", message.goods_name or "") + flow_response = await handle_find_image_batch_flow( + agent, + message=message, + state=state, + customer_text=customer_text, + shop_type=shop_type, + ) + if flow_response is not None: + return flow_response + + prompt_bundle = build_prompt_bundle(agent, message=message, state=state) + user_prompt = prompt_bundle.user_prompt + deps = prompt_bundle.deps + history = prompt_bundle.history + + agent._log_block("PROMPT->AI 前置提示词", user_prompt) + + try: + reply_text = await execute_ai_turn( + agent, + message=message, + state=state, + user_prompt=user_prompt, + deps=deps, + history=history, + ) + except Exception as e: + err_str = str(e) + print(f"[Agent] AI 调用失败: {e},使用兜底回复") + agent._activity_log("agent_ai_error", customer_id=message.from_id, acc_id=message.acc_id, error=err_str) + metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id) + if "AccountOverdueError" in err_str or "overdue" in err_str.lower(): + asyncio.create_task(agent._notify_wechat_overdue()) + else: + asyncio.create_task( + agent._notify_wechat( + f"⚠️ **AI调用异常**\n" + f"客户:{message.from_id}\n" + f"店铺:{message.acc_id}\n" + f"错误:{err_str[:200]}", + tag="AI异常", + ) + ) + reply_text = None + else: + metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id) + + if not reply_text: + fallback_text = await agent._rewrite_reply_with_ai( + message=message, + state=state, + reply="好嘞,你稍等下,我这边看一下", + scene="fallback_reply", + ) + from core.pydantic_ai_agent import AgentResponse + + return AgentResponse(reply=fallback_text, should_reply=True, need_transfer=False) + + return await finalize_ai_reply( + agent, + message=message, + state=state, + reply_text=reply_text, + ) diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 7302cbc..ec94b79 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -75,6 +75,7 @@ from core.batch_quote_helpers import ( ) from core.prompt_builder import build_prompt as build_agent_prompt, split_customer_text from core.image_workflow_router import handle_image_workflow as route_image_workflow +from core.message_orchestrator import process_incoming_message load_dotenv() @@ -729,6 +730,9 @@ class CustomerServiceAgent: _is_political_inquiry = staticmethod(is_political_inquiry) _is_map_inquiry = staticmethod(is_map_inquiry) + _get_shop_type = staticmethod(_get_shop_type) + _notify_wechat = staticmethod(_notify_wechat) + _notify_wechat_overdue = staticmethod(_notify_wechat_overdue) _calc_avg_complexity = staticmethod(calc_avg_complexity) _get_conversation_context = staticmethod(get_conversation_context) @@ -759,101 +763,8 @@ class CustomerServiceAgent: return clean in self._COOLDOWN_PATTERNS async def process_message(self, message: CustomerMessage) -> AgentResponse: - """处理客户消息并生成回复""" - trace_id = build_trace_id(message.acc_id, message.from_id, message.msg_id, message.msg[:64]) - self._activity_log( - "agent_inbound", - trace_id=trace_id, - acc_id=message.acc_id, - customer_id=message.from_id, - msg=message.msg, - msg_type=message.msg_type, - ) - metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id) - # 获取或创建对话状态 - state = self._get_conversation_state(message.from_id) - pre_response = await self.pre_rule_service.run(message=message, state=state, trace_id=trace_id) - if isinstance(pre_response, AgentResponse): - return pre_response - - # 检测售前/售后 - new_stage = self._detect_stage(message.msg) - if new_stage != state.stage: - state.stage = new_stage - - state.last_update = datetime.now().isoformat() - - order_response = await handle_order_notification(self, message=message, state=state) - if isinstance(order_response, AgentResponse): - return order_response - - customer_text, _ = self._split_customer_text(message.msg) - shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "") - flow_response = await handle_find_image_batch_flow( - self, - message=message, - state=state, - customer_text=customer_text, - shop_type=shop_type, - ) - if isinstance(flow_response, AgentResponse): - return flow_response - - prompt_bundle = build_prompt_bundle(self, message=message, state=state) - user_prompt = prompt_bundle.user_prompt - deps = prompt_bundle.deps - history = prompt_bundle.history - - self._log_block("PROMPT->AI 前置提示词", user_prompt) - - try: - reply_text = await execute_ai_turn( - self, - message=message, - state=state, - user_prompt=user_prompt, - deps=deps, - history=history, - ) - except Exception as e: - err_str = str(e) - print(f"[Agent] AI 调用失败: {e},使用兜底回复") - self._activity_log("agent_ai_error", customer_id=message.from_id, acc_id=message.acc_id, error=err_str) - metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id) - if "AccountOverdueError" in err_str or "overdue" in err_str.lower(): - asyncio.create_task(_notify_wechat_overdue()) - else: - asyncio.create_task(_notify_wechat( - f"⚠️ **AI调用异常**\n" - f"客户:{message.from_id}\n" - f"店铺:{message.acc_id}\n" - f"错误:{err_str[:200]}", - tag="AI异常" - )) - reply_text = None - else: - metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id) - - # AI 失败兜底:给一个不出错的万能回复 - if not reply_text: - fallback_text = await self._rewrite_reply_with_ai( - message=message, - state=state, - reply="好嘞,你稍等下,我这边看一下", - scene="fallback_reply", - ) - return AgentResponse( - reply=fallback_text, - should_reply=True, - need_transfer=False - ) - - return await finalize_ai_reply( - self, - message=message, - state=state, - reply_text=reply_text, - ) + """处理客户消息并生成回复。""" + return await process_incoming_message(self, message) async def _check_order_amount(self, customer_id: str, order: dict, acc_id: str): """核查订单实付金额是否与报价一致,异常时企业微信预警""" @@ -885,7 +796,7 @@ class CustomerServiceAgent: f"差额:{quoted - paid:.1f}元 — 请人工核查" ) print(f"[Agent] {msg}") - await _notify_wechat(msg) + await self._notify_wechat(msg) except Exception as e: print(f"[Agent] 订单金额核查失败: {e}")