fix: block leaked history summaries in replies

This commit is contained in:
2026-03-11 18:33:17 +08:00
parent 2c003e9a7d
commit ebca1eaff6
4 changed files with 87 additions and 5 deletions

View File

@@ -1,5 +1,6 @@
import logging import logging
import asyncio import asyncio
import re
from datetime import datetime from datetime import datetime
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@@ -10,6 +11,41 @@ from db.chat_log_db import get_conversation, get_customer_orders
logger = logging.getLogger("cs_agent") logger = logging.getLogger("cs_agent")
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
_HISTORY_NOISE_PREFIXES = (
"[系统订单信息]",
"[进店卡片]",
"【系统:已收到",
"金额:",
"定制:",
)
def _is_plain_transfer_command(text: str) -> bool:
return bool(_TRANSFER_COMMAND_RE.fullmatch(str(text or "").strip()))
def _normalize_history_message(message: str, role: str) -> str:
text = str(message or "").strip()
if not text:
return ""
if _is_plain_transfer_command(text):
return "已转接设计师"
if role == "客服" and "[转移会话]" in text:
return "已尝试转接设计师"
return text
def _extract_need_snippet(message: str) -> str:
text = str(message or "").strip()
if not text:
return ""
if any(text.startswith(prefix) for prefix in _HISTORY_NOISE_PREFIXES):
return ""
if "http://" in text or "https://" in text:
return ""
return text[:60]
class TransferSuccessException(Exception): class TransferSuccessException(Exception):
"""转接成功后抛出此异常,用于提前终止 AI 处理流程""" """转接成功后抛出此异常,用于提前终止 AI 处理流程"""
@@ -117,16 +153,18 @@ async def lookup_chat_history_tool(
for r in rows: for r in rows:
role = "客户" if r["direction"] == "in" else "客服" role = "客户" if r["direction"] == "in" else "客服"
ts = str(r.get("timestamp", "")) ts = str(r.get("timestamp", ""))
msg = r.get("message", "") msg = _normalize_history_message(r.get("message", ""), role)
line = f"[{ts}] {role}{msg}" line = f"[{ts}] {role}{msg}"
lines.append(line) lines.append(line)
if r["direction"] == "in": if r["direction"] == "in":
msg_type = int(r.get("msg_type") or 0) msg_type = int(r.get("msg_type") or 0)
raw_message = str(r.get("message", "") or "")
image_urls = str(r.get("image_urls", "") or "").strip() image_urls = str(r.get("image_urls", "") or "").strip()
if msg_type == 1 or image_urls or ("已收到" in msg and "" in msg): if msg_type == 1 or image_urls or ("已收到" in msg and "" in msg):
has_images = True has_images = True
if any(k in msg for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印"]): need_text = _extract_need_snippet(raw_message)
customer_needs.append(msg[:60]) if need_text and any(k in need_text for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印", "大图", "素材"]):
customer_needs.append(need_text)
summary_parts = [f"{len(rows)}条历史消息。"] summary_parts = [f"{len(rows)}条历史消息。"]
if has_images: if has_images:

View File

@@ -51,6 +51,7 @@ _OUTBOUND_BLOCK_MARKERS = (
) )
_TRANSFER_COMMAND_MARKER = "[转移会话]" _TRANSFER_COMMAND_MARKER = "[转移会话]"
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
# 历史记录格式检测模式AI 转述历史时容易泄露) # 历史记录格式检测模式AI 转述历史时容易泄露)
_HISTORY_LEAK_PATTERNS = [ _HISTORY_LEAK_PATTERNS = [
@@ -213,8 +214,11 @@ class SystemOrchestrator:
if not text: if not text:
return "" return ""
cleaned = str(text).strip() cleaned = str(text).strip()
if "[转移会话]" in cleaned: if _TRANSFER_COMMAND_RE.fullmatch(cleaned):
return cleaned return cleaned
if _TRANSFER_COMMAND_MARKER in cleaned:
logger.warning("[Orchestrator] 检测到混入正文的转接指令,替换为安全兜底回复")
return "我在帮你看记录,稍等哈"
if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS):
logger.warning("[Orchestrator] 拦截到内部内容外发,替换为安全兜底回复") logger.warning("[Orchestrator] 拦截到内部内容外发,替换为安全兜底回复")
return "我在帮你看记录,稍等哈" return "我在帮你看记录,稍等哈"
@@ -225,6 +229,33 @@ class SystemOrchestrator:
return "我在帮你看记录,稍等哈" return "我在帮你看记录,稍等哈"
return cleaned return cleaned
@staticmethod
def _sanitize_history_content_for_ai(text: str) -> str:
cleaned = str(text or "").strip()
if not cleaned:
return ""
if _TRANSFER_COMMAND_RE.fullmatch(cleaned):
return "系统:之前已转接设计师"
if "【历史记录摘要】" in cleaned or "【详细记录】" in cleaned:
return "系统:刚刚查过历史记录"
if "【订单摘要】" in cleaned or "【订单详情】" in cleaned:
return "系统:刚刚查过订单记录"
if _TRANSFER_COMMAND_MARKER in cleaned:
cleaned = re.sub(
r"正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*",
"系统:之前已转接设计师",
cleaned,
)
return cleaned
def _sanitize_history_for_ai(self, history: List[dict]) -> List[dict]:
sanitized = []
for item in history or []:
normalized = dict(item)
normalized["content"] = self._sanitize_history_content_for_ai(item.get("content", ""))
sanitized.append(normalized)
return sanitized
@staticmethod @staticmethod
def _extract_designer_name(transfer_cmd: str) -> str: def _extract_designer_name(transfer_cmd: str) -> str:
text = str(transfer_cmd or "").strip() text = str(transfer_cmd or "").strip()
@@ -559,6 +590,7 @@ class SystemOrchestrator:
history_elapsed = time.time() - history_start history_elapsed = time.time() - history_start
logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)") logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)")
ai_history = history[:-1] if history and history[-1].get("content") == db_content else history ai_history = history[:-1] if history and history[-1].get("content") == db_content else history
ai_history = self._sanitize_history_for_ai(ai_history)
# C. 短时间追问且疑似没真正接上人工:优先补发一次转接 # C. 短时间追问且疑似没真正接上人工:优先补发一次转接
std_res = await self._retry_stalled_transfer_if_needed( std_res = await self._retry_stalled_transfer_if_needed(

View File

@@ -26,6 +26,7 @@ _INTERNAL_TOOL_MARKERS = (
"【订单摘要】", "【订单摘要】",
"【订单详情】", "【订单详情】",
) )
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
# 历史记录格式检测模式AI 转述历史时容易泄露) # 历史记录格式检测模式AI 转述历史时容易泄露)
_HISTORY_LEAK_PATTERNS = [ _HISTORY_LEAK_PATTERNS = [
@@ -109,6 +110,13 @@ def _sanitize_reply_text(reply_text: str) -> str:
text = re.sub(r'[\[\]]{2,}', '', text) text = re.sub(r'[\[\]]{2,}', '', text)
text = text.strip() text = text.strip()
if _TRANSFER_COMMAND_RE.fullmatch(text):
return text
if "[转移会话]" in text:
logger.warning("[Brain] 拦截到混入正文的转接指令,降级为安全兜底回复")
return "我在帮你看记录,稍等哈"
# 检查固定标记 # 检查固定标记
if any(marker in text for marker in _INTERNAL_TOOL_MARKERS): if any(marker in text for marker in _INTERNAL_TOOL_MARKERS):
logger.warning("[Brain] 拦截到工具原文泄露,降级为安全兜底回复") logger.warning("[Brain] 拦截到工具原文泄露,降级为安全兜底回复")

View File

@@ -18,6 +18,7 @@ _OUTBOUND_BLOCK_MARKERS = (
"think_never_used", "think_never_used",
'[{"name":', '[{"name":',
) )
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
_HISTORY_LEAK_PATTERNS = [ _HISTORY_LEAK_PATTERNS = [
r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]', r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]',
@@ -35,8 +36,11 @@ def _sanitize_outbound_archive_text(content: str) -> str:
if not content: if not content:
return "" return ""
cleaned = str(content).strip() cleaned = str(content).strip()
if "[转移会话]" in cleaned: if _TRANSFER_COMMAND_RE.fullmatch(cleaned):
return cleaned return cleaned
if "[转移会话]" in cleaned:
logger.warning("[Repository] 检测到混入正文的转接指令,拦截出站入库")
return "我在帮你看记录,稍等哈"
if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS):
logger.warning("[Repository] 拦截到内部内容写入外发记录,替换为安全兜底回复") logger.warning("[Repository] 拦截到内部内容写入外发记录,替换为安全兜底回复")
return "我在帮你看记录,稍等哈" return "我在帮你看记录,稍等哈"