Files
tw/core/adapters/qianniu_adapter.py

209 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
import logging
import json
from pathlib import Path
from typing import List, Tuple, Any
from core.adapters.base import BaseAdapter
from core.schema import StandardMessage, StandardResponse
logger = logging.getLogger("cs_agent")
_OUTBOUND_BLOCK_MARKERS = (
"【历史记录摘要】",
"【详细记录】",
"【订单摘要】",
"【订单详情】",
"<think",
"think_never_used",
'[{"name":',
)
_HISTORY_LEAK_PATTERNS = [
r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]',
r'\[\d{2}:\d{2}:\d{2}\]\s*(客户|客服|我)[:]',
r'(根据|查看|查询|翻看)(历史|聊天|对话)(记录|内容)',
r'历史(记录|对话|消息)(显示|表明|中)',
r'之前的(聊天|对话|记录)(中|里|显示)',
r'\d+条(历史|对话)?消息',
r'订单号[:]\s*\d{10,}',
r'(状态|金额|数量)[:].*(状态|金额|数量)[:]',
]
class QianniuAdapter(BaseAdapter):
"""
千牛适配器:支持识别消息来源(客户 vs 商家人工)。
"""
def __init__(self, ws_client=None):
self.ws_client = ws_client
self._default_group_id = "20252916034"
def platform_id(self) -> str:
return "qianniu"
def _resolve_group_id(self, acc_id: str) -> str:
try:
config_path = Path("config/transfer_groups.json")
if config_path.exists():
with open(config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
return cfg.get(acc_id, self._default_group_id)
except Exception as e:
logger.warning(f"[QianniuAdapter] 读取转接分组配置失败: {e}")
return self._default_group_id
@staticmethod
def _sanitize_outbound_text(content: str) -> str:
if not content:
return ""
cleaned = str(content).strip()
if "[转移会话]" in cleaned:
return cleaned
if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS):
logger.warning("[QianniuAdapter] 拦截到内部内容外发,替换为安全兜底回复")
return "我在帮你看记录,稍等哈"
for pattern in _HISTORY_LEAK_PATTERNS:
if re.search(pattern, cleaned):
logger.warning(f"[QianniuAdapter] 检测到历史记录泄露模式: {pattern[:30]}...")
return "我在帮你看记录,稍等哈"
return cleaned
async def translate_inbound(self, raw: dict) -> Tuple[StandardMessage, str]:
"""
返回: (标准消息, 消息方向)
direction: 'in' (客户发给商家), 'out' (商家人工在后台回复)
"""
if not isinstance(raw, dict): raw = {}
acc_id = str(raw.get("acc_id") or raw.get("shop_id") or "")
from_id = str(raw.get("from_id") or raw.get("cy_id") or "")
msg_text = str(raw.get("msg") or raw.get("content") or "")
raw_msg_type = self._safe_int(raw.get("msg_type"), 0)
image_urls = self._extract_inbound_image_urls(raw, msg_text)
if raw_msg_type == 1 and not msg_text.strip():
msg_text = "【系统:已收到图片消息】"
# 判断方向:如果 from_id 包含了店铺名或 acc_id通常说明是商家自己在说话
# 或者逆向接口通常有一个特定的标识,这里我们做一个通用的逻辑判断
direction = "in"
user_id = from_id
# 逻辑:如果发送者 ID 等于 店铺 ID说明是【商家人工回复】
if from_id == acc_id and acc_id != "":
direction = "out"
# 此时 cy_id (客户ID) 通常在另一个字段里
user_id = str(raw.get("cy_id") or "")
msg = StandardMessage(
platform=self.platform_id(),
msg_id=str(raw.get("msg_id", "")),
user_id=user_id,
user_name=str(raw.get("from_name", "")),
content=msg_text,
msg_type=raw_msg_type,
image_urls=image_urls,
acc_id=acc_id,
acc_type=str(raw.get("acc_type") or "AliWorkbench"),
raw_data=raw
)
return msg, direction
async def translate_outbound(self, res: StandardResponse, user_id: str):
if not self.ws_client: return
if not res or (not res.should_reply and not res.need_transfer): return
meta = res.metadata if isinstance(res.metadata, dict) else {}
acc_id = meta.get("acc_id", "")
acc_type = meta.get("acc_type", "AliWorkbench")
if "[转移会话]" in res.reply_content:
content = res.reply_content
elif res.need_transfer:
group_id = self._resolve_group_id(acc_id)
content = f"正在为您转接|[转移会话],分组{group_id},无原因"
else:
content = res.reply_content
if res.msg_type == 0:
content = self._sanitize_outbound_text(content)
try:
logger.info(
f"[REPLY->CUSTOMER] user={user_id} acc={acc_id} type={res.msg_type}\n{content}"
)
await self.ws_client.send(customer_id=user_id, acc_id=acc_id, acc_type=acc_type, content=content, msg_type=res.msg_type)
except Exception as e:
logger.error(f"[QianniuAdapter] 发送失败: {e}")
def _extract_urls(self, text: str) -> List[str]:
if not text:
return []
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
candidates = re.findall(r'https?://[^\s#,"\'}\]]+', text)
urls: List[str] = []
seen = set()
for candidate in candidates:
url = str(candidate or "").strip().rstrip('\'".,;:!?)')
lower = url.lower()
if not any(ext in lower for ext in image_exts):
continue
# 过滤被卡片/JSON 串污染的伪图片链接
if any(marker in lower for marker in ("%22title%22", "%22topic%22", '"title":', '"topic":', "%7d")):
continue
if url in seen:
continue
seen.add(url)
urls.append(url)
return urls
@staticmethod
def _safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except Exception:
return default
def _extract_inbound_image_urls(self, raw: dict, msg_text: str) -> List[str]:
urls = []
seen = set()
def add_url(url: str):
if not url:
return
s = str(url).strip()
if not s or s in seen:
return
if self._extract_urls(s):
seen.add(s)
urls.append(s)
for url in self._extract_urls(msg_text):
add_url(url)
for url in self._find_image_urls_in_obj(raw):
add_url(url)
return urls
def _find_image_urls_in_obj(self, obj: Any) -> List[str]:
found: List[str] = []
def walk(val: Any):
if val is None:
return
if isinstance(val, str):
found.extend(self._extract_urls(val))
return
if isinstance(val, dict):
for item in val.values():
walk(item)
return
if isinstance(val, (list, tuple, set)):
for item in val:
walk(item)
walk(obj)
return found