diff --git a/core/orchestrator.py b/core/orchestrator.py index d1d4607..aef54c6 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -10,6 +10,13 @@ from core.adapters.qianniu_adapter import QianniuAdapter from core.pydantic_ai_agent_v2 import CustomerServiceBrain from core.events.event_bus import bus from core.repository import repo +from db.pending_transfer_db import ( + enqueue_pending_transfer, + claim_due_pending_transfers, + complete_pending_transfer, + retry_pending_transfer, +) +from services.dispatch_service import dispatch_service logger = logging.getLogger("cs_agent") @@ -17,6 +24,8 @@ logger = logging.getLogger("cs_agent") MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量 TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒) +PENDING_TRANSFER_POLL_SECONDS = 30 +PENDING_TRANSFER_RETRY_SECONDS = 60 # 转接后安抚话术池(轮换使用,避免复读) _TRANSFER_CALM_REPLIES = [ @@ -70,6 +79,7 @@ class SystemOrchestrator: self._debounce_tasks: Dict[str, asyncio.Task] = {} self._pending_messages: Dict[str, List[StandardMessage]] = {} self._user_locks: Dict[str, asyncio.Lock] = {} + self._pending_transfer_task: Optional[asyncio.Task] = None bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event) @@ -86,6 +96,17 @@ class SystemOrchestrator: self._user_locks[user_id] = asyncio.Lock() return self._user_locks[user_id] + def _should_run_pending_transfer_worker(self) -> bool: + worker_id = getattr(self.ws_client, "worker_id", -1) if self.ws_client else -1 + return worker_id in (-1, 0) + + def _ensure_background_tasks(self): + if not self._should_run_pending_transfer_worker(): + return + if self._pending_transfer_task is None or self._pending_transfer_task.done(): + self._pending_transfer_task = asyncio.create_task(self._process_pending_transfers_loop()) + logger.info("[Orchestrator] 待转接轮询任务已启动") + @staticmethod def _sanitize_outbound_text(text: str) -> str: if not text: @@ -107,6 +128,7 @@ class SystemOrchestrator: """链路入口""" try: if platform != "qianniu": return + self._ensure_background_tasks() std_msg, direction = await self.qianniu_adapter.translate_inbound(raw_data) @@ -343,7 +365,9 @@ class SystemOrchestrator: # E. 发送并记录时间 if std_res.should_reply: std_res.reply_content = self._sanitize_outbound_text(std_res.reply_content) - std_res.metadata = {"acc_id": acc_id, "acc_type": acc_type} + meta = dict(std_res.metadata or {}) + meta.update({"acc_id": acc_id, "acc_type": acc_type}) + std_res.metadata = meta # 转接场景:先发一句安抚话,再发转接指令 if "[转移会话]" in std_res.reply_content: @@ -358,6 +382,21 @@ class SystemOrchestrator: await self.qianniu_adapter.translate_outbound(std_res, user_id) await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id) + if std_res.metadata.get("pending_transfer"): + reason = str(std_res.metadata.get("pending_transfer_reason") or "").strip() + if reason: + pending_id = await asyncio.to_thread( + enqueue_pending_transfer, + customer_id=user_id, + acc_id=acc_id, + acc_type=acc_type, + platform=platform, + reason=reason, + ) + logger.info( + f"[Orchestrator] 已加入待转接池: pending_id={pending_id} user={user_id} acc={acc_id}" + ) + if "[转移会话]" in std_res.reply_content: self._last_transfer_time[session_key] = time.time() @@ -370,6 +409,71 @@ class SystemOrchestrator: response.reply_content = self._sanitize_outbound_text(response.reply_content) await self.qianniu_adapter.translate_outbound(response, user_id) + async def _process_pending_transfers_loop(self): + while True: + try: + if not self.ws_client or not getattr(self.ws_client, "websocket", None): + await asyncio.sleep(PENDING_TRANSFER_POLL_SECONDS) + continue + + rows = await asyncio.to_thread(claim_due_pending_transfers, 10) + if not rows: + await asyncio.sleep(PENDING_TRANSFER_POLL_SECONDS) + continue + + for row in rows: + row_id = int(row["id"]) + customer_id = str(row.get("customer_id") or "") + acc_id = str(row.get("acc_id") or "") + acc_type = str(row.get("acc_type") or "AliWorkbench") + reason = str(row.get("reason") or "").strip() + + try: + designer_name = await dispatch_service.assign_designer(user_id=customer_id) + if not designer_name: + await asyncio.to_thread( + retry_pending_transfer, + row_id, + PENDING_TRANSFER_RETRY_SECONDS, + "designer_unavailable", + ) + continue + + notify = StandardResponse( + reply_content="设计师上线了,我给您转过去哈", + metadata={"acc_id": acc_id, "acc_type": acc_type}, + ) + transfer = StandardResponse( + reply_content=f"正在为您转接|[转移会话],{designer_name},无原因", + need_transfer=True, + metadata={"acc_id": acc_id, "acc_type": acc_type}, + ) + + await self.qianniu_adapter.translate_outbound(notify, customer_id) + await repo.save_chat("qianniu", customer_id, notify.reply_content, "out", acc_id=acc_id) + await asyncio.sleep(0.5) + await self.qianniu_adapter.translate_outbound(transfer, customer_id) + await repo.save_chat("qianniu", customer_id, transfer.reply_content, "out", acc_id=acc_id) + + self._last_transfer_time[f"{customer_id}@{acc_id}"] = time.time() + await asyncio.to_thread(complete_pending_transfer, row_id) + logger.info( + f"[Orchestrator] 待转接自动完成: pending_id={row_id} user={customer_id} designer={designer_name} reason={reason}" + ) + except Exception as e: + logger.warning(f"[Orchestrator] 待转接处理失败 pending_id={row_id}: {e}") + await asyncio.to_thread( + retry_pending_transfer, + row_id, + PENDING_TRANSFER_RETRY_SECONDS, + str(e), + ) + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"[Orchestrator] 待转接轮询异常: {e}") + await asyncio.sleep(PENDING_TRANSFER_RETRY_SECONDS) + # 全局单例 orchestrator: Optional[SystemOrchestrator] = None def init_orchestrator(ws_client): diff --git a/core/pydantic_ai_agent_v2.py b/core/pydantic_ai_agent_v2.py index fdb2b64..213915d 100644 --- a/core/pydantic_ai_agent_v2.py +++ b/core/pydantic_ai_agent_v2.py @@ -3,6 +3,7 @@ import re import hashlib import logging import time +import json from typing import List, Optional, Any, Dict from pydantic_ai import Agent, RunContext from pydantic_ai.models.openai import OpenAIChatModel @@ -236,6 +237,8 @@ class CustomerServiceBrain: logger.info(f"[Brain] AI处理完成,总耗时{elapsed:.1f}s") # ===== 详细日志:AI 的思考过程和工具调用 ===== + pending_transfer_reason = "" + pending_transfer_error = "" try: all_msgs = result.all_messages() for idx, m in enumerate(all_msgs): @@ -247,9 +250,20 @@ class CustomerServiceBrain: tool_name = getattr(part, 'tool_name', '?') tool_args = getattr(part, 'args', {}) logger.info(f"[AI思考] 步骤{idx+1} 调用工具: {tool_name}({tool_args})") + if tool_name == "transfer_to_human_tool": + if isinstance(tool_args, str): + try: + tool_args = json.loads(tool_args) + except Exception: + tool_args = {"reason": tool_args} + if isinstance(tool_args, dict): + pending_transfer_reason = str(tool_args.get("reason") or "").strip() elif part_kind == 'tool-return': content = str(getattr(part, 'content', ''))[:200] logger.info(f"[AI思考] 步骤{idx+1} 工具返回: {content}") + full_content = str(getattr(part, 'content', '')) + if full_content.startswith("ERROR_DESIGNER_"): + pending_transfer_error = full_content elif part_kind == 'text': content = str(getattr(part, 'content', ''))[:150] if content.strip(): @@ -298,7 +312,13 @@ class CustomerServiceBrain: return StandardResponse( reply_content=reply_text, need_transfer=need_transfer, - metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type} + metadata={ + "acc_id": msg.acc_id, + "acc_type": msg.acc_type, + "pending_transfer": bool(pending_transfer_error and pending_transfer_reason), + "pending_transfer_reason": pending_transfer_reason, + "pending_transfer_error": pending_transfer_error, + } ) except Exception as e: diff --git a/db/pending_transfer_db.py b/db/pending_transfer_db.py new file mode 100644 index 0000000..417ef98 --- /dev/null +++ b/db/pending_transfer_db.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +""" +待转接队列(本地 SQLite) +用于在设计师不在线时暂存转接请求,待设计师上线后自动转接。 +""" + +import os +import sqlite3 +from datetime import datetime, timedelta +from typing import List, Dict + +_DB_PATH = os.path.join(os.path.dirname(__file__), "pending_transfer.db") + + +def _get_conn() -> sqlite3.Connection: + conn = sqlite3.connect(_DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def init_db(): + os.makedirs(os.path.dirname(_DB_PATH), exist_ok=True) + with _get_conn() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS pending_transfers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + customer_id TEXT NOT NULL, + acc_id TEXT DEFAULT '', + acc_type TEXT DEFAULT '', + platform TEXT DEFAULT 'qianniu', + reason TEXT DEFAULT '', + status TEXT NOT NULL DEFAULT 'pending', + retry_count INTEGER NOT NULL DEFAULT 0, + next_retry_at TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + completed_at TEXT DEFAULT '', + last_error TEXT DEFAULT '' + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_pending_status_retry ON pending_transfers(status, next_retry_at)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_pending_customer_acc ON pending_transfers(customer_id, acc_id)" + ) + conn.commit() + + +init_db() + + +def enqueue_pending_transfer( + customer_id: str, + acc_id: str, + acc_type: str, + platform: str, + reason: str, +) -> int: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with _get_conn() as conn: + row = conn.execute( + """ + SELECT id + FROM pending_transfers + WHERE customer_id = ? AND acc_id = ? AND status IN ('pending', 'processing') + ORDER BY id DESC + LIMIT 1 + """, + (customer_id, acc_id), + ).fetchone() + + if row: + conn.execute( + """ + UPDATE pending_transfers + SET acc_type = ?, platform = ?, reason = ?, status = 'pending', + next_retry_at = ?, updated_at = ?, last_error = '' + WHERE id = ? + """, + (acc_type, platform, reason, now, now, row["id"]), + ) + conn.commit() + return int(row["id"]) + + cur = conn.execute( + """ + INSERT INTO pending_transfers + (customer_id, acc_id, acc_type, platform, reason, status, retry_count, next_retry_at, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?) + """, + (customer_id, acc_id, acc_type, platform, reason, now, now, now), + ) + conn.commit() + return int(cur.lastrowid) + + +def claim_due_pending_transfers(limit: int = 10) -> List[Dict]: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with _get_conn() as conn: + conn.execute("BEGIN IMMEDIATE") + rows = conn.execute( + """ + SELECT * + FROM pending_transfers + WHERE status = 'pending' AND next_retry_at <= ? + ORDER BY created_at ASC, id ASC + LIMIT ? + """, + (now, limit), + ).fetchall() + + ids = [int(r["id"]) for r in rows] + if ids: + placeholders = ",".join("?" for _ in ids) + conn.execute( + f""" + UPDATE pending_transfers + SET status = 'processing', updated_at = ? + WHERE id IN ({placeholders}) + """, + (now, *ids), + ) + conn.commit() + return [dict(r) for r in rows] + + +def complete_pending_transfer(row_id: int): + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with _get_conn() as conn: + conn.execute( + """ + UPDATE pending_transfers + SET status = 'completed', completed_at = ?, updated_at = ?, last_error = '' + WHERE id = ? + """, + (now, now, row_id), + ) + conn.commit() + + +def retry_pending_transfer(row_id: int, delay_seconds: int = 60, error: str = ""): + now = datetime.now() + next_retry = now + timedelta(seconds=max(delay_seconds, 5)) + now_s = now.strftime("%Y-%m-%d %H:%M:%S") + next_s = next_retry.strftime("%Y-%m-%d %H:%M:%S") + with _get_conn() as conn: + conn.execute( + """ + UPDATE pending_transfers + SET status = 'pending', + retry_count = retry_count + 1, + next_retry_at = ?, + updated_at = ?, + last_error = ? + WHERE id = ? + """, + (next_s, now_s, error[:500], row_id), + ) + conn.commit()