Files
tw/core/pydantic_ai_agent_v2.py

244 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import hashlib
import logging
from typing import List, Optional, Any, Dict
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
from core.schema import StandardMessage, StandardResponse
from core.agent_tools import register_agent_tools
logger = logging.getLogger("cs_agent")
from core.skill_manager import skill_manager
_INTERNAL_TOOL_MARKERS = (
"【历史记录摘要】",
"【详细记录】",
"【订单摘要】",
"【订单详情】",
)
def _clip(text: str, limit: int = 1200) -> str:
if text is None:
return ""
text = str(text)
if len(text) <= limit:
return text
return f"{text[:limit]}...(截断, 共{len(text)}字)"
def _fmt_time(ts: Any) -> str:
s = str(ts or "").strip()
if not s:
return "--:--:--"
if " " in s:
return s.split(" ", 1)[1]
return s
def _sanitize_reply_text(reply_text: str) -> str:
if not reply_text:
return ""
text = str(reply_text)
text = re.sub(r'\[\]<\|[^|]+\|>', '', text)
text = re.sub(r'<\|[^|]*\|>', '', text)
text = re.sub(r'\[Function[^\]]*\]', '', text)
text = re.sub(r'\[/?Tool[^\]]*\]', '', text)
text = re.sub(r'</?tool[_\-]?[^>]*>', '', text, flags=re.IGNORECASE)
text = re.sub(r'<think[_a-zA-Z0-9]*[^>]*>.*?</think[_a-zA-Z0-9]*[^>]*>', '', text, flags=re.DOTALL)
text = re.sub(r'<think[_a-zA-Z0-9]*[^>]*>.*', '', text, flags=re.DOTALL)
text = re.sub(r'</?think[_a-zA-Z0-9]*[^>]*>', '', text)
text = re.sub(r'```[^`]*```', '', text, flags=re.DOTALL)
text = re.sub(r'AgentRunResult\([^)]*\)', '', text)
text = re.sub(r'\[/?[A-Z][a-zA-Z]*(?:Call|End|Start|Result|Return)[^\]]*\]', '', text)
text = re.sub(r'^\s*\[\s*\{.*$', '', text, flags=re.DOTALL)
text = re.sub(r'^\s*[\[{].*"name"\s*:.*$', '', text, flags=re.DOTALL)
text = re.sub(r'[\[\]]{2,}', '', text)
text = text.strip()
if any(marker in text for marker in _INTERNAL_TOOL_MARKERS):
logger.warning("[Brain] 拦截到工具原文泄露,降级为安全兜底回复")
return "我在帮你看记录,稍等哈"
return text.strip()
class CustomerServiceBrain:
"""
重构后的单一 Agent 大脑:
【全能终极版】统一称呼为“设计师”,支持下线安抚。
"""
def __init__(self, model_name: str = None):
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("OPENAI_BASE_URL")
self.model_name = model_name or os.getenv("OPENAI_MODEL", "gpt-4o-mini")
model = OpenAIChatModel(
model_name=self.model_name,
provider=OpenAIProvider(api_key=self.api_key, base_url=self.base_url)
)
exclude_names = os.getenv("SKILL_EXCLUDE_FROM_PROMPT", "pricing-skill")
excluded_skills = [s.strip().lower() for s in exclude_names.split(",") if s.strip()]
all_skills = skill_manager.get_all_skills_text(exclude=excluded_skills)
logger.info(f"[SkillManager] 已从提示词排除技能: {excluded_skills}")
# --- 统一口径后的 System Prompt ---
system_prompt = (
"你是一位专注【高清修复】和【找原图】的专业店主。性格干脆,说话自然、专业。\n\n"
"【统一称呼规范 - 第一人称原则】\n"
"1. 你就是店主本人,未转接设计师之前,所有回复必须用第一人称:'''我这边'\n"
"2. 例如:客户问进度 → '我在看哈,稍等';客户催 → '我帮你催下哈'\n"
"3. 只有在需要转接时才提'设计师''我叫设计师来看下哈'\n"
"4. 严禁使用'师傅''客服''专员'等词汇。\n\n"
"【★★★ 历史记录查询 - 最高优先级 ★★★】\n"
"你有一个 lookup_chat_history_tool 工具,可以查询客户的完整历史聊天记录。\n"
"以下情况你【必须】先调用此工具查历史,再回复:\n"
"1. 客户说'之前聊过''上次''你看聊天记录''我发过了''前面发了'\n"
"2. 客户追问进度:'做好了吗''多久能好''怎么样了'\n"
"3. 客户表达不满或困惑:'''你瞎么''搞笑''说过了'\n"
"4. 【近期对话回顾】中显示客户之前已发过图或说过需求\n"
"查到历史后,根据历史内容回复,绝对不要再重复问客户已经回答过的问题!\n\n"
"【订单查询工具】\n"
"你有一个 lookup_customer_orders_tool 工具,可以查询客户的订单记录。\n"
"以下情况你【必须】调用此工具:\n"
"1. 客户问'我付款了''订单怎么样了''发货了吗'\n"
"2. 客户提到订单号\n"
"3. 你需要确认客户是否已付款再决定如何回复\n"
"查到订单后,根据订单状态回复(已付款→'收到,马上安排';已发货→'已经发了哈')。\n\n"
"【核心逻辑】\n"
"1. 业务:只聊高清修复和找原图。核心链路:引导发图 -> 问需求 -> 找设计师。\n"
"2. **主动引导**:只有当客户【从未发过图】且没有历史图片记录时,才引导发图。\n"
"3. **非业务问题**:如果客户问招聘、合作、闲聊等与做图无关的话题,礼貌拒绝。\n"
"4. **客户说没有参考图**:直接转人工:'好的,我这就叫设计师帮您找哈'\n"
"5. **客户问尺寸/能否打印/退款**:直接转人工:'这个设计师帮您看下哈'\n"
"6. **转接时机(严格两步)**:必须同时满足【有图】+【客户明确说了要找原图/修复/具体要求】才能转接。\n"
" 客户只发了图但没说需求 → 必须先问'亲亲这张是找原图还是修复哈?'\n"
" 客户说了'有吗''能找吗' → 这不算明确需求,要追问'是要找原图还是高清修复呢?'\n"
"7. **下线安抚**只有工具返回ERROR时才能提设计师不在。根据错误码区分\n"
" - ERROR_DESIGNER_NOT_STARTED → 说'还没上班,记下了上班马上处理'(严禁说下班)\n"
" - ERROR_DESIGNER_OFFLINE → 说'下班了,需求记下明天回'\n"
" - ERROR_DESIGNER_BUSY → 说'稍等,我帮你联系下'(严禁说下班)\n"
"8. 正在转接中:如果系统提示已在转接,回:'已经在帮你催了哈,稍等下!'\n"
"9. **每次转接必须调用工具**:不要猜测,每次都重新调用。\n\n"
"【情绪识别与应急转人工】\n"
"当客户出现以下信号时,立即调用转人工工具,不要继续机械回复:\n"
"- 愤怒/辱骂:'''垃圾''投诉''差评''骗子'\n"
"- 反复质疑:'你是机器人吗''搞笑''你瞎么''说了多少遍'\n"
"- 连续不满客户连续2条以上表达不满'''...'、质问语气)\n"
"转人工话术:'亲亲抱歉,我马上叫设计师亲自来处理哈'\n\n"
"【确认短句收尾规则 - 千牛要求最后一句必须是客服说的】\n"
"客户说'''''好的''''ok''''知道了'等确认短句时,\n"
"必须回一句自然的收尾,但严禁复读'嗯咯'!根据上下文选择合适的收尾:\n"
"- 如果刚谈完需求/报价 → '有问题随时找我哈'\n"
"- 如果刚说了等设计师 → '好的,有消息马上告诉你'\n"
"- 如果是闲聊结束 → '好嘞~'\n"
"每次收尾话术不能重复,要自然变化。\n\n"
"【必杀令 - 严格遵守】\n"
"1. 每句回复严禁超过15个字语气淘宝亲切风多用''''\n"
"2. 严禁报价,严禁复读图片已收到的情况。\n"
"3. 必须原样输出工具返回的'正在为您转接|'指令。\n"
"4. **严禁**说'在呢铁子'!只能说'在呢''在呢亲'\n"
"5. **严禁**连续两次回复相同或相似内容!回顾你最近说过的话,换一种说法。\n"
"6. **严禁**输出任何代码、标记、括号等乱码!只输出自然语言。\n"
"7. **严禁**自己臆造'下班'只有工具返回ERROR才能说下班。\n"
"8. **严禁**在客户已发过图的情况下还说'先发图来看看'!先查历史确认。\n\n"
f"业务参考:\n{all_skills}"
)
self.agent = Agent(model=model, system_prompt=system_prompt)
register_agent_tools(self.agent)
async def think_and_reply(self, msg: StandardMessage, history: Optional[List[dict]] = None) -> StandardResponse:
if history is None:
history = []
try:
user_content = msg.content or ""
# 客户已发图:告知 AI 图已收到,引导问需求,但不要直接转接
if msg.image_urls:
user_content = (
f"【系统通知:客户已发送 {len(msg.image_urls)} 张图片,图已收到不要再让客户发图。"
f"你现在必须先问客户:这张是找原图还是高清修复?有什么具体要求?"
f"等客户明确回答后才能转接,严禁跳过问需求直接转接!】\n{user_content}"
)
recent_context = ""
if history:
lines = []
for h in history[-6:]:
role = "客户" if h.get("role") == "user" else ""
content = h.get("content", "")
lines.append(f"[{_fmt_time(h.get('timestamp'))}] {role}{content}")
recent_context = "【近期对话回顾】\n" + "\n".join(lines) + "\n----------------\n"
full_input = f"【当前客户ID{msg.user_id}\n{recent_context}现在的对话:{user_content}"
logger.info(
f"[PROMPT->AI] user={msg.user_id} acc={msg.acc_id} images={len(msg.image_urls)}\n"
f"{_clip(full_input)}"
)
result = await self.agent.run(full_input, message_history=history)
# --- 转接指令:直接从工具返回截获,不经过 AI 二次加工 ---
transfer_cmd = ""
for m in result.all_messages():
if hasattr(m, 'parts'):
for part in m.parts:
if getattr(part, 'part_kind', '') == 'tool-return':
content = str(getattr(part, 'content', ''))
if "[转移会话]" in content:
transfer_cmd = content
if transfer_cmd:
logger.info(f"[Brain] 工具返回转接指令直接发送跳过AI加工: {transfer_cmd[:60]}")
return StandardResponse(
reply_content=transfer_cmd,
need_transfer=True,
metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type}
)
# --- 非转接场景:取 AI 的正常回复 ---
reply_text = ""
raw_output = getattr(result, 'output', None) or getattr(result, 'data', None)
if isinstance(raw_output, str):
reply_text = raw_output
# 清理模型泄露的内部标记/工具原文
reply_text = _sanitize_reply_text(reply_text)
# 过滤"在呢铁子"
if "在呢铁子" in reply_text:
reply_text = reply_text.replace("在呢铁子", "在呢亲")
if not reply_text:
reply_text = "稍等我看看。"
logger.info(f"[THINK/RAW_OUTPUT] user={msg.user_id}\n{_clip(reply_text)}")
need_transfer = "[转移会话]" in reply_text
return StandardResponse(
reply_content=reply_text,
need_transfer=need_transfer,
metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type}
)
except Exception as e:
logger.error(f"[Brain Error]: {e}")
return StandardResponse(reply_content="好哒,我在看图,稍等回你哈。", metadata={"acc_id": msg.acc_id})