import os import re import hashlib import logging import time import json from typing import List, Optional, Any, Dict from pydantic_ai import Agent, RunContext from pydantic_ai.models.openai import OpenAIChatModel from pydantic_ai.providers.openai import OpenAIProvider from core.schema import StandardMessage, StandardResponse from core.agent_tools import register_agent_tools, TransferSuccessException logger = logging.getLogger("cs_agent") # 日志详细程度:设置环境变量 AI_LOG_LEVEL=debug 可获得完整日志 _LOG_FULL_PROMPT = os.getenv("AI_LOG_LEVEL", "").lower() == "debug" _LOG_CLIP_LIMIT = int(os.getenv("AI_LOG_CLIP", "2000")) # 日志截断长度 from core.skill_manager import skill_manager _INTERNAL_TOOL_MARKERS = ( "【历史记录摘要】", "【详细记录】", "【订单摘要】", "【订单详情】", ) _TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$") # 历史记录格式检测模式(AI 转述历史时容易泄露) _HISTORY_LEAK_PATTERNS = [ r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[::]', # [2026-03-07 12:00:00] 客户: r'\[\d{2}:\d{2}:\d{2}\]\s*(客户|客服|我)[::]', # [12:00:00] 客户: r'(根据|查看|查询|翻看)(历史|聊天|对话)(记录|内容)', # 根据历史记录 r'历史(记录|对话|消息)(显示|表明|中)', # 历史记录显示 r'之前的(聊天|对话|记录)(中|里|显示)', # 之前的聊天中 r'共\d+条(历史|对话)?消息', # 共30条历史消息 r'订单号[::]\s*\d{10,}', # 订单号:xxxxxxxxxx r'(状态|金额|数量)[::].*(状态|金额|数量)[::]', # 状态:xxx 金额:xxx 连续出现 ] _FIND_ORIGINAL_INTENT_KEYWORDS = ( "找图", "找原图", "原图", "素材", "大图", "源图", ) _FIND_ORIGINAL_QUESTION_KEYWORDS = ( "有吗", "有没", "有没有", "能找吗", "找得到吗", "能不能找到", "能找到吗", ) _REPAIR_INTENT_KEYWORDS = ( "修复", "高清修复", "高清", "清晰", "清楚", "变清晰", "修清楚", "放大清晰", ) _IMAGE_ALREADY_SENT_HINT_KEYWORDS = ( "上面不是发了吗", "上面不是有吗", "我不是发了吗", "前面不是发了吗", "前面发了", "上面发了", "我发过了", "不是发了吗", "都发了", "你没看到吗", "聊天记录里有", "上面有图", ) _PAYMENT_LINK_REQUEST_KEYWORDS = ( "付款链接", "支付链接", "拍单链接", "下单链接", "付款吧", "发我链接", "发个链接", "发下链接", "发一下链接", "给我链接", ) def _clip(text: str, limit: int = 1200) -> str: if text is None: return "" text = str(text) if len(text) <= limit: return text return f"{text[:limit]}...(截断, 共{len(text)}字)" def _fmt_time(ts: Any) -> str: s = str(ts or "").strip() if not s: return "--:--:--" if " " in s: return s.split(" ", 1)[1] return s def _sanitize_reply_text(reply_text: str) -> str: if not reply_text: return "" text = str(reply_text) text = re.sub(r'\[\]<\|[^|]+\|>', '', text) text = re.sub(r'<\|[^|]*\|>', '', text) text = re.sub(r'\[Function[^\]]*\]', '', text) text = re.sub(r'\[/?Tool[^\]]*\]', '', text) text = re.sub(r']*>', '', text, flags=re.IGNORECASE) text = re.sub(r']*>.*?]*>', '', text, flags=re.DOTALL) text = re.sub(r']*>.*', '', text, flags=re.DOTALL) text = re.sub(r']*>', '', text) text = re.sub(r'```[^`]*```', '', text, flags=re.DOTALL) text = re.sub(r'AgentRunResult\([^)]*\)', '', text) text = re.sub(r'\[/?[A-Z][a-zA-Z]*(?:Call|End|Start|Result|Return)[^\]]*\]', '', text) text = re.sub(r'^\s*\[\s*\{.*$', '', text, flags=re.DOTALL) text = re.sub(r'^\s*[\[{].*"name"\s*:.*$', '', text, flags=re.DOTALL) text = re.sub(r'[\[\]]{2,}', '', text) text = text.strip() if _TRANSFER_COMMAND_RE.fullmatch(text): return text if "[转移会话]" in text: logger.warning("[Brain] 拦截到混入正文的转接指令,降级为安全兜底回复") return "我在帮你看记录,稍等哈" # 检查固定标记 if any(marker in text for marker in _INTERNAL_TOOL_MARKERS): logger.warning("[Brain] 拦截到工具原文泄露,降级为安全兜底回复") return "我在帮你看记录,稍等哈" # 检查历史记录泄露模式(AI 转述历史内容) for pattern in _HISTORY_LEAK_PATTERNS: if re.search(pattern, text): logger.warning(f"[Brain] 检测到历史记录泄露模式: {pattern[:30]}...") return "我在帮你看记录,稍等哈" return text.strip() def _normalize_text(text: Any) -> str: return str(text or "").strip().lower() def _infer_image_intent(current_text: str, history: Optional[List[dict]] = None) -> str: text = _normalize_text(current_text) recent_user_text = "\n".join( _normalize_text(h.get("content", "")) for h in (history or [])[-6:] if h.get("role") == "user" ) combined = f"{recent_user_text}\n{text}" if any(k in combined for k in _REPAIR_INTENT_KEYWORDS): return "repair" if any(k in combined for k in _FIND_ORIGINAL_INTENT_KEYWORDS): return "find_original" if any(k in text for k in _FIND_ORIGINAL_QUESTION_KEYWORDS): return "find_original" return "" def _history_has_customer_image(history: Optional[List[dict]] = None) -> bool: for item in history or []: if item.get("role") != "user": continue msg_type = int(item.get("msg_type") or 0) image_urls = item.get("image_urls") or [] if isinstance(image_urls, str): image_urls = [part for part in image_urls.splitlines() if part.strip()] content = str(item.get("content") or "") if msg_type == 1 or image_urls or ("已收到" in content and "图" in content): return True return False def _customer_claims_image_already_sent(current_text: str, history: Optional[List[dict]] = None) -> bool: text = _normalize_text(current_text) if not text or not _history_has_customer_image(history): return False return any(keyword in text for keyword in _IMAGE_ALREADY_SENT_HINT_KEYWORDS) def _requests_payment_link(current_text: str, history: Optional[List[dict]] = None) -> bool: text = _normalize_text(current_text) if not text: return False if any(keyword in text for keyword in _PAYMENT_LINK_REQUEST_KEYWORDS): return True return ("付款" in text or "支付" in text or "拍单" in text or "下单" in text) and "链接" in text class CustomerServiceBrain: """ 重构后的单一 Agent 大脑: 【全能终极版】统一称呼为“设计师”,支持下线安抚。 """ def __init__(self, model_name: str = None): self.api_key = os.getenv("OPENAI_API_KEY") self.base_url = os.getenv("OPENAI_BASE_URL") self.model_name = model_name or os.getenv("OPENAI_MODEL", "gpt-4o-mini") model = OpenAIChatModel( model_name=self.model_name, provider=OpenAIProvider(api_key=self.api_key, base_url=self.base_url) ) exclude_names = os.getenv("SKILL_EXCLUDE_FROM_PROMPT", "pricing-skill") excluded_skills = [s.strip().lower() for s in exclude_names.split(",") if s.strip()] all_skills = skill_manager.get_all_skills_text(exclude=excluded_skills) logger.info(f"[SkillManager] 已从提示词排除技能: {excluded_skills}") # --- 统一口径后的 System Prompt --- system_prompt = ( "你是一位专注【高清修复】和【找原图】的专业店主。性格干脆,说话自然、专业。\n\n" "【统一称呼规范 - 第一人称原则】\n" "1. 你就是店主本人,未转接设计师之前,所有回复必须用第一人称:'我'、'我这边'。\n" "2. 例如:客户问进度 → '我在看哈,稍等';客户催 → '我帮你催下哈'。\n" "3. 只有在需要转接时才提'设计师':'我叫设计师来看下哈'。\n" "4. 严禁使用'师傅'、'客服'、'专员'等词汇。\n\n" "【★★★ 历史记录查询 - 最高优先级 ★★★】\n" "你有一个 lookup_chat_history_tool 工具,可以查询客户的完整历史聊天记录。\n" "以下情况你【必须】先调用此工具查历史,再回复:\n" "1. 客户说'之前聊过'、'上次'、'你看聊天记录'、'我发过了'、'前面发了'等\n" "2. 客户追问进度:'做好了吗'、'多久能好'、'怎么样了'\n" "3. 客户表达不满或困惑:'?'、'你瞎么'、'搞笑'、'说过了'\n" "4. 【近期对话回顾】中显示客户之前已发过图或说过需求\n" "查到历史后,根据历史内容回复,绝对不要再重复问客户已经回答过的问题!\n\n" "【订单查询工具】\n" "你有一个 lookup_customer_orders_tool 工具,可以查询客户的订单记录。\n" "以下情况你【必须】调用此工具:\n" "1. 客户问'我付款了'、'订单怎么样了'、'发货了吗'\n" "2. 客户提到订单号\n" "3. 你需要确认客户是否已付款再决定如何回复\n" "查到订单后,根据订单状态回复(已付款→'收到,马上安排';已发货→'已经发了哈')。\n\n" "【核心逻辑】\n" "1. 业务:只聊高清修复和找原图。核心链路:引导发图 -> 问需求 -> 找设计师。\n" "2. **主动引导**:只有当客户【从未发过图】且没有历史图片记录时,才引导发图。\n" "2.1 **消息延迟安抚**:如果客户说'上面不是发了吗'、'我发过了'、'你没看到吗',说明他在提醒你图早就发过了。\n" " 这时先道歉,类似'不好意思哈,刚刚消息慢了点',再承接后续;严禁让客户重发图片。\n" "3. **非业务问题**:如果客户问招聘、合作、闲聊等与做图无关的话题,礼貌拒绝。\n" "4. **客户说没有参考图**:直接转人工:'好的,我这就叫设计师帮您找哈'。\n" "5. **客户问尺寸/能否打印/退款**:直接转人工:'这个设计师帮您看下哈'。\n" "6. **付款链接特判**:客户明确说'发付款链接'、'支付链接'、'拍单链接'、'下单链接'时,视为强成交信号,必须立即调用转人工工具;严禁只回复'直接下单'。\n" "7. **转接时机(严格两步)**:除付款链接特判外,必须同时满足【有图】+【客户明确或可直接判断的需求】才能转接。\n" " 客户只发了图但没说需求 → 先问'亲亲这张是找原图还是修复哈?'\n" " 客户说了'有吗'、'能找吗'、'找图'、'找原图'、'有大图吗' → 直接按【找原图】意图处理,不要重复追问。\n" " 客户说了'修复'、'高清'、'清晰点'、'放大清晰' → 直接按【高清修复】意图处理,不要重复追问。\n" "8. **下线安抚**:只有工具返回ERROR时才能提设计师不在。根据错误码区分:\n" " - ERROR_DESIGNER_NOT_STARTED → 说'还没上班,记下了上班马上处理'(严禁说下班)\n" " - ERROR_DESIGNER_OFFLINE → 说'下班了,需求记下明天回'\n" " - ERROR_DESIGNER_BUSY → 说'稍等,我帮你联系下'(严禁说下班)\n" "9. 正在转接中:如果系统提示已在转接,回:'已经在帮你催了哈,稍等下!'。\n" "10. **每次转接必须调用工具**:不要猜测,每次都重新调用。\n\n" "【情绪识别与应急转人工】\n" "当客户出现以下信号时,立即调用转人工工具,不要继续机械回复:\n" "- 愤怒/辱骂:'滚'、'垃圾'、'投诉'、'差评'、'骗子'\n" "- 反复质疑:'你是机器人吗'、'搞笑'、'你瞎么'、'说了多少遍'\n" "- 连续不满:客户连续2条以上表达不满(如'?'、'...'、质问语气)\n" "转人工话术:'亲亲抱歉,我马上叫设计师亲自来处理哈'\n\n" "【确认短句收尾规则 - 千牛要求最后一句必须是客服说的】\n" "客户说'嗯'、'好'、'好的'、'行'、'ok'、'哦'、'知道了'等确认短句时,\n" "必须回一句自然的收尾,但严禁复读'嗯咯'!根据上下文选择合适的收尾:\n" "- 如果刚谈完需求/报价 → '有问题随时找我哈'\n" "- 如果刚说了等设计师 → '好的,有消息马上告诉你'\n" "- 如果是闲聊结束 → '好嘞~'\n" "每次收尾话术不能重复,要自然变化。\n\n" "【必杀令 - 严格遵守】\n" "1. 每句回复严禁超过15个字!语气淘宝亲切风,多用'哈'、'呢'。\n" "2. 严禁报价,严禁复读图片已收到的情况。\n" "3. 必须原样输出工具返回的'正在为您转接|'指令。\n" "4. **严禁**说'在呢铁子'!只能说'在呢'或'在呢亲'。\n" "5. **严禁**连续两次回复相同或相似内容!回顾你最近说过的话,换一种说法。\n" "6. **严禁**输出任何代码、标记、括号等乱码!只输出自然语言。\n" "7. **严禁**自己臆造'下班'!只有工具返回ERROR才能说下班。\n" "8. **严禁**在客户已发过图的情况下还说'先发图来看看'!先查历史确认。\n\n" f"业务参考:\n{all_skills}" ) self.agent = Agent(model=model, system_prompt=system_prompt) register_agent_tools(self.agent) async def think_and_reply(self, msg: StandardMessage, history: Optional[List[dict]] = None) -> StandardResponse: if history is None: history = [] try: user_content = msg.content or "" if _requests_payment_link(user_content, history): user_content = ( "【系统通知:客户正在明确索要付款/支付链接,这是强成交信号。" "不要只回复'直接下单'或'平台拍单',必须立即调用转人工工具转接设计师跟进付款。】\n" f"{user_content}" ) # 客户已发图:告知 AI 图已收到,引导问需求,但不要直接转接 has_image_message = bool(msg.image_urls) or msg.msg_type == 1 if not has_image_message and _customer_claims_image_already_sent(user_content, history): inferred_intent = _infer_image_intent(user_content, history) if inferred_intent == "find_original": next_step = "客户当前更像是在问找原图,别再问他有没有发图。" elif inferred_intent == "repair": next_step = "客户当前更像是在问高清修复,别再问他有没有发图。" else: next_step = "如果客户需求还不明确,只问这是找原图还是修复,不要让客户重发。" user_content = ( "【系统通知:客户是在提醒你他上面已经发过图片了,可能刚刚网络或消息同步有点慢。" "回复时先简短道歉,表示现在已经看到图了,再继续正常承接。" f"{next_step}】\n{user_content}" ) if has_image_message: image_count = max(len(msg.image_urls), 1) if user_content.startswith("【系统:已收到图片消息"): user_content = "" inferred_intent = _infer_image_intent(user_content, history) if inferred_intent == "find_original": logger.info(f"[Brain] 已根据客户表述推断为找原图意图: user={msg.user_id}") user_content = ( f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。" f"系统判断客户当前意图是【找原图】;像'有吗'、'能找吗'、'找图'都算找原图意图。" f"不要再追问'找原图还是高清修复',直接按找原图流程继续;如果信息足够就直接转接。】\n{user_content}" ) elif inferred_intent == "repair": logger.info(f"[Brain] 已根据客户表述推断为高清修复意图: user={msg.user_id}") user_content = ( f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。" f"系统判断客户当前意图是【高清修复】;像'修复'、'高清'、'清晰点'都算修复意图。" f"不要再追问'找原图还是高清修复',直接按高清修复流程继续;如果信息足够就直接转接。】\n{user_content}" ) else: user_content = ( f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。" f"你现在必须先问客户:这张是找原图还是高清修复?有什么具体要求?" f"等客户明确回答后才能转接,严禁跳过问需求直接转接!】\n{user_content}" ) recent_context = "" if history: lines = [] for h in history[-6:]: role = "客户" if h.get("role") == "user" else "我" content = h.get("content", "") lines.append(f"[{_fmt_time(h.get('timestamp'))}] {role}:{content}") recent_context = "【近期对话回顾】\n" + "\n".join(lines) + "\n----------------\n" full_input = f"【当前客户ID:{msg.user_id}】\n{recent_context}现在的对话:{user_content}" start_time = time.time() # ===== 详细日志:发给 AI 的提示词 ===== logger.info(f"[AI提示词] user={msg.user_id} acc={msg.acc_id} images={len(msg.image_urls)}\n{full_input}") if history: history_preview = "\n".join([f" {h.get('role','?')}: {str(h.get('content',''))[:50]}" for h in history[-4:]]) logger.info(f"[AI历史上下文] 共{len(history)}条:\n{history_preview}") # 尝试运行 AI,捕获转接成功异常以提前终止 try: result = await self.agent.run(full_input, message_history=history) except TransferSuccessException as e: # 转接工具成功后立即返回,无需等待 AI 继续生成 elapsed = time.time() - start_time logger.info(f"[Brain] 转接成功(提前终止,耗时{elapsed:.1f}s): {e.transfer_cmd[:60]}") return StandardResponse( reply_content=e.transfer_cmd, need_transfer=True, metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type} ) elapsed = time.time() - start_time logger.info(f"[Brain] AI处理完成,总耗时{elapsed:.1f}s") # ===== 详细日志:AI 的思考过程和工具调用 ===== pending_transfer_reason = "" pending_transfer_error = "" try: all_msgs = result.all_messages() for idx, m in enumerate(all_msgs): msg_kind = getattr(m, 'kind', type(m).__name__) if hasattr(m, 'parts'): for part in m.parts: part_kind = getattr(part, 'part_kind', '') if part_kind == 'tool-call': tool_name = getattr(part, 'tool_name', '?') tool_args = getattr(part, 'args', {}) logger.info(f"[AI思考] 步骤{idx+1} 调用工具: {tool_name}({tool_args})") if tool_name == "transfer_to_human_tool": if isinstance(tool_args, str): try: tool_args = json.loads(tool_args) except Exception: tool_args = {"reason": tool_args} if isinstance(tool_args, dict): pending_transfer_reason = str(tool_args.get("reason") or "").strip() elif part_kind == 'tool-return': content = str(getattr(part, 'content', ''))[:200] logger.info(f"[AI思考] 步骤{idx+1} 工具返回: {content}") full_content = str(getattr(part, 'content', '')) if full_content.startswith("ERROR_DESIGNER_"): pending_transfer_error = full_content elif part_kind == 'text': content = str(getattr(part, 'content', ''))[:150] if content.strip(): logger.info(f"[AI思考] 步骤{idx+1} 文本输出: {content}") except Exception as log_err: logger.debug(f"[AI思考日志] 解析失败: {log_err}") # --- 转接指令:直接从工具返回截获,不经过 AI 二次加工 --- transfer_cmd = "" for m in result.all_messages(): if hasattr(m, 'parts'): for part in m.parts: if getattr(part, 'part_kind', '') == 'tool-return': content = str(getattr(part, 'content', '')) if "[转移会话]" in content: transfer_cmd = content if transfer_cmd: logger.info(f"[Brain] 工具返回转接指令,直接发送(跳过AI加工): {transfer_cmd[:60]}") return StandardResponse( reply_content=transfer_cmd, need_transfer=True, metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type} ) # --- 非转接场景:取 AI 的正常回复 --- reply_text = "" raw_output = getattr(result, 'output', None) or getattr(result, 'data', None) if isinstance(raw_output, str): reply_text = raw_output # 清理模型泄露的内部标记/工具原文 reply_text = _sanitize_reply_text(reply_text) # 过滤"在呢铁子" if "在呢铁子" in reply_text: reply_text = reply_text.replace("在呢铁子", "在呢亲") if not reply_text: reply_text = "稍等我看看。" logger.info(f"[THINK/RAW_OUTPUT] user={msg.user_id}\n{_clip(reply_text)}") need_transfer = "[转移会话]" in reply_text return StandardResponse( reply_content=reply_text, need_transfer=need_transfer, metadata={ "acc_id": msg.acc_id, "acc_type": msg.acc_type, "pending_transfer": bool(pending_transfer_error and pending_transfer_reason), "pending_transfer_reason": pending_transfer_reason, "pending_transfer_error": pending_transfer_error, } ) except Exception as e: logger.error(f"[Brain Error]: {e}") return StandardResponse(reply_content="好哒,我在看图,稍等回你哈。", metadata={"acc_id": msg.acc_id})