diff --git a/core/agent_tools.py b/core/agent_tools.py index 8bccd47..ec08a1a 100644 --- a/core/agent_tools.py +++ b/core/agent_tools.py @@ -6,7 +6,7 @@ from pydantic import BaseModel, Field from pydantic_ai import RunContext from core.schema import StandardResponse from services.dispatch_service import dispatch_service -from db.chat_log_db import get_conversation +from db.chat_log_db import get_conversation, get_customer_orders logger = logging.getLogger("cs_agent") @@ -35,9 +35,53 @@ async def transfer_to_human_tool(ctx: RunContext[Any], reason: str = Field(descr return "ERROR_DESIGNER_BUSY:设计师暂时不在位,你告诉客户稍等,马上帮忙联系设计师。不要说下班。" -async def check_order_status_tool(ctx: RunContext[Any], customer_id: str = Field(description="客户ID")) -> str: - """查询订单状态。""" - return "我在帮你加急处理中,稍等哈。" +async def lookup_customer_orders_tool( + ctx: RunContext[Any], + customer_id: str = Field(description="客户ID,从当前对话上下文中获取"), +) -> str: + """ + 【订单查询工具】查询该客户的订单记录(订单号、状态、金额等)。 + 使用场景: + - 客户问"我的订单怎么样了"、"付款了"、"发货了吗" + - 客户提到订单号 + - 需要确认客户是否已付款 + 返回该客户的所有订单及其状态。 + """ + logger.info(f"[Tool] 查询客户订单: customer_id={customer_id}") + try: + rows = await asyncio.to_thread(get_customer_orders, customer_id, limit=10) + if not rows: + return f"该客户({customer_id})暂无订单记录。" + + lines = [] + for r in rows: + oid = r.get("order_id", "") + status = r.get("order_status", "") + amount = r.get("amount", 0) + qty = r.get("quantity", 0) + title = r.get("product_title", "") + note = r.get("buyer_note", "") + updated = str(r.get("updated_at", "")) + line = f"订单号:{oid} 状态:{status} 金额:{amount}元 数量:{qty} 商品:{title}" + if note: + line += f" 备注:{note}" + line += f" 更新时间:{updated}" + lines.append(line) + + has_paid = any("已付款" in r.get("order_status", "") for r in rows) + has_shipped = any("已发货" in r.get("order_status", "") for r in rows) + summary_parts = [f"共{len(rows)}条订单记录。"] + if has_paid: + summary_parts.append("客户已付款!") + if has_shipped: + summary_parts.append("已发货。") + + summary = " ".join(summary_parts) + return f"【订单摘要】{summary}\n\n【订单详情】\n" + "\n".join(lines) + + except Exception as e: + logger.error(f"[Tool] 查询订单失败: {e}") + return f"查询订单失败: {e}" async def lookup_chat_history_tool( @@ -92,6 +136,6 @@ async def lookup_chat_history_tool( def register_agent_tools(agent: Any): """注册工具""" agent.tool(transfer_to_human_tool) - agent.tool(check_order_status_tool) agent.tool(lookup_chat_history_tool) - logger.info("[Agent] 工具箱已更新:含转人工、订单查询、历史记录查询。") + agent.tool(lookup_customer_orders_tool) + logger.info("[Agent] 工具箱已更新:含转人工、历史记录查询、订单查询。") diff --git a/core/orchestrator.py b/core/orchestrator.py index 2d25f48..70fd4b5 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -123,13 +123,44 @@ class SystemOrchestrator: async def _handle_order_packet(self, platform: str, msg: StandardMessage): try: - price_match = re.search(r"订单金额:金额:\s*([\d\.]+)元", msg.content) - if price_match: await repo.update_task_price(platform, msg.user_id, float(price_match.group(1))) - # 判定成交结果(扩大范围:已付款 或 已发货 都视为成功,用于后期 AI 话术微调) - if any(k in msg.content for k in ["买家已付款", "卖家已发货"]): + from core.order_helpers import parse_order_info + from db.chat_log_db import upsert_order + + content = msg.content or "" + info = parse_order_info(content) + + price_match = re.search(r"金额[::]\s*([\d\.]+)\s*元", content) + if price_match: + await repo.update_task_price(platform, msg.user_id, float(price_match.group(1))) + + if any(k in content for k in ["买家已付款", "卖家已发货"]): await repo.update_task_outcome(platform, msg.user_id, "deal_success") - elif any(k in msg.content for k in ["退款", "已关闭", "已取消"]): + elif any(k in content for k in ["退款", "已关闭", "已取消"]): await repo.update_task_outcome(platform, msg.user_id, "refunded") + + # 结构化写入 customer_orders 表 + order_id = info.get("order_id", "") + if order_id and msg.user_id and msg.user_id != "unknown": + title_match = re.search(r"商品标题[::]\s*([^\s]+(?:\s+[^\s订]+)*)", content) + product_title = title_match.group(1).strip() if title_match else "" + amount = float(info.get("amount", 0)) + quantity = int(info.get("quantity", 0)) + order_status = info.get("order_status", "") + buyer_note = info.get("buyer_note", "") + + await asyncio.to_thread( + upsert_order, + customer_id=msg.user_id, + order_id=order_id, + order_status=order_status, + acc_id=msg.acc_id, + product_title=product_title, + amount=amount, + quantity=quantity, + buyer_note=buyer_note, + ) + logger.info(f"[订单入库] user={msg.user_id} order={order_id} status={order_status} amount={amount}") + except Exception as e: logger.warning(f"[Orchestrator] 订单消息处理异常: {e}") diff --git a/core/pydantic_ai_agent_v2.py b/core/pydantic_ai_agent_v2.py index e53be20..7cf87f1 100644 --- a/core/pydantic_ai_agent_v2.py +++ b/core/pydantic_ai_agent_v2.py @@ -72,6 +72,14 @@ class CustomerServiceBrain: "4. 【近期对话回顾】中显示客户之前已发过图或说过需求\n" "查到历史后,根据历史内容回复,绝对不要再重复问客户已经回答过的问题!\n\n" + "【订单查询工具】\n" + "你有一个 lookup_customer_orders_tool 工具,可以查询客户的订单记录。\n" + "以下情况你【必须】调用此工具:\n" + "1. 客户问'我付款了'、'订单怎么样了'、'发货了吗'\n" + "2. 客户提到订单号\n" + "3. 你需要确认客户是否已付款再决定如何回复\n" + "查到订单后,根据订单状态回复(已付款→'收到,马上安排';已发货→'已经发了哈')。\n\n" + "【核心逻辑】\n" "1. 业务:只聊高清修复和找原图。核心链路:引导发图 -> 问需求 -> 找设计师。\n" "2. **主动引导**:只有当客户【从未发过图】且没有历史图片记录时,才引导发图。\n" diff --git a/db/chat_log_db.py b/db/chat_log_db.py index e89e82f..d1c7407 100755 --- a/db/chat_log_db.py +++ b/db/chat_log_db.py @@ -143,6 +143,49 @@ def init_db(): except Exception: pass conn.execute("CREATE INDEX IF NOT EXISTS idx_acc ON chat_logs(acc_id)") + + # ---- customer_orders 表 ---- + if _is_mysql(): + conn.execute(""" + CREATE TABLE IF NOT EXISTS customer_orders ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + customer_id VARCHAR(128) NOT NULL, + acc_id VARCHAR(128) DEFAULT '', + order_id VARCHAR(64) NOT NULL, + order_status VARCHAR(64) DEFAULT '', + product_title VARCHAR(512) DEFAULT '', + amount DECIMAL(10,2) DEFAULT 0, + quantity INTEGER DEFAULT 0, + buyer_note TEXT DEFAULT '', + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + idx_rows2 = conn.execute("SHOW INDEX FROM customer_orders").fetchall() + exists2 = {str(r.get("Key_name", "")) for r in idx_rows2} + if "idx_co_customer" not in exists2: + conn.execute("CREATE INDEX idx_co_customer ON customer_orders(customer_id)") + if "idx_co_order" not in exists2: + conn.execute("CREATE UNIQUE INDEX idx_co_order ON customer_orders(order_id, order_status)") + else: + conn.execute(""" + CREATE TABLE IF NOT EXISTS customer_orders ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + customer_id TEXT NOT NULL, + acc_id TEXT DEFAULT '', + order_id TEXT NOT NULL, + order_status TEXT DEFAULT '', + product_title TEXT DEFAULT '', + amount REAL DEFAULT 0, + quantity INTEGER DEFAULT 0, + buyer_note TEXT DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_co_customer ON customer_orders(customer_id)") + conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_co_order ON customer_orders(order_id, order_status)") + conn.commit() @@ -351,3 +394,63 @@ def get_latest_messages(limit: int = 20) -> List[Dict]: ORDER BY id DESC LIMIT ? """), (limit,)).fetchall() return [dict(r) for r in rows] + + +# ========== 订单相关 ========== + +def upsert_order( + customer_id: str, + order_id: str, + order_status: str = "", + acc_id: str = "", + product_title: str = "", + amount: float = 0.0, + quantity: int = 0, + buyer_note: str = "", +): + """写入或更新一条订单记录(按 order_id + order_status 去重)""" + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with _get_conn() as conn: + if _is_mysql(): + conn.execute( + "INSERT INTO customer_orders " + "(customer_id, acc_id, order_id, order_status, product_title, amount, quantity, buyer_note, created_at, updated_at) " + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) " + "ON DUPLICATE KEY UPDATE customer_id=VALUES(customer_id), acc_id=VALUES(acc_id), " + "product_title=VALUES(product_title), amount=VALUES(amount), quantity=VALUES(quantity), " + "buyer_note=VALUES(buyer_note), updated_at=VALUES(updated_at)", + (customer_id, acc_id, order_id, order_status, product_title, amount, quantity, buyer_note, ts, ts), + ) + else: + conn.execute( + _sql("INSERT OR REPLACE INTO customer_orders " + "(customer_id, acc_id, order_id, order_status, product_title, amount, quantity, buyer_note, created_at, updated_at) " + "VALUES (?,?,?,?,?,?,?,?,?,?)"), + (customer_id, acc_id, order_id, order_status, product_title, amount, quantity, buyer_note, ts, ts), + ) + conn.commit() + + +def get_customer_orders(customer_id: str, limit: int = 10) -> List[Dict]: + """查询某客户的订单记录(按时间倒序)""" + with _get_conn() as conn: + rows = conn.execute(_sql(""" + SELECT order_id, order_status, product_title, amount, quantity, buyer_note, created_at, updated_at + FROM customer_orders + WHERE customer_id = ? + ORDER BY updated_at DESC + LIMIT ? + """), (customer_id, limit)).fetchall() + return [dict(r) for r in rows] + + +def get_order_by_id(order_id: str) -> List[Dict]: + """按订单号查询所有状态变更记录""" + with _get_conn() as conn: + rows = conn.execute(_sql(""" + SELECT customer_id, order_id, order_status, product_title, amount, quantity, buyer_note, created_at, updated_at + FROM customer_orders + WHERE order_id = ? + ORDER BY updated_at ASC + """), (order_id,)).fetchall() + return [dict(r) for r in rows]