Files
tw2/qingjian_cs/app/client.py
jimi 2c09fcf9e6
Some checks failed
Pre-commit / run (ubuntu-latest) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_en (ubuntu-latest, 3.10) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_zh (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.12) (push) Has been cancelled
fix: prevent outbound echo loops and reduce AgentScope warning noise
2026-03-02 18:59:09 +08:00

251 lines
9.9 KiB
Python

import asyncio
import json
import time
from collections import defaultdict
import websockets
from .callbacks import post_tianwang_callback
from .config import AUTO_QUOTE_WAIT_SECONDS, MESSAGE_DEBOUNCE_SECONDS, QINGJIAN_WS_URI
from .logger import setup_logger
from .observability import activity_event, build_trace_id
from .orchestrator import Orchestrator
from .rules import detect_intent, extract_image_urls, prefilter_message
class QingjianClient:
def __init__(self) -> None:
self.logger = setup_logger()
self.uri = QINGJIAN_WS_URI
self.reply_id = "tb001"
self.websocket = None
self.running = True
self.orchestrator = Orchestrator()
self.pending_msgs: dict[str, list[dict]] = defaultdict(list)
self.debounce_tasks: dict[str, asyncio.Task] = {}
self.pending_images: dict[str, list[str]] = defaultdict(list)
self.auto_quote_tasks: dict[str, asyncio.Task] = {}
self.last_reply_key: dict[str, str] = {}
self.recent_outbound: dict[str, tuple[str, float]] = {}
@staticmethod
def _customer_key(data: dict) -> str:
return f"{data.get('acc_id','')}:{data.get('from_id','')}"
@staticmethod
def _msg_text(data: dict) -> str:
return str(data.get("msg", "") or "").strip()
def _debounce_seconds(self, msg: str) -> float:
intent = detect_intent(msg)
if intent == "image":
return 2.5
if intent in {"pricing", "finish_or_quote_trigger"}:
return 2.0
if intent == "greeting":
return 1.5
return float(MESSAGE_DEBOUNCE_SECONDS)
async def send_message(self, message: dict) -> None:
if not self.websocket:
return
await self.websocket.send(json.dumps(message, ensure_ascii=False))
self.logger.info("[发送] %s", message.get("msg", ""))
async def send_reply(self, data: dict, text: str, trace_id: str = "-") -> None:
text = str(text or "").strip()
if not text:
return
msg = {
"msg_id": "",
"acc_id": data.get("acc_id", ""),
"msg": text,
"from_id": data.get("from_id", ""),
"from_name": data.get("from_name", data.get("from_id", "")),
"cy_id": data.get("from_id", ""),
"acc_type": data.get("acc_type", "AliWorkbench"),
"msg_type": 0,
"cy_name": data.get("from_name", data.get("from_id", "")),
}
activity_event(self.logger, "send_reply_attempt", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text)
await self.send_message(msg)
self.recent_outbound[self._customer_key(data)] = (text, time.monotonic())
activity_event(self.logger, "send_reply_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text)
def _is_outbound_echo(self, data: dict, msg: str) -> bool:
"""
轻简可能会把我方刚发送文本回推为“收到消息”。
对同 customer_key 的“短时间完全相同文本”做回环拦截,避免无限对话。
"""
key = self._customer_key(data)
last = self.recent_outbound.get(key)
if not last:
return False
last_msg, ts = last
if (time.monotonic() - ts) > 120:
return False
return str(msg or "").strip() == last_msg
async def _handle_decision(self, data: dict, merged_msg: str, *, auto_quote: bool = False) -> None:
key = self._customer_key(data)
trace_id = build_trace_id(data.get("acc_id", ""), data.get("from_id", ""), merged_msg)
t0 = time.perf_counter()
urls = extract_image_urls(merged_msg)
if urls:
for u in urls:
if u not in self.pending_images[key]:
self.pending_images[key].append(u)
context = {
"customer_key": key,
"acc_id": data.get("acc_id", ""),
"customer_id": data.get("from_id", ""),
"goods_name": data.get("goods_name", ""),
"goods_order": data.get("goods_order", ""),
"msg": merged_msg,
"intent": detect_intent(merged_msg),
"pending_images": len(self.pending_images[key]),
"auto_quote_trigger": auto_quote,
"last_reply": self.last_reply_key.get(key, ""),
}
activity_event(self.logger, "agent_process_start", trace_id=trace_id, customer_id=context["customer_id"], acc_id=context["acc_id"], intent=context["intent"])
route, decision, state = await self.orchestrator.decide(context)
latency_ms = int((time.perf_counter() - t0) * 1000)
activity_event(
self.logger,
"agent_process_done",
trace_id=trace_id,
customer_id=context["customer_id"],
route=route,
action=decision.action,
reason=decision.reason,
latency_ms=latency_ms,
after_sales_stage=state.get("after_sales_stage", "new"),
)
if decision.action == "transfer":
text = decision.transfer_msg or "我给你转接人工处理哈。"
await self.send_reply(data, text, trace_id=trace_id)
self.last_reply_key[key] = text
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": text})
return
if decision.action == "quote":
if decision.reply:
await self.send_reply(data, decision.reply, trace_id=trace_id)
quote_text = "图我收齐了,我这边看完马上给你报具体价格。"
if self.last_reply_key.get(key) != quote_text:
await self.send_reply(data, quote_text, trace_id=trace_id)
self.last_reply_key[key] = quote_text
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "quote", "reply": quote_text})
return
if decision.action == "noop":
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "noop", "reply": ""})
return
text = decision.reply or "收到,我先看一下哈,稍等哈。"
if self.last_reply_key.get(key) != text:
await self.send_reply(data, text, trace_id=trace_id)
self.last_reply_key[key] = text
await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "reply", "reply": text})
if self.pending_images[key] and key not in self.auto_quote_tasks:
self.auto_quote_tasks[key] = asyncio.create_task(self._auto_quote_later(data))
async def _auto_quote_later(self, data: dict) -> None:
key = self._customer_key(data)
try:
await asyncio.sleep(AUTO_QUOTE_WAIT_SECONDS)
if self.pending_images.get(key):
await self._handle_decision(data, "发完了,报价吧", auto_quote=True)
finally:
self.auto_quote_tasks.pop(key, None)
async def _flush_customer(self, key: str) -> None:
queue = self.pending_msgs.get(key, [])
if not queue:
return
merged = "".join([self._msg_text(x) for x in queue if self._msg_text(x)])
data = queue[-1]
self.pending_msgs[key].clear()
await self._handle_decision(data, merged)
async def _debounce_enqueue(self, data: dict) -> None:
key = self._customer_key(data)
msg = self._msg_text(data)
self.pending_msgs[key].append(data)
if key in self.debounce_tasks:
self.debounce_tasks[key].cancel()
wait_s = self._debounce_seconds(msg)
activity_event(self.logger, "debounce_enqueue", customer_id=data.get("from_id", "-"), key=key, queue_size=len(self.pending_msgs[key]), wait_s=wait_s)
async def later() -> None:
try:
await asyncio.sleep(wait_s)
await self._flush_customer(key)
except asyncio.CancelledError:
return
finally:
self.debounce_tasks.pop(key, None)
self.debounce_tasks[key] = asyncio.create_task(later())
async def _on_message(self, raw: str) -> None:
try:
data = json.loads(raw)
except Exception:
self.logger.info("[非JSON] %s", raw)
return
msg_type = int(data.get("msg_type", 0) or 0)
msg = self._msg_text(data)
rule = prefilter_message(msg, msg_type)
self.logger.info("[收消息] acc=%s from=%s type=%s msg=%s", data.get("acc_id", ""), data.get("from_id", ""), msg_type, msg)
await post_tianwang_callback("message_received", data, extra={"msg_type": msg_type})
if self._is_outbound_echo(data, msg):
activity_event(
self.logger,
"inbound_ignored",
customer_id=data.get("from_id", "-"),
reason="outbound_echo_loop_guard",
)
return
if rule.ignore:
activity_event(self.logger, "inbound_ignored", customer_id=data.get("from_id", "-"), reason=rule.reason)
return
patched = dict(data)
patched["msg"] = rule.normalized_msg or msg
if msg_type == 1:
await self._handle_decision(patched, patched["msg"])
return
await self._debounce_enqueue(patched)
async def _serve(self) -> None:
while self.running:
try:
self.logger.info("[连接] %s", self.uri)
async with websockets.connect(self.uri) as ws:
self.websocket = ws
self.logger.info("[连接成功]")
async for raw in ws:
await self._on_message(raw)
except Exception as e:
self.logger.info("[连接异常] %s", e)
await asyncio.sleep(3)
def run(self) -> None:
asyncio.run(self._serve())