# -*- coding: utf-8 -*- """ 客服对话推送到企业微信群 - 客户消息与AI回复成对发送,保持上下文 """ import asyncio import os from datetime import datetime import httpx from dotenv import load_dotenv load_dotenv() _last_push: dict[tuple[str, str], tuple[str, str, float]] = {} def _get_webhook() -> str: """优先从 config 读取,与健康检查/日报保持一致""" try: from config.config import WECHAT_WEBHOOK return WECHAT_WEBHOOK or os.getenv("WECHAT_WEBHOOK", "") except Exception: return os.getenv("WECHAT_WEBHOOK", "") def _truncate(text: str, max_len: int = 200) -> str: """截断过长内容""" if not text: return "" text = str(text).strip() if len(text) > max_len: return text[:max_len] + "..." return text def _get_recent_conversation(customer_id: str, acc_id: str, last_n: int = 8) -> list: """获取近期对话(同店铺),保持连贯上下文""" try: from db.chat_log_db import get_recent_conversation return get_recent_conversation(customer_id, acc_id, limit=last_n) except Exception: return [] async def push_chat_to_wechat( customer_name: str, customer_id: str, acc_id: str, customer_msg: str, reply_msg: str, goods_name: str = "", ): """ 将客户消息与AI回复推送到企业微信群,附带近期对话保持连贯。 """ webhook = _get_webhook() if not webhook: return # 去重:同一客户+店铺,若客户消息与回复完全相同且在窗口期内,则跳过 try: import time key = (customer_id or "", acc_id or "") now = time.time() last = _last_push.get(key) if last: last_customer_msg, last_reply_msg, last_ts = last if (last_customer_msg or "") == (customer_msg or "") and (last_reply_msg or "") == (reply_msg or ""): if now - last_ts < 30: return _last_push[key] = ((customer_msg or ""), (reply_msg or ""), now) except Exception: pass reply_msg = _truncate(reply_msg, 300) ts = datetime.now().strftime("%H:%M") shop = acc_id or "未知店铺" name = (customer_name or customer_id or "客户")[:12] lines = [f"**📩 {ts} | {shop}**"] if goods_name: lines.append(f"**商品** {_truncate(goods_name, 80)}") if customer_id: lines.append(f"**客户ID** {customer_id}") lines.append("") # 附带近期对话,保持连贯 recent = _get_recent_conversation(customer_id, acc_id, last_n=8) for m in recent: role = customer_id if m.get("direction") == "in" else "客服" msg = _truncate((m.get("message") or "").strip(), 120) if msg: lines.append(f"{role}:{msg}") # 当前回复(可能已在 recent 中有客户消息,客服回复是新的) lines.append(f"客服:{reply_msg or '(无回复)'}") content = "\n".join(lines) enc = content.encode("utf-8") if len(enc) > 3800: content = enc[:3750].decode("utf-8", errors="ignore") + "\n...(略)" try: async with httpx.AsyncClient(timeout=8) as client: resp = await client.post( webhook, json={"msgtype": "markdown", "markdown": {"content": content}}, ) data = resp.json() if data.get("errcode") == 0: pass # 成功静默 else: print(f"[WechatChatLog] 推送失败: {data}") except Exception as e: print(f"[WechatChatLog] 推送异常: {e}") async def send_morning_startup(): """每天早上8点发送客服启动消息到企微群""" webhook = _get_webhook() if not webhook: return ts = datetime.now().strftime("%Y-%m-%d %H:%M") content = f"**☀️ 客服已启动**\n{ts}" try: async with httpx.AsyncClient(timeout=8) as client: await client.post( webhook, json={"msgtype": "markdown", "markdown": {"content": content}}, ) print(f"[WechatChatLog] 早8点启动消息已发送") except Exception as e: print(f"[WechatChatLog] 启动消息发送失败: {e}") async def morning_startup_scheduler(): """每天 8:00 发送启动消息""" print("[WechatChatLog] 早8点启动消息定时任务已启动") sent_today = None while True: now = datetime.now() today = now.strftime("%Y-%m-%d") if now.hour == 8 and now.minute == 0 and sent_today != today: sent_today = today await send_morning_startup() await asyncio.sleep(30)