From 1f28dc4630ebe7b99763b8a9b754dfbcf9c266a8 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Tue, 3 Mar 2026 13:03:52 +0800 Subject: [PATCH] fix: prevent stale-turn double replies and suppress thinking noise --- qingjian_cs/app/client.py | 45 +++++++++++++++++++++++++++------------ qingjian_cs/app/logger.py | 2 ++ 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/qingjian_cs/app/client.py b/qingjian_cs/app/client.py index 0541942..1f8aae9 100644 --- a/qingjian_cs/app/client.py +++ b/qingjian_cs/app/client.py @@ -39,6 +39,7 @@ class QingjianClient: self.pending_msgs: dict[str, list[dict]] = defaultdict(list) self.debounce_tasks: dict[str, asyncio.Task] = {} self.processing_tasks: dict[str, asyncio.Task] = {} + self.turn_versions: dict[str, int] = defaultdict(int) self.customer_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) self.turn_semaphore = asyncio.Semaphore(max(1, int(MAX_CONCURRENT_TURNS))) self.pending_images: dict[str, list[str]] = defaultdict(list) @@ -141,12 +142,15 @@ class QingjianClient: await self.websocket.send(json.dumps(message, ensure_ascii=False)) self.logger.info("[发送] %s", message.get("msg", "")) - async def send_reply(self, data: dict, text: str, trace_id: str = "-") -> None: + async def send_reply(self, data: dict, text: str, trace_id: str = "-", turn_version: int | None = None) -> None: text = str(text or "").strip() if not text: return text = self._shorten_reply(text) key = self._customer_key(data) + if turn_version is not None and self.turn_versions.get(key, 0) != turn_version: + activity_event(self.logger, "send_reply_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn") + return msg = { "msg_id": "", "acc_id": data.get("acc_id", ""), @@ -180,11 +184,14 @@ class QingjianClient: self.recent_outbound = self.recent_outbound[-200:] activity_event(self.logger, "send_reply_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text) - async def send_image(self, data: dict, image_url: str, trace_id: str = "-") -> None: + async def send_image(self, data: dict, image_url: str, trace_id: str = "-", turn_version: int | None = None) -> None: image_url = str(image_url or "").strip() if not image_url: return key = self._customer_key(data) + if turn_version is not None and self.turn_versions.get(key, 0) != turn_version: + activity_event(self.logger, "send_image_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn") + return msg = { "msg_id": "", "acc_id": data.get("acc_id", ""), @@ -286,7 +293,7 @@ class QingjianClient: return True return False - async def _handle_decision(self, data: dict, merged_msg: str, *, auto_quote: bool = False) -> None: + async def _handle_decision(self, data: dict, merged_msg: str, *, auto_quote: bool = False, turn_version: int | None = None) -> None: if is_listen_only(): activity_event( self.logger, @@ -296,6 +303,9 @@ class QingjianClient: ) return key = self._customer_key(data) + if turn_version is not None and self.turn_versions.get(key, 0) != turn_version: + activity_event(self.logger, "decision_skipped", customer_id=data.get("from_id", "-"), reason="stale_turn") + return trace_id = build_trace_id(data.get("acc_id", ""), data.get("from_id", ""), merged_msg) t0 = time.perf_counter() @@ -358,13 +368,16 @@ class QingjianClient: text = (decision.transfer_msg or "").strip() if self._is_invalid_ai_reply(text): text = self._fallback_reply("transfer") - await self.send_reply(data, text, trace_id=trace_id) + await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) self.last_reply_key[key] = text await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": text}) return if decision.action == "quote": - if AUTO_DRAW_ENABLED and self.pending_images.get(key): + analysis = state.get("last_image_quote_analysis", {}) if isinstance(state, dict) else {} + can_do = str((analysis or {}).get("can_do", "")).lower() + draw_allowed = can_do in {"", "yes", "partial"} + if AUTO_DRAW_ENABLED and draw_allowed and self.pending_images.get(key): latest_image = self.pending_images[key][-1] activity_event( self.logger, @@ -381,7 +394,7 @@ class QingjianClient: ) if draw_res.get("ok"): preview_url = str(draw_res.get("url", "") or "") - await self.send_image(data, preview_url, trace_id=trace_id) + await self.send_image(data, preview_url, trace_id=trace_id, turn_version=turn_version) # 预览完成后清掉当前批次,避免同一图重复触发 self.pending_images[key].clear() activity_event( @@ -406,11 +419,13 @@ class QingjianClient: error=str(draw_res.get("error", "unknown")), ) self.logger.error("[作图] 失败 customer=%s error=%s", context["customer_id"], draw_res.get("error", "unknown")) + elif AUTO_DRAW_ENABLED and (not draw_allowed): + self.logger.info("[作图] 已跳过: 识图结果不可做 can_do=%s customer=%s", can_do, context["customer_id"]) text = (decision.reply or "").strip() if self._is_invalid_ai_reply(text): text = self._fallback_reply("quote") if self.last_reply_key.get(key) != text: - await self.send_reply(data, text, trace_id=trace_id) + await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) self.last_reply_key[key] = text await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": text}) return @@ -423,7 +438,7 @@ class QingjianClient: if self._is_invalid_ai_reply(text): text = self._fallback_reply("reply") if self.last_reply_key.get(key) != text: - await self.send_reply(data, text, trace_id=trace_id) + await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) self.last_reply_key[key] = text await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "reply", "reply": text}) @@ -440,13 +455,13 @@ class QingjianClient: async with self.turn_semaphore: async with self.customer_locks[key]: await asyncio.wait_for( - self._handle_decision(data, "", auto_quote=True), + self._handle_decision(data, "", auto_quote=True, turn_version=self.turn_versions.get(key, 0)), timeout=max(10, int(DECISION_TIMEOUT_SECONDS)), ) finally: self.auto_quote_tasks.pop(key, None) - async def _flush_customer(self, key: str) -> None: + async def _flush_customer(self, key: str, turn_version: int | None = None) -> None: queue = self.pending_msgs.get(key, []) if not queue: return @@ -459,7 +474,7 @@ class QingjianClient: ordered = [x for _, x in indexed] merged = "、".join([self._msg_text(x) for x in ordered if self._msg_text(x)]) data = ordered[-1] - await self._handle_decision(data, merged) + await self._handle_decision(data, merged, turn_version=turn_version) # 仅弹出已处理快照,保留处理中到达的新消息 cur = self.pending_msgs.get(key, []) n = min(len(snapshot), len(cur)) @@ -468,23 +483,25 @@ class QingjianClient: if not cur: self.pending_msgs.pop(key, None) - async def _run_customer_turn(self, key: str) -> None: + async def _run_customer_turn(self, key: str, turn_version: int) -> None: async with self.turn_semaphore: async with self.customer_locks[key]: await asyncio.wait_for( - self._flush_customer(key), + self._flush_customer(key, turn_version=turn_version), timeout=max(10, int(DECISION_TIMEOUT_SECONDS)), ) def _schedule_customer_turn(self, key: str) -> None: # 新消息来了就取消同客户旧任务,重新按最新消息计算 + self.turn_versions[key] = int(self.turn_versions.get(key, 0)) + 1 + turn_version = self.turn_versions[key] old = self.processing_tasks.get(key) if old and not old.done(): old.cancel() async def runner() -> None: try: - await self._run_customer_turn(key) + await self._run_customer_turn(key, turn_version) except asyncio.CancelledError: activity_event(self.logger, "customer_turn_cancelled", customer_id=key.split(":")[-1], reason="newer_message") return diff --git a/qingjian_cs/app/logger.py b/qingjian_cs/app/logger.py index bf478a1..d46595c 100644 --- a/qingjian_cs/app/logger.py +++ b/qingjian_cs/app/logger.py @@ -15,6 +15,8 @@ class _StreamColorizer: if not data: return out = str(data) + if "Unsupported block type thinking" in out: + return if "\x1b[" in out: self.stream.write(out) return