import json import random import re def to_chinese_text(text): """处理文本,安全地转换 unicode 转义。""" if not isinstance(text, str): return text if "\\u" not in text: return text try: return json.loads(f'"{text}"') except Exception: return text def is_transfer_msg(client, data: dict) -> bool: msg = to_chinese_text(data.get("msg", "")) return "转交给" in msg or "转接给" in msg def pick_transfer_greeting() -> str: choices = [ "在的亲,发图我看下", "在呢亲,有需求直接说", "我在的,您把要求发我", "在的哈,你说我这边看着处理", "在呢,图和需求发来我看看", ] return random.choice(choices) def is_shop_card(client, data: dict) -> bool: msg = to_chinese_text(data.get("msg", "")) return msg.startswith("[进店卡片]") or "我想咨询你们店的这个商品" in msg def extract_customer_text_from_shop_card_msg(client, msg: str) -> str: text = to_chinese_text(msg or "").strip() if not text: return "" parts = [p.strip() for p in text.split("#*#") if p and p.strip()] kept = [] for part in parts: if part.startswith("[进店卡片]") or "我想咨询你们店的这个商品" in part: continue kept.append(part) if kept: return " ".join(kept).strip() stripped = re.sub(r"\[进店卡片\][^\n\r]*", "", text).strip() stripped = stripped.replace("我想咨询你们店的这个商品", "").strip(",。,#* ") return stripped def has_chat_history(customer_id: str, acc_id: str = "") -> bool: if not customer_id: return False try: from db.chat_log_db import get_recent_conversation msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=1) return len(msgs) > 0 except Exception: return False def should_ignore(client, data: dict) -> bool: msg = to_chinese_text(data.get("msg", "")) ignore_patterns = [ "已转接", "接入会话", "结束会话", "会话已", "[系统消息]", "[系统通知]", ] for pattern in ignore_patterns: if pattern in msg: return True acc_id = data.get("acc_id", "") from_id = data.get("from_id", "") if acc_id and from_id and acc_id == from_id: return True return False def get_msg_type_name(msg_type): types = { 0: "文本", 1: "图片", 2: "视频", 3: "文件", } return types.get(msg_type, f"未知({msg_type})")