import re import logging import json from pathlib import Path from typing import List, Tuple from core.adapters.base import BaseAdapter from core.schema import StandardMessage, StandardResponse logger = logging.getLogger("cs_agent") class QianniuAdapter(BaseAdapter): """ 千牛适配器:支持识别消息来源(客户 vs 商家人工)。 """ def __init__(self, ws_client=None): self.ws_client = ws_client self._default_group_id = "20252916034" def platform_id(self) -> str: return "qianniu" def _resolve_group_id(self, acc_id: str) -> str: try: config_path = Path("config/transfer_groups.json") if config_path.exists(): with open(config_path, "r", encoding="utf-8") as f: cfg = json.load(f) return cfg.get(acc_id, self._default_group_id) except Exception as e: logger.warning(f"[QianniuAdapter] 读取转接分组配置失败: {e}") return self._default_group_id async def translate_inbound(self, raw: dict) -> Tuple[StandardMessage, str]: """ 返回: (标准消息, 消息方向) direction: 'in' (客户发给商家), 'out' (商家人工在后台回复) """ if not isinstance(raw, dict): raw = {} acc_id = str(raw.get("acc_id") or raw.get("shop_id") or "") from_id = str(raw.get("from_id") or raw.get("cy_id") or "") msg_text = str(raw.get("msg") or raw.get("content") or "") # 判断方向:如果 from_id 包含了店铺名或 acc_id,通常说明是商家自己在说话 # 或者逆向接口通常有一个特定的标识,这里我们做一个通用的逻辑判断 direction = "in" user_id = from_id # 逻辑:如果发送者 ID 等于 店铺 ID,说明是【商家人工回复】 if from_id == acc_id and acc_id != "": direction = "out" # 此时 cy_id (客户ID) 通常在另一个字段里 user_id = str(raw.get("cy_id") or "") msg = StandardMessage( platform=self.platform_id(), msg_id=str(raw.get("msg_id", "")), user_id=user_id, user_name=str(raw.get("from_name", "")), content=msg_text, image_urls=self._extract_urls(msg_text), acc_id=acc_id, acc_type=str(raw.get("acc_type") or "AliWorkbench"), raw_data=raw ) return msg, direction async def translate_outbound(self, res: StandardResponse, user_id: str): if not self.ws_client: return if not res or (not res.should_reply and not res.need_transfer): return meta = res.metadata if isinstance(res.metadata, dict) else {} acc_id = meta.get("acc_id", "") acc_type = meta.get("acc_type", "AliWorkbench") if "[转移会话]" in res.reply_content: content = res.reply_content elif res.need_transfer: group_id = self._resolve_group_id(acc_id) content = f"正在为您转接|[转移会话],分组{group_id},无原因" else: content = res.reply_content try: logger.info( f"[REPLY->CUSTOMER] user={user_id} acc={acc_id} type={res.msg_type}\n{content}" ) await self.ws_client.send(customer_id=user_id, acc_id=acc_id, acc_type=acc_type, content=content, msg_type=res.msg_type) except Exception as e: logger.error(f"[QianniuAdapter] 发送失败: {e}") def _extract_urls(self, text: str) -> List[str]: if not text: return [] image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") candidates = re.findall(r'https?://[^\s#]+', text) return [u for u in candidates if any(ext in u.lower() for ext in image_exts)]