import logging import asyncio from datetime import datetime from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field from pydantic_ai import RunContext from core.schema import StandardResponse from services.dispatch_service import dispatch_service from db.chat_log_db import get_conversation, get_customer_orders logger = logging.getLogger("cs_agent") async def transfer_to_human_tool(ctx: RunContext[Any], reason: str = Field(description="转人工的原因")) -> str: """ 【核心工具】执行转人工逻辑。 获取设计师姓名并生成精准转接指令。 """ logger.info(f"[Tool] 尝试呼叫设计师接手: {reason}") designer_name = await dispatch_service.assign_designer() if designer_name: magic_cmd = f"正在为您转接|[转移会话],{designer_name},无原因" logger.info(f"[Tool] 成功呼叫设计师: {designer_name}") return magic_cmd else: hour = datetime.now().hour logger.warning(f"[Tool] 派单失败:设计师们不在位 (当前{hour}点)") if 0 <= hour < 9: return "ERROR_DESIGNER_NOT_STARTED:现在设计师还没上班,你告诉客户需求记下了,上班后第一时间处理。不要说下班。" elif 22 <= hour or hour < 1: return "ERROR_DESIGNER_OFFLINE:设计师已下班,你告诉客户需求记下了,明天第一时间回复。" else: return "ERROR_DESIGNER_BUSY:设计师暂时不在位,你告诉客户稍等,马上帮忙联系设计师。不要说下班。" async def lookup_customer_orders_tool( ctx: RunContext[Any], customer_id: str = Field(description="客户ID,从当前对话上下文中获取"), ) -> str: """ 【订单查询工具】查询该客户的订单记录(订单号、状态、金额等)。 使用场景: - 客户问"我的订单怎么样了"、"付款了"、"发货了吗" - 客户提到订单号 - 需要确认客户是否已付款 返回该客户的所有订单及其状态。 """ logger.info(f"[Tool] 查询客户订单: customer_id={customer_id}") try: rows = await asyncio.to_thread(get_customer_orders, customer_id, limit=10) if not rows: return f"该客户({customer_id})暂无订单记录。" lines = [] for r in rows: oid = r.get("order_id", "") status = r.get("order_status", "") amount = r.get("amount", 0) qty = r.get("quantity", 0) title = r.get("product_title", "") note = r.get("buyer_note", "") updated = str(r.get("updated_at", "")) line = f"订单号:{oid} 状态:{status} 金额:{amount}元 数量:{qty} 商品:{title}" if note: line += f" 备注:{note}" line += f" 更新时间:{updated}" lines.append(line) has_paid = any("已付款" in r.get("order_status", "") for r in rows) has_shipped = any("已发货" in r.get("order_status", "") for r in rows) summary_parts = [f"共{len(rows)}条订单记录。"] if has_paid: summary_parts.append("客户已付款!") if has_shipped: summary_parts.append("已发货。") summary = " ".join(summary_parts) return f"【订单摘要】{summary}\n\n【订单详情】\n" + "\n".join(lines) except Exception as e: logger.error(f"[Tool] 查询订单失败: {e}") return f"查询订单失败: {e}" async def lookup_chat_history_tool( ctx: RunContext[Any], customer_id: str = Field(description="客户ID,从当前对话上下文中获取"), num_messages: int = Field(default=30, description="要查询的历史消息条数,默认30条"), ) -> str: """ 【历史记录查询工具】查询该客户的历史聊天记录。 使用场景: - 客户说"之前聊过"、"上次"、"你看聊天记录"、"我发过图了"等暗示有历史对话时 - 客户第二次来访、追问进度、催单时 - 你不确定客户之前是否发过图或说过需求时 必须先调用此工具回顾历史,再回复客户,避免重复要求客户发图。 """ logger.info(f"[Tool] 查询历史记录: customer_id={customer_id}, limit={num_messages}") try: rows = await asyncio.to_thread(get_conversation, customer_id, limit=num_messages) if not rows: return f"该客户({customer_id})暂无历史聊天记录。" lines = [] has_images = False customer_needs = [] for r in rows: role = "客户" if r["direction"] == "in" else "客服" ts = str(r.get("timestamp", "")) msg = r.get("message", "") line = f"[{ts}] {role}:{msg}" lines.append(line) if r["direction"] == "in": if "已收到" in msg and "图" in msg: has_images = True if any(k in msg for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印"]): customer_needs.append(msg[:60]) summary_parts = [f"共{len(rows)}条历史消息。"] if has_images: summary_parts.append("⚠️ 客户之前已经发过图片!不要再让客户发图!") if customer_needs: summary_parts.append(f"客户曾表达的需求:{';'.join(customer_needs[:3])}") summary = " ".join(summary_parts) history_text = "\n".join(lines[-30:]) return f"【历史记录摘要】{summary}\n\n【详细记录】\n{history_text}" except Exception as e: logger.error(f"[Tool] 查询历史记录失败: {e}") return f"查询历史记录失败: {e}" def register_agent_tools(agent: Any): """注册工具""" agent.tool(transfer_to_human_tool) agent.tool(lookup_chat_history_tool) agent.tool(lookup_customer_orders_tool) logger.info("[Agent] 工具箱已更新:含转人工、历史记录查询、订单查询。")