import logging import asyncio import re import time import json from typing import Optional, List, Any, Dict from collections import deque from core.schema import StandardMessage, StandardResponse from core.adapters.qianniu_adapter import QianniuAdapter from core.pydantic_ai_agent_v2 import CustomerServiceBrain from core.events.event_bus import bus from core.repository import repo logger = logging.getLogger("cs_agent") # 配置常量 MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量 TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒) # 转接后安抚话术池(轮换使用,避免复读) _TRANSFER_CALM_REPLIES = [ "我在帮你催了哈,稍等下", "已经转了哈,马上就来", "收到,设计师在赶来了哈", "好的亲,稍等一下哈", "在催了在催了,马上哈", ] _OUTBOUND_BLOCK_MARKERS = ( "【历史记录摘要】", "【详细记录】", "【订单摘要】", "【订单详情】", " last_transfer_time) self._last_transfer_time: Dict[str, float] = {} self._transfer_calm_idx: Dict[str, int] = {} # 安抚话术轮换索引 # 3. 防抖配置 self._debounce_seconds = DEBOUNCE_SECONDS self._debounce_tasks: Dict[str, asyncio.Task] = {} self._pending_messages: Dict[str, List[StandardMessage]] = {} self._user_locks: Dict[str, asyncio.Lock] = {} bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event) @staticmethod def _has_transfer_intent(text: str) -> bool: if not text: return False t = str(text) keywords = ("转人工", "转接", "人工客服", "人工", "设计师", "叫人", "找人") return any(k in t for k in keywords) def _get_user_lock(self, user_id: str) -> asyncio.Lock: if user_id not in self._user_locks: self._user_locks[user_id] = asyncio.Lock() return self._user_locks[user_id] @staticmethod def _sanitize_outbound_text(text: str) -> str: if not text: return "" cleaned = str(text).strip() if "[转移会话]" in cleaned: return cleaned if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): logger.warning("[Orchestrator] 拦截到内部内容外发,替换为安全兜底回复") return "我在帮你看记录,稍等哈" # 检查历史记录泄露模式 for pattern in _HISTORY_LEAK_PATTERNS: if re.search(pattern, cleaned): logger.warning(f"[Orchestrator] 检测到历史记录泄露模式: {pattern[:30]}...") return "我在帮你看记录,稍等哈" return cleaned async def on_raw_message_received(self, platform: str, raw_data: dict): """链路入口""" try: if platform != "qianniu": return std_msg, direction = await self.qianniu_adapter.translate_inbound(raw_data) # 关键修复:确保 user_id 绝不为空 user_id = std_msg.user_id or str(raw_data.get("cy_id") or raw_data.get("from_id") or "unknown") std_msg.user_id = user_id # 店铺隔离:同一客户在不同店铺的对话独立处理 session_key = f"{user_id}@{std_msg.acc_id}" # 订单消息 / 纯金额通知 / SKU信息:静默入库,不触发 AI 回复 msg_text = std_msg.content or "" is_order = "[系统订单信息]" in msg_text is_price_only = bool(re.match(r'^[\s\n]*金?额?[::]?\s*[\d.]+\s*元', msg_text.strip())) is_sku_only = bool(re.match(r'^[\s\n]*(备注[::]|数量[::]|款式[::]|定制[::])', msg_text.strip())) is_sku_amount = bool(re.match(r'^[\s\n]*金额[::]\s*[\d.]+元\s*●', msg_text.strip())) if is_order or is_price_only or is_sku_only or is_sku_amount: await self._handle_order_packet(platform, std_msg) logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态") await repo.save_chat(platform, user_id, msg_text, "in", acc_id=std_msg.acc_id) return preview = (std_msg.content or "").replace("\n", "\\n") if len(preview) > 120: preview = preview[:120] + "..." logger.info( f"[监听消息] dir={direction} user={user_id} acc={std_msg.acc_id} " f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}" ) # 过滤心跳 if not (std_msg.content or "").strip() and not std_msg.image_urls: return # 如果是商家人工回复,静默入库 if direction == "out": await repo.save_chat(platform, user_id, std_msg.content, "out", acc_id=std_msg.acc_id) return # ID 去重 if std_msg.msg_id: if std_msg.msg_id in self._processed_msg_ids: return self._processed_msg_ids.append(std_msg.msg_id) # 进入防抖(使用 session_key 隔离不同店铺) if session_key in self._debounce_tasks: self._debounce_tasks[session_key].cancel() if session_key not in self._pending_messages: self._pending_messages[session_key] = [] self._pending_messages[session_key].append(std_msg) self._debounce_tasks[session_key] = asyncio.create_task(self._debounced_process(session_key, user_id, platform)) except Exception as e: logger.error(f"[Orchestrator] 处理失败: {e}") async def _handle_order_packet(self, platform: str, msg: StandardMessage): try: from core.order_helpers import parse_order_info from db.chat_log_db import upsert_order content = msg.content or "" info = parse_order_info(content) price_match = re.search(r"金额[::]\s*([\d\.]+)\s*元", content) if price_match: await repo.update_task_price(platform, msg.user_id, float(price_match.group(1))) if any(k in content for k in ["买家已付款", "卖家已发货"]): await repo.update_task_outcome(platform, msg.user_id, "deal_success") elif any(k in content for k in ["退款", "已关闭", "已取消"]): await repo.update_task_outcome(platform, msg.user_id, "refunded") # 结构化写入 customer_orders 表 order_id = info.get("order_id", "") if order_id and msg.user_id and msg.user_id != "unknown": title_match = re.search(r"商品标题[::]\s*([^\s]+(?:\s+[^\s订]+)*)", content) product_title = title_match.group(1).strip() if title_match else "" amount = float(info.get("amount", 0)) quantity = int(info.get("quantity", 0)) order_status = info.get("order_status", "") buyer_note = info.get("buyer_note", "") await asyncio.to_thread( upsert_order, customer_id=msg.user_id, order_id=order_id, order_status=order_status, acc_id=msg.acc_id, product_title=product_title, amount=amount, quantity=quantity, buyer_note=buyer_note, ) logger.info(f"[订单入库] user={msg.user_id} order={order_id} status={order_status} amount={amount}") except Exception as e: logger.warning(f"[Orchestrator] 订单消息处理异常: {e}") async def _analyze_images_background(self, session_key: str, image_urls: List[str]): """后台静默分析图片,存入用户数据库用于数据标定""" try: from services.service_image_analyzer import image_analyzer_service from db.customer_db import CustomerDatabase db = CustomerDatabase() profile = db.get_customer(session_key) for url in image_urls: try: result = await image_analyzer_service.analyze(url) result_json = json.dumps(result, ensure_ascii=False) # 更新最近一次分析 profile.last_image_analysis = result_json profile.last_image_url = url profile.last_gemini_prompt = result.get("gemini_prompt", "") profile.last_aspect_ratio = result.get("aspect_ratio", "1:1") profile.last_perspective = result.get("perspective", "no") # 追加到历史记录(保留最近20条) if profile.image_analysis_history is None: profile.image_analysis_history = [] profile.image_analysis_history.append(result_json) if len(profile.image_analysis_history) > 20: profile.image_analysis_history = profile.image_analysis_history[-20:] # 更新复杂度历史 complexity = result.get("complexity", "normal") if profile.complexity_history is None: profile.complexity_history = [] profile.complexity_history.append(complexity) if len(profile.complexity_history) > 10: profile.complexity_history = profile.complexity_history[-10:] # 更新图片类型历史 proc_type = result.get("proc_type", "") if proc_type and profile.image_type_history is not None: if proc_type not in profile.image_type_history: profile.image_type_history.append(proc_type) logger.debug(f"[ImageAnalysis] session={session_key} 分析完成: {result.get('subject', '?')} | {complexity}") except Exception as e: logger.warning(f"[ImageAnalysis] 单张图片分析失败: {e}") continue # 保存更新 db.save_customer(profile) logger.info(f"[ImageAnalysis] session={session_key} 分析结果已保存到数据库") except Exception as e: logger.warning(f"[ImageAnalysis] 后台分析失败: {e}") async def _debounced_process(self, session_key: str, user_id: str, platform: str): try: # 记录开始时间(防抖前) process_start = time.time() await asyncio.sleep(self._debounce_seconds) async with self._get_user_lock(session_key): messages = self._pending_messages.pop(session_key, []) if not messages: return debounce_elapsed = time.time() - process_start logger.info(f"[计时] user={user_id} 防抖等待完成: {debounce_elapsed:.1f}s") # A. 合并与元数据修复(去重:同一防抖窗口内完全相同的内容只保留一条) seen_contents = set() unique_parts = [] for m in messages: c = (m.content or "").strip() if c and c not in seen_contents: seen_contents.add(c) unique_parts.append(c) combined_content = "\n".join(unique_parts) all_image_urls = [] acc_id = messages[-1].acc_id acc_type = messages[-1].acc_type for m in messages: for url in m.image_urls: if url not in all_image_urls: all_image_urls.append(url) # 防抖合并后的消息仍需有 msg_id,避免触发 StandardMessage 校验失败 merged_msg_id = messages[-1].msg_id if messages[-1].msg_id else f"merged-{user_id}-{int(time.time() * 1000)}" final_msg = StandardMessage( platform=platform, msg_id=merged_msg_id, user_id=user_id, content=combined_content, msg_type=messages[-1].msg_type, image_urls=all_image_urls, acc_id=acc_id, acc_type=acc_type ) # B. 持久化 db_start = time.time() db_content = combined_content if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}" await repo.save_chat(platform, user_id, db_content, "in", acc_id=acc_id, image_urls=all_image_urls) db_elapsed = time.time() - db_start logger.info(f"[计时] user={user_id} 消息入库: {db_elapsed:.2f}s") # B2. 后台图片分析(不阻塞主流程,用于数据标定) if all_image_urls: asyncio.create_task(self._analyze_images_background(session_key, all_image_urls)) # C. 冷却检查:转接成功后冷却期内,直接回安抚话术,不调AI last_transfer = self._last_transfer_time.get(session_key, 0) cooldown_elapsed = time.time() - last_transfer is_in_cooldown = cooldown_elapsed < TRANSFER_COOLDOWN_SEC if is_in_cooldown: idx = self._transfer_calm_idx.get(session_key, 0) calm_reply = _TRANSFER_CALM_REPLIES[idx % len(_TRANSFER_CALM_REPLIES)] self._transfer_calm_idx[session_key] = idx + 1 logger.info(f"[Orchestrator] 转接冷却中({cooldown_elapsed:.0f}s),直接安抚: {calm_reply}") std_res = StandardResponse( reply_content=calm_reply, metadata={"acc_id": acc_id, "acc_type": acc_type} ) else: # D. 正常流程:调用AI思考 history_start = time.time() history = await repo.get_chat_history(user_id, limit=10, acc_id=acc_id) if history and history[-1].get('content') == db_content: history = history[:-1] history_elapsed = time.time() - history_start logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)") ai_start = time.time() std_res = await self.brain.think_and_reply(final_msg, history=history) ai_elapsed = time.time() - ai_start total_elapsed = time.time() - process_start logger.info(f"[计时] user={user_id} AI思考: {ai_elapsed:.1f}s | 总耗时: {total_elapsed:.1f}s") # E. 发送并记录时间 if std_res.should_reply: std_res.reply_content = self._sanitize_outbound_text(std_res.reply_content) std_res.metadata = {"acc_id": acc_id, "acc_type": acc_type} # 转接场景:先发一句安抚话,再发转接指令 if "[转移会话]" in std_res.reply_content: greet = StandardResponse( reply_content="收到,我叫设计师来看下哈", metadata={"acc_id": acc_id, "acc_type": acc_type} ) await self.qianniu_adapter.translate_outbound(greet, user_id) await repo.save_chat(platform, user_id, greet.reply_content, "out", acc_id=acc_id) await asyncio.sleep(0.5) await self.qianniu_adapter.translate_outbound(std_res, user_id) await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id) if "[转移会话]" in std_res.reply_content: self._last_transfer_time[session_key] = time.time() except asyncio.CancelledError: pass except Exception as e: logger.exception(f"[Orchestrator] 处理失败: {e}") async def handle_outbound_event(self, user_id: str, platform: str, response: StandardResponse): if platform == "qianniu": if response and response.msg_type == 0: response.reply_content = self._sanitize_outbound_text(response.reply_content) await self.qianniu_adapter.translate_outbound(response, user_id) # 全局单例 orchestrator: Optional[SystemOrchestrator] = None def init_orchestrator(ws_client): global orchestrator orchestrator = SystemOrchestrator(ws_client) return orchestrator