Files
tw/core/pydantic_ai_agent_v2.py

492 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import hashlib
import logging
import time
import json
from typing import List, Optional, Any, Dict
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
from core.schema import StandardMessage, StandardResponse
from core.agent_tools import register_agent_tools, TransferSuccessException
logger = logging.getLogger("cs_agent")
# 日志详细程度:设置环境变量 AI_LOG_LEVEL=debug 可获得完整日志
_LOG_FULL_PROMPT = os.getenv("AI_LOG_LEVEL", "").lower() == "debug"
_LOG_CLIP_LIMIT = int(os.getenv("AI_LOG_CLIP", "2000")) # 日志截断长度
from core.skill_manager import skill_manager
_INTERNAL_TOOL_MARKERS = (
"【历史记录摘要】",
"【详细记录】",
"【订单摘要】",
"【订单详情】",
)
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
# 历史记录格式检测模式AI 转述历史时容易泄露)
_HISTORY_LEAK_PATTERNS = [
r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]', # [2026-03-07 12:00:00] 客户:
r'\[\d{2}:\d{2}:\d{2}\]\s*(客户|客服|我)[:]', # [12:00:00] 客户:
r'(根据|查看|查询|翻看)(历史|聊天|对话)(记录|内容)', # 根据历史记录
r'历史(记录|对话|消息)(显示|表明|中)', # 历史记录显示
r'之前的(聊天|对话|记录)(中|里|显示)', # 之前的聊天中
r'\d+条(历史|对话)?消息', # 共30条历史消息
r'订单号[:]\s*\d{10,}', # 订单号:xxxxxxxxxx
r'(状态|金额|数量)[:].*(状态|金额|数量)[:]', # 状态:xxx 金额:xxx 连续出现
]
_FIND_ORIGINAL_INTENT_KEYWORDS = (
"找图",
"找原图",
"原图",
"素材",
"大图",
"源图",
)
_FIND_ORIGINAL_QUESTION_KEYWORDS = (
"有吗",
"有没",
"有没有",
"能找吗",
"找得到吗",
"能不能找到",
"能找到吗",
)
_REPAIR_INTENT_KEYWORDS = (
"修复",
"高清修复",
"高清",
"清晰",
"清楚",
"变清晰",
"修清楚",
"放大清晰",
)
_IMAGE_ALREADY_SENT_HINT_KEYWORDS = (
"上面不是发了吗",
"上面不是有吗",
"我不是发了吗",
"前面不是发了吗",
"前面发了",
"上面发了",
"我发过了",
"不是发了吗",
"都发了",
"你没看到吗",
"聊天记录里有",
"上面有图",
)
_PAYMENT_LINK_REQUEST_KEYWORDS = (
"付款链接",
"支付链接",
"拍单链接",
"下单链接",
"付款吧",
"发我链接",
"发个链接",
"发下链接",
"发一下链接",
"给我链接",
)
def _clip(text: str, limit: int = 1200) -> str:
if text is None:
return ""
text = str(text)
if len(text) <= limit:
return text
return f"{text[:limit]}...(截断, 共{len(text)}字)"
def _fmt_time(ts: Any) -> str:
s = str(ts or "").strip()
if not s:
return "--:--:--"
if " " in s:
return s.split(" ", 1)[1]
return s
def _sanitize_reply_text(reply_text: str) -> str:
if not reply_text:
return ""
text = str(reply_text)
text = re.sub(r'\[\]<\|[^|]+\|>', '', text)
text = re.sub(r'<\|[^|]*\|>', '', text)
text = re.sub(r'\[Function[^\]]*\]', '', text)
text = re.sub(r'\[/?Tool[^\]]*\]', '', text)
text = re.sub(r'</?tool[_\-]?[^>]*>', '', text, flags=re.IGNORECASE)
text = re.sub(r'<think[_a-zA-Z0-9]*[^>]*>.*?</think[_a-zA-Z0-9]*[^>]*>', '', text, flags=re.DOTALL)
text = re.sub(r'<think[_a-zA-Z0-9]*[^>]*>.*', '', text, flags=re.DOTALL)
text = re.sub(r'</?think[_a-zA-Z0-9]*[^>]*>', '', text)
text = re.sub(r'```[^`]*```', '', text, flags=re.DOTALL)
text = re.sub(r'AgentRunResult\([^)]*\)', '', text)
text = re.sub(r'\[/?[A-Z][a-zA-Z]*(?:Call|End|Start|Result|Return)[^\]]*\]', '', text)
text = re.sub(r'^\s*\[\s*\{.*$', '', text, flags=re.DOTALL)
text = re.sub(r'^\s*[\[{].*"name"\s*:.*$', '', text, flags=re.DOTALL)
text = re.sub(r'[\[\]]{2,}', '', text)
text = text.strip()
if _TRANSFER_COMMAND_RE.fullmatch(text):
return text
if "[转移会话]" in text:
logger.warning("[Brain] 拦截到混入正文的转接指令,降级为安全兜底回复")
return "我在帮你看记录,稍等哈"
# 检查固定标记
if any(marker in text for marker in _INTERNAL_TOOL_MARKERS):
logger.warning("[Brain] 拦截到工具原文泄露,降级为安全兜底回复")
return "我在帮你看记录,稍等哈"
# 检查历史记录泄露模式AI 转述历史内容)
for pattern in _HISTORY_LEAK_PATTERNS:
if re.search(pattern, text):
logger.warning(f"[Brain] 检测到历史记录泄露模式: {pattern[:30]}...")
return "我在帮你看记录,稍等哈"
return text.strip()
def _normalize_text(text: Any) -> str:
return str(text or "").strip().lower()
def _infer_image_intent(current_text: str, history: Optional[List[dict]] = None) -> str:
text = _normalize_text(current_text)
recent_user_text = "\n".join(
_normalize_text(h.get("content", ""))
for h in (history or [])[-6:]
if h.get("role") == "user"
)
combined = f"{recent_user_text}\n{text}"
if any(k in combined for k in _REPAIR_INTENT_KEYWORDS):
return "repair"
if any(k in combined for k in _FIND_ORIGINAL_INTENT_KEYWORDS):
return "find_original"
if any(k in text for k in _FIND_ORIGINAL_QUESTION_KEYWORDS):
return "find_original"
return ""
def _history_has_customer_image(history: Optional[List[dict]] = None) -> bool:
for item in history or []:
if item.get("role") != "user":
continue
msg_type = int(item.get("msg_type") or 0)
image_urls = item.get("image_urls") or []
if isinstance(image_urls, str):
image_urls = [part for part in image_urls.splitlines() if part.strip()]
content = str(item.get("content") or "")
if msg_type == 1 or image_urls or ("已收到" in content and "" in content):
return True
return False
def _customer_claims_image_already_sent(current_text: str, history: Optional[List[dict]] = None) -> bool:
text = _normalize_text(current_text)
if not text or not _history_has_customer_image(history):
return False
return any(keyword in text for keyword in _IMAGE_ALREADY_SENT_HINT_KEYWORDS)
def _requests_payment_link(current_text: str, history: Optional[List[dict]] = None) -> bool:
text = _normalize_text(current_text)
if not text:
return False
if any(keyword in text for keyword in _PAYMENT_LINK_REQUEST_KEYWORDS):
return True
return ("付款" in text or "支付" in text or "拍单" in text or "下单" in text) and "链接" in text
class CustomerServiceBrain:
"""
重构后的单一 Agent 大脑:
【全能终极版】统一称呼为“设计师”,支持下线安抚。
"""
def __init__(self, model_name: str = None):
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("OPENAI_BASE_URL")
self.model_name = model_name or os.getenv("OPENAI_MODEL", "gpt-4o-mini")
model = OpenAIChatModel(
model_name=self.model_name,
provider=OpenAIProvider(api_key=self.api_key, base_url=self.base_url)
)
exclude_names = os.getenv("SKILL_EXCLUDE_FROM_PROMPT", "pricing-skill")
excluded_skills = [s.strip().lower() for s in exclude_names.split(",") if s.strip()]
all_skills = skill_manager.get_all_skills_text(exclude=excluded_skills)
logger.info(f"[SkillManager] 已从提示词排除技能: {excluded_skills}")
# --- 统一口径后的 System Prompt ---
system_prompt = (
"你是一位专注【高清修复】和【找原图】的专业店主。性格干脆,说话自然、专业。\n\n"
"【统一称呼规范 - 第一人称原则】\n"
"1. 你就是店主本人,未转接设计师之前,所有回复必须用第一人称:'''我这边'\n"
"2. 例如:客户问进度 → '我在看哈,稍等';客户催 → '我帮你催下哈'\n"
"3. 只有在需要转接时才提'设计师''我叫设计师来看下哈'\n"
"4. 严禁使用'师傅''客服''专员'等词汇。\n\n"
"【★★★ 历史记录查询 - 最高优先级 ★★★】\n"
"你有一个 lookup_chat_history_tool 工具,可以查询客户的完整历史聊天记录。\n"
"以下情况你【必须】先调用此工具查历史,再回复:\n"
"1. 客户说'之前聊过''上次''你看聊天记录''我发过了''前面发了'\n"
"2. 客户追问进度:'做好了吗''多久能好''怎么样了'\n"
"3. 客户表达不满或困惑:'''你瞎么''搞笑''说过了'\n"
"4. 【近期对话回顾】中显示客户之前已发过图或说过需求\n"
"查到历史后,根据历史内容回复,绝对不要再重复问客户已经回答过的问题!\n\n"
"【订单查询工具】\n"
"你有一个 lookup_customer_orders_tool 工具,可以查询客户的订单记录。\n"
"以下情况你【必须】调用此工具:\n"
"1. 客户问'我付款了''订单怎么样了''发货了吗'\n"
"2. 客户提到订单号\n"
"3. 你需要确认客户是否已付款再决定如何回复\n"
"查到订单后,根据订单状态回复(已付款→'收到,马上安排';已发货→'已经发了哈')。\n\n"
"【核心逻辑】\n"
"1. 业务:只聊高清修复和找原图。核心链路:引导发图 -> 问需求 -> 找设计师。\n"
"2. **主动引导**:只有当客户【从未发过图】且没有历史图片记录时,才引导发图。\n"
"2.1 **消息延迟安抚**:如果客户说'上面不是发了吗''我发过了''你没看到吗',说明他在提醒你图早就发过了。\n"
" 这时先道歉,类似'不好意思哈,刚刚消息慢了点',再承接后续;严禁让客户重发图片。\n"
"3. **非业务问题**:如果客户问招聘、合作、闲聊等与做图无关的话题,礼貌拒绝。\n"
"4. **客户说没有参考图**:直接转人工:'好的,我这就叫设计师帮您找哈'\n"
"5. **客户问尺寸/能否打印/退款**:直接转人工:'这个设计师帮您看下哈'\n"
"6. **付款链接特判**:客户明确说'发付款链接''支付链接''拍单链接''下单链接'时,视为强成交信号,必须立即调用转人工工具;严禁只回复'直接下单'\n"
"7. **转接时机(严格两步)**:除付款链接特判外,必须同时满足【有图】+【客户明确或可直接判断的需求】才能转接。\n"
" 客户只发了图但没说需求 → 先问'亲亲这张是找原图还是修复哈?'\n"
" 客户说了'有吗''能找吗''找图''找原图''有大图吗' → 直接按【找原图】意图处理,不要重复追问。\n"
" 客户说了'修复''高清''清晰点''放大清晰' → 直接按【高清修复】意图处理,不要重复追问。\n"
"8. **下线安抚**只有工具返回ERROR时才能提设计师不在。根据错误码区分\n"
" - ERROR_DESIGNER_NOT_STARTED → 说'还没上班,记下了上班马上处理'(严禁说下班)\n"
" - ERROR_DESIGNER_OFFLINE → 说'下班了,需求记下明天回'\n"
" - ERROR_DESIGNER_BUSY → 说'稍等,我帮你联系下'(严禁说下班)\n"
"9. 正在转接中:如果系统提示已在转接,回:'已经在帮你催了哈,稍等下!'\n"
"10. **每次转接必须调用工具**:不要猜测,每次都重新调用。\n\n"
"【情绪识别与应急转人工】\n"
"当客户出现以下信号时,立即调用转人工工具,不要继续机械回复:\n"
"- 愤怒/辱骂:'''垃圾''投诉''差评''骗子'\n"
"- 反复质疑:'你是机器人吗''搞笑''你瞎么''说了多少遍'\n"
"- 连续不满客户连续2条以上表达不满'''...'、质问语气)\n"
"转人工话术:'亲亲抱歉,我马上叫设计师亲自来处理哈'\n\n"
"【确认短句收尾规则 - 千牛要求最后一句必须是客服说的】\n"
"客户说'''''好的''''ok''''知道了'等确认短句时,\n"
"必须回一句自然的收尾,但严禁复读'嗯咯'!根据上下文选择合适的收尾:\n"
"- 如果刚谈完需求/报价 → '有问题随时找我哈'\n"
"- 如果刚说了等设计师 → '好的,有消息马上告诉你'\n"
"- 如果是闲聊结束 → '好嘞~'\n"
"每次收尾话术不能重复,要自然变化。\n\n"
"【必杀令 - 严格遵守】\n"
"1. 每句回复严禁超过15个字语气淘宝亲切风多用''''\n"
"2. 严禁报价,严禁复读图片已收到的情况。\n"
"3. 必须原样输出工具返回的'正在为您转接|'指令。\n"
"4. **严禁**说'在呢铁子'!只能说'在呢''在呢亲'\n"
"5. **严禁**连续两次回复相同或相似内容!回顾你最近说过的话,换一种说法。\n"
"6. **严禁**输出任何代码、标记、括号等乱码!只输出自然语言。\n"
"7. **严禁**自己臆造'下班'只有工具返回ERROR才能说下班。\n"
"8. **严禁**在客户已发过图的情况下还说'先发图来看看'!先查历史确认。\n\n"
f"业务参考:\n{all_skills}"
)
self.agent = Agent(model=model, system_prompt=system_prompt)
register_agent_tools(self.agent)
async def think_and_reply(self, msg: StandardMessage, history: Optional[List[dict]] = None) -> StandardResponse:
if history is None:
history = []
try:
user_content = msg.content or ""
if _requests_payment_link(user_content, history):
user_content = (
"【系统通知:客户正在明确索要付款/支付链接,这是强成交信号。"
"不要只回复'直接下单''平台拍单',必须立即调用转人工工具转接设计师跟进付款。】\n"
f"{user_content}"
)
# 客户已发图:告知 AI 图已收到,引导问需求,但不要直接转接
has_image_message = bool(msg.image_urls) or msg.msg_type == 1
if not has_image_message and _customer_claims_image_already_sent(user_content, history):
inferred_intent = _infer_image_intent(user_content, history)
if inferred_intent == "find_original":
next_step = "客户当前更像是在问找原图,别再问他有没有发图。"
elif inferred_intent == "repair":
next_step = "客户当前更像是在问高清修复,别再问他有没有发图。"
else:
next_step = "如果客户需求还不明确,只问这是找原图还是修复,不要让客户重发。"
user_content = (
"【系统通知:客户是在提醒你他上面已经发过图片了,可能刚刚网络或消息同步有点慢。"
"回复时先简短道歉,表示现在已经看到图了,再继续正常承接。"
f"{next_step}\n{user_content}"
)
if has_image_message:
image_count = max(len(msg.image_urls), 1)
if user_content.startswith("【系统:已收到图片消息"):
user_content = ""
inferred_intent = _infer_image_intent(user_content, history)
if inferred_intent == "find_original":
logger.info(f"[Brain] 已根据客户表述推断为找原图意图: user={msg.user_id}")
user_content = (
f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。"
f"系统判断客户当前意图是【找原图】;像'有吗''能找吗''找图'都算找原图意图。"
f"不要再追问'找原图还是高清修复',直接按找原图流程继续;如果信息足够就直接转接。】\n{user_content}"
)
elif inferred_intent == "repair":
logger.info(f"[Brain] 已根据客户表述推断为高清修复意图: user={msg.user_id}")
user_content = (
f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。"
f"系统判断客户当前意图是【高清修复】;像'修复''高清''清晰点'都算修复意图。"
f"不要再追问'找原图还是高清修复',直接按高清修复流程继续;如果信息足够就直接转接。】\n{user_content}"
)
else:
user_content = (
f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。"
f"你现在必须先问客户:这张是找原图还是高清修复?有什么具体要求?"
f"等客户明确回答后才能转接,严禁跳过问需求直接转接!】\n{user_content}"
)
recent_context = ""
if history:
lines = []
for h in history[-6:]:
role = "客户" if h.get("role") == "user" else ""
content = h.get("content", "")
lines.append(f"[{_fmt_time(h.get('timestamp'))}] {role}{content}")
recent_context = "【近期对话回顾】\n" + "\n".join(lines) + "\n----------------\n"
full_input = f"【当前客户ID{msg.user_id}\n{recent_context}现在的对话:{user_content}"
start_time = time.time()
# ===== 详细日志:发给 AI 的提示词 =====
logger.info(f"[AI提示词] user={msg.user_id} acc={msg.acc_id} images={len(msg.image_urls)}\n{full_input}")
if history:
history_preview = "\n".join([f" {h.get('role','?')}: {str(h.get('content',''))[:50]}" for h in history[-4:]])
logger.info(f"[AI历史上下文] 共{len(history)}条:\n{history_preview}")
# 尝试运行 AI捕获转接成功异常以提前终止
try:
result = await self.agent.run(full_input, message_history=history)
except TransferSuccessException as e:
# 转接工具成功后立即返回,无需等待 AI 继续生成
elapsed = time.time() - start_time
logger.info(f"[Brain] 转接成功(提前终止,耗时{elapsed:.1f}s: {e.transfer_cmd[:60]}")
return StandardResponse(
reply_content=e.transfer_cmd,
need_transfer=True,
metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type}
)
elapsed = time.time() - start_time
logger.info(f"[Brain] AI处理完成总耗时{elapsed:.1f}s")
# ===== 详细日志AI 的思考过程和工具调用 =====
pending_transfer_reason = ""
pending_transfer_error = ""
try:
all_msgs = result.all_messages()
for idx, m in enumerate(all_msgs):
msg_kind = getattr(m, 'kind', type(m).__name__)
if hasattr(m, 'parts'):
for part in m.parts:
part_kind = getattr(part, 'part_kind', '')
if part_kind == 'tool-call':
tool_name = getattr(part, 'tool_name', '?')
tool_args = getattr(part, 'args', {})
logger.info(f"[AI思考] 步骤{idx+1} 调用工具: {tool_name}({tool_args})")
if tool_name == "transfer_to_human_tool":
if isinstance(tool_args, str):
try:
tool_args = json.loads(tool_args)
except Exception:
tool_args = {"reason": tool_args}
if isinstance(tool_args, dict):
pending_transfer_reason = str(tool_args.get("reason") or "").strip()
elif part_kind == 'tool-return':
content = str(getattr(part, 'content', ''))[:200]
logger.info(f"[AI思考] 步骤{idx+1} 工具返回: {content}")
full_content = str(getattr(part, 'content', ''))
if full_content.startswith("ERROR_DESIGNER_"):
pending_transfer_error = full_content
elif part_kind == 'text':
content = str(getattr(part, 'content', ''))[:150]
if content.strip():
logger.info(f"[AI思考] 步骤{idx+1} 文本输出: {content}")
except Exception as log_err:
logger.debug(f"[AI思考日志] 解析失败: {log_err}")
# --- 转接指令:直接从工具返回截获,不经过 AI 二次加工 ---
transfer_cmd = ""
for m in result.all_messages():
if hasattr(m, 'parts'):
for part in m.parts:
if getattr(part, 'part_kind', '') == 'tool-return':
content = str(getattr(part, 'content', ''))
if "[转移会话]" in content:
transfer_cmd = content
if transfer_cmd:
logger.info(f"[Brain] 工具返回转接指令直接发送跳过AI加工: {transfer_cmd[:60]}")
return StandardResponse(
reply_content=transfer_cmd,
need_transfer=True,
metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type}
)
# --- 非转接场景:取 AI 的正常回复 ---
reply_text = ""
raw_output = getattr(result, 'output', None) or getattr(result, 'data', None)
if isinstance(raw_output, str):
reply_text = raw_output
# 清理模型泄露的内部标记/工具原文
reply_text = _sanitize_reply_text(reply_text)
# 过滤"在呢铁子"
if "在呢铁子" in reply_text:
reply_text = reply_text.replace("在呢铁子", "在呢亲")
if not reply_text:
reply_text = "稍等我看看。"
logger.info(f"[THINK/RAW_OUTPUT] user={msg.user_id}\n{_clip(reply_text)}")
need_transfer = "[转移会话]" in reply_text
return StandardResponse(
reply_content=reply_text,
need_transfer=need_transfer,
metadata={
"acc_id": msg.acc_id,
"acc_type": msg.acc_type,
"pending_transfer": bool(pending_transfer_error and pending_transfer_reason),
"pending_transfer_reason": pending_transfer_reason,
"pending_transfer_error": pending_transfer_error,
}
)
except Exception as e:
logger.error(f"[Brain Error]: {e}")
return StandardResponse(reply_content="好哒,我在看图,稍等回你哈。", metadata={"acc_id": msg.acc_id})