from __future__ import annotations import asyncio from typing import Any from core.ai_reply_flow import execute_ai_turn from core.find_image_flow import handle_find_image_batch_flow from core.order_flow import handle_order_notification from core.prompt_flow import build_prompt_bundle from core.reply_finalize_flow import finalize_ai_reply from utils.metrics_tracker import emit as metrics_emit from utils.observability import build_trace_id async def process_incoming_message(agent: Any, message: Any) -> Any: """主消息处理编排:预处理 -> 业务流 -> AI -> 收尾。""" trace_id = build_trace_id(message.acc_id, message.from_id, message.msg_id, message.msg[:64]) agent._activity_log( "agent_inbound", trace_id=trace_id, acc_id=message.acc_id, customer_id=message.from_id, msg=message.msg, msg_type=message.msg_type, ) metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id) state = agent._get_conversation_state(message.from_id) pre_response = await agent.pre_rule_service.run(message=message, state=state, trace_id=trace_id) if pre_response is not None: return pre_response new_stage = agent._detect_stage(message.msg) if new_stage != state.stage: state.stage = new_stage from datetime import datetime state.last_update = datetime.now().isoformat() order_response = await handle_order_notification(agent, message=message, state=state) if order_response is not None: return order_response customer_text, _ = agent._split_customer_text(message.msg) shop_type = agent._get_shop_type(message.acc_id or "", message.goods_name or "") flow_response = await handle_find_image_batch_flow( agent, message=message, state=state, customer_text=customer_text, shop_type=shop_type, ) if flow_response is not None: return flow_response prompt_bundle = build_prompt_bundle(agent, message=message, state=state) user_prompt = prompt_bundle.user_prompt deps = prompt_bundle.deps history = prompt_bundle.history agent._log_block("PROMPT->AI 前置提示词", user_prompt) try: reply_text = await execute_ai_turn( agent, message=message, state=state, user_prompt=user_prompt, deps=deps, history=history, ) except Exception as e: err_str = str(e) print(f"[Agent] AI 调用失败: {e},使用兜底回复") agent._activity_log("agent_ai_error", customer_id=message.from_id, acc_id=message.acc_id, error=err_str) metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id) if "AccountOverdueError" in err_str or "overdue" in err_str.lower(): asyncio.create_task(agent._notify_wechat_overdue()) else: asyncio.create_task( agent._notify_wechat( f"⚠️ **AI调用异常**\n" f"客户:{message.from_id}\n" f"店铺:{message.acc_id}\n" f"错误:{err_str[:200]}", tag="AI异常", ) ) reply_text = None else: metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id) if not reply_text: fallback_text = await agent._rewrite_reply_with_ai( message=message, state=state, reply="好嘞,你稍等下,我这边看一下", scene="fallback_reply", ) from core.pydantic_ai_agent import AgentResponse return AgentResponse(reply=fallback_text, should_reply=True, need_transfer=False) return await finalize_ai_reply( agent, message=message, state=state, reply_text=reply_text, )