diff --git a/core/orchestrator.py b/core/orchestrator.py index 35856c8..0fd9f4e 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -3,6 +3,7 @@ import asyncio import re import time import json +from datetime import datetime from typing import Optional, List, Any, Dict from collections import deque from core.schema import StandardMessage, StandardResponse @@ -26,6 +27,8 @@ TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2 DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒) PENDING_TRANSFER_POLL_SECONDS = 30 PENDING_TRANSFER_RETRY_SECONDS = 60 +TRANSFER_RETRY_WINDOW_SEC = 300 +TRANSFER_RETRY_GAP_SEC = 45 # 转接后安抚话术池(轮换使用,避免复读) _TRANSFER_CALM_REPLIES = [ @@ -46,6 +49,8 @@ _OUTBOUND_BLOCK_MARKERS = ( '[{"name":', ) +_TRANSFER_COMMAND_MARKER = "[转移会话]" + # 历史记录格式检测模式(AI 转述历史时容易泄露) _HISTORY_LEAK_PATTERNS = [ r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[::]', # [2026-03-07 12:00:00] 客户: @@ -80,6 +85,7 @@ class SystemOrchestrator: self._pending_messages: Dict[str, List[StandardMessage]] = {} self._user_locks: Dict[str, asyncio.Lock] = {} self._pending_transfer_task: Optional[asyncio.Task] = None + self._last_retry_transfer_time: Dict[str, float] = {} bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event) @@ -107,6 +113,99 @@ class SystemOrchestrator: self._pending_transfer_task = asyncio.create_task(self._process_pending_transfers_loop()) logger.info("[Orchestrator] 待转接轮询任务已启动") + @staticmethod + def _parse_history_ts(ts: Any) -> Optional[datetime]: + text = str(ts or "").strip() + if not text: + return None + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"): + try: + return datetime.strptime(text, fmt) + except ValueError: + continue + return None + + def _find_stalled_transfer(self, history: List[dict]) -> Optional[dict]: + if not history: + return None + + last_transfer_idx = -1 + for idx in range(len(history) - 1, -1, -1): + item = history[idx] + if item.get("role") == "assistant" and _TRANSFER_COMMAND_MARKER in str(item.get("content") or ""): + last_transfer_idx = idx + break + + if last_transfer_idx < 0: + return None + + transfer_item = history[last_transfer_idx] + transfer_at = self._parse_history_ts(transfer_item.get("timestamp")) + if not transfer_at: + return None + + elapsed = time.time() - transfer_at.timestamp() + if elapsed < 0 or elapsed > TRANSFER_RETRY_WINDOW_SEC: + return None + + after_transfer = history[last_transfer_idx + 1:] + if not any(item.get("role") == "user" for item in after_transfer): + return None + + for item in after_transfer: + if item.get("role") != "assistant": + continue + content = str(item.get("content") or "") + if _TRANSFER_COMMAND_MARKER not in content: + return None + + return { + "timestamp": transfer_at, + "elapsed": elapsed, + "content": str(transfer_item.get("content") or ""), + } + + async def _retry_stalled_transfer_if_needed( + self, + session_key: str, + user_id: str, + platform: str, + acc_id: str, + acc_type: str, + history: List[dict], + ) -> Optional[StandardResponse]: + stalled = self._find_stalled_transfer(history) + if not stalled: + return None + + last_retry_at = self._last_retry_transfer_time.get(session_key, 0.0) + if time.time() - last_retry_at < TRANSFER_RETRY_GAP_SEC: + logger.info( + f"[Orchestrator] 转接补发冷却中,先不重复补转: user={user_id} acc={acc_id}" + ) + return None + + logger.info( + f"[Orchestrator] 检测到疑似转接未接上,准备补发转接: " + f"user={user_id} acc={acc_id} elapsed={stalled['elapsed']:.0f}s" + ) + designer_name = await dispatch_service.assign_designer(user_id=user_id) + if not designer_name: + logger.info(f"[Orchestrator] 补发转接失败,当前仍无可用设计师: user={user_id} acc={acc_id}") + return None + + self._last_retry_transfer_time[session_key] = time.time() + return StandardResponse( + reply_content=f"正在为您转接|[转移会话],{designer_name},无原因", + need_transfer=True, + metadata={ + "acc_id": acc_id, + "acc_type": acc_type, + "transfer_prelude": "我再帮您转一下哈", + "retry_transfer": True, + }, + ) + @staticmethod def _sanitize_outbound_text(text: str) -> str: if not text: @@ -357,12 +456,28 @@ class SystemOrchestrator: if all_image_urls: asyncio.create_task(self._analyze_images_background(session_key, all_image_urls)) - # C. 冷却检查:转接成功后冷却期内,直接回安抚话术,不调AI + history_start = time.time() + history = await repo.get_chat_history(user_id, limit=12, acc_id=acc_id) + history_elapsed = time.time() - history_start + logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)") + ai_history = history[:-1] if history and history[-1].get("content") == db_content else history + + # C. 短时间追问且疑似没真正接上人工:优先补发一次转接 + std_res = await self._retry_stalled_transfer_if_needed( + session_key=session_key, + user_id=user_id, + platform=platform, + acc_id=acc_id, + acc_type=acc_type, + history=history, + ) + + # D. 冷却检查:转接成功后冷却期内,直接回安抚话术,不调AI last_transfer = self._last_transfer_time.get(session_key, 0) cooldown_elapsed = time.time() - last_transfer is_in_cooldown = cooldown_elapsed < TRANSFER_COOLDOWN_SEC - if is_in_cooldown: + if std_res is None and is_in_cooldown: idx = self._transfer_calm_idx.get(session_key, 0) calm_reply = _TRANSFER_CALM_REPLIES[idx % len(_TRANSFER_CALM_REPLIES)] self._transfer_calm_idx[session_key] = idx + 1 @@ -371,21 +486,16 @@ class SystemOrchestrator: reply_content=calm_reply, metadata={"acc_id": acc_id, "acc_type": acc_type} ) - else: - # D. 正常流程:调用AI思考 - history_start = time.time() - history = await repo.get_chat_history(user_id, limit=10, acc_id=acc_id) - if history and history[-1].get('content') == db_content: history = history[:-1] - history_elapsed = time.time() - history_start - logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)") - + + if std_res is None: + # E. 正常流程:调用AI思考 ai_start = time.time() - std_res = await self.brain.think_and_reply(final_msg, history=history) + std_res = await self.brain.think_and_reply(final_msg, history=ai_history) ai_elapsed = time.time() - ai_start total_elapsed = time.time() - process_start logger.info(f"[计时] user={user_id} AI思考: {ai_elapsed:.1f}s | 总耗时: {total_elapsed:.1f}s") - # E. 发送并记录时间 + # F. 发送并记录时间 if std_res.should_reply: std_res.reply_content = self._sanitize_outbound_text(std_res.reply_content) meta = dict(std_res.metadata or {}) @@ -394,8 +504,9 @@ class SystemOrchestrator: # 转接场景:先发一句安抚话,再发转接指令 if "[转移会话]" in std_res.reply_content: + transfer_prelude = str(std_res.metadata.get("transfer_prelude") or "").strip() greet = StandardResponse( - reply_content="收到,我叫设计师来看下哈", + reply_content=transfer_prelude or "收到,我叫设计师来看下哈", metadata={"acc_id": acc_id, "acc_type": acc_type} ) await self.qianniu_adapter.translate_outbound(greet, user_id)