import logging import asyncio import re import time import json from datetime import datetime from typing import Optional, List, Any, Dict from collections import deque from core.schema import StandardMessage, StandardResponse from core.adapters.qianniu_adapter import QianniuAdapter from core.pydantic_ai_agent_v2 import CustomerServiceBrain from core.events.event_bus import bus from core.repository import repo from db.pending_transfer_db import ( enqueue_pending_transfer, claim_due_pending_transfers, complete_pending_transfer, retry_pending_transfer, ) from services.dispatch_service import dispatch_service from services.service_auto_image_pipeline import auto_image_pipeline_service logger = logging.getLogger("cs_agent") # 配置常量 MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量 TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒) FIRST_GREETING_IDLE_SEC = 180 # 超过该空闲时间,视为新一轮对话 PENDING_TRANSFER_POLL_SECONDS = 30 PENDING_TRANSFER_RETRY_SECONDS = 60 TRANSFER_RETRY_WINDOW_SEC = 300 TRANSFER_RETRY_GAP_SEC = 45 # 转接后安抚话术池(轮换使用,避免复读) _TRANSFER_CALM_REPLIES = [ "我在帮你催了哈,稍等下", "已经转了哈,马上就来", "收到,设计师在赶来了哈", "好的亲,稍等一下哈", "在催了在催了,马上哈", ] _OUTBOUND_BLOCK_MARKERS = ( "【历史记录摘要】", "【详细记录】", "【订单摘要】", "【订单详情】", " last_transfer_time) self._last_transfer_time: Dict[str, float] = {} self._transfer_calm_idx: Dict[str, int] = {} # 安抚话术轮换索引 # 3. 防抖配置 self._debounce_seconds = DEBOUNCE_SECONDS self._debounce_tasks: Dict[str, asyncio.Task] = {} self._pending_messages: Dict[str, List[StandardMessage]] = {} self._user_locks: Dict[str, asyncio.Lock] = {} self._last_inbound_seen_time: Dict[str, float] = {} self._pending_transfer_task: Optional[asyncio.Task] = None self._last_retry_transfer_time: Dict[str, float] = {} self._auto_pipeline_jobs: Dict[str, float] = {} bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event) @staticmethod def _has_transfer_intent(text: str) -> bool: if not text: return False t = str(text) keywords = ("转人工", "转接", "人工客服", "人工", "设计师", "叫人", "找人") return any(k in t for k in keywords) def _get_user_lock(self, user_id: str) -> asyncio.Lock: if user_id not in self._user_locks: self._user_locks[user_id] = asyncio.Lock() return self._user_locks[user_id] def _should_run_pending_transfer_worker(self) -> bool: worker_id = getattr(self.ws_client, "worker_id", -1) if self.ws_client else -1 return worker_id in (-1, 0) def _ensure_background_tasks(self): if not self._should_run_pending_transfer_worker(): return if self._pending_transfer_task is None or self._pending_transfer_task.done(): self._pending_transfer_task = asyncio.create_task(self._process_pending_transfers_loop()) logger.info("[Orchestrator] 待转接轮询任务已启动") @staticmethod def _parse_history_ts(ts: Any) -> Optional[datetime]: text = str(ts or "").strip() if not text: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"): try: return datetime.strptime(text, fmt) except ValueError: continue return None def _find_stalled_transfer(self, history: List[dict]) -> Optional[dict]: if not history: return None last_transfer_idx = -1 for idx in range(len(history) - 1, -1, -1): item = history[idx] if item.get("role") == "assistant" and _TRANSFER_COMMAND_MARKER in str(item.get("content") or ""): last_transfer_idx = idx break if last_transfer_idx < 0: return None transfer_item = history[last_transfer_idx] transfer_at = self._parse_history_ts(transfer_item.get("timestamp")) if not transfer_at: return None elapsed = time.time() - transfer_at.timestamp() if elapsed < 0 or elapsed > TRANSFER_RETRY_WINDOW_SEC: return None after_transfer = history[last_transfer_idx + 1:] if not any(item.get("role") == "user" for item in after_transfer): return None for item in after_transfer: if item.get("role") != "assistant": continue content = str(item.get("content") or "") if _TRANSFER_COMMAND_MARKER not in content: return None return { "timestamp": transfer_at, "elapsed": elapsed, "content": str(transfer_item.get("content") or ""), } async def _retry_stalled_transfer_if_needed( self, session_key: str, user_id: str, platform: str, acc_id: str, acc_type: str, history: List[dict], ) -> Optional[StandardResponse]: stalled = self._find_stalled_transfer(history) if not stalled: return None last_retry_at = self._last_retry_transfer_time.get(session_key, 0.0) if time.time() - last_retry_at < TRANSFER_RETRY_GAP_SEC: logger.info( f"[Orchestrator] 转接补发冷却中,先不重复补转: user={user_id} acc={acc_id}" ) return None logger.info( f"[Orchestrator] 检测到疑似转接未接上,准备补发转接: " f"user={user_id} acc={acc_id} elapsed={stalled['elapsed']:.0f}s" ) designer_name = await dispatch_service.assign_designer(user_id=user_id) if not designer_name: logger.info(f"[Orchestrator] 补发转接失败,当前仍无可用设计师: user={user_id} acc={acc_id}") return None self._last_retry_transfer_time[session_key] = time.time() return StandardResponse( reply_content=f"正在为您转接|[转移会话],{designer_name},无原因", need_transfer=True, metadata={ "acc_id": acc_id, "acc_type": acc_type, "transfer_prelude": "我再帮您转一下哈", "retry_transfer": True, }, ) @staticmethod def _sanitize_outbound_text(text: str) -> str: if not text: return "" cleaned = str(text).strip() if _TRANSFER_COMMAND_RE.fullmatch(cleaned): return cleaned if _TRANSFER_COMMAND_MARKER in cleaned: logger.warning("[Orchestrator] 检测到混入正文的转接指令,替换为安全兜底回复") return "我在帮你看记录,稍等哈" if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): logger.warning("[Orchestrator] 拦截到内部内容外发,替换为安全兜底回复") return "我在帮你看记录,稍等哈" # 检查历史记录泄露模式 for pattern in _HISTORY_LEAK_PATTERNS: if re.search(pattern, cleaned): logger.warning(f"[Orchestrator] 检测到历史记录泄露模式: {pattern[:30]}...") return "我在帮你看记录,稍等哈" return cleaned @staticmethod def _sanitize_history_content_for_ai(text: str) -> str: cleaned = str(text or "").strip() if not cleaned: return "" if _TRANSFER_COMMAND_RE.fullmatch(cleaned): return "系统:之前已转接设计师" if "【历史记录摘要】" in cleaned or "【详细记录】" in cleaned: return "系统:刚刚查过历史记录" if "【订单摘要】" in cleaned or "【订单详情】" in cleaned: return "系统:刚刚查过订单记录" if _TRANSFER_COMMAND_MARKER in cleaned: cleaned = re.sub( r"正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*", "系统:之前已转接设计师", cleaned, ) return cleaned def _sanitize_history_for_ai(self, history: List[dict]) -> List[dict]: sanitized = [] for item in history or []: normalized = dict(item) normalized["content"] = self._sanitize_history_content_for_ai(item.get("content", "")) sanitized.append(normalized) return sanitized @staticmethod def _extract_designer_name(transfer_cmd: str) -> str: text = str(transfer_cmd or "").strip() match = re.search(r"\[转移会话\],([^,]+),", text) return str(match.group(1)).strip() if match else "" @staticmethod def _infer_processing_intent(requirement_text: str, history: Optional[List[dict]] = None) -> str: combined_parts = [str(requirement_text or "").lower()] for item in history or []: if item.get("role") == "user": combined_parts.append(str(item.get("content") or "").lower()) combined = "\n".join(combined_parts) repair_keywords = ("修复", "高清", "清晰", "放大", "老照片") if any(k in combined for k in repair_keywords): return "repair" return "find_original" @staticmethod def _collect_recent_image_urls(history: List[dict], fallback_urls: Optional[List[str]] = None) -> List[str]: urls: List[str] = [] seen = set() def add_url(url: str): value = str(url or "").strip() if not value or value in seen: return seen.add(value) urls.append(value) for url in fallback_urls or []: add_url(url) for item in reversed(history or []): if item.get("role") != "user": continue raw_urls = item.get("image_urls") or [] if isinstance(raw_urls, str): for part in re.split(r"[\n#]+", raw_urls): add_url(part) elif isinstance(raw_urls, list): for part in raw_urls: add_url(part) content = str(item.get("content") or "") for match in re.findall(r"https?://[^\s#]+", content): add_url(match) if len(urls) >= 5: break return urls def _schedule_auto_pipeline( self, *, session_key: str, customer_id: str, acc_id: str, designer_name: str, requirement_text: str, history: List[dict], image_urls: Optional[List[str]] = None, ): resolved_urls = self._collect_recent_image_urls(history, image_urls) if not resolved_urls: logger.info(f"[Orchestrator] 自动处理跳过:未找到客户图片 user={customer_id} acc={acc_id}") return intent = self._infer_processing_intent(requirement_text, history) signature_src = f"{session_key}|{designer_name}|{intent}|{'|'.join(resolved_urls)}" signature = str(abs(hash(signature_src))) now = time.time() last_run = self._auto_pipeline_jobs.get(signature, 0.0) if now - last_run < 600: logger.info(f"[Orchestrator] 自动处理已在近期触发,跳过重复任务 user={customer_id} acc={acc_id}") return self._auto_pipeline_jobs[signature] = now async def _runner(): try: result = await auto_image_pipeline_service.process_and_notify( session_key=session_key, customer_id=customer_id, acc_id=acc_id, designer_name=designer_name, requirement_text=requirement_text, image_urls=resolved_urls, intent=intent, ) logger.info( f"[Orchestrator] 自动处理完成 user={customer_id} acc={acc_id} " f"ok={result.get('success')} uploaded={len(result.get('uploaded') or [])}" ) except Exception as e: logger.warning(f"[Orchestrator] 自动处理失败 user={customer_id} acc={acc_id}: {e}") asyncio.create_task(_runner()) async def on_raw_message_received(self, platform: str, raw_data: dict): """链路入口""" try: if platform != "qianniu": return self._ensure_background_tasks() std_msg, direction = await self.qianniu_adapter.translate_inbound(raw_data) # 关键修复:确保 user_id 绝不为空 user_id = std_msg.user_id or str(raw_data.get("cy_id") or raw_data.get("from_id") or "unknown") std_msg.user_id = user_id # 店铺隔离:同一客户在不同店铺的对话独立处理 session_key = f"{user_id}@{std_msg.acc_id}" # 订单消息 / 纯金额通知 / SKU信息:静默入库,不触发 AI 回复 msg_text = std_msg.content or "" is_order = "[系统订单信息]" in msg_text is_price_only = bool(re.match(r'^[\s\n]*金?额?[::]?\s*[\d.]+\s*元', msg_text.strip())) is_sku_only = bool(re.match(r'^[\s\n]*(备注[::]|数量[::]|款式[::]|定制[::])', msg_text.strip())) is_sku_amount = bool(re.match(r'^[\s\n]*金额[::]\s*[\d.]+元\s*●', msg_text.strip())) if is_order or is_price_only or is_sku_only or is_sku_amount: await self._handle_order_packet(platform, std_msg) logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态") await repo.save_chat( platform, user_id, msg_text, "in", acc_id=std_msg.acc_id, msg_type=std_msg.msg_type, ) return preview = (std_msg.content or "").replace("\n", "\\n") if len(preview) > 120: preview = preview[:120] + "..." logger.info( f"[监听消息] dir={direction} user={user_id} acc={std_msg.acc_id} " f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}" ) # 过滤心跳;图片消息即使暂时没拿到 URL,也不能直接丢掉 if std_msg.msg_type != 1 and not (std_msg.content or "").strip() and not std_msg.image_urls: return # 如果是商家人工回复,静默入库 if direction == "out": await repo.save_chat( platform, user_id, std_msg.content, "out", acc_id=std_msg.acc_id, msg_type=std_msg.msg_type, ) return # ID 去重 if std_msg.msg_id: if std_msg.msg_id in self._processed_msg_ids: return self._processed_msg_ids.append(std_msg.msg_id) # 同一轮对话只在第一句先发固定承接,不经过 AI now_ts = time.time() last_inbound_ts = self._last_inbound_seen_time.get(session_key, 0.0) self._last_inbound_seen_time[session_key] = now_ts if now_ts - last_inbound_ts >= FIRST_GREETING_IDLE_SEC: first_greet = StandardResponse( reply_content="在的 亲", metadata={"acc_id": std_msg.acc_id, "acc_type": std_msg.acc_type}, ) await self.qianniu_adapter.translate_outbound(first_greet, user_id) await repo.save_chat( platform, user_id, first_greet.reply_content, "out", acc_id=std_msg.acc_id, msg_type=first_greet.msg_type, ) # 进入防抖(使用 session_key 隔离不同店铺) if session_key in self._debounce_tasks: self._debounce_tasks[session_key].cancel() if session_key not in self._pending_messages: self._pending_messages[session_key] = [] self._pending_messages[session_key].append(std_msg) self._debounce_tasks[session_key] = asyncio.create_task(self._debounced_process(session_key, user_id, platform)) except Exception as e: logger.error(f"[Orchestrator] 处理失败: {e}") async def _handle_order_packet(self, platform: str, msg: StandardMessage): try: from core.order_helpers import parse_order_info from db.chat_log_db import upsert_order content = msg.content or "" info = parse_order_info(content) price_match = re.search(r"金额[::]\s*([\d\.]+)\s*元", content) if price_match: await repo.update_task_price(platform, msg.user_id, float(price_match.group(1))) if any(k in content for k in ["买家已付款", "卖家已发货"]): await repo.update_task_outcome(platform, msg.user_id, "deal_success") elif any(k in content for k in ["退款", "已关闭", "已取消"]): await repo.update_task_outcome(platform, msg.user_id, "refunded") # 结构化写入 customer_orders 表 order_id = info.get("order_id", "") if order_id and msg.user_id and msg.user_id != "unknown": title_match = re.search(r"商品标题[::]\s*([^\s]+(?:\s+[^\s订]+)*)", content) product_title = title_match.group(1).strip() if title_match else "" amount = float(info.get("amount", 0)) quantity = int(info.get("quantity", 0)) order_status = info.get("order_status", "") buyer_note = info.get("buyer_note", "") await asyncio.to_thread( upsert_order, customer_id=msg.user_id, order_id=order_id, order_status=order_status, acc_id=msg.acc_id, product_title=product_title, amount=amount, quantity=quantity, buyer_note=buyer_note, ) logger.info(f"[订单入库] user={msg.user_id} order={order_id} status={order_status} amount={amount}") except Exception as e: logger.warning(f"[Orchestrator] 订单消息处理异常: {e}") async def _analyze_images_background(self, session_key: str, image_urls: List[str], requirement_text: str = ""): """后台静默分析图片,存入用户数据库用于数据标定""" try: from services.service_image_analyzer import image_analyzer_service from db.customer_db import CustomerDatabase db = CustomerDatabase() profile = db.get_customer(session_key) for url in (image_urls or [])[:1]: try: result = await image_analyzer_service.analyze(url, customer_requirement=requirement_text) result_json = json.dumps(result, ensure_ascii=False) # 更新最近一次分析 profile.last_image_analysis = result_json profile.last_image_url = url profile.last_gemini_prompt = result.get("gemini_prompt", "") profile.last_aspect_ratio = result.get("aspect_ratio", "1:1") profile.last_perspective = result.get("perspective", "no") # 追加到历史记录(保留最近20条) if profile.image_analysis_history is None: profile.image_analysis_history = [] profile.image_analysis_history.append(result_json) if len(profile.image_analysis_history) > 20: profile.image_analysis_history = profile.image_analysis_history[-20:] # 更新复杂度历史 complexity = result.get("complexity", "normal") if profile.complexity_history is None: profile.complexity_history = [] profile.complexity_history.append(complexity) if len(profile.complexity_history) > 10: profile.complexity_history = profile.complexity_history[-10:] # 更新图片类型历史 proc_type = result.get("proc_type", "") if proc_type and profile.image_type_history is not None: if proc_type not in profile.image_type_history: profile.image_type_history.append(proc_type) logger.debug(f"[ImageAnalysis] session={session_key} 分析完成: {result.get('subject', '?')} | {complexity}") except Exception as e: logger.warning(f"[ImageAnalysis] 单张图片分析失败: {e}") continue # 保存更新 db.save_customer(profile) logger.info(f"[ImageAnalysis] session={session_key} 分析结果已保存到数据库") except Exception as e: logger.warning(f"[ImageAnalysis] 后台分析失败: {e}") async def _debounced_process(self, session_key: str, user_id: str, platform: str): try: # 记录开始时间(防抖前) process_start = time.time() await asyncio.sleep(self._debounce_seconds) async with self._get_user_lock(session_key): messages = self._pending_messages.pop(session_key, []) if not messages: return debounce_elapsed = time.time() - process_start logger.info(f"[计时] user={user_id} 防抖等待完成: {debounce_elapsed:.1f}s") # A. 合并与元数据修复(去重:同一防抖窗口内完全相同的内容只保留一条) seen_contents = set() unique_parts = [] for m in messages: c = (m.content or "").strip() if c and c not in seen_contents: seen_contents.add(c) unique_parts.append(c) combined_content = "\n".join(unique_parts) all_image_urls = [] acc_id = messages[-1].acc_id acc_type = messages[-1].acc_type for m in messages: for url in m.image_urls: if url not in all_image_urls: all_image_urls.append(url) # 防抖合并后的消息仍需有 msg_id,避免触发 StandardMessage 校验失败 merged_msg_id = messages[-1].msg_id if messages[-1].msg_id else f"merged-{user_id}-{int(time.time() * 1000)}" final_msg = StandardMessage( platform=platform, msg_id=merged_msg_id, user_id=user_id, content=combined_content, msg_type=messages[-1].msg_type, image_urls=all_image_urls, acc_id=acc_id, acc_type=acc_type ) # B. 持久化 db_start = time.time() db_content = combined_content if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}" await repo.save_chat( platform, user_id, db_content, "in", acc_id=acc_id, image_urls=all_image_urls, msg_type=final_msg.msg_type, ) db_elapsed = time.time() - db_start logger.info(f"[计时] user={user_id} 消息入库: {db_elapsed:.2f}s") # B2. 后台图片分析(不阻塞主流程,用于数据标定) if all_image_urls: asyncio.create_task(self._analyze_images_background(session_key, all_image_urls, combined_content)) history_start = time.time() history = await repo.get_chat_history(user_id, limit=12, acc_id=acc_id) history_elapsed = time.time() - history_start logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)") ai_history = history[:-1] if history and history[-1].get("content") == db_content else history ai_history = self._sanitize_history_for_ai(ai_history) # C. 短时间追问且疑似没真正接上人工:优先补发一次转接 std_res = await self._retry_stalled_transfer_if_needed( session_key=session_key, user_id=user_id, platform=platform, acc_id=acc_id, acc_type=acc_type, history=history, ) # D. 冷却检查:转接成功后冷却期内,直接回安抚话术,不调AI last_transfer = self._last_transfer_time.get(session_key, 0) cooldown_elapsed = time.time() - last_transfer is_in_cooldown = cooldown_elapsed < TRANSFER_COOLDOWN_SEC if std_res is None and is_in_cooldown: idx = self._transfer_calm_idx.get(session_key, 0) calm_reply = _TRANSFER_CALM_REPLIES[idx % len(_TRANSFER_CALM_REPLIES)] self._transfer_calm_idx[session_key] = idx + 1 logger.info(f"[Orchestrator] 转接冷却中({cooldown_elapsed:.0f}s),直接安抚: {calm_reply}") std_res = StandardResponse( reply_content=calm_reply, metadata={"acc_id": acc_id, "acc_type": acc_type} ) if std_res is None: # E. 正常流程:调用AI思考 ai_start = time.time() std_res = await self.brain.think_and_reply(final_msg, history=ai_history) ai_elapsed = time.time() - ai_start total_elapsed = time.time() - process_start logger.info(f"[计时] user={user_id} AI思考: {ai_elapsed:.1f}s | 总耗时: {total_elapsed:.1f}s") # F. 发送并记录时间 if std_res.should_reply: std_res.reply_content = self._sanitize_outbound_text(std_res.reply_content) meta = dict(std_res.metadata or {}) meta.update({"acc_id": acc_id, "acc_type": acc_type}) std_res.metadata = meta # 转接场景:先发一句安抚话,再发转接指令 if "[转移会话]" in std_res.reply_content: designer_name = self._extract_designer_name(std_res.reply_content) transfer_prelude = str(std_res.metadata.get("transfer_prelude") or "").strip() greet = StandardResponse( reply_content=transfer_prelude or "收到,我叫设计师来看下哈", metadata={"acc_id": acc_id, "acc_type": acc_type} ) await self.qianniu_adapter.translate_outbound(greet, user_id) await repo.save_chat( platform, user_id, greet.reply_content, "out", acc_id=acc_id, msg_type=greet.msg_type, ) await asyncio.sleep(0.5) await self.qianniu_adapter.translate_outbound(std_res, user_id) await repo.save_chat( platform, user_id, std_res.reply_content, "out", acc_id=acc_id, msg_type=std_res.msg_type, ) if std_res.metadata.get("pending_transfer"): reason = str(std_res.metadata.get("pending_transfer_reason") or "").strip() if reason: pending_id = await asyncio.to_thread( enqueue_pending_transfer, customer_id=user_id, acc_id=acc_id, acc_type=acc_type, platform=platform, reason=reason, ) logger.info( f"[Orchestrator] 已加入待转接池: pending_id={pending_id} user={user_id} acc={acc_id}" ) if "[转移会话]" in std_res.reply_content: self._last_transfer_time[session_key] = time.time() self._schedule_auto_pipeline( session_key=session_key, customer_id=user_id, acc_id=acc_id, designer_name=self._extract_designer_name(std_res.reply_content), requirement_text=combined_content, history=history, image_urls=all_image_urls, ) except asyncio.CancelledError: pass except Exception as e: logger.exception(f"[Orchestrator] 处理失败: {e}") async def handle_outbound_event(self, user_id: str, platform: str, response: StandardResponse): if platform == "qianniu": if response and response.msg_type == 0: response.reply_content = self._sanitize_outbound_text(response.reply_content) await self.qianniu_adapter.translate_outbound(response, user_id) async def _process_pending_transfers_loop(self): while True: try: if not self.ws_client or not getattr(self.ws_client, "websocket", None): await asyncio.sleep(PENDING_TRANSFER_POLL_SECONDS) continue rows = await asyncio.to_thread(claim_due_pending_transfers, 10) if not rows: await asyncio.sleep(PENDING_TRANSFER_POLL_SECONDS) continue for row in rows: row_id = int(row["id"]) customer_id = str(row.get("customer_id") or "") acc_id = str(row.get("acc_id") or "") acc_type = str(row.get("acc_type") or "AliWorkbench") reason = str(row.get("reason") or "").strip() try: designer_name = await dispatch_service.assign_designer(user_id=customer_id) if not designer_name: await asyncio.to_thread( retry_pending_transfer, row_id, PENDING_TRANSFER_RETRY_SECONDS, "designer_unavailable", ) continue notify = StandardResponse( reply_content="设计师上线了,我给您转过去哈", metadata={"acc_id": acc_id, "acc_type": acc_type}, ) transfer = StandardResponse( reply_content=f"正在为您转接|[转移会话],{designer_name},无原因", need_transfer=True, metadata={"acc_id": acc_id, "acc_type": acc_type}, ) await self.qianniu_adapter.translate_outbound(notify, customer_id) await repo.save_chat( "qianniu", customer_id, notify.reply_content, "out", acc_id=acc_id, msg_type=notify.msg_type, ) await asyncio.sleep(0.5) await self.qianniu_adapter.translate_outbound(transfer, customer_id) await repo.save_chat( "qianniu", customer_id, transfer.reply_content, "out", acc_id=acc_id, msg_type=transfer.msg_type, ) self._last_transfer_time[f"{customer_id}@{acc_id}"] = time.time() history = await repo.get_chat_history(customer_id, limit=12, acc_id=acc_id) self._schedule_auto_pipeline( session_key=f"{customer_id}@{acc_id}", customer_id=customer_id, acc_id=acc_id, designer_name=designer_name, requirement_text=reason, history=history, ) await asyncio.to_thread(complete_pending_transfer, row_id) logger.info( f"[Orchestrator] 待转接自动完成: pending_id={row_id} user={customer_id} designer={designer_name} reason={reason}" ) except Exception as e: logger.warning(f"[Orchestrator] 待转接处理失败 pending_id={row_id}: {e}") await asyncio.to_thread( retry_pending_transfer, row_id, PENDING_TRANSFER_RETRY_SECONDS, str(e), ) except asyncio.CancelledError: break except Exception as e: logger.warning(f"[Orchestrator] 待转接轮询异常: {e}") await asyncio.sleep(PENDING_TRANSFER_RETRY_SECONDS) # 全局单例 orchestrator: Optional[SystemOrchestrator] = None def init_orchestrator(ws_client): global orchestrator orchestrator = SystemOrchestrator(ws_client) return orchestrator