diff --git a/core/agent_tools.py b/core/agent_tools.py index aebfde5..70d412a 100644 --- a/core/agent_tools.py +++ b/core/agent_tools.py @@ -1,5 +1,6 @@ import logging import asyncio +import re from datetime import datetime from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field @@ -10,6 +11,41 @@ from db.chat_log_db import get_conversation, get_customer_orders logger = logging.getLogger("cs_agent") +_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$") +_HISTORY_NOISE_PREFIXES = ( + "[系统订单信息]", + "[进店卡片]", + "【系统:已收到", + "金额:", + "定制:", +) + + +def _is_plain_transfer_command(text: str) -> bool: + return bool(_TRANSFER_COMMAND_RE.fullmatch(str(text or "").strip())) + + +def _normalize_history_message(message: str, role: str) -> str: + text = str(message or "").strip() + if not text: + return "" + if _is_plain_transfer_command(text): + return "已转接设计师" + if role == "客服" and "[转移会话]" in text: + return "已尝试转接设计师" + return text + + +def _extract_need_snippet(message: str) -> str: + text = str(message or "").strip() + if not text: + return "" + if any(text.startswith(prefix) for prefix in _HISTORY_NOISE_PREFIXES): + return "" + if "http://" in text or "https://" in text: + return "" + return text[:60] + class TransferSuccessException(Exception): """转接成功后抛出此异常,用于提前终止 AI 处理流程""" @@ -117,16 +153,18 @@ async def lookup_chat_history_tool( for r in rows: role = "客户" if r["direction"] == "in" else "客服" ts = str(r.get("timestamp", "")) - msg = r.get("message", "") + msg = _normalize_history_message(r.get("message", ""), role) line = f"[{ts}] {role}:{msg}" lines.append(line) if r["direction"] == "in": msg_type = int(r.get("msg_type") or 0) + raw_message = str(r.get("message", "") or "") image_urls = str(r.get("image_urls", "") or "").strip() if msg_type == 1 or image_urls or ("已收到" in msg and "图" in msg): has_images = True - if any(k in msg for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印"]): - customer_needs.append(msg[:60]) + need_text = _extract_need_snippet(raw_message) + if need_text and any(k in need_text for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印", "大图", "素材"]): + customer_needs.append(need_text) summary_parts = [f"共{len(rows)}条历史消息。"] if has_images: diff --git a/core/orchestrator.py b/core/orchestrator.py index c431b38..ee88589 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -51,6 +51,7 @@ _OUTBOUND_BLOCK_MARKERS = ( ) _TRANSFER_COMMAND_MARKER = "[转移会话]" +_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$") # 历史记录格式检测模式(AI 转述历史时容易泄露) _HISTORY_LEAK_PATTERNS = [ @@ -213,8 +214,11 @@ class SystemOrchestrator: if not text: return "" cleaned = str(text).strip() - if "[转移会话]" in cleaned: + if _TRANSFER_COMMAND_RE.fullmatch(cleaned): return cleaned + if _TRANSFER_COMMAND_MARKER in cleaned: + logger.warning("[Orchestrator] 检测到混入正文的转接指令,替换为安全兜底回复") + return "我在帮你看记录,稍等哈" if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): logger.warning("[Orchestrator] 拦截到内部内容外发,替换为安全兜底回复") return "我在帮你看记录,稍等哈" @@ -225,6 +229,33 @@ class SystemOrchestrator: return "我在帮你看记录,稍等哈" return cleaned + @staticmethod + def _sanitize_history_content_for_ai(text: str) -> str: + cleaned = str(text or "").strip() + if not cleaned: + return "" + if _TRANSFER_COMMAND_RE.fullmatch(cleaned): + return "系统:之前已转接设计师" + if "【历史记录摘要】" in cleaned or "【详细记录】" in cleaned: + return "系统:刚刚查过历史记录" + if "【订单摘要】" in cleaned or "【订单详情】" in cleaned: + return "系统:刚刚查过订单记录" + if _TRANSFER_COMMAND_MARKER in cleaned: + cleaned = re.sub( + r"正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*", + "系统:之前已转接设计师", + cleaned, + ) + return cleaned + + def _sanitize_history_for_ai(self, history: List[dict]) -> List[dict]: + sanitized = [] + for item in history or []: + normalized = dict(item) + normalized["content"] = self._sanitize_history_content_for_ai(item.get("content", "")) + sanitized.append(normalized) + return sanitized + @staticmethod def _extract_designer_name(transfer_cmd: str) -> str: text = str(transfer_cmd or "").strip() @@ -559,6 +590,7 @@ class SystemOrchestrator: history_elapsed = time.time() - history_start logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)") ai_history = history[:-1] if history and history[-1].get("content") == db_content else history + ai_history = self._sanitize_history_for_ai(ai_history) # C. 短时间追问且疑似没真正接上人工:优先补发一次转接 std_res = await self._retry_stalled_transfer_if_needed( diff --git a/core/pydantic_ai_agent_v2.py b/core/pydantic_ai_agent_v2.py index dbaad48..ccf1604 100644 --- a/core/pydantic_ai_agent_v2.py +++ b/core/pydantic_ai_agent_v2.py @@ -26,6 +26,7 @@ _INTERNAL_TOOL_MARKERS = ( "【订单摘要】", "【订单详情】", ) +_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$") # 历史记录格式检测模式(AI 转述历史时容易泄露) _HISTORY_LEAK_PATTERNS = [ @@ -109,6 +110,13 @@ def _sanitize_reply_text(reply_text: str) -> str: text = re.sub(r'[\[\]]{2,}', '', text) text = text.strip() + if _TRANSFER_COMMAND_RE.fullmatch(text): + return text + + if "[转移会话]" in text: + logger.warning("[Brain] 拦截到混入正文的转接指令,降级为安全兜底回复") + return "我在帮你看记录,稍等哈" + # 检查固定标记 if any(marker in text for marker in _INTERNAL_TOOL_MARKERS): logger.warning("[Brain] 拦截到工具原文泄露,降级为安全兜底回复") diff --git a/core/repository.py b/core/repository.py index 11e17f3..09428d2 100644 --- a/core/repository.py +++ b/core/repository.py @@ -18,6 +18,7 @@ _OUTBOUND_BLOCK_MARKERS = ( "think_never_used", '[{"name":', ) +_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$") _HISTORY_LEAK_PATTERNS = [ r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[::]', @@ -35,8 +36,11 @@ def _sanitize_outbound_archive_text(content: str) -> str: if not content: return "" cleaned = str(content).strip() - if "[转移会话]" in cleaned: + if _TRANSFER_COMMAND_RE.fullmatch(cleaned): return cleaned + if "[转移会话]" in cleaned: + logger.warning("[Repository] 检测到混入正文的转接指令,拦截出站入库") + return "我在帮你看记录,稍等哈" if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): logger.warning("[Repository] 拦截到内部内容写入外发记录,替换为安全兜底回复") return "我在帮你看记录,稍等哈"