diff --git a/qingjian_cs/app/client.py b/qingjian_cs/app/client.py index 3240818..6b5d325 100644 --- a/qingjian_cs/app/client.py +++ b/qingjian_cs/app/client.py @@ -26,6 +26,8 @@ from .rules import extract_image_urls, prefilter_message from .runtime_switch import is_listen_only from .store import ConversationStore from .transfer_flow import transfer_to_human_flow +from .email_push import push_chat_to_email +from .wechat_push import push_chat_to_wechat class QingjianClient: @@ -144,15 +146,15 @@ class QingjianClient: await self.websocket.send(json.dumps(message, ensure_ascii=False)) self.logger.info("[发送] %s", message.get("msg", "")) - async def send_reply(self, data: dict, text: str, trace_id: str = "-", turn_version: int | None = None) -> None: + async def send_reply(self, data: dict, text: str, trace_id: str = "-", turn_version: int | None = None) -> bool: text = str(text or "").strip() if not text: - return + return False text = self._shorten_reply(text) key = self._customer_key(data) if turn_version is not None and self.turn_versions.get(key, 0) != turn_version: activity_event(self.logger, "send_reply_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn") - return + return False msg = { "msg_id": "", "acc_id": data.get("acc_id", ""), @@ -185,6 +187,47 @@ class QingjianClient: if len(self.recent_outbound) > 200: self.recent_outbound = self.recent_outbound[-200:] activity_event(self.logger, "send_reply_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text) + return True + + async def _push_wechat_pair(self, data: dict, customer_msg: str, reply_msg: str, recent_dialogue: list[dict] | None = None) -> None: + try: + ok, reason = await push_chat_to_wechat( + customer_name=str(data.get("from_name", "") or data.get("cy_name", "") or ""), + customer_id=str(data.get("from_id", "") or ""), + acc_id=str(data.get("acc_id", "") or ""), + customer_msg=str(customer_msg or ""), + reply_msg=str(reply_msg or ""), + goods_name=str(data.get("goods_name", "") or ""), + recent_dialogue=recent_dialogue or [], + ) + activity_event( + self.logger, + "wechat_push", + customer_id=data.get("from_id", "-"), + result="ok" if ok else "skip", + reason=reason, + ) + except Exception as e: + activity_event(self.logger, "wechat_push", customer_id=data.get("from_id", "-"), result="error", reason=str(e)) + try: + ok, reason = await push_chat_to_email( + customer_name=str(data.get("from_name", "") or data.get("cy_name", "") or ""), + customer_id=str(data.get("from_id", "") or ""), + acc_id=str(data.get("acc_id", "") or ""), + customer_msg=str(customer_msg or ""), + reply_msg=str(reply_msg or ""), + goods_name=str(data.get("goods_name", "") or ""), + recent_dialogue=recent_dialogue or [], + ) + activity_event( + self.logger, + "email_push", + customer_id=data.get("from_id", "-"), + result="ok" if ok else "skip", + reason=reason, + ) + except Exception as e: + activity_event(self.logger, "email_push", customer_id=data.get("from_id", "-"), result="error", reason=str(e)) async def send_image(self, data: dict, image_url: str, trace_id: str = "-", turn_version: int | None = None) -> None: image_url = str(image_url or "").strip() @@ -387,8 +430,10 @@ class QingjianClient: ok_transfer, reason = await transfer_to_human_flow(self, data, transfer_msg=text, trace_id=trace_id) if not ok_transfer: self.logger.error("[转人工] 指令失败: %s", reason) - await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) - self.last_reply_key[key] = text + sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) + if sent: + self.last_reply_key[key] = text + await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:]) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": text}) return @@ -442,8 +487,10 @@ class QingjianClient: ok_transfer, reason = await transfer_to_human_flow(self, data, transfer_msg=tmsg, trace_id=trace_id) if not ok_transfer: self.logger.error("[转人工] 指令失败: %s", reason) - await self.send_reply(data, tmsg, trace_id=trace_id, turn_version=turn_version) - self.last_reply_key[key] = tmsg + sent = await self.send_reply(data, tmsg, trace_id=trace_id, turn_version=turn_version) + if sent: + self.last_reply_key[key] = tmsg + await self._push_wechat_pair(data, merged_msg, tmsg, recent_dialogue=recent_dialogue[-12:]) await post_tianwang_callback( "message_processed", data, @@ -457,8 +504,10 @@ class QingjianClient: if self._is_invalid_ai_reply(text): text = self._fallback_reply("quote") if self.last_reply_key.get(key) != text: - await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) - self.last_reply_key[key] = text + sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) + if sent: + self.last_reply_key[key] = text + await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:]) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": text}) return @@ -470,8 +519,10 @@ class QingjianClient: if self._is_invalid_ai_reply(text): text = self._fallback_reply("reply") if self.last_reply_key.get(key) != text: - await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) - self.last_reply_key[key] = text + sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) + if sent: + self.last_reply_key[key] = text + await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:]) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "reply", "reply": text}) @@ -632,8 +683,10 @@ class QingjianClient: # 硬编码:每个客户首条消息先快速回复“在的” if key not in self.first_msg_replied: - await self.send_reply(patched, "在的") - self.last_reply_key[key] = "在的" + sent = await self.send_reply(patched, "在的") + if sent: + self.last_reply_key[key] = "在的" + await self._push_wechat_pair(patched, patched["msg"], "在的", recent_dialogue=self.recent_dialogue.get(key, [])[-8:]) self.first_msg_replied.add(key) if msg_type == 1: diff --git a/qingjian_cs/app/config.py b/qingjian_cs/app/config.py index bc1bee1..670ec1b 100644 --- a/qingjian_cs/app/config.py +++ b/qingjian_cs/app/config.py @@ -32,6 +32,14 @@ MYSQL_TABLE_PREFIX = os.getenv("MYSQL_TABLE_PREFIX", "qjcs_").strip() HTTP_HOST = os.getenv("HTTP_HOST", "127.0.0.1").strip() HTTP_PORT = int(os.getenv("HTTP_PORT", "6060")) TIANWANG_CALLBACK_URL = os.getenv("TIANWANG_CALLBACK_URL", "http://139.199.3.75:18789/api/callback").strip() +WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "").strip() + +EMAIL_NOTIFY_ENABLED = os.getenv("EMAIL_NOTIFY_ENABLED", "0").strip() in {"1", "true", "True", "yes", "on"} +SMTP_HOST = os.getenv("SMTP_HOST", "").strip() +SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) +SMTP_USER = os.getenv("SMTP_USER", "").strip() +SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "").strip() +SMTP_TO = os.getenv("SMTP_TO", "").strip() AUTO_DRAW_ENABLED = os.getenv("AUTO_DRAW_ENABLED", "1").strip() in {"1", "true", "True", "yes", "on"} AUTO_DRAW_ENDPOINT = os.getenv("AUTO_DRAW_ENDPOINT", "").strip() diff --git a/qingjian_cs/app/email_push.py b/qingjian_cs/app/email_push.py new file mode 100644 index 0000000..2130639 --- /dev/null +++ b/qingjian_cs/app/email_push.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import asyncio +import smtplib +import time +from datetime import datetime +from email.header import Header +from email.mime.text import MIMEText +from typing import Any + +from .config import ( + EMAIL_NOTIFY_ENABLED, + SMTP_HOST, + SMTP_PASSWORD, + SMTP_PORT, + SMTP_TO, + SMTP_USER, +) + +_last_push: dict[tuple[str, str], tuple[str, str, float]] = {} + + +def _truncate(text: str, max_len: int = 220) -> str: + t = str(text or "").strip() + if len(t) <= max_len: + return t + return f"{t[:max_len]}..." + + +def _send_mail(subject: str, body: str) -> tuple[bool, str]: + if not EMAIL_NOTIFY_ENABLED: + return False, "email_disabled" + if not SMTP_HOST or not SMTP_USER or not SMTP_PASSWORD or not SMTP_TO: + return False, "email_config_incomplete" + + try: + msg = MIMEText(body, "plain", "utf-8") + msg["Subject"] = Header(subject, "utf-8") + msg["From"] = f"{Header('千牛AI通知', 'utf-8').encode()} <{SMTP_USER}>" + msg["To"] = SMTP_TO + + if int(SMTP_PORT) == 465: + with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=10) as server: + server.login(SMTP_USER, SMTP_PASSWORD) + server.sendmail(SMTP_USER, [SMTP_TO], msg.as_string()) + else: + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=10) as server: + server.starttls() + server.login(SMTP_USER, SMTP_PASSWORD) + server.sendmail(SMTP_USER, [SMTP_TO], msg.as_string()) + return True, "ok" + except Exception as e: + return False, str(e) + + +async def push_chat_to_email( + *, + customer_name: str, + customer_id: str, + acc_id: str, + customer_msg: str, + reply_msg: str, + goods_name: str = "", + recent_dialogue: list[dict[str, Any]] | None = None, +) -> tuple[bool, str]: + if not EMAIL_NOTIFY_ENABLED: + return False, "email_disabled" + + key = (str(customer_id or ""), str(acc_id or "")) + now = time.time() + last = _last_push.get(key) + cm = str(customer_msg or "") + rm = str(reply_msg or "") + if last: + lcm, lrm, lts = last + if lcm == cm and lrm == rm and (now - lts) < 30: + return False, "dedup_30s" + _last_push[key] = (cm, rm, now) + + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + subject = f"[千牛AI]{acc_id or '店铺'}-{customer_name or customer_id or '客户'}" + lines: list[str] = [f"时间: {ts}"] + lines.append(f"店铺: {acc_id or '未知店铺'}") + lines.append(f"客户: {customer_name or '-'} ({customer_id or '-'})") + if goods_name: + lines.append(f"商品: {_truncate(goods_name, 120)}") + lines.append("") + for item in (recent_dialogue or [])[-8:]: + role = "客户" if str(item.get("role", "")) == "user" else "客服" + txt = _truncate(str(item.get("text", "") or ""), 180) + if txt: + lines.append(f"{role}: {txt}") + lines.append(f"客户: {_truncate(cm, 220)}") + lines.append(f"客服: {_truncate(rm, 220)}") + body = "\n".join(lines) + return await asyncio.to_thread(_send_mail, subject, body) +