import re import logging import json from pathlib import Path from typing import List, Tuple, Any from core.adapters.base import BaseAdapter from core.schema import StandardMessage, StandardResponse logger = logging.getLogger("cs_agent") _OUTBOUND_BLOCK_MARKERS = ( "【历史记录摘要】", "【详细记录】", "【订单摘要】", "【订单详情】", " str: return "qianniu" def _resolve_group_id(self, acc_id: str) -> str: try: config_path = Path("config/transfer_groups.json") if config_path.exists(): with open(config_path, "r", encoding="utf-8") as f: cfg = json.load(f) return cfg.get(acc_id, self._default_group_id) except Exception as e: logger.warning(f"[QianniuAdapter] 读取转接分组配置失败: {e}") return self._default_group_id @staticmethod def _sanitize_outbound_text(content: str) -> str: if not content: return "" cleaned = str(content).strip() if "[转移会话]" in cleaned: return cleaned if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): logger.warning("[QianniuAdapter] 拦截到内部内容外发,替换为安全兜底回复") return "我在帮你看记录,稍等哈" for pattern in _HISTORY_LEAK_PATTERNS: if re.search(pattern, cleaned): logger.warning(f"[QianniuAdapter] 检测到历史记录泄露模式: {pattern[:30]}...") return "我在帮你看记录,稍等哈" return cleaned async def translate_inbound(self, raw: dict) -> Tuple[StandardMessage, str]: """ 返回: (标准消息, 消息方向) direction: 'in' (客户发给商家), 'out' (商家人工在后台回复) """ if not isinstance(raw, dict): raw = {} acc_id = str(raw.get("acc_id") or raw.get("shop_id") or "") from_id = str(raw.get("from_id") or raw.get("cy_id") or "") msg_text = str(raw.get("msg") or raw.get("content") or "") raw_msg_type = self._safe_int(raw.get("msg_type"), 0) image_urls = self._extract_inbound_image_urls(raw, msg_text) if raw_msg_type == 1 and not msg_text.strip(): msg_text = "【系统:已收到图片消息】" # 判断方向:如果 from_id 包含了店铺名或 acc_id,通常说明是商家自己在说话 # 或者逆向接口通常有一个特定的标识,这里我们做一个通用的逻辑判断 direction = "in" user_id = from_id # 逻辑:如果发送者 ID 等于 店铺 ID,说明是【商家人工回复】 if from_id == acc_id and acc_id != "": direction = "out" # 此时 cy_id (客户ID) 通常在另一个字段里 user_id = str(raw.get("cy_id") or "") msg = StandardMessage( platform=self.platform_id(), msg_id=str(raw.get("msg_id", "")), user_id=user_id, user_name=str(raw.get("from_name", "")), content=msg_text, msg_type=raw_msg_type, image_urls=image_urls, acc_id=acc_id, acc_type=str(raw.get("acc_type") or "AliWorkbench"), raw_data=raw ) return msg, direction async def translate_outbound(self, res: StandardResponse, user_id: str): if not self.ws_client: return if not res or (not res.should_reply and not res.need_transfer): return meta = res.metadata if isinstance(res.metadata, dict) else {} acc_id = meta.get("acc_id", "") acc_type = meta.get("acc_type", "AliWorkbench") if "[转移会话]" in res.reply_content: content = res.reply_content elif res.need_transfer: group_id = self._resolve_group_id(acc_id) content = f"正在为您转接|[转移会话],分组{group_id},无原因" else: content = res.reply_content if res.msg_type == 0: content = self._sanitize_outbound_text(content) try: logger.info( f"[REPLY->CUSTOMER] user={user_id} acc={acc_id} type={res.msg_type}\n{content}" ) await self.ws_client.send(customer_id=user_id, acc_id=acc_id, acc_type=acc_type, content=content, msg_type=res.msg_type) except Exception as e: logger.error(f"[QianniuAdapter] 发送失败: {e}") def _extract_urls(self, text: str) -> List[str]: if not text: return [] image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") candidates = re.findall(r'https?://[^\s#,"\'}\]]+', text) urls: List[str] = [] seen = set() for candidate in candidates: url = str(candidate or "").strip().rstrip('\'".,;:!?)') lower = url.lower() if not any(ext in lower for ext in image_exts): continue # 过滤被卡片/JSON 串污染的伪图片链接 if any(marker in lower for marker in ("%22title%22", "%22topic%22", '"title":', '"topic":', "%7d")): continue if url in seen: continue seen.add(url) urls.append(url) return urls @staticmethod def _safe_int(value: Any, default: int = 0) -> int: try: return int(value) except Exception: return default def _extract_inbound_image_urls(self, raw: dict, msg_text: str) -> List[str]: urls = [] seen = set() def add_url(url: str): if not url: return s = str(url).strip() if not s or s in seen: return if self._extract_urls(s): seen.add(s) urls.append(s) for url in self._extract_urls(msg_text): add_url(url) for url in self._find_image_urls_in_obj(raw): add_url(url) return urls def _find_image_urls_in_obj(self, obj: Any) -> List[str]: found: List[str] = [] def walk(val: Any): if val is None: return if isinstance(val, str): found.extend(self._extract_urls(val)) return if isinstance(val, dict): for item in val.values(): walk(item) return if isinstance(val, (list, tuple, set)): for item in val: walk(item) walk(obj) return found