from __future__ import annotations import asyncio import time from datetime import datetime from typing import Any import requests from .config import WECHAT_WEBHOOK _last_push: dict[tuple[str, str], tuple[str, str, float]] = {} def _truncate(text: str, max_len: int = 220) -> str: t = str(text or "").strip() if len(t) <= max_len: return t return f"{t[:max_len]}..." def _post_markdown(content: str, timeout_s: int = 6) -> tuple[bool, str]: if not WECHAT_WEBHOOK: return False, "no_webhook" try: payload = {"msgtype": "markdown", "markdown": {"content": content}} resp = requests.post(WECHAT_WEBHOOK, json=payload, timeout=timeout_s) if resp.status_code != 200: return False, f"http_{resp.status_code}" return True, "ok" except Exception as e: return False, str(e) async def push_chat_to_wechat( *, customer_name: str, customer_id: str, acc_id: str, customer_msg: str, reply_msg: str, goods_name: str = "", recent_dialogue: list[dict[str, Any]] | None = None, ) -> tuple[bool, str]: if not WECHAT_WEBHOOK: return False, "no_webhook" key = (str(customer_id or ""), str(acc_id or "")) now = time.time() last = _last_push.get(key) cm = str(customer_msg or "") rm = str(reply_msg or "") if last: lcm, lrm, lts = last if lcm == cm and lrm == rm and (now - lts) < 30: return False, "dedup_30s" _last_push[key] = (cm, rm, now) ts = datetime.now().strftime("%H:%M") shop = acc_id or "未知店铺" lines: list[str] = [f"**📩 {ts} | {shop}**"] if goods_name: lines.append(f"**商品** {_truncate(goods_name, 80)}") lines.append(f"**客户** {(customer_name or customer_id or '客户')[:20]} ({customer_id or '-'})") lines.append("") for item in (recent_dialogue or [])[-8:]: role = "客户" if str(item.get("role", "")) == "user" else "客服" txt = _truncate(str(item.get("text", "") or ""), 120) if txt: lines.append(f"{role}:{txt}") lines.append(f"客户:{_truncate(cm, 120)}") lines.append(f"客服:{_truncate(rm, 180)}") content = "\n".join(lines) if len(content.encode("utf-8")) > 3800: content = content.encode("utf-8")[:3700].decode("utf-8", errors="ignore") + "\n...(略)" return await asyncio.to_thread(_post_markdown, content)