This commit is contained in:
2026-03-06 14:25:10 +08:00
parent afb2b78c15
commit f06bfb1fa0
6 changed files with 289 additions and 145 deletions

View File

@@ -1,39 +1,97 @@
import logging
import asyncio
from datetime import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from pydantic_ai import RunContext
from core.schema import StandardResponse
from services.dispatch_service import dispatch_service
from db.chat_log_db import get_conversation
logger = logging.getLogger("cs_agent")
async def transfer_to_human_tool(ctx: RunContext[Any], reason: str = Field(description="转人工的原因")) -> str:
"""
【核心工具】执行转人工逻辑。
获取设计师姓名并生成精准转接指令。
"""
logger.info(f"[Tool] 尝试呼叫设计师接手: {reason}")
# 1. 尝试派单获取设计师姓名
designer_name = await dispatch_service.assign_designer()
if designer_name:
# 2. 有设计师在线:生成标准转接指令 (必须包含 [转移会话] 且格式正确)
magic_cmd = f"正在为您转接|[转移会话],{designer_name},无原因"
logger.info(f"[Tool] 成功呼叫设计师: {designer_name}")
return magic_cmd
else:
# 3. 设计师下线:返回特定信号
logger.warning("[Tool] 派单失败:设计师们已下线或不在位")
return "ERROR_NO_DESIGNER_ONLINE"
hour = datetime.now().hour
logger.warning(f"[Tool] 派单失败:设计师们不在位 (当前{hour}点)")
if 0 <= hour < 9:
return "ERROR_DESIGNER_NOT_STARTED现在设计师还没上班你告诉客户需求记下了上班后第一时间处理。不要说下班。"
elif 22 <= hour or hour < 1:
return "ERROR_DESIGNER_OFFLINE设计师已下班你告诉客户需求记下了明天第一时间回复。"
else:
return "ERROR_DESIGNER_BUSY设计师暂时不在位你告诉客户稍等马上帮忙联系设计师。不要说下班。"
async def check_order_status_tool(ctx: RunContext[Any], customer_id: str = Field(description="客户ID")) -> str:
"""查询订单状态。"""
return "设计师正在后台加急处理中,稍等哈。"
return "我在帮你加急处理中,稍等哈。"
async def lookup_chat_history_tool(
ctx: RunContext[Any],
customer_id: str = Field(description="客户ID从当前对话上下文中获取"),
num_messages: int = Field(default=30, description="要查询的历史消息条数默认30条"),
) -> str:
"""
【历史记录查询工具】查询该客户的历史聊天记录。
使用场景:
- 客户说"之前聊过""上次""你看聊天记录""我发过图了"等暗示有历史对话时
- 客户第二次来访、追问进度、催单时
- 你不确定客户之前是否发过图或说过需求时
必须先调用此工具回顾历史,再回复客户,避免重复要求客户发图。
"""
logger.info(f"[Tool] 查询历史记录: customer_id={customer_id}, limit={num_messages}")
try:
rows = await asyncio.to_thread(get_conversation, customer_id, limit=num_messages)
if not rows:
return f"该客户({customer_id})暂无历史聊天记录。"
lines = []
has_images = False
customer_needs = []
for r in rows:
role = "客户" if r["direction"] == "in" else "客服"
ts = str(r.get("timestamp", ""))
msg = r.get("message", "")
line = f"[{ts}] {role}{msg}"
lines.append(line)
if r["direction"] == "in":
if "已收到" in msg and "" in msg:
has_images = True
if any(k in msg for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印"]):
customer_needs.append(msg[:60])
summary_parts = [f"{len(rows)}条历史消息。"]
if has_images:
summary_parts.append("⚠️ 客户之前已经发过图片!不要再让客户发图!")
if customer_needs:
summary_parts.append(f"客户曾表达的需求:{''.join(customer_needs[:3])}")
summary = " ".join(summary_parts)
history_text = "\n".join(lines[-30:])
return f"【历史记录摘要】{summary}\n\n【详细记录】\n{history_text}"
except Exception as e:
logger.error(f"[Tool] 查询历史记录失败: {e}")
return f"查询历史记录失败: {e}"
def register_agent_tools(agent: Any):
"""注册工具"""
agent.tool(transfer_to_human_tool)
agent.tool(check_order_status_tool)
logger.info("[Agent] 工具箱已更新:称呼统一为“设计师”。")
agent.tool(lookup_chat_history_tool)
logger.info("[Agent] 工具箱已更新:含转人工、订单查询、历史记录查询。")

View File

@@ -15,9 +15,18 @@ logger = logging.getLogger("cs_agent")
# 配置常量
MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量
TRANSFER_COOLDOWN_SEC = 60 # 转接冷却时间(秒)
TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI
DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒)
# 转接后安抚话术池(轮换使用,避免复读)
_TRANSFER_CALM_REPLIES = [
"我在帮你催了哈,稍等下",
"已经转了哈,马上就来",
"收到,设计师在赶来了哈",
"好的亲,稍等一下哈",
"在催了在催了,马上哈",
]
class SystemOrchestrator:
"""
全系统总编排:具备转接冷却、防抖合并、多消息去重、以及精准日志。
@@ -30,8 +39,9 @@ class SystemOrchestrator:
# 1. 消息 ID 去重
self._processed_msg_ids = deque(maxlen=MSG_DEDUP_CAPACITY)
# 2. 转接冷却存储 (customer_id -> last_transfer_time)
# 2. 转接冷却存储 (session_key -> last_transfer_time)
self._last_transfer_time: Dict[str, float] = {}
self._transfer_calm_idx: Dict[str, int] = {} # 安抚话术轮换索引
# 3. 防抖配置
self._debounce_seconds = DEBOUNCE_SECONDS
@@ -68,12 +78,13 @@ class SystemOrchestrator:
# 店铺隔离:同一客户在不同店铺的对话独立处理
session_key = f"{user_id}@{std_msg.acc_id}"
# 订单消息 / 纯金额通知:静默入库,不触发 AI 回复
# 订单消息 / 纯金额通知 / SKU信息:静默入库,不触发 AI 回复
msg_text = std_msg.content or ""
is_order = "[系统订单信息]" in msg_text
is_price_only = bool(re.match(r'^[\s\n]*金?额?[:]?\s*[\d.]+\s*元', msg_text.strip()))
is_sku_only = bool(re.match(r'^[\s\n]*(备注[:]|数量[:]|款式[:])', msg_text.strip()))
if is_order or is_price_only or is_sku_only:
is_sku_only = bool(re.match(r'^[\s\n]*(备注[:]|数量[:]|款式[:]|定制[:])', msg_text.strip()))
is_sku_amount = bool(re.match(r'^[\s\n]*金额[:]\s*[\d.]+元\s*●', msg_text.strip()))
if is_order or is_price_only or is_sku_only or is_sku_amount:
await self._handle_order_packet(platform, std_msg)
logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态")
await repo.save_chat(platform, user_id, msg_text, "in", acc_id=std_msg.acc_id)
@@ -222,22 +233,25 @@ class SystemOrchestrator:
if all_image_urls:
asyncio.create_task(self._analyze_images_background(session_key, all_image_urls))
# C. 冷却检查:如果转接冷却期内发过转接,告诉大脑"已处于转接中"
is_in_cooldown = (time.time() - self._last_transfer_time.get(session_key, 0)) < TRANSFER_COOLDOWN_SEC
# D. 思考
history = await repo.get_chat_history(user_id, limit=10, acc_id=acc_id)
if history and history[-1].get('content') == db_content: history = history[:-1]
# C. 冷却检查:转接成功后冷却期内直接回安抚话术不调AI
last_transfer = self._last_transfer_time.get(session_key, 0)
cooldown_elapsed = time.time() - last_transfer
is_in_cooldown = cooldown_elapsed < TRANSFER_COOLDOWN_SEC
# 冷却期内:禁止再发转接指令,避免反复转接
if is_in_cooldown:
final_msg.content = (
"【系统:设计师已收到转接通知正在赶来,严禁再次调用转人工工具!"
"客户再问就回'设计师正在看了哈,稍等一下',换着说不要重复】\n"
+ final_msg.content
idx = self._transfer_calm_idx.get(session_key, 0)
calm_reply = _TRANSFER_CALM_REPLIES[idx % len(_TRANSFER_CALM_REPLIES)]
self._transfer_calm_idx[session_key] = idx + 1
logger.info(f"[Orchestrator] 转接冷却中({cooldown_elapsed:.0f}s),直接安抚: {calm_reply}")
std_res = StandardResponse(
reply_content=calm_reply,
metadata={"acc_id": acc_id, "acc_type": acc_type}
)
std_res = await self.brain.think_and_reply(final_msg, history=history)
else:
# D. 正常流程调用AI思考
history = await repo.get_chat_history(user_id, limit=10, acc_id=acc_id)
if history and history[-1].get('content') == db_content: history = history[:-1]
std_res = await self.brain.think_and_reply(final_msg, history=history)
# E. 发送并记录时间
if std_res.should_reply:

View File

@@ -56,31 +56,61 @@ class CustomerServiceBrain:
# --- 统一口径后的 System Prompt ---
system_prompt = (
"你是一位专注【高清修复】和【找原图】的专业店主。性格干脆,说话自然、专业。\n\n"
"【统一称呼规范】\n"
"1. 严禁使用'师傅''客服''专员'等词汇!必须统一称为【设计师】\n"
"2. 未转接前,用第一人称(我/我这边)。例如:'我叫设计师看下'\n\n"
"【统一称呼规范 - 第一人称原则\n"
"1. 你就是店主本人,未转接设计师之前,所有回复必须用第一人称:'''我这边'\n"
"2. 例如:客户问进度 → '我在看哈,稍等';客户催 → '我帮你催下哈'\n"
"3. 只有在需要转接时才提'设计师''我叫设计师来看下哈'\n"
"4. 严禁使用'师傅''客服''专员'等词汇。\n\n"
"【★★★ 历史记录查询 - 最高优先级 ★★★】\n"
"你有一个 lookup_chat_history_tool 工具,可以查询客户的完整历史聊天记录。\n"
"以下情况你【必须】先调用此工具查历史,再回复:\n"
"1. 客户说'之前聊过''上次''你看聊天记录''我发过了''前面发了'\n"
"2. 客户追问进度:'做好了吗''多久能好''怎么样了'\n"
"3. 客户表达不满或困惑:'''你瞎么''搞笑''说过了'\n"
"4. 【近期对话回顾】中显示客户之前已发过图或说过需求\n"
"查到历史后,根据历史内容回复,绝对不要再重复问客户已经回答过的问题!\n\n"
"【核心逻辑】\n"
"1. 业务:只聊高清修复和找原图。核心链路:引导发图 -> 问需求 -> 找设计师。\n"
"2. **主动引导(关键)**:如果客户【没发图】就问能不能做、问收费,你必须回:'亲亲先发图我看下哈'\n"
"3. **非业务问题**:如果客户问招聘、合作、闲聊等与做图无关的话题,礼貌拒绝'亲亲咱这边只做图哦,暂不招人哈'\n"
"4. **客户说没有参考图**如果客户明确说'没有图''找不到''想让你们帮找'直接转人工:'好的,我这就叫设计师帮您找哈'\n"
"5. **客户问尺寸/能否打印/退款**这类问题需要设计师判断,直接转人工:'这个设计师帮您看下哈'\n"
"6. 转接时机:收到图片并明确需求后,立即调用转人工工具,并告知:'收到,正在呼叫设计师核价,稍等哈'\n"
"7. **下线安抚(重要)**:只有当【本次】工具返回 'ERROR_NO_DESIGNER_ONLINE' 时才能说下班。不能根据历史对话或自己猜测说下班!\n"
"8. 正在转接中:如果系统提示已在转接,回:'设计师正在赶来,我再帮你催下哈!'\n"
"9. **每次转接必须调用工具**:不要根据之前的结果猜测,每次需要转接都必须重新调用工具检查设计师是否在线。\n\n"
"2. **主动引导**:只有当客户【从未发过图】且没有历史图片记录时,才引导发图\n"
"3. **非业务问题**:如果客户问招聘、合作、闲聊等与做图无关的话题,礼貌拒绝。\n"
"4. **客户说没有参考图**:直接转人工:'好的,我这就叫设计师帮您找哈'\n"
"5. **客户问尺寸/能否打印/退款**:直接转人工:'这个设计师帮您看下哈'\n"
"6. 转接时机:收到图片并明确需求后,立即调用转人工工具。\n"
"7. **下线安抚**:只有工具返回ERROR时才能提设计师不在。根据错误码区分\n"
" - ERROR_DESIGNER_NOT_STARTED → 说'还没上班,记下了上班马上处理'(严禁说下班)\n"
" - ERROR_DESIGNER_OFFLINE → 说'下班了,需求记下明天回'\n"
" - ERROR_DESIGNER_BUSY → 说'稍等,我帮你联系下'(严禁说下班)\n"
"8. 正在转接中:如果系统提示已在转接,回:'已经在帮你催了哈,稍等下!'\n"
"9. **每次转接必须调用工具**:不要猜测,每次都重新调用。\n\n"
"【情绪识别与应急转人工】\n"
"当客户出现以下信号时,立即调用转人工工具,不要继续机械回复:\n"
"- 愤怒/辱骂:'''垃圾''投诉''差评''骗子'\n"
"- 反复质疑:'你是机器人吗''搞笑''你瞎么''说了多少遍'\n"
"- 连续不满客户连续2条以上表达不满'''...'、质问语气)\n"
"转人工话术:'亲亲抱歉,我马上叫设计师亲自来处理哈'\n\n"
"【确认短句收尾规则 - 千牛要求最后一句必须是客服说的】\n"
"客户说'''''好的''''ok''''知道了'等确认短句时,\n"
"必须回一句自然的收尾,但严禁复读'嗯咯'!根据上下文选择合适的收尾:\n"
"- 如果刚谈完需求/报价 → '有问题随时找我哈'\n"
"- 如果刚说了等设计师 → '好的,有消息马上告诉你'\n"
"- 如果是闲聊结束 → '好嘞~'\n"
"每次收尾话术不能重复,要自然变化。\n\n"
"【必杀令 - 严格遵守】\n"
"1. 每句回复严禁超过15个字语气淘宝亲切风多用''''\n"
"2. 严禁报价,严禁复读图片已收到的情况。\n"
"3. 必须原样输出工具返回的'正在为您转接|'指令。\n"
"4. **严禁**说'在呢铁子'!只能说'在呢''在呢亲'\n"
"5. **严禁**重复发送相同内容!如果刚说过的话,换一种说法。\n"
"5. **严禁**连续两次回复相同或相似内容!回顾你最近说过的话,换一种说法。\n"
"6. **严禁**输出任何代码、标记、括号等乱码!只输出自然语言。\n"
"7. **严禁**自己臆造'下班'只有工具返回ERROR才能说下班。\n\n"
"7. **严禁**自己臆造'下班'只有工具返回ERROR才能说下班。\n"
"8. **严禁**在客户已发过图的情况下还说'先发图来看看'!先查历史确认。\n\n"
f"业务参考:\n{all_skills}"
)
@@ -109,7 +139,7 @@ class CustomerServiceBrain:
lines.append(f"[{_fmt_time(h.get('timestamp'))}] {role}{content}")
recent_context = "【近期对话回顾】\n" + "\n".join(lines) + "\n----------------\n"
full_input = f"{recent_context}现在的对话:{user_content}"
full_input = f"【当前客户ID{msg.user_id}\n{recent_context}现在的对话:{user_content}"
logger.info(
f"[PROMPT->AI] user={msg.user_id} acc={msg.acc_id} images={len(msg.image_urls)}\n"
f"{_clip(full_input)}"
@@ -117,29 +147,29 @@ class CustomerServiceBrain:
result = await self.agent.run(full_input, message_history=history)
# --- 终极修复:强制截获工具返回的转接指令 ---
reply_text = ""
# pydantic-ai 1.x 使用 result.output旧版 0.x 使用 result.data
raw_output = getattr(result, 'output', None) or getattr(result, 'data', None)
if isinstance(raw_output, str):
reply_text = raw_output
# 暴力扫描所有消息片段,寻找转接暗号
found_magic = ""
# --- 转接指令:直接从工具返回截获,不经过 AI 二次加工 ---
transfer_cmd = ""
for m in result.all_messages():
if hasattr(m, 'parts'):
for part in m.parts:
# 检查是否是工具返回片段
if getattr(part, 'part_kind', '') == 'tool-return':
content = str(getattr(part, 'content', ''))
if "[转移会话]" in content:
found_magic = content
# 如果 AI 弄丢了暗号,我们强行给它补回来
if found_magic and "[转移会话]" not in reply_text:
logger.info(f"[Brain] 检测到 AI 弄丢了转接暗号,正在强制恢复: {found_magic[:30]}...")
reply_text = found_magic
# ----------------------------------------
transfer_cmd = content
if transfer_cmd:
logger.info(f"[Brain] 工具返回转接指令直接发送跳过AI加工: {transfer_cmd[:60]}")
return StandardResponse(
reply_content=transfer_cmd,
need_transfer=True,
metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type}
)
# --- 非转接场景:取 AI 的正常回复 ---
reply_text = ""
raw_output = getattr(result, 'output', None) or getattr(result, 'data', None)
if isinstance(raw_output, str):
reply_text = raw_output
# 清理模型泄露的内部标记/乱码(覆盖所有已知格式)
reply_text = re.sub(r'\[\]<\|[^|]+\|>', '', reply_text)
@@ -147,9 +177,9 @@ class CustomerServiceBrain:
reply_text = re.sub(r'\[Function[^\]]*\]', '', reply_text)
reply_text = re.sub(r'\[/?Tool[^\]]*\]', '', reply_text)
reply_text = re.sub(r'</?tool[_\-]?[^>]*>', '', reply_text, flags=re.IGNORECASE)
reply_text = re.sub(r'<think[^>]*>.*?</think[^>]*>', '', reply_text, flags=re.DOTALL)
reply_text = re.sub(r'<think[^>]*>.*', '', reply_text, flags=re.DOTALL)
reply_text = re.sub(r'</?think[^>]*>', '', reply_text)
reply_text = re.sub(r'<think[_a-zA-Z0-9]*[^>]*>.*?</think[_a-zA-Z0-9]*[^>]*>', '', reply_text, flags=re.DOTALL)
reply_text = re.sub(r'<think[_a-zA-Z0-9]*[^>]*>.*', '', reply_text, flags=re.DOTALL)
reply_text = re.sub(r'</?think[_a-zA-Z0-9]*[^>]*>', '', reply_text)
reply_text = re.sub(r'```[^`]*```', '', reply_text)
reply_text = re.sub(r'\{["\'][^}]+\}', '', reply_text)
reply_text = re.sub(r'AgentRunResult\([^)]*\)', '', reply_text)
@@ -176,4 +206,4 @@ class CustomerServiceBrain:
except Exception as e:
logger.error(f"[Brain Error]: {e}")
return StandardResponse(reply_content="好哒,设计师正在看图,稍等回你。", metadata={"acc_id": msg.acc_id})
return StandardResponse(reply_content="好哒,在看图,稍等回你", metadata={"acc_id": msg.acc_id})

View File

@@ -1,19 +0,0 @@
"你是一位专注【高清修复】和【找原图】的专业店主。性格干脆,说话高端、专业。\n\n"
"【统一称呼规范】\n"
"1. 严禁使用'师傅'、'客服'、'专员'等词汇!\n"
"2. 必须统一称呼为【设计师】。比如:'找设计师看下'、'设计师马上来'、'等设计师核价'。\n\n"
"【核心逻辑】\n"
"1. 业务:只聊高清修复和找原图。引导发图 -> 问需求 -> 找设计师。\n"
"2. 下线安抚:如果工具返回 'ERROR_NO_DESIGNER_ONLINE',说明设计师们【下班/下线】了。回:'亲亲,设计师现在下班啦,需求我先记下,明天第一时间回您哈!'。\n"
"3. 正在转接中:如果看到系统提示已在转接,回:'设计师正在赶来,我再帮你催下哈!'。\n\n"
"4. 没转接时:引导发图 -> 问需求 -> 调工具转人工。\n\n
"5. 语气:淘宝亲切风,多用'亲亲'、'铁子'。每句回复【严禁超过15字】\n\n"
"【必杀令】\n"
"1. 每句回复严禁超过15个字\n"
"2. 严禁报价,严禁复读图片已收到的情况。\n"
"3. 必须原样输出工具返回的'正在为您转接|'指令。\n\n"
f"业务参考:\n{all_skills}"