Files
tw/core/agent_tools.py

190 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import asyncio
import re
from datetime import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from pydantic_ai import RunContext
from core.schema import StandardResponse
from services.dispatch_service import dispatch_service
from db.chat_log_db import get_conversation, get_customer_orders
logger = logging.getLogger("cs_agent")
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
_HISTORY_NOISE_PREFIXES = (
"[系统订单信息]",
"[进店卡片]",
"【系统:已收到",
"金额:",
"定制:",
)
def _is_plain_transfer_command(text: str) -> bool:
return bool(_TRANSFER_COMMAND_RE.fullmatch(str(text or "").strip()))
def _normalize_history_message(message: str, role: str) -> str:
text = str(message or "").strip()
if not text:
return ""
if _is_plain_transfer_command(text):
return "已转接设计师"
if role == "客服" and "[转移会话]" in text:
return "已尝试转接设计师"
return text
def _extract_need_snippet(message: str) -> str:
text = str(message or "").strip()
if not text:
return ""
if any(text.startswith(prefix) for prefix in _HISTORY_NOISE_PREFIXES):
return ""
if "http://" in text or "https://" in text:
return ""
return text[:60]
class TransferSuccessException(Exception):
"""转接成功后抛出此异常,用于提前终止 AI 处理流程"""
def __init__(self, transfer_cmd: str):
self.transfer_cmd = transfer_cmd
super().__init__(transfer_cmd)
async def transfer_to_human_tool(ctx: RunContext[Any], reason: str = Field(description="转人工的原因")) -> str:
"""
【核心工具】执行转人工逻辑。
获取设计师姓名并生成精准转接指令。
"""
logger.info(f"[Tool] 尝试呼叫设计师接手: {reason}")
designer_name = await dispatch_service.assign_designer()
if designer_name:
magic_cmd = f"正在为您转接|[转移会话],{designer_name},无原因"
logger.info(f"[Tool] 成功呼叫设计师: {designer_name},立即触发转接")
# 抛出异常以提前终止 AI 后续处理,节省等待时间
raise TransferSuccessException(magic_cmd)
else:
hour = datetime.now().hour
logger.warning(f"[Tool] 派单失败:设计师们不在位 (当前{hour}点)")
if 0 <= hour < 9:
return "ERROR_DESIGNER_NOT_STARTED现在设计师还没上班你告诉客户需求记下了上班后第一时间处理。不要说下班。"
elif 22 <= hour or hour < 1:
return "ERROR_DESIGNER_OFFLINE设计师已下班你告诉客户需求记下了明天第一时间回复。"
else:
return "ERROR_DESIGNER_BUSY设计师暂时不在位你告诉客户稍等马上帮忙联系设计师。不要说下班。"
async def lookup_customer_orders_tool(
ctx: RunContext[Any],
customer_id: str = Field(description="客户ID从当前对话上下文中获取"),
) -> str:
"""
【订单查询工具】查询该客户的订单记录(订单号、状态、金额等)。
使用场景:
- 客户问"我的订单怎么样了""付款了""发货了吗"
- 客户提到订单号
- 需要确认客户是否已付款
返回该客户的所有订单及其状态。
"""
logger.info(f"[Tool] 查询客户订单: customer_id={customer_id}")
try:
rows = await asyncio.to_thread(get_customer_orders, customer_id, limit=10)
if not rows:
return f"该客户({customer_id})暂无订单记录。"
lines = []
for r in rows:
oid = r.get("order_id", "")
status = r.get("order_status", "")
amount = r.get("amount", 0)
qty = r.get("quantity", 0)
title = r.get("product_title", "")
note = r.get("buyer_note", "")
updated = str(r.get("updated_at", ""))
line = f"订单号:{oid} 状态:{status} 金额:{amount}元 数量:{qty} 商品:{title}"
if note:
line += f" 备注:{note}"
line += f" 更新时间:{updated}"
lines.append(line)
has_paid = any("已付款" in r.get("order_status", "") for r in rows)
has_shipped = any("已发货" in r.get("order_status", "") for r in rows)
summary_parts = [f"{len(rows)}条订单记录。"]
if has_paid:
summary_parts.append("客户已付款!")
if has_shipped:
summary_parts.append("已发货。")
summary = " ".join(summary_parts)
return f"【订单摘要】{summary}\n\n【订单详情】\n" + "\n".join(lines)
except Exception as e:
logger.error(f"[Tool] 查询订单失败: {e}")
return f"查询订单失败: {e}"
async def lookup_chat_history_tool(
ctx: RunContext[Any],
customer_id: str = Field(description="客户ID从当前对话上下文中获取"),
num_messages: int = Field(default=30, description="要查询的历史消息条数默认30条"),
) -> str:
"""
【历史记录查询工具】查询该客户的历史聊天记录。
使用场景:
- 客户说"之前聊过""上次""你看聊天记录""我发过图了"等暗示有历史对话时
- 客户第二次来访、追问进度、催单时
- 你不确定客户之前是否发过图或说过需求时
必须先调用此工具回顾历史,再回复客户,避免重复要求客户发图。
"""
logger.info(f"[Tool] 查询历史记录: customer_id={customer_id}, limit={num_messages}")
try:
rows = await asyncio.to_thread(get_conversation, customer_id, limit=num_messages)
if not rows:
return f"该客户({customer_id})暂无历史聊天记录。"
lines = []
has_images = False
customer_needs = []
for r in rows:
role = "客户" if r["direction"] == "in" else "客服"
ts = str(r.get("timestamp", ""))
msg = _normalize_history_message(r.get("message", ""), role)
line = f"[{ts}] {role}{msg}"
lines.append(line)
if r["direction"] == "in":
msg_type = int(r.get("msg_type") or 0)
raw_message = str(r.get("message", "") or "")
image_urls = str(r.get("image_urls", "") or "").strip()
if msg_type == 1 or image_urls or ("已收到" in msg and "" in msg):
has_images = True
need_text = _extract_need_snippet(raw_message)
if need_text and any(k in need_text for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印", "大图", "素材"]):
customer_needs.append(need_text)
summary_parts = [f"{len(rows)}条历史消息。"]
if has_images:
summary_parts.append("⚠️ 客户之前已经发过图片!不要再让客户发图!")
if customer_needs:
summary_parts.append(f"客户曾表达的需求:{''.join(customer_needs[:3])}")
summary = " ".join(summary_parts)
history_text = "\n".join(lines[-30:])
return f"【历史记录摘要】{summary}\n\n【详细记录】\n{history_text}"
except Exception as e:
logger.error(f"[Tool] 查询历史记录失败: {e}")
return f"查询历史记录失败: {e}"
def register_agent_tools(agent: Any):
"""注册工具"""
agent.tool(transfer_to_human_tool)
agent.tool(lookup_chat_history_tool)
agent.tool(lookup_customer_orders_tool)
logger.info("[Agent] 工具箱已更新:含转人工、历史记录查询、订单查询。")