Files
tw2/qingjian_cs/app/client.py
jimi 31c74e661e
Some checks failed
Pre-commit / run (ubuntu-latest) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_en (ubuntu-latest, 3.10) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_zh (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.12) (push) Has been cancelled
feat: add email notification channel for chat replies
2026-03-03 14:24:58 +08:00

713 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import re
import time
from collections import defaultdict
from datetime import datetime
from contextlib import suppress
import websockets
from .callbacks import post_tianwang_callback
from .auto_draw import auto_draw_preview
from .config import (
AUTO_DRAW_ENABLED,
AUTO_QUOTE_WAIT_SECONDS,
DECISION_TIMEOUT_SECONDS,
IMAGE_MESSAGE_DEBOUNCE_SECONDS,
MAX_CONCURRENT_TURNS,
MESSAGE_DEBOUNCE_SECONDS,
QINGJIAN_WS_URI,
)
from .logger import setup_logger
from .observability import activity_event, build_trace_id
from .orchestrator import Orchestrator
from .rules import extract_image_urls, prefilter_message
from .runtime_switch import is_listen_only
from .store import ConversationStore
from .transfer_flow import transfer_to_human_flow
from .email_push import push_chat_to_email
from .wechat_push import push_chat_to_wechat
class QingjianClient:
def __init__(self) -> None:
self.logger = setup_logger()
self.uri = QINGJIAN_WS_URI
self.reply_id = "tb001"
self.websocket = None
self.running = True
self.orchestrator = Orchestrator()
self.store = ConversationStore()
self.pending_msgs: dict[str, list[dict]] = defaultdict(list)
self.debounce_tasks: dict[str, asyncio.Task] = {}
self.processing_tasks: dict[str, asyncio.Task] = {}
self.turn_versions: dict[str, int] = defaultdict(int)
self.customer_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
self.turn_semaphore = asyncio.Semaphore(max(1, int(MAX_CONCURRENT_TURNS)))
self.pending_images: dict[str, list[str]] = defaultdict(list)
self.auto_quote_tasks: dict[str, asyncio.Task] = {}
self.last_reply_key: dict[str, str] = {}
self.first_msg_replied: set[str] = set()
self.recent_outbound: list[tuple[str, str, str, float]] = []
self.recent_dialogue: dict[str, list[dict]] = defaultdict(list)
@staticmethod
def _customer_key(data: dict) -> str:
return f"{data.get('acc_id','')}:{data.get('from_id','')}"
@staticmethod
def _msg_text(data: dict) -> str:
return str(data.get("msg", "") or "").strip()
@staticmethod
def _extract_price_tokens(text: str) -> list[str]:
s = str(text or "")
if not s:
return []
out: list[str] = []
out += re.findall(r"(?:¥|¥)\s*\d+(?:\.\d{1,2})?", s)
out += re.findall(r"\d+(?:\.\d{1,2})?\s*元", s)
# 去重保序
seen = set()
uniq: list[str] = []
for x in out:
k = x.strip()
if k and k not in seen:
seen.add(k)
uniq.append(k)
return uniq
@staticmethod
def _route_cn(route: str) -> str:
return {
"pre_sales": "售前",
"quote": "报价",
"after_sales": "售后",
"risk": "风控",
}.get(str(route or ""), "未知")
@staticmethod
def _status_text(route: str, action: str) -> str:
a = str(action or "")
if a == "quote":
return "开始作图中"
if a == "reply":
return "已回复客户"
if a == "transfer":
return "已转人工"
if a == "noop":
return "仅监听中"
if a == "update_state":
return "状态已更新"
return f"{QingjianClient._route_cn(route)}处理中"
def _append_dialogue(self, key: str, role: str, text: str) -> None:
t = str(text or "").strip()
if not t:
return
self.recent_dialogue[key].append({"role": role, "text": t})
if len(self.recent_dialogue[key]) > 24:
self.recent_dialogue[key] = self.recent_dialogue[key][-24:]
@staticmethod
def _parse_msg_ts(data: dict) -> float:
# 兼容常见时间字段解析失败时返回0后续按入队顺序兜底
for key in ("timestamp", "msg_time", "send_time", "create_time", "time"):
v = data.get(key)
if v is None:
continue
if isinstance(v, (int, float)):
return float(v)
s = str(v).strip()
if not s:
continue
# 纯数字时间戳
if re.fullmatch(r"\d{10,13}", s):
n = float(s)
return n / 1000.0 if len(s) == 13 else n
# 常见日期格式
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M"):
try:
return datetime.strptime(s, fmt).timestamp()
except Exception:
pass
return 0.0
def _debounce_seconds(self, msg: str) -> float:
if extract_image_urls(msg):
return float(IMAGE_MESSAGE_DEBOUNCE_SECONDS)
return float(MESSAGE_DEBOUNCE_SECONDS)
async def send_message(self, message: dict) -> None:
if not self.websocket:
return
await self.websocket.send(json.dumps(message, ensure_ascii=False))
self.logger.info("[发送] %s", message.get("msg", ""))
async def send_reply(self, data: dict, text: str, trace_id: str = "-", turn_version: int | None = None) -> bool:
text = str(text or "").strip()
if not text:
return False
text = self._shorten_reply(text)
key = self._customer_key(data)
if turn_version is not None and self.turn_versions.get(key, 0) != turn_version:
activity_event(self.logger, "send_reply_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn")
return False
msg = {
"msg_id": "",
"acc_id": data.get("acc_id", ""),
"msg": text,
"from_id": data.get("from_id", ""),
"from_name": data.get("from_name", data.get("from_id", "")),
"cy_id": data.get("from_id", ""),
"acc_type": data.get("acc_type", "AliWorkbench"),
"msg_type": 0,
"cy_name": data.get("from_name", data.get("from_id", "")),
}
activity_event(self.logger, "send_reply_attempt", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text)
await self.send_message(msg)
self._append_dialogue(key, "assistant", text)
try:
self.store.append_event(
key,
"assistant_message",
{
"acc_id": data.get("acc_id", ""),
"customer_id": data.get("from_id", ""),
"msg_type": 0,
"msg": text,
"trace_id": trace_id,
},
)
except Exception as e:
self.logger.error("[入库] 客服消息写入失败: %s", e)
self.recent_outbound.append((str(data.get("acc_id", "")), str(data.get("from_id", "")), text, time.monotonic()))
if len(self.recent_outbound) > 200:
self.recent_outbound = self.recent_outbound[-200:]
activity_event(self.logger, "send_reply_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text)
return True
async def _push_wechat_pair(self, data: dict, customer_msg: str, reply_msg: str, recent_dialogue: list[dict] | None = None) -> None:
try:
ok, reason = await push_chat_to_wechat(
customer_name=str(data.get("from_name", "") or data.get("cy_name", "") or ""),
customer_id=str(data.get("from_id", "") or ""),
acc_id=str(data.get("acc_id", "") or ""),
customer_msg=str(customer_msg or ""),
reply_msg=str(reply_msg or ""),
goods_name=str(data.get("goods_name", "") or ""),
recent_dialogue=recent_dialogue or [],
)
activity_event(
self.logger,
"wechat_push",
customer_id=data.get("from_id", "-"),
result="ok" if ok else "skip",
reason=reason,
)
except Exception as e:
activity_event(self.logger, "wechat_push", customer_id=data.get("from_id", "-"), result="error", reason=str(e))
try:
ok, reason = await push_chat_to_email(
customer_name=str(data.get("from_name", "") or data.get("cy_name", "") or ""),
customer_id=str(data.get("from_id", "") or ""),
acc_id=str(data.get("acc_id", "") or ""),
customer_msg=str(customer_msg or ""),
reply_msg=str(reply_msg or ""),
goods_name=str(data.get("goods_name", "") or ""),
recent_dialogue=recent_dialogue or [],
)
activity_event(
self.logger,
"email_push",
customer_id=data.get("from_id", "-"),
result="ok" if ok else "skip",
reason=reason,
)
except Exception as e:
activity_event(self.logger, "email_push", customer_id=data.get("from_id", "-"), result="error", reason=str(e))
async def send_image(self, data: dict, image_url: str, trace_id: str = "-", turn_version: int | None = None) -> None:
image_url = str(image_url or "").strip()
if not image_url:
return
key = self._customer_key(data)
if turn_version is not None and self.turn_versions.get(key, 0) != turn_version:
activity_event(self.logger, "send_image_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn")
return
msg = {
"msg_id": "",
"acc_id": data.get("acc_id", ""),
"msg": image_url,
"from_id": data.get("from_id", ""),
"from_name": data.get("from_name", data.get("from_id", "")),
"cy_id": data.get("from_id", ""),
"acc_type": data.get("acc_type", "AliWorkbench"),
"msg_type": 1,
"cy_name": data.get("from_name", data.get("from_id", "")),
}
activity_event(self.logger, "send_image_attempt", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=image_url)
await self.send_message(msg)
self._append_dialogue(key, "assistant", f"[image]{image_url}")
try:
self.store.append_event(
key,
"assistant_message",
{
"acc_id": data.get("acc_id", ""),
"customer_id": data.get("from_id", ""),
"msg_type": 1,
"msg": image_url,
"trace_id": trace_id,
},
)
except Exception as e:
self.logger.error("[入库] 客服图片消息写入失败: %s", e)
self.recent_outbound.append((str(data.get("acc_id", "")), str(data.get("from_id", "")), image_url, time.monotonic()))
if len(self.recent_outbound) > 200:
self.recent_outbound = self.recent_outbound[-200:]
activity_event(self.logger, "send_image_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=image_url)
@staticmethod
def _clean_text(text: str) -> str:
t = str(text or "").strip()
t = re.sub(r"\s+", "", t)
return t
def _shorten_reply(self, text: str) -> str:
t = str(text or "").strip()
t = self._humanize_reply(t)
# 只取首句,不做按字数硬截断,避免半句/残句
parts = re.split(r"[。!?!?]", t)
head = next((p.strip() for p in parts if p and p.strip()), "")
if not head:
# 无句号时按逗号切第一分句
sub_parts = re.split(r"[,;:]", t)
head = next((p.strip() for p in sub_parts if p and p.strip()), t)
return head or t
@staticmethod
def _humanize_reply(text: str) -> str:
t = str(text or "").strip()
# 去AI腔常见口癖
t = re.sub(r"^(亲亲|宝子|宝贝|您好呀|您好哦)[,]?\s*", "", t)
t = t.replace("我这边", "")
t = t.replace("请问", "")
t = t.replace("可以先帮您评估看看哦", "我先看下")
t = t.replace("服务质量有保障", "质量没问题")
t = t.replace("这个价格已经是很优惠的啦", "这价已经很低了")
t = re.sub(r"(哈~|哦~|呀~|啦~)$", "", t)
t = re.sub(r"\s+", "", t)
return t
@staticmethod
def _is_invalid_ai_reply(text: str) -> bool:
t = str(text or "").strip().lower()
if not t:
return True
if "i noticed that you have interrupted me" in t:
return True
if t.startswith("action:") or t.startswith("{"):
return True
return False
def _fallback_reply(self, action: str) -> str:
if action == "transfer":
return "我先给你转人工处理。"
return "我先看看"
def _is_outbound_echo(self, data: dict, msg: str) -> bool:
"""
轻简可能会把我方刚发送文本回推为“收到消息”。
对“短时间完全相同文本”做回环拦截,兼容 acc/from 对调回推,避免无限对话。
"""
in_acc = str(data.get("acc_id", ""))
in_from = str(data.get("from_id", ""))
in_msg = self._clean_text(msg)
now = time.monotonic()
if not in_msg:
return False
for out_acc, out_to, out_msg, ts in reversed(self.recent_outbound):
if (now - ts) > 120:
break
if self._clean_text(out_msg) != in_msg:
continue
if (out_acc == in_acc and out_to == in_from) or (out_acc == in_from and out_to == in_acc):
return True
return False
async def _handle_decision(self, data: dict, merged_msg: str, *, auto_quote: bool = False, turn_version: int | None = None) -> None:
if is_listen_only():
activity_event(
self.logger,
"ai_reply_skipped",
customer_id=data.get("from_id", "-"),
reason="listen_only_mode",
)
return
key = self._customer_key(data)
if turn_version is not None and self.turn_versions.get(key, 0) != turn_version:
activity_event(self.logger, "decision_skipped", customer_id=data.get("from_id", "-"), reason="stale_turn")
return
trace_id = build_trace_id(data.get("acc_id", ""), data.get("from_id", ""), merged_msg)
t0 = time.perf_counter()
urls = extract_image_urls(merged_msg)
if urls:
for u in urls:
if u not in self.pending_images[key]:
self.pending_images[key].append(u)
# 上下文优先从数据库回读,保证重启后也能恢复最近对话
try:
recent_dialogue = self.store.get_recent_dialogue(key, limit=24)
except Exception as e:
self.logger.error("[入库] 读取最近对话失败: %s", e)
recent_dialogue = self.recent_dialogue.get(key, [])
context = {
"customer_key": key,
"acc_id": data.get("acc_id", ""),
"customer_id": data.get("from_id", ""),
"goods_name": data.get("goods_name", ""),
"goods_order": data.get("goods_order", ""),
"msg": merged_msg,
"intent": "unknown",
"pending_images": len(self.pending_images[key]),
"pending_image_urls": self.pending_images[key][-5:],
"current_image_urls": urls[-3:],
"latest_image_url": (urls[-1] if urls else (self.pending_images[key][-1] if self.pending_images[key] else "")),
"auto_quote_trigger": auto_quote,
"last_reply": self.last_reply_key.get(key, ""),
"recent_dialogue": recent_dialogue[-12:],
}
try:
rd = context.get("recent_dialogue", []) or []
rd_preview = " | ".join(
[f"{str(x.get('role',''))}:{str(x.get('text',''))[:32]}" for x in rd[-8:] if isinstance(x, dict)]
)
self.logger.info(
"[AI上下文] customer=%s msg=%s pending_images=%s recent=%s",
context["customer_id"],
str(merged_msg or "")[:120],
context["pending_images"],
rd_preview,
)
except Exception:
pass
activity_event(self.logger, "agent_process_start", trace_id=trace_id, customer_id=context["customer_id"], acc_id=context["acc_id"], intent=context["intent"])
route, decision, state = await self.orchestrator.decide(context)
latency_ms = int((time.perf_counter() - t0) * 1000)
status_text = self._status_text(route, decision.action)
activity_event(
self.logger,
"agent_process_done",
trace_id=trace_id,
customer_id=context["customer_id"],
route=route,
route_cn=self._route_cn(route),
action=decision.action,
reason=status_text,
raw_reason=decision.reason,
latency_ms=latency_ms,
after_sales_stage=state.get("after_sales_stage", "new"),
)
# 价格日志(中文)
price_hits = []
price_hits.extend(self._extract_price_tokens(merged_msg))
price_hits.extend(self._extract_price_tokens(decision.reply))
price_hits.extend(self._extract_price_tokens(decision.reason))
if price_hits:
self.logger.info("[价格] 客户=%s 金额=%s", context["customer_id"], " | ".join(price_hits))
if decision.action == "transfer":
text = (decision.transfer_msg or "").strip()
if self._is_invalid_ai_reply(text):
text = self._fallback_reply("transfer")
ok_transfer, reason = await transfer_to_human_flow(self, data, transfer_msg=text, trace_id=trace_id)
if not ok_transfer:
self.logger.error("[转人工] 指令失败: %s", reason)
sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version)
if sent:
self.last_reply_key[key] = text
await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:])
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": text})
return
if decision.action == "quote":
analysis = state.get("last_image_quote_analysis", {}) if isinstance(state, dict) else {}
can_do = str((analysis or {}).get("can_do", "")).lower()
draw_allowed = can_do in {"", "yes", "partial"}
if AUTO_DRAW_ENABLED and draw_allowed and self.pending_images.get(key):
latest_image = self.pending_images[key][-1]
activity_event(
self.logger,
"auto_draw_start",
trace_id=trace_id,
customer_id=context["customer_id"],
image_url=latest_image,
)
self.logger.info("[作图] 开始 customer=%s", context["customer_id"])
draw_res = await auto_draw_preview(
image_url=latest_image,
customer_id=context["customer_id"],
requirement=merged_msg,
)
if draw_res.get("ok"):
preview_url = str(draw_res.get("url", "") or "")
await self.send_image(data, preview_url, trace_id=trace_id, turn_version=turn_version)
# 预览完成后清掉当前批次,避免同一图重复触发
self.pending_images[key].clear()
activity_event(
self.logger,
"auto_draw_success",
trace_id=trace_id,
customer_id=context["customer_id"],
preview_url=preview_url,
)
self.logger.info("[作图] 成功 customer=%s url=%s", context["customer_id"], preview_url)
await post_tianwang_callback(
"message_processed",
data,
extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": "", "auto_draw": True},
)
return
activity_event(
self.logger,
"auto_draw_fail",
trace_id=trace_id,
customer_id=context["customer_id"],
error=str(draw_res.get("error", "unknown")),
)
if bool(draw_res.get("need_transfer")):
tmsg = "这个我转人工给你看下"
ok_transfer, reason = await transfer_to_human_flow(self, data, transfer_msg=tmsg, trace_id=trace_id)
if not ok_transfer:
self.logger.error("[转人工] 指令失败: %s", reason)
sent = await self.send_reply(data, tmsg, trace_id=trace_id, turn_version=turn_version)
if sent:
self.last_reply_key[key] = tmsg
await self._push_wechat_pair(data, merged_msg, tmsg, recent_dialogue=recent_dialogue[-12:])
await post_tianwang_callback(
"message_processed",
data,
extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": tmsg},
)
return
self.logger.error("[作图] 失败 customer=%s error=%s", context["customer_id"], draw_res.get("error", "unknown"))
elif AUTO_DRAW_ENABLED and (not draw_allowed):
self.logger.info("[作图] 已跳过: 识图结果不可做 can_do=%s customer=%s", can_do, context["customer_id"])
text = (decision.reply or "").strip()
if self._is_invalid_ai_reply(text):
text = self._fallback_reply("quote")
if self.last_reply_key.get(key) != text:
sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version)
if sent:
self.last_reply_key[key] = text
await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:])
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": text})
return
if decision.action == "noop":
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "noop", "reply": ""})
return
text = (decision.reply or "").strip()
if self._is_invalid_ai_reply(text):
text = self._fallback_reply("reply")
if self.last_reply_key.get(key) != text:
sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version)
if sent:
self.last_reply_key[key] = text
await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:])
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "reply", "reply": text})
if self.pending_images[key] and key not in self.auto_quote_tasks:
self.auto_quote_tasks[key] = asyncio.create_task(self._auto_quote_later(data))
async def _auto_quote_later(self, data: dict) -> None:
key = self._customer_key(data)
try:
await asyncio.sleep(AUTO_QUOTE_WAIT_SECONDS)
if self.pending_images.get(key):
# 自动报价也走并发控制与客户互斥
async with self.turn_semaphore:
async with self.customer_locks[key]:
await asyncio.wait_for(
self._handle_decision(data, "", auto_quote=True, turn_version=self.turn_versions.get(key, 0)),
timeout=max(10, int(DECISION_TIMEOUT_SECONDS)),
)
finally:
self.auto_quote_tasks.pop(key, None)
async def _flush_customer(self, key: str, turn_version: int | None = None) -> None:
queue = self.pending_msgs.get(key, [])
if not queue:
return
# 只处理当前快照,避免把处理中新增消息一并清掉
snapshot = list(queue)
if not snapshot:
return
indexed = list(enumerate(snapshot))
indexed.sort(key=lambda it: (self._parse_msg_ts(it[1]), it[0]))
ordered = [x for _, x in indexed]
merged = "".join([self._msg_text(x) for x in ordered if self._msg_text(x)])
data = ordered[-1]
await self._handle_decision(data, merged, turn_version=turn_version)
# 仅弹出已处理快照,保留处理中到达的新消息
cur = self.pending_msgs.get(key, [])
n = min(len(snapshot), len(cur))
if n > 0:
del cur[:n]
if not cur:
self.pending_msgs.pop(key, None)
async def _run_customer_turn(self, key: str, turn_version: int) -> None:
async with self.turn_semaphore:
async with self.customer_locks[key]:
await asyncio.wait_for(
self._flush_customer(key, turn_version=turn_version),
timeout=max(10, int(DECISION_TIMEOUT_SECONDS)),
)
def _schedule_customer_turn(self, key: str) -> None:
# 新消息来了就取消同客户旧任务,重新按最新消息计算
self.turn_versions[key] = int(self.turn_versions.get(key, 0)) + 1
turn_version = self.turn_versions[key]
old = self.processing_tasks.get(key)
if old and not old.done():
old.cancel()
async def runner() -> None:
try:
await self._run_customer_turn(key, turn_version)
except asyncio.CancelledError:
activity_event(self.logger, "customer_turn_cancelled", customer_id=key.split(":")[-1], reason="newer_message")
return
except asyncio.TimeoutError:
activity_event(self.logger, "customer_turn_timeout", customer_id=key.split(":")[-1], reason="decision_timeout")
return
except Exception as e:
activity_event(self.logger, "customer_turn_error", customer_id=key.split(":")[-1], reason=str(e))
return
finally:
cur = self.processing_tasks.get(key)
if cur is asyncio.current_task():
self.processing_tasks.pop(key, None)
self.processing_tasks[key] = asyncio.create_task(runner())
async def _debounce_enqueue(self, data: dict) -> None:
key = self._customer_key(data)
msg = self._msg_text(data)
self.pending_msgs[key].append(data)
if key in self.debounce_tasks:
self.debounce_tasks[key].cancel()
wait_s = self._debounce_seconds(msg)
activity_event(self.logger, "debounce_enqueue", customer_id=data.get("from_id", "-"), key=key, queue_size=len(self.pending_msgs[key]), wait_s=wait_s)
async def later() -> None:
try:
await asyncio.sleep(wait_s)
self._schedule_customer_turn(key)
except asyncio.CancelledError:
return
finally:
self.debounce_tasks.pop(key, None)
self.debounce_tasks[key] = asyncio.create_task(later())
async def _on_message(self, raw: str) -> None:
try:
data = json.loads(raw)
except Exception:
self.logger.info("[非JSON] %s", raw)
return
msg_type = int(data.get("msg_type", 0) or 0)
msg = self._msg_text(data)
rule = prefilter_message(msg, msg_type)
self.logger.info("[收消息] acc=%s from=%s type=%s msg=%s", data.get("acc_id", ""), data.get("from_id", ""), msg_type, msg)
await post_tianwang_callback("message_received", data, extra={"msg_type": msg_type})
# 客户消息全量入库(监听模式也落库)
try:
customer_key = self._customer_key(data)
self.store.append_event(
customer_key,
"customer_message",
{
"acc_id": data.get("acc_id", ""),
"customer_id": data.get("from_id", ""),
"msg_type": msg_type,
"msg": msg,
"raw_msg": data.get("msg", ""),
"timestamp": data.get("timestamp", ""),
},
)
except Exception as e:
self.logger.error("[入库] 客户消息写入失败: %s", e)
if self._is_outbound_echo(data, msg):
activity_event(
self.logger,
"inbound_ignored",
customer_id=data.get("from_id", "-"),
reason="outbound_echo_loop_guard",
)
return
if rule.ignore:
activity_event(self.logger, "inbound_ignored", customer_id=data.get("from_id", "-"), reason=rule.reason)
return
patched = dict(data)
patched["msg"] = rule.normalized_msg or msg
key = self._customer_key(patched)
self._append_dialogue(key, "user", patched["msg"])
if is_listen_only():
activity_event(
self.logger,
"ai_reply_skipped",
customer_id=patched.get("from_id", "-"),
reason="listen_only_mode",
)
return
# 硬编码:每个客户首条消息先快速回复“在的”
if key not in self.first_msg_replied:
sent = await self.send_reply(patched, "在的")
if sent:
self.last_reply_key[key] = "在的"
await self._push_wechat_pair(patched, patched["msg"], "在的", recent_dialogue=self.recent_dialogue.get(key, [])[-8:])
self.first_msg_replied.add(key)
if msg_type == 1:
await self._handle_decision(patched, patched["msg"])
return
await self._debounce_enqueue(patched)
async def _serve(self) -> None:
while self.running:
try:
self.logger.info("[连接] %s", self.uri)
async with websockets.connect(self.uri) as ws:
self.websocket = ws
self.logger.info("[连接成功]")
async for raw in ws:
await self._on_message(raw)
except Exception as e:
self.logger.info("[连接异常] %s", e)
await asyncio.sleep(3)
def run(self) -> None:
asyncio.run(self._serve())