From 8dd5a11b4be8b5ff4ca8efb39198ddf878bdceb7 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sun, 1 Mar 2026 16:29:52 +0800 Subject: [PATCH] refactor: unify core pipeline logging with cs_agent logger --- core/agent_pre_rules.py | 6 ++++-- core/agent_tools.py | 13 +++++++++++-- core/ai_reply_flow.py | 14 +++++++++----- core/context_helpers.py | 5 ++++- core/find_image_flow.py | 23 +++++++++++++---------- core/image_workflow_router.py | 11 +++++++---- core/message_orchestrator.py | 5 ++++- core/order_flow.py | 7 +++++-- core/post_ops.py | 10 ++++++---- core/pydantic_ai_agent.py | 34 +++++++++++++--------------------- core/reply_finalize_flow.py | 9 ++++++--- 11 files changed, 82 insertions(+), 55 deletions(-) diff --git a/core/agent_pre_rules.py b/core/agent_pre_rules.py index 508ddf6..1231f03 100644 --- a/core/agent_pre_rules.py +++ b/core/agent_pre_rules.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging import random from datetime import datetime from typing import TYPE_CHECKING, Optional @@ -108,7 +109,7 @@ class AgentPreRuleService: state = ctx.get("state") trace_id = ctx.get("trace_id", "") elapsed = int((datetime.now() - state.last_reply_at).total_seconds()) if state.last_reply_at else 0 - print(f"[Agent] 冷却期静默(距上次回复 {elapsed}s):{message.msg!r}") + logger.info("[Agent] 冷却期静默(距上次回复 %ss):%r", elapsed, message.msg) self.agent._activity_log( "agent_cooldown_silent", trace_id=trace_id, @@ -183,7 +184,7 @@ class AgentPreRuleService: scene="risk_reject", ) state.last_reply_at = datetime.now() - print(f"{self.agent.C_REPLY}[REPLY->CUSTOMER]{self.agent.C_RESET} {reply}") + logger.info("[REPLY->CUSTOMER] %s", reply) self.agent._activity_log( "agent_risk_reject", trace_id=trace_id, @@ -198,3 +199,4 @@ class AgentPreRuleService: action="agent_risk_reject", payload={"response": AgentResponse(reply=reply, should_reply=True, need_transfer=False)}, ) +logger = logging.getLogger("cs_agent") diff --git a/core/agent_tools.py b/core/agent_tools.py index f4d848b..76c16fb 100644 --- a/core/agent_tools.py +++ b/core/agent_tools.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Any from pydantic_ai import RunContext @@ -8,6 +9,8 @@ from db.customer_risk_db import risk_db from services.service_tuhui_upload import upload_to_tuhui from core.order_helpers import parse_order_info +logger = logging.getLogger("cs_agent") + def register_tools(agent) -> None: """注册所有 Tool,让 Agent 可以主动调用。""" @@ -65,9 +68,15 @@ def register_tools(agent) -> None: subject=result.get("subject", ""), quality=result.get("quality", ""), ) - print(f"[Agent] Workflow 任务已创建 | 客户: {ctx.deps.from_id} | 比例: {result.get('aspect_ratio')} | 透视: {result.get('perspective')} | 图片: {image_url[:60]}...") + logger.info( + "[Agent] Workflow 任务已创建 | 客户: %s | 比例: %s | 透视: %s | 图片: %s...", + ctx.deps.from_id, + result.get("aspect_ratio"), + result.get("perspective"), + image_url[:60], + ) except Exception as e: - print(f"[Agent] Workflow 任务创建失败: {e}") + logger.exception("[Agent] Workflow 任务创建失败: %s", e) # 组装给 AI 的分析报告 risk = result.get("risk", "none") diff --git a/core/ai_reply_flow.py b/core/ai_reply_flow.py index d23e57f..5f031c1 100644 --- a/core/ai_reply_flow.py +++ b/core/ai_reply_flow.py @@ -1,8 +1,11 @@ from __future__ import annotations +import logging from typing import TYPE_CHECKING, Any from core.post_ops import negotiation_strategy_reply +logger = logging.getLogger("cs_agent") + if TYPE_CHECKING: from core.pydantic_ai_agent import AgentDeps, ConversationState, CustomerMessage, CustomerServiceAgent @@ -124,13 +127,14 @@ async def execute_ai_turn( for part in getattr(msg, "parts", []): part_type = type(part).__name__ if "ToolCall" in part_type: - print( - f"{agent.C_TOOL}[THINK/TOOL_CALL]{agent.C_RESET} " - f"{getattr(part, 'tool_name', '')}({getattr(part, 'args', '')})" + logger.info( + "[THINK/TOOL_CALL] %s(%s)", + getattr(part, "tool_name", ""), + getattr(part, "args", ""), ) elif "ToolReturn" in part_type: ret = str(getattr(part, "content", ""))[:120] - print(f"{agent.C_TOOL}[THINK/TOOL_RETURN]{agent.C_RESET} {ret}") + logger.info("[THINK/TOOL_RETURN] %s", ret) - print(f"{agent.C_THINK}[THINK/RAW_OUTPUT]{agent.C_RESET} {repr(reply_text)}") + logger.info("[THINK/RAW_OUTPUT] %r", reply_text) return reply_text diff --git a/core/context_helpers.py b/core/context_helpers.py index 596c5a3..3a2d53b 100644 --- a/core/context_helpers.py +++ b/core/context_helpers.py @@ -1,9 +1,12 @@ from __future__ import annotations import os +import logging from collections import Counter from datetime import datetime +logger = logging.getLogger("cs_agent") + def calc_avg_complexity(complexity_history: list) -> str: """计算平均复杂度。""" @@ -153,7 +156,7 @@ def get_customer_profile_context(agent, customer_id: str) -> str: return "\n".join(lines) except Exception as e: - print(f"[Agent] 获取客户画像失败: {e}") + logger.exception("[Agent] 获取客户画像失败: %s", e) return "" diff --git a/core/find_image_flow.py b/core/find_image_flow.py index 227b5fb..f282118 100644 --- a/core/find_image_flow.py +++ b/core/find_image_flow.py @@ -1,8 +1,11 @@ from __future__ import annotations +import logging from datetime import datetime from typing import TYPE_CHECKING, Optional +logger = logging.getLogger("cs_agent") + if TYPE_CHECKING: from core.pydantic_ai_agent import AgentResponse, ConversationState, CustomerMessage, CustomerServiceAgent @@ -55,7 +58,7 @@ async def handle_find_image_batch_flow( fallback=defer_fallback, ) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {defer_reply}") + logger.info("[REPLY->CUSTOMER] %s", defer_reply) return AgentResponse(reply=defer_reply, should_reply=True, need_transfer=False) quote_res = await agent._quote_pending_images(state, message) reply_text = agent._colloquialize_reply(quote_res.get("reply", "")) @@ -67,7 +70,7 @@ async def handle_find_image_batch_flow( ) need_transfer = bool(quote_res.get("need_transfer")) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {reply_text}") + logger.info("[REPLY->CUSTOMER] %s", reply_text) return AgentResponse( reply=reply_text, should_reply=not need_transfer, @@ -89,7 +92,7 @@ async def handle_find_image_batch_flow( fallback=ack_fallback, ) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {ack}") + logger.info("[REPLY->CUSTOMER] %s", ack) return AgentResponse(reply=ack, should_reply=True, need_transfer=False) if not state.pending_image_urls: @@ -118,7 +121,7 @@ async def handle_find_image_batch_flow( fallback=clarify_fallback, ) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {clarify}") + logger.info("[REPLY->CUSTOMER] %s", clarify) return AgentResponse(reply=clarify, should_reply=True, need_transfer=False) if state.quote_phase == "ready_to_quote" and state.quote_ready_turns <= 0 and short_intent in {"progress_query", "ack", "finish_signal"}: @@ -132,7 +135,7 @@ async def handle_find_image_batch_flow( ) need_transfer = bool(quote_res.get("need_transfer")) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {reply_text}") + logger.info("[REPLY->CUSTOMER] %s", reply_text) return AgentResponse( reply=reply_text, should_reply=not need_transfer, @@ -150,7 +153,7 @@ async def handle_find_image_batch_flow( fallback=progress_fallback, ) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {progress}") + logger.info("[REPLY->CUSTOMER] %s", progress) return AgentResponse(reply=progress, should_reply=True, need_transfer=False) if agent._needs_clarification_in_collecting(text_without_urls): @@ -163,7 +166,7 @@ async def handle_find_image_batch_flow( fallback=ask_fallback, ) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {ask}") + logger.info("[REPLY->CUSTOMER] %s", ask) return AgentResponse(reply=ask, should_reply=True, need_transfer=False) if agent._is_batch_finish_intent( text=customer_text, @@ -182,7 +185,7 @@ async def handle_find_image_batch_flow( fallback=defer_fallback, ) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {defer_reply}") + logger.info("[REPLY->CUSTOMER] %s", defer_reply) return AgentResponse(reply=defer_reply, should_reply=True, need_transfer=False) quote_res = await agent._quote_pending_images(state, message) reply_text = agent._colloquialize_reply(quote_res.get("reply", "")) @@ -194,7 +197,7 @@ async def handle_find_image_batch_flow( ) need_transfer = bool(quote_res.get("need_transfer")) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {reply_text}") + logger.info("[REPLY->CUSTOMER] %s", reply_text) return AgentResponse( reply=reply_text, should_reply=not need_transfer, @@ -211,5 +214,5 @@ async def handle_find_image_batch_flow( fallback=remind_fallback, ) state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {remind}") + logger.info("[REPLY->CUSTOMER] %s", remind) return AgentResponse(reply=remind, should_reply=True, need_transfer=False) diff --git a/core/image_workflow_router.py b/core/image_workflow_router.py index ff68a48..7098b0a 100644 --- a/core/image_workflow_router.py +++ b/core/image_workflow_router.py @@ -1,7 +1,10 @@ from __future__ import annotations +import logging from typing import Any +logger = logging.getLogger("cs_agent") + async def handle_image_workflow(*, workflow_router: Any, message: str, data: dict, image_urls: list) -> bool: """处理图片工作流(根据客户说的话判断执行哪种工作流)。""" @@ -15,10 +18,10 @@ async def handle_image_workflow(*, workflow_router: Any, message: str, data: dic acc_type = data.get("acc_type", "AliWorkbench") image_url = image_urls[0] - print(f"[Agent] 检测到工作流类型:{workflow_type} (置信度:{confidence})") + logger.info("[Agent] 检测到工作流类型:%s (置信度:%s)", workflow_type, confidence) if workflow_type == "find_image": - print(f"[Agent] 执行查找图片工作流 | 客户:{customer_id}") + logger.info("[Agent] 执行查找图片工作流 | 客户:%s", customer_id) from core.workflow import workflow return await workflow.find_image_workflow( @@ -28,7 +31,7 @@ async def handle_image_workflow(*, workflow_router: Any, message: str, data: dic acc_type=acc_type, ) if workflow_type == "process_image": - print(f"[Agent] 执行处理图片工作流 | 客户:{customer_id}") + logger.info("[Agent] 执行处理图片工作流 | 客户:%s", customer_id) from core.workflow import workflow return await workflow.process_image_workflow( @@ -38,7 +41,7 @@ async def handle_image_workflow(*, workflow_router: Any, message: str, data: dic acc_type=acc_type, ) if workflow_type == "transfer_human": - print(f"[Agent] 执行转人工派单工作流 | 客户:{customer_id}") + logger.info("[Agent] 执行转人工派单工作流 | 客户:%s", customer_id) from core.workflow import workflow return await workflow.transfer_to_designer_workflow( diff --git a/core/message_orchestrator.py b/core/message_orchestrator.py index 76bc8f4..d9b5b54 100644 --- a/core/message_orchestrator.py +++ b/core/message_orchestrator.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging from typing import Any from core.ai_reply_flow import execute_ai_turn @@ -11,6 +12,8 @@ from core.reply_finalize_flow import finalize_ai_reply from utils.metrics_tracker import emit as metrics_emit from utils.observability import build_trace_id +logger = logging.getLogger("cs_agent") + async def process_incoming_message(agent: Any, message: Any) -> Any: """主消息处理编排:预处理 -> 业务流 -> AI -> 收尾。""" @@ -72,7 +75,7 @@ async def process_incoming_message(agent: Any, message: Any) -> Any: ) except Exception as e: err_str = str(e) - print(f"[Agent] AI 调用失败: {e},使用兜底回复") + logger.exception("[Agent] AI 调用失败,使用兜底回复: %s", err_str) agent._activity_log("agent_ai_error", customer_id=message.from_id, acc_id=message.acc_id, error=err_str) metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id) if "AccountOverdueError" in err_str or "overdue" in err_str.lower(): diff --git a/core/order_flow.py b/core/order_flow.py index 7a16b70..d3e323e 100644 --- a/core/order_flow.py +++ b/core/order_flow.py @@ -1,10 +1,13 @@ from __future__ import annotations import asyncio +import logging from typing import TYPE_CHECKING, Optional from core.post_ops import record_deal_success from core.order_helpers import parse_order_info +logger = logging.getLogger("cs_agent") + if TYPE_CHECKING: from core.pydantic_ai_agent import AgentResponse, ConversationState, CustomerMessage, CustomerServiceAgent @@ -53,9 +56,9 @@ async def handle_order_notification( ) ) except Exception as e: - print(f"[Agent] 触发作图失败: {e}") + logger.exception("[Agent] 触发作图失败: %s", e) elif not customer_text: - print(f"[Agent] 订单通知静默({pay_status or order_status}),跳过回复") + logger.info("[Agent] 订单通知静默(%s),跳过回复", pay_status or order_status) return AgentResponse(reply="", should_reply=False, need_transfer=False) return None diff --git a/core/post_ops.py b/core/post_ops.py index 751946f..98f4b06 100644 --- a/core/post_ops.py +++ b/core/post_ops.py @@ -1,11 +1,13 @@ from __future__ import annotations +import logging import re from typing import Any from utils.metrics_tracker import emit as metrics_emit CASE_LIBRARY_LINK = "https://www.yuque.com/zuowei-dfvpq/kge0in/mynala0g35b8cec5" +logger = logging.getLogger("cs_agent") def detect_price(reply: str, state: Any) -> None: @@ -102,9 +104,9 @@ async def record_deal_success( db.clear_quote_no_convert(customer_id) except Exception: pass - print(f"[Agent] 成交记录: {customer_id} {reason} {amount}元") + logger.info("[Agent] 成交记录: %s %s %s元", customer_id, reason, amount) except Exception as e: - print(f"[Agent] 成交记录失败: {e}") + logger.exception("[Agent] 成交记录失败: %s", e) async def record_deal_fail( @@ -128,9 +130,9 @@ async def record_deal_fail( platform=platform or "", ) db.mark_quote_no_convert(customer_id) - print(f"[Agent] 未成交记录: {customer_id} {reason}") + logger.info("[Agent] 未成交记录: %s %s", customer_id, reason) except Exception as e: - print(f"[Agent] 未成交记录失败: {e}") + logger.exception("[Agent] 未成交记录失败: %s", e) async def auto_tag(message: Any, state: Any) -> None: diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index ec94b79..cef58b4 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -22,15 +22,10 @@ from pydantic_ai.models.openai import OpenAIChatModel from pydantic_ai.providers.openai import OpenAIProvider from dotenv import load_dotenv from utils.metrics_tracker import emit as metrics_emit -from utils.observability import emit_activity, build_trace_id +from utils.observability import emit_activity from core.quote_state_machine import QuoteStateMachine from services.risk_service import RiskService from core.agent_pre_rules import AgentPreRuleService -from core.find_image_flow import handle_find_image_batch_flow -from core.order_flow import handle_order_notification -from core.ai_reply_flow import execute_ai_turn -from core.reply_finalize_flow import finalize_ai_reply -from core.prompt_flow import build_prompt_bundle from core.order_helpers import parse_order_info, order_instruction as build_order_instruction from core.collection_intent_helpers import ( append_requirement, @@ -81,7 +76,6 @@ load_dotenv() from services.service_tuhui_upload import upload_to_tuhui from core.workflow_router import get_workflow_router -from core.workflow_router import get_workflow_router # ========== 企业微信通知 ========== _WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "") @@ -91,7 +85,7 @@ logger = logging.getLogger("cs_agent") async def _notify_wechat(content: str, tag: str = "通知"): """发送企业微信 markdown 通知,任何异常都发""" if not _WECHAT_WEBHOOK: - print(f"[{tag}] 未配置 WECHAT_WEBHOOK,跳过推送") + logger.info("[%s] 未配置 WECHAT_WEBHOOK,跳过推送", tag) return try: import httpx @@ -102,11 +96,11 @@ async def _notify_wechat(content: str, tag: str = "通知"): }) data = resp.json() if data.get("errcode") == 0: - print(f"[{tag}] 企业微信推送成功 ✓") + logger.info("[%s] 企业微信推送成功", tag) else: - print(f"[{tag}] 企业微信推送失败: {data}") + logger.warning("[%s] 企业微信推送失败: %s", tag, data) except Exception as e: - print(f"[{tag}] 企业微信发送异常: {e}") + logger.exception("[%s] 企业微信发送异常: %s", tag, e) async def _notify_wechat_overdue(): @@ -246,7 +240,7 @@ def load_skill_map(skills_dir: str = "skills") -> Dict[str, str]: else: skill_map[skill_name] = content except Exception as e: - print(f"警告: 读取 {skill_file} 失败: {e}") + logger.warning("读取技能文件失败: %s | err=%s", skill_file, e) return skill_map @@ -441,9 +435,7 @@ class CustomerServiceAgent: @staticmethod def _log_block(title: str, content: str): """统一的控制台分层日志输出。""" - print(f"{CustomerServiceAgent.C_PROMPT}[{title}]{CustomerServiceAgent.C_RESET}") - print(content) - print(f"{CustomerServiceAgent.C_MUTED}────────────────────{CustomerServiceAgent.C_RESET}") + logger.info("[%s]\n%s\n--------------------", title, content) @staticmethod def _normalize_reply_text(text: Optional[str]) -> str: @@ -783,7 +775,7 @@ class CustomerServiceAgent: return paid = float(m.group()) - print(f"[Agent] 订单金额核查:报价 {quoted}元 vs 实付 {paid}元(客户 {customer_id})") + logger.info("[Agent] 订单金额核查:报价 %s元 vs 实付 %s元(客户 %s)", quoted, paid, customer_id) # 实付金额明显低于报价(低于报价的 60%)才预警 if paid < quoted * 0.6: @@ -795,10 +787,10 @@ class CustomerServiceAgent: f"实付:{paid}元\n" f"差额:{quoted - paid:.1f}元 — 请人工核查" ) - print(f"[Agent] {msg}") + logger.warning("[Agent] %s", msg) await self._notify_wechat(msg) except Exception as e: - print(f"[Agent] 订单金额核查失败: {e}") + logger.exception("[Agent] 订单金额核查失败: %s", e) def _extract_image_url(self, msg: str) -> str: """从消息中提取图片URL,兼容纯URL和 text#*#url 两种格式""" @@ -887,7 +879,7 @@ class CustomerServiceAgent: quality=r.get("quality", ""), ) except Exception as e: - print(f"[Agent] Workflow 批量任务创建失败: {e}") + logger.exception("[Agent] Workflow 批量任务创建失败: %s", e) _calc_requirement_surcharge = staticmethod(calc_requirement_surcharge) _build_batch_quote_reply = staticmethod(build_batch_quote_reply) @@ -945,7 +937,7 @@ class CustomerServiceAgent: raise RuntimeError(str(link)) links.append(link) except Exception as e: - print(f"[Agent] 找图自动处理失败,回退需求澄清: {e}") + logger.exception("[Agent] 找图自动处理失败,回退需求澄清: %s", e) return { "reply": "这种可以做类似款。你先说下具体需求:要几张、是否改字、尺寸比例、交付格式(单图/打包链接),我按需求给你直接做。", "need_transfer": False, @@ -1064,7 +1056,7 @@ async def test_agent(): ) response = await agent.process_message(test_msg) - print(f"回复内容: {response.reply}") + logger.info("回复内容: %s", response.reply) if __name__ == "__main__": diff --git a/core/reply_finalize_flow.py b/core/reply_finalize_flow.py index bca5a00..0d2fc20 100644 --- a/core/reply_finalize_flow.py +++ b/core/reply_finalize_flow.py @@ -1,12 +1,15 @@ from __future__ import annotations import asyncio +import logging from datetime import datetime from typing import TYPE_CHECKING from utils.metrics_tracker import emit as metrics_emit from core.post_ops import auto_tag, detect_discount, detect_price, record_deal_fail +logger = logging.getLogger("cs_agent") + if TYPE_CHECKING: from core.pydantic_ai_agent import AgentResponse, ConversationState, CustomerMessage, CustomerServiceAgent @@ -25,7 +28,7 @@ async def finalize_ai_reply( blocked, fallback = should_block_reply(reply_text) if blocked: - print("[Agent] 敏感词拦截,使用兜底回复") + logger.warning("[Agent] 敏感词拦截,使用兜底回复") reply_text = fallback or "好的,您稍等,我帮您确认一下" except Exception: pass @@ -88,9 +91,9 @@ async def finalize_ai_reply( if should_reply: state.last_reply_at = datetime.now() - print(f"{agent.C_REPLY}[REPLY->CUSTOMER]{agent.C_RESET} {reply_text}") + logger.info("[REPLY->CUSTOMER] %s", reply_text) else: - print(f"{agent.C_MUTED}[REPLY->CUSTOMER]{agent.C_RESET} <静默/不发送>") + logger.info("[REPLY->CUSTOMER] <静默/不发送>") agent._activity_log( "agent_outbound_decision",