feat: add simulator page and image quote analyzer
Some checks failed
Pre-commit / run (ubuntu-latest) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_en (ubuntu-latest, 3.10) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_zh (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.12) (push) Has been cancelled

This commit is contained in:
2026-03-03 12:41:28 +08:00
parent 919c70789e
commit b663c7acbf
6 changed files with 605 additions and 7 deletions

View File

@@ -4,6 +4,7 @@ import re
import time
from collections import defaultdict
from datetime import datetime
from contextlib import suppress
import websockets
@@ -12,6 +13,8 @@ from .auto_draw import auto_draw_preview
from .config import (
AUTO_DRAW_ENABLED,
AUTO_QUOTE_WAIT_SECONDS,
DECISION_TIMEOUT_SECONDS,
MAX_CONCURRENT_TURNS,
MESSAGE_DEBOUNCE_SECONDS,
QINGJIAN_WS_URI,
)
@@ -20,6 +23,7 @@ from .observability import activity_event, build_trace_id
from .orchestrator import Orchestrator
from .rules import extract_image_urls, prefilter_message
from .runtime_switch import is_listen_only
from .store import ConversationStore
class QingjianClient:
@@ -30,9 +34,13 @@ class QingjianClient:
self.websocket = None
self.running = True
self.orchestrator = Orchestrator()
self.store = ConversationStore()
self.pending_msgs: dict[str, list[dict]] = defaultdict(list)
self.debounce_tasks: dict[str, asyncio.Task] = {}
self.processing_tasks: dict[str, asyncio.Task] = {}
self.customer_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
self.turn_semaphore = asyncio.Semaphore(max(1, int(MAX_CONCURRENT_TURNS)))
self.pending_images: dict[str, list[str]] = defaultdict(list)
self.auto_quote_tasks: dict[str, asyncio.Task] = {}
self.last_reply_key: dict[str, str] = {}
@@ -153,6 +161,20 @@ class QingjianClient:
activity_event(self.logger, "send_reply_attempt", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text)
await self.send_message(msg)
self._append_dialogue(key, "assistant", text)
try:
self.store.append_event(
key,
"assistant_message",
{
"acc_id": data.get("acc_id", ""),
"customer_id": data.get("from_id", ""),
"msg_type": 0,
"msg": text,
"trace_id": trace_id,
},
)
except Exception as e:
self.logger.error("[入库] 客服消息写入失败: %s", e)
self.recent_outbound.append((str(data.get("acc_id", "")), str(data.get("from_id", "")), text, time.monotonic()))
if len(self.recent_outbound) > 200:
self.recent_outbound = self.recent_outbound[-200:]
@@ -177,6 +199,20 @@ class QingjianClient:
activity_event(self.logger, "send_image_attempt", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=image_url)
await self.send_message(msg)
self._append_dialogue(key, "assistant", f"[image]{image_url}")
try:
self.store.append_event(
key,
"assistant_message",
{
"acc_id": data.get("acc_id", ""),
"customer_id": data.get("from_id", ""),
"msg_type": 1,
"msg": image_url,
"trace_id": trace_id,
},
)
except Exception as e:
self.logger.error("[入库] 客服图片消息写入失败: %s", e)
self.recent_outbound.append((str(data.get("acc_id", "")), str(data.get("from_id", "")), image_url, time.monotonic()))
if len(self.recent_outbound) > 200:
self.recent_outbound = self.recent_outbound[-200:]
@@ -269,6 +305,13 @@ class QingjianClient:
if u not in self.pending_images[key]:
self.pending_images[key].append(u)
# 上下文优先从数据库回读,保证重启后也能恢复最近对话
try:
recent_dialogue = self.store.get_recent_dialogue(key, limit=24)
except Exception as e:
self.logger.error("[入库] 读取最近对话失败: %s", e)
recent_dialogue = self.recent_dialogue.get(key, [])
context = {
"customer_key": key,
"acc_id": data.get("acc_id", ""),
@@ -278,9 +321,12 @@ class QingjianClient:
"msg": merged_msg,
"intent": "unknown",
"pending_images": len(self.pending_images[key]),
"pending_image_urls": self.pending_images[key][-5:],
"current_image_urls": urls[-3:],
"latest_image_url": (urls[-1] if urls else (self.pending_images[key][-1] if self.pending_images[key] else "")),
"auto_quote_trigger": auto_quote,
"last_reply": self.last_reply_key.get(key, ""),
"recent_dialogue": self.recent_dialogue.get(key, [])[-12:],
"recent_dialogue": recent_dialogue[-12:],
}
activity_event(self.logger, "agent_process_start", trace_id=trace_id, customer_id=context["customer_id"], acc_id=context["acc_id"], intent=context["intent"])
@@ -390,7 +436,13 @@ class QingjianClient:
try:
await asyncio.sleep(AUTO_QUOTE_WAIT_SECONDS)
if self.pending_images.get(key):
await self._handle_decision(data, "", auto_quote=True)
# 自动报价也走并发控制与客户互斥
async with self.turn_semaphore:
async with self.customer_locks[key]:
await asyncio.wait_for(
self._handle_decision(data, "", auto_quote=True),
timeout=max(10, int(DECISION_TIMEOUT_SECONDS)),
)
finally:
self.auto_quote_tasks.pop(key, None)
@@ -398,13 +450,56 @@ class QingjianClient:
queue = self.pending_msgs.get(key, [])
if not queue:
return
indexed = list(enumerate(queue))
# 只处理当前快照,避免把处理中新增消息一并清掉
snapshot = list(queue)
if not snapshot:
return
indexed = list(enumerate(snapshot))
indexed.sort(key=lambda it: (self._parse_msg_ts(it[1]), it[0]))
ordered = [x for _, x in indexed]
merged = "".join([self._msg_text(x) for x in ordered if self._msg_text(x)])
data = ordered[-1]
self.pending_msgs[key].clear()
await self._handle_decision(data, merged)
# 仅弹出已处理快照,保留处理中到达的新消息
cur = self.pending_msgs.get(key, [])
n = min(len(snapshot), len(cur))
if n > 0:
del cur[:n]
if not cur:
self.pending_msgs.pop(key, None)
async def _run_customer_turn(self, key: str) -> None:
async with self.turn_semaphore:
async with self.customer_locks[key]:
await asyncio.wait_for(
self._flush_customer(key),
timeout=max(10, int(DECISION_TIMEOUT_SECONDS)),
)
def _schedule_customer_turn(self, key: str) -> None:
# 新消息来了就取消同客户旧任务,重新按最新消息计算
old = self.processing_tasks.get(key)
if old and not old.done():
old.cancel()
async def runner() -> None:
try:
await self._run_customer_turn(key)
except asyncio.CancelledError:
activity_event(self.logger, "customer_turn_cancelled", customer_id=key.split(":")[-1], reason="newer_message")
return
except asyncio.TimeoutError:
activity_event(self.logger, "customer_turn_timeout", customer_id=key.split(":")[-1], reason="decision_timeout")
return
except Exception as e:
activity_event(self.logger, "customer_turn_error", customer_id=key.split(":")[-1], reason=str(e))
return
finally:
cur = self.processing_tasks.get(key)
if cur is asyncio.current_task():
self.processing_tasks.pop(key, None)
self.processing_tasks[key] = asyncio.create_task(runner())
async def _debounce_enqueue(self, data: dict) -> None:
key = self._customer_key(data)
@@ -420,7 +515,7 @@ class QingjianClient:
async def later() -> None:
try:
await asyncio.sleep(wait_s)
await self._flush_customer(key)
self._schedule_customer_turn(key)
except asyncio.CancelledError:
return
finally:
@@ -441,6 +536,23 @@ class QingjianClient:
self.logger.info("[收消息] acc=%s from=%s type=%s msg=%s", data.get("acc_id", ""), data.get("from_id", ""), msg_type, msg)
await post_tianwang_callback("message_received", data, extra={"msg_type": msg_type})
# 客户消息全量入库(监听模式也落库)
try:
customer_key = self._customer_key(data)
self.store.append_event(
customer_key,
"customer_message",
{
"acc_id": data.get("acc_id", ""),
"customer_id": data.get("from_id", ""),
"msg_type": msg_type,
"msg": msg,
"raw_msg": data.get("msg", ""),
"timestamp": data.get("timestamp", ""),
},
)
except Exception as e:
self.logger.error("[入库] 客户消息写入失败: %s", e)
if self._is_outbound_echo(data, msg):
activity_event(