import asyncio import json import re import time from collections import defaultdict from datetime import datetime from contextlib import suppress import websockets from .callbacks import post_tianwang_callback from .auto_draw import auto_draw_preview from .config import ( AUTO_DRAW_ENABLED, AUTO_QUOTE_WAIT_SECONDS, DECISION_TIMEOUT_SECONDS, IMAGE_MESSAGE_DEBOUNCE_SECONDS, MAX_CONCURRENT_TURNS, MESSAGE_DEBOUNCE_SECONDS, QINGJIAN_WS_URI, ) from .logger import setup_logger from .observability import activity_event, build_trace_id from .orchestrator import Orchestrator from .rules import extract_image_urls, prefilter_message from .runtime_switch import is_listen_only from .store import ConversationStore from .transfer_flow import transfer_to_human_flow from .email_push import push_chat_to_email from .wechat_push import push_chat_to_wechat class QingjianClient: def __init__(self) -> None: self.logger = setup_logger() self.uri = QINGJIAN_WS_URI self.reply_id = "tb001" self.websocket = None self.running = True self.orchestrator = Orchestrator() self.store = ConversationStore() self.pending_msgs: dict[str, list[dict]] = defaultdict(list) self.debounce_tasks: dict[str, asyncio.Task] = {} self.processing_tasks: dict[str, asyncio.Task] = {} self.turn_versions: dict[str, int] = defaultdict(int) self.customer_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) self.turn_semaphore = asyncio.Semaphore(max(1, int(MAX_CONCURRENT_TURNS))) self.pending_images: dict[str, list[str]] = defaultdict(list) self.auto_quote_tasks: dict[str, asyncio.Task] = {} self.last_reply_key: dict[str, str] = {} self.first_msg_replied: set[str] = set() self.recent_outbound: list[tuple[str, str, str, float]] = [] self.recent_dialogue: dict[str, list[dict]] = defaultdict(list) @staticmethod def _customer_key(data: dict) -> str: return f"{data.get('acc_id','')}:{data.get('from_id','')}" @staticmethod def _msg_text(data: dict) -> str: return str(data.get("msg", "") or "").strip() @staticmethod def _extract_price_tokens(text: str) -> list[str]: s = str(text or "") if not s: return [] out: list[str] = [] out += re.findall(r"(?:¥|¥)\s*\d+(?:\.\d{1,2})?", s) out += re.findall(r"\d+(?:\.\d{1,2})?\s*元", s) # 去重保序 seen = set() uniq: list[str] = [] for x in out: k = x.strip() if k and k not in seen: seen.add(k) uniq.append(k) return uniq @staticmethod def _route_cn(route: str) -> str: return { "pre_sales": "售前", "quote": "报价", "after_sales": "售后", "risk": "风控", }.get(str(route or ""), "未知") @staticmethod def _status_text(route: str, action: str) -> str: a = str(action or "") if a == "quote": return "开始作图中" if a == "reply": return "已回复客户" if a == "transfer": return "已转人工" if a == "noop": return "仅监听中" if a == "update_state": return "状态已更新" return f"{QingjianClient._route_cn(route)}处理中" def _append_dialogue(self, key: str, role: str, text: str) -> None: t = str(text or "").strip() if not t: return self.recent_dialogue[key].append({"role": role, "text": t}) if len(self.recent_dialogue[key]) > 24: self.recent_dialogue[key] = self.recent_dialogue[key][-24:] @staticmethod def _parse_msg_ts(data: dict) -> float: # 兼容常见时间字段;解析失败时返回0,后续按入队顺序兜底 for key in ("timestamp", "msg_time", "send_time", "create_time", "time"): v = data.get(key) if v is None: continue if isinstance(v, (int, float)): return float(v) s = str(v).strip() if not s: continue # 纯数字时间戳 if re.fullmatch(r"\d{10,13}", s): n = float(s) return n / 1000.0 if len(s) == 13 else n # 常见日期格式 for fmt in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M"): try: return datetime.strptime(s, fmt).timestamp() except Exception: pass return 0.0 def _debounce_seconds(self, msg: str) -> float: if extract_image_urls(msg): return float(IMAGE_MESSAGE_DEBOUNCE_SECONDS) return float(MESSAGE_DEBOUNCE_SECONDS) async def send_message(self, message: dict) -> None: if not self.websocket: return await self.websocket.send(json.dumps(message, ensure_ascii=False)) self.logger.info("[发送] %s", message.get("msg", "")) async def send_reply(self, data: dict, text: str, trace_id: str = "-", turn_version: int | None = None) -> bool: text = str(text or "").strip() if not text: return False text = self._shorten_reply(text) key = self._customer_key(data) if turn_version is not None and self.turn_versions.get(key, 0) != turn_version: activity_event(self.logger, "send_reply_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn") return False msg = { "msg_id": "", "acc_id": data.get("acc_id", ""), "msg": text, "from_id": data.get("from_id", ""), "from_name": data.get("from_name", data.get("from_id", "")), "cy_id": data.get("from_id", ""), "acc_type": data.get("acc_type", "AliWorkbench"), "msg_type": 0, "cy_name": data.get("from_name", data.get("from_id", "")), } activity_event(self.logger, "send_reply_attempt", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text) await self.send_message(msg) self._append_dialogue(key, "assistant", text) try: self.store.append_event( key, "assistant_message", { "acc_id": data.get("acc_id", ""), "customer_id": data.get("from_id", ""), "msg_type": 0, "msg": text, "trace_id": trace_id, }, ) except Exception as e: self.logger.error("[入库] 客服消息写入失败: %s", e) self.recent_outbound.append((str(data.get("acc_id", "")), str(data.get("from_id", "")), text, time.monotonic())) if len(self.recent_outbound) > 200: self.recent_outbound = self.recent_outbound[-200:] activity_event(self.logger, "send_reply_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text) return True async def _push_wechat_pair(self, data: dict, customer_msg: str, reply_msg: str, recent_dialogue: list[dict] | None = None) -> None: try: ok, reason = await push_chat_to_wechat( customer_name=str(data.get("from_name", "") or data.get("cy_name", "") or ""), customer_id=str(data.get("from_id", "") or ""), acc_id=str(data.get("acc_id", "") or ""), customer_msg=str(customer_msg or ""), reply_msg=str(reply_msg or ""), goods_name=str(data.get("goods_name", "") or ""), recent_dialogue=recent_dialogue or [], ) activity_event( self.logger, "wechat_push", customer_id=data.get("from_id", "-"), result="ok" if ok else "skip", reason=reason, ) except Exception as e: activity_event(self.logger, "wechat_push", customer_id=data.get("from_id", "-"), result="error", reason=str(e)) try: ok, reason = await push_chat_to_email( customer_name=str(data.get("from_name", "") or data.get("cy_name", "") or ""), customer_id=str(data.get("from_id", "") or ""), acc_id=str(data.get("acc_id", "") or ""), customer_msg=str(customer_msg or ""), reply_msg=str(reply_msg or ""), goods_name=str(data.get("goods_name", "") or ""), recent_dialogue=recent_dialogue or [], ) activity_event( self.logger, "email_push", customer_id=data.get("from_id", "-"), result="ok" if ok else "skip", reason=reason, ) except Exception as e: activity_event(self.logger, "email_push", customer_id=data.get("from_id", "-"), result="error", reason=str(e)) async def send_image(self, data: dict, image_url: str, trace_id: str = "-", turn_version: int | None = None) -> None: image_url = str(image_url or "").strip() if not image_url: return key = self._customer_key(data) if turn_version is not None and self.turn_versions.get(key, 0) != turn_version: activity_event(self.logger, "send_image_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn") return msg = { "msg_id": "", "acc_id": data.get("acc_id", ""), "msg": image_url, "from_id": data.get("from_id", ""), "from_name": data.get("from_name", data.get("from_id", "")), "cy_id": data.get("from_id", ""), "acc_type": data.get("acc_type", "AliWorkbench"), "msg_type": 1, "cy_name": data.get("from_name", data.get("from_id", "")), } activity_event(self.logger, "send_image_attempt", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=image_url) await self.send_message(msg) self._append_dialogue(key, "assistant", f"[image]{image_url}") try: self.store.append_event( key, "assistant_message", { "acc_id": data.get("acc_id", ""), "customer_id": data.get("from_id", ""), "msg_type": 1, "msg": image_url, "trace_id": trace_id, }, ) except Exception as e: self.logger.error("[入库] 客服图片消息写入失败: %s", e) self.recent_outbound.append((str(data.get("acc_id", "")), str(data.get("from_id", "")), image_url, time.monotonic())) if len(self.recent_outbound) > 200: self.recent_outbound = self.recent_outbound[-200:] activity_event(self.logger, "send_image_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=image_url) @staticmethod def _clean_text(text: str) -> str: t = str(text or "").strip() t = re.sub(r"\s+", "", t) return t def _shorten_reply(self, text: str) -> str: t = str(text or "").strip() t = self._humanize_reply(t) # 只取首句,不做按字数硬截断,避免半句/残句 parts = re.split(r"[。!?!?]", t) head = next((p.strip() for p in parts if p and p.strip()), "") if not head: # 无句号时按逗号切第一分句 sub_parts = re.split(r"[,,;;::]", t) head = next((p.strip() for p in sub_parts if p and p.strip()), t) return head or t @staticmethod def _humanize_reply(text: str) -> str: t = str(text or "").strip() # 去AI腔常见口癖 t = re.sub(r"^(亲亲|宝子|宝贝|您好呀|您好哦)[,,]?\s*", "", t) t = t.replace("我这边", "我") t = t.replace("请问", "") t = t.replace("可以先帮您评估看看哦", "我先看下") t = t.replace("服务质量有保障", "质量没问题") t = t.replace("这个价格已经是很优惠的啦", "这价已经很低了") t = re.sub(r"(哈~|哦~|呀~|啦~)$", "", t) t = re.sub(r"\s+", "", t) return t @staticmethod def _is_invalid_ai_reply(text: str) -> bool: t = str(text or "").strip().lower() if not t: return True if "i noticed that you have interrupted me" in t: return True if t.startswith("action:") or t.startswith("{"): return True return False def _fallback_reply(self, action: str) -> str: if action == "transfer": return "我先给你转人工处理。" if action == "quote": return "我先给你找找看" return "你直接说需求就行" @staticmethod def _is_greeting_only(text: str) -> bool: t = re.sub(r"\s+", "", str(text or "")) if not t: return False greetings = { "你好", "您好", "在吗", "在不在", "哈喽", "hello", "hi", "嗨", "有人吗", "在", "在嘛", } return t.lower() in greetings def _is_outbound_echo(self, data: dict, msg: str) -> bool: """ 轻简可能会把我方刚发送文本回推为“收到消息”。 对“短时间完全相同文本”做回环拦截,兼容 acc/from 对调回推,避免无限对话。 """ in_acc = str(data.get("acc_id", "")) in_from = str(data.get("from_id", "")) in_msg = self._clean_text(msg) now = time.monotonic() if not in_msg: return False for out_acc, out_to, out_msg, ts in reversed(self.recent_outbound): if (now - ts) > 120: break if self._clean_text(out_msg) != in_msg: continue if (out_acc == in_acc and out_to == in_from) or (out_acc == in_from and out_to == in_acc): return True return False async def _handle_decision(self, data: dict, merged_msg: str, *, auto_quote: bool = False, turn_version: int | None = None) -> None: if is_listen_only(): activity_event( self.logger, "ai_reply_skipped", customer_id=data.get("from_id", "-"), reason="listen_only_mode", ) return key = self._customer_key(data) if turn_version is not None and self.turn_versions.get(key, 0) != turn_version: activity_event(self.logger, "decision_skipped", customer_id=data.get("from_id", "-"), reason="stale_turn") return trace_id = build_trace_id(data.get("acc_id", ""), data.get("from_id", ""), merged_msg) t0 = time.perf_counter() urls = extract_image_urls(merged_msg) if urls: for u in urls: if u not in self.pending_images[key]: self.pending_images[key].append(u) # 上下文优先从数据库回读,保证重启后也能恢复最近对话 try: recent_dialogue = self.store.get_recent_dialogue(key, limit=24) except Exception as e: self.logger.error("[入库] 读取最近对话失败: %s", e) recent_dialogue = self.recent_dialogue.get(key, []) context = { "customer_key": key, "acc_id": data.get("acc_id", ""), "customer_id": data.get("from_id", ""), "goods_name": data.get("goods_name", ""), "goods_order": data.get("goods_order", ""), "msg": merged_msg, "intent": "unknown", "pending_images": len(self.pending_images[key]), "pending_image_urls": self.pending_images[key][-5:], "current_image_urls": urls[-3:], "latest_image_url": (urls[-1] if urls else (self.pending_images[key][-1] if self.pending_images[key] else "")), "auto_quote_trigger": auto_quote, "last_reply": self.last_reply_key.get(key, ""), "recent_dialogue": recent_dialogue[-12:], } # 首条已快速回复“在的”后,纯问候不再重复走AI回复 if self._is_greeting_only(merged_msg) and self.last_reply_key.get(key) == "在的" and not urls and int(context["pending_images"]) == 0: activity_event( self.logger, "agent_process_skipped", trace_id=trace_id, customer_id=context["customer_id"], reason="greeting_already_replied", ) await post_tianwang_callback( "message_processed", data, extra={"trace_id": trace_id, "route": "pre_sales", "action": "noop", "reply": ""}, ) return try: rd = context.get("recent_dialogue", []) or [] rd_preview = " | ".join( [f"{str(x.get('role',''))}:{str(x.get('text',''))[:32]}" for x in rd[-8:] if isinstance(x, dict)] ) self.logger.info( "[AI上下文] customer=%s msg=%s pending_images=%s recent=%s", context["customer_id"], str(merged_msg or "")[:120], context["pending_images"], rd_preview, ) except Exception: pass activity_event(self.logger, "agent_process_start", trace_id=trace_id, customer_id=context["customer_id"], acc_id=context["acc_id"], intent=context["intent"]) route, decision, state = await self.orchestrator.decide(context) latency_ms = int((time.perf_counter() - t0) * 1000) status_text = self._status_text(route, decision.action) activity_event( self.logger, "agent_process_done", trace_id=trace_id, customer_id=context["customer_id"], route=route, route_cn=self._route_cn(route), action=decision.action, reason=status_text, raw_reason=decision.reason, latency_ms=latency_ms, after_sales_stage=state.get("after_sales_stage", "new"), ) # 价格日志(中文) price_hits = [] price_hits.extend(self._extract_price_tokens(merged_msg)) price_hits.extend(self._extract_price_tokens(decision.reply)) price_hits.extend(self._extract_price_tokens(decision.reason)) if price_hits: self.logger.info("[价格] 客户=%s 金额=%s", context["customer_id"], " | ".join(price_hits)) if decision.action == "transfer": text = (decision.transfer_msg or "").strip() if self._is_invalid_ai_reply(text): text = self._fallback_reply("transfer") ok_transfer, reason = await transfer_to_human_flow(self, data, transfer_msg=text, trace_id=trace_id) if not ok_transfer: self.logger.error("[转人工] 指令失败: %s", reason) sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) if sent: self.last_reply_key[key] = text await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:]) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": text}) return if decision.action == "quote": analysis = state.get("last_image_quote_analysis", {}) if isinstance(state, dict) else {} can_do = str((analysis or {}).get("can_do", "")).lower() draw_allowed = can_do in {"", "yes", "partial"} if AUTO_DRAW_ENABLED and draw_allowed and self.pending_images.get(key): latest_image = self.pending_images[key][-1] activity_event( self.logger, "auto_draw_start", trace_id=trace_id, customer_id=context["customer_id"], image_url=latest_image, ) self.logger.info("[作图] 开始 customer=%s", context["customer_id"]) draw_res = await auto_draw_preview( image_url=latest_image, customer_id=context["customer_id"], requirement=merged_msg, ) if draw_res.get("ok"): preview_url = str(draw_res.get("url", "") or "") await self.send_image(data, preview_url, trace_id=trace_id, turn_version=turn_version) # 预览完成后清掉当前批次,避免同一图重复触发 self.pending_images[key].clear() activity_event( self.logger, "auto_draw_success", trace_id=trace_id, customer_id=context["customer_id"], preview_url=preview_url, ) self.logger.info("[作图] 成功 customer=%s url=%s", context["customer_id"], preview_url) await post_tianwang_callback( "message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": "", "auto_draw": True}, ) return activity_event( self.logger, "auto_draw_fail", trace_id=trace_id, customer_id=context["customer_id"], error=str(draw_res.get("error", "unknown")), ) if bool(draw_res.get("need_transfer")): tmsg = "这个我转人工给你看下" ok_transfer, reason = await transfer_to_human_flow(self, data, transfer_msg=tmsg, trace_id=trace_id) if not ok_transfer: self.logger.error("[转人工] 指令失败: %s", reason) sent = await self.send_reply(data, tmsg, trace_id=trace_id, turn_version=turn_version) if sent: self.last_reply_key[key] = tmsg await self._push_wechat_pair(data, merged_msg, tmsg, recent_dialogue=recent_dialogue[-12:]) await post_tianwang_callback( "message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": tmsg}, ) return self.logger.error("[作图] 失败 customer=%s error=%s", context["customer_id"], draw_res.get("error", "unknown")) elif AUTO_DRAW_ENABLED and (not draw_allowed): self.logger.info("[作图] 已跳过: 识图结果不可做 can_do=%s customer=%s", can_do, context["customer_id"]) text = (decision.reply or "").strip() if self._is_invalid_ai_reply(text): text = self._fallback_reply("quote") if self.last_reply_key.get(key) != text: sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) if sent: self.last_reply_key[key] = text await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:]) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": text}) return if decision.action == "noop": await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "noop", "reply": ""}) return text = (decision.reply or "").strip() if self._is_invalid_ai_reply(text): text = self._fallback_reply("reply") if self.last_reply_key.get(key) != text: sent = await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) if sent: self.last_reply_key[key] = text await self._push_wechat_pair(data, merged_msg, text, recent_dialogue=recent_dialogue[-12:]) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "reply", "reply": text}) if self.pending_images[key] and key not in self.auto_quote_tasks: self.auto_quote_tasks[key] = asyncio.create_task(self._auto_quote_later(data)) async def _auto_quote_later(self, data: dict) -> None: key = self._customer_key(data) try: await asyncio.sleep(AUTO_QUOTE_WAIT_SECONDS) if self.pending_images.get(key): # 自动报价也走并发控制与客户互斥 async with self.turn_semaphore: async with self.customer_locks[key]: await asyncio.wait_for( self._handle_decision(data, "", auto_quote=True, turn_version=self.turn_versions.get(key, 0)), timeout=max(10, int(DECISION_TIMEOUT_SECONDS)), ) finally: self.auto_quote_tasks.pop(key, None) async def _flush_customer(self, key: str, turn_version: int | None = None) -> None: queue = self.pending_msgs.get(key, []) if not queue: return # 只处理当前快照,避免把处理中新增消息一并清掉 snapshot = list(queue) if not snapshot: return indexed = list(enumerate(snapshot)) indexed.sort(key=lambda it: (self._parse_msg_ts(it[1]), it[0])) ordered = [x for _, x in indexed] merged = "、".join([self._msg_text(x) for x in ordered if self._msg_text(x)]) data = ordered[-1] await self._handle_decision(data, merged, turn_version=turn_version) # 仅弹出已处理快照,保留处理中到达的新消息 cur = self.pending_msgs.get(key, []) n = min(len(snapshot), len(cur)) if n > 0: del cur[:n] if not cur: self.pending_msgs.pop(key, None) async def _run_customer_turn(self, key: str, turn_version: int) -> None: async with self.turn_semaphore: async with self.customer_locks[key]: await asyncio.wait_for( self._flush_customer(key, turn_version=turn_version), timeout=max(10, int(DECISION_TIMEOUT_SECONDS)), ) def _schedule_customer_turn(self, key: str) -> None: # 新消息来了就取消同客户旧任务,重新按最新消息计算 self.turn_versions[key] = int(self.turn_versions.get(key, 0)) + 1 turn_version = self.turn_versions[key] old = self.processing_tasks.get(key) if old and not old.done(): old.cancel() async def runner() -> None: try: await self._run_customer_turn(key, turn_version) except asyncio.CancelledError: activity_event(self.logger, "customer_turn_cancelled", customer_id=key.split(":")[-1], reason="newer_message") return except asyncio.TimeoutError: activity_event(self.logger, "customer_turn_timeout", customer_id=key.split(":")[-1], reason="decision_timeout") return except Exception as e: activity_event(self.logger, "customer_turn_error", customer_id=key.split(":")[-1], reason=str(e)) return finally: cur = self.processing_tasks.get(key) if cur is asyncio.current_task(): self.processing_tasks.pop(key, None) self.processing_tasks[key] = asyncio.create_task(runner()) async def _debounce_enqueue(self, data: dict) -> None: key = self._customer_key(data) msg = self._msg_text(data) self.pending_msgs[key].append(data) if key in self.debounce_tasks: self.debounce_tasks[key].cancel() wait_s = self._debounce_seconds(msg) activity_event(self.logger, "debounce_enqueue", customer_id=data.get("from_id", "-"), key=key, queue_size=len(self.pending_msgs[key]), wait_s=wait_s) async def later() -> None: try: await asyncio.sleep(wait_s) self._schedule_customer_turn(key) except asyncio.CancelledError: return finally: self.debounce_tasks.pop(key, None) self.debounce_tasks[key] = asyncio.create_task(later()) async def _on_message(self, raw: str) -> None: try: data = json.loads(raw) except Exception: self.logger.info("[非JSON] %s", raw) return msg_type = int(data.get("msg_type", 0) or 0) msg = self._msg_text(data) rule = prefilter_message(msg, msg_type) self.logger.info("[收消息] acc=%s from=%s type=%s msg=%s", data.get("acc_id", ""), data.get("from_id", ""), msg_type, msg) await post_tianwang_callback("message_received", data, extra={"msg_type": msg_type}) # 客户消息全量入库(监听模式也落库) try: customer_key = self._customer_key(data) self.store.append_event( customer_key, "customer_message", { "acc_id": data.get("acc_id", ""), "customer_id": data.get("from_id", ""), "msg_type": msg_type, "msg": msg, "raw_msg": data.get("msg", ""), "timestamp": data.get("timestamp", ""), }, ) except Exception as e: self.logger.error("[入库] 客户消息写入失败: %s", e) if self._is_outbound_echo(data, msg): activity_event( self.logger, "inbound_ignored", customer_id=data.get("from_id", "-"), reason="outbound_echo_loop_guard", ) return if rule.ignore: activity_event(self.logger, "inbound_ignored", customer_id=data.get("from_id", "-"), reason=rule.reason) return patched = dict(data) patched["msg"] = rule.normalized_msg or msg key = self._customer_key(patched) self._append_dialogue(key, "user", patched["msg"]) if is_listen_only(): activity_event( self.logger, "ai_reply_skipped", customer_id=patched.get("from_id", "-"), reason="listen_only_mode", ) return # 硬编码:每个客户首条消息先快速回复“在的” if key not in self.first_msg_replied: sent = await self.send_reply(patched, "在的") if sent: self.last_reply_key[key] = "在的" await self._push_wechat_pair(patched, patched["msg"], "在的", recent_dialogue=self.recent_dialogue.get(key, [])[-8:]) self.first_msg_replied.add(key) if msg_type == 1: await self._handle_decision(patched, patched["msg"]) return await self._debounce_enqueue(patched) async def _serve(self) -> None: while self.running: try: self.logger.info("[连接] %s", self.uri) async with websockets.connect(self.uri) as ws: self.websocket = ws self.logger.info("[连接成功]") async for raw in ws: await self._on_message(raw) except Exception as e: self.logger.info("[连接异常] %s", e) await asyncio.sleep(3) def run(self) -> None: asyncio.run(self._serve())