fix: prevent stale-turn double replies and suppress thinking noise
Some checks failed
Pre-commit / run (ubuntu-latest) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_en (ubuntu-latest, 3.10) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_zh (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.12) (push) Has been cancelled

This commit is contained in:
2026-03-03 13:03:52 +08:00
parent 36e9082d33
commit 1f28dc4630
2 changed files with 33 additions and 14 deletions

View File

@@ -39,6 +39,7 @@ class QingjianClient:
self.pending_msgs: dict[str, list[dict]] = defaultdict(list) self.pending_msgs: dict[str, list[dict]] = defaultdict(list)
self.debounce_tasks: dict[str, asyncio.Task] = {} self.debounce_tasks: dict[str, asyncio.Task] = {}
self.processing_tasks: dict[str, asyncio.Task] = {} self.processing_tasks: dict[str, asyncio.Task] = {}
self.turn_versions: dict[str, int] = defaultdict(int)
self.customer_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) self.customer_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
self.turn_semaphore = asyncio.Semaphore(max(1, int(MAX_CONCURRENT_TURNS))) self.turn_semaphore = asyncio.Semaphore(max(1, int(MAX_CONCURRENT_TURNS)))
self.pending_images: dict[str, list[str]] = defaultdict(list) self.pending_images: dict[str, list[str]] = defaultdict(list)
@@ -141,12 +142,15 @@ class QingjianClient:
await self.websocket.send(json.dumps(message, ensure_ascii=False)) await self.websocket.send(json.dumps(message, ensure_ascii=False))
self.logger.info("[发送] %s", message.get("msg", "")) self.logger.info("[发送] %s", message.get("msg", ""))
async def send_reply(self, data: dict, text: str, trace_id: str = "-") -> None: async def send_reply(self, data: dict, text: str, trace_id: str = "-", turn_version: int | None = None) -> None:
text = str(text or "").strip() text = str(text or "").strip()
if not text: if not text:
return return
text = self._shorten_reply(text) text = self._shorten_reply(text)
key = self._customer_key(data) key = self._customer_key(data)
if turn_version is not None and self.turn_versions.get(key, 0) != turn_version:
activity_event(self.logger, "send_reply_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn")
return
msg = { msg = {
"msg_id": "", "msg_id": "",
"acc_id": data.get("acc_id", ""), "acc_id": data.get("acc_id", ""),
@@ -180,11 +184,14 @@ class QingjianClient:
self.recent_outbound = self.recent_outbound[-200:] self.recent_outbound = self.recent_outbound[-200:]
activity_event(self.logger, "send_reply_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text) activity_event(self.logger, "send_reply_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text)
async def send_image(self, data: dict, image_url: str, trace_id: str = "-") -> None: async def send_image(self, data: dict, image_url: str, trace_id: str = "-", turn_version: int | None = None) -> None:
image_url = str(image_url or "").strip() image_url = str(image_url or "").strip()
if not image_url: if not image_url:
return return
key = self._customer_key(data) key = self._customer_key(data)
if turn_version is not None and self.turn_versions.get(key, 0) != turn_version:
activity_event(self.logger, "send_image_skipped", trace_id=trace_id, customer_id=data.get("from_id", "-"), reason="stale_turn")
return
msg = { msg = {
"msg_id": "", "msg_id": "",
"acc_id": data.get("acc_id", ""), "acc_id": data.get("acc_id", ""),
@@ -286,7 +293,7 @@ class QingjianClient:
return True return True
return False return False
async def _handle_decision(self, data: dict, merged_msg: str, *, auto_quote: bool = False) -> None: async def _handle_decision(self, data: dict, merged_msg: str, *, auto_quote: bool = False, turn_version: int | None = None) -> None:
if is_listen_only(): if is_listen_only():
activity_event( activity_event(
self.logger, self.logger,
@@ -296,6 +303,9 @@ class QingjianClient:
) )
return return
key = self._customer_key(data) key = self._customer_key(data)
if turn_version is not None and self.turn_versions.get(key, 0) != turn_version:
activity_event(self.logger, "decision_skipped", customer_id=data.get("from_id", "-"), reason="stale_turn")
return
trace_id = build_trace_id(data.get("acc_id", ""), data.get("from_id", ""), merged_msg) trace_id = build_trace_id(data.get("acc_id", ""), data.get("from_id", ""), merged_msg)
t0 = time.perf_counter() t0 = time.perf_counter()
@@ -358,13 +368,16 @@ class QingjianClient:
text = (decision.transfer_msg or "").strip() text = (decision.transfer_msg or "").strip()
if self._is_invalid_ai_reply(text): if self._is_invalid_ai_reply(text):
text = self._fallback_reply("transfer") text = self._fallback_reply("transfer")
await self.send_reply(data, text, trace_id=trace_id) await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version)
self.last_reply_key[key] = text self.last_reply_key[key] = text
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": text}) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": text})
return return
if decision.action == "quote": if decision.action == "quote":
if AUTO_DRAW_ENABLED and self.pending_images.get(key): analysis = state.get("last_image_quote_analysis", {}) if isinstance(state, dict) else {}
can_do = str((analysis or {}).get("can_do", "")).lower()
draw_allowed = can_do in {"", "yes", "partial"}
if AUTO_DRAW_ENABLED and draw_allowed and self.pending_images.get(key):
latest_image = self.pending_images[key][-1] latest_image = self.pending_images[key][-1]
activity_event( activity_event(
self.logger, self.logger,
@@ -381,7 +394,7 @@ class QingjianClient:
) )
if draw_res.get("ok"): if draw_res.get("ok"):
preview_url = str(draw_res.get("url", "") or "") preview_url = str(draw_res.get("url", "") or "")
await self.send_image(data, preview_url, trace_id=trace_id) await self.send_image(data, preview_url, trace_id=trace_id, turn_version=turn_version)
# 预览完成后清掉当前批次,避免同一图重复触发 # 预览完成后清掉当前批次,避免同一图重复触发
self.pending_images[key].clear() self.pending_images[key].clear()
activity_event( activity_event(
@@ -406,11 +419,13 @@ class QingjianClient:
error=str(draw_res.get("error", "unknown")), error=str(draw_res.get("error", "unknown")),
) )
self.logger.error("[作图] 失败 customer=%s error=%s", context["customer_id"], draw_res.get("error", "unknown")) self.logger.error("[作图] 失败 customer=%s error=%s", context["customer_id"], draw_res.get("error", "unknown"))
elif AUTO_DRAW_ENABLED and (not draw_allowed):
self.logger.info("[作图] 已跳过: 识图结果不可做 can_do=%s customer=%s", can_do, context["customer_id"])
text = (decision.reply or "").strip() text = (decision.reply or "").strip()
if self._is_invalid_ai_reply(text): if self._is_invalid_ai_reply(text):
text = self._fallback_reply("quote") text = self._fallback_reply("quote")
if self.last_reply_key.get(key) != text: if self.last_reply_key.get(key) != text:
await self.send_reply(data, text, trace_id=trace_id) await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version)
self.last_reply_key[key] = text self.last_reply_key[key] = text
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": text}) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": text})
return return
@@ -423,7 +438,7 @@ class QingjianClient:
if self._is_invalid_ai_reply(text): if self._is_invalid_ai_reply(text):
text = self._fallback_reply("reply") text = self._fallback_reply("reply")
if self.last_reply_key.get(key) != text: if self.last_reply_key.get(key) != text:
await self.send_reply(data, text, trace_id=trace_id) await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version)
self.last_reply_key[key] = text self.last_reply_key[key] = text
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "reply", "reply": text}) await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "reply", "reply": text})
@@ -440,13 +455,13 @@ class QingjianClient:
async with self.turn_semaphore: async with self.turn_semaphore:
async with self.customer_locks[key]: async with self.customer_locks[key]:
await asyncio.wait_for( await asyncio.wait_for(
self._handle_decision(data, "", auto_quote=True), self._handle_decision(data, "", auto_quote=True, turn_version=self.turn_versions.get(key, 0)),
timeout=max(10, int(DECISION_TIMEOUT_SECONDS)), timeout=max(10, int(DECISION_TIMEOUT_SECONDS)),
) )
finally: finally:
self.auto_quote_tasks.pop(key, None) self.auto_quote_tasks.pop(key, None)
async def _flush_customer(self, key: str) -> None: async def _flush_customer(self, key: str, turn_version: int | None = None) -> None:
queue = self.pending_msgs.get(key, []) queue = self.pending_msgs.get(key, [])
if not queue: if not queue:
return return
@@ -459,7 +474,7 @@ class QingjianClient:
ordered = [x for _, x in indexed] ordered = [x for _, x in indexed]
merged = "".join([self._msg_text(x) for x in ordered if self._msg_text(x)]) merged = "".join([self._msg_text(x) for x in ordered if self._msg_text(x)])
data = ordered[-1] data = ordered[-1]
await self._handle_decision(data, merged) await self._handle_decision(data, merged, turn_version=turn_version)
# 仅弹出已处理快照,保留处理中到达的新消息 # 仅弹出已处理快照,保留处理中到达的新消息
cur = self.pending_msgs.get(key, []) cur = self.pending_msgs.get(key, [])
n = min(len(snapshot), len(cur)) n = min(len(snapshot), len(cur))
@@ -468,23 +483,25 @@ class QingjianClient:
if not cur: if not cur:
self.pending_msgs.pop(key, None) self.pending_msgs.pop(key, None)
async def _run_customer_turn(self, key: str) -> None: async def _run_customer_turn(self, key: str, turn_version: int) -> None:
async with self.turn_semaphore: async with self.turn_semaphore:
async with self.customer_locks[key]: async with self.customer_locks[key]:
await asyncio.wait_for( await asyncio.wait_for(
self._flush_customer(key), self._flush_customer(key, turn_version=turn_version),
timeout=max(10, int(DECISION_TIMEOUT_SECONDS)), timeout=max(10, int(DECISION_TIMEOUT_SECONDS)),
) )
def _schedule_customer_turn(self, key: str) -> None: def _schedule_customer_turn(self, key: str) -> None:
# 新消息来了就取消同客户旧任务,重新按最新消息计算 # 新消息来了就取消同客户旧任务,重新按最新消息计算
self.turn_versions[key] = int(self.turn_versions.get(key, 0)) + 1
turn_version = self.turn_versions[key]
old = self.processing_tasks.get(key) old = self.processing_tasks.get(key)
if old and not old.done(): if old and not old.done():
old.cancel() old.cancel()
async def runner() -> None: async def runner() -> None:
try: try:
await self._run_customer_turn(key) await self._run_customer_turn(key, turn_version)
except asyncio.CancelledError: except asyncio.CancelledError:
activity_event(self.logger, "customer_turn_cancelled", customer_id=key.split(":")[-1], reason="newer_message") activity_event(self.logger, "customer_turn_cancelled", customer_id=key.split(":")[-1], reason="newer_message")
return return

View File

@@ -15,6 +15,8 @@ class _StreamColorizer:
if not data: if not data:
return return
out = str(data) out = str(data)
if "Unsupported block type thinking" in out:
return
if "\x1b[" in out: if "\x1b[" in out:
self.stream.write(out) self.stream.write(out)
return return