diff --git a/core/websocket_client.py b/core/websocket_client.py index 171ce77..1f3c2e9 100755 --- a/core/websocket_client.py +++ b/core/websocket_client.py @@ -168,6 +168,8 @@ class QingjianAPIClient: self._legacy_fast_quote_enabled = os.getenv("LEGACY_FAST_IMAGE_QUOTE", "false").lower() in ("1", "true", "yes") self._system_inquiry_rules = self._load_system_inquiry_rules() self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts + self._outbound_semantic_seen: dict = {} # customer_key -> {semantic_key: ts} + self._outbound_class_seen: dict = {} # customer_key -> {reply_class: ts} self._inbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) self._outbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) self._tianwang_callback_url = ( @@ -399,6 +401,96 @@ class QingjianAPIClient: except Exception: pass + @staticmethod + def _normalize_reply_semantic_key(text: str) -> str: + """把回复归一化为语义键,用于去重。""" + s = (text or "").strip().lower() + if not s: + return "" + for w in ("哈", "呀", "哦", "呢", "啦", "咯", "亲"): + s = s.replace(w, "") + s = re.sub(r"[,。!?、,.!?::;\s~\-—_]+", "", s) + return s[:200] + + @staticmethod + def _classify_outbound_reply(text: str) -> str: + s = (text or "").strip() + if not s: + return "empty" + if any(k in s for k in ("报价", "总价", "多少钱", "多少", "马上给你报价", "先给你报")): + return "quote" + if any(k in s for k in ("继续发图", "发完", "发图", "把图发", "先看图")): + return "collect" + if any(k in s for k in ("在吗", "你好", "在的", "在呢")): + return "greeting" + if any(k in s for k in ("转人工", "转接", "转给")): + return "transfer" + if any(k in s for k in ("稍等", "我先看", "看一下", "看下")): + return "ack" + return "general" + + def _outbound_arbiter(self, original_msg: dict, reply_content: str, trace_id: str) -> tuple[bool, str]: + """ + 统一出站裁决层: + 1) 语义去重(相同语义短窗口不重复); + 2) 同类回复节流(同类话术短窗口不重复)。 + """ + key = f"{original_msg.get('acc_id', '')}:{original_msg.get('from_id', '')}" + now_mono = time.monotonic() + sem_key = self._normalize_reply_semantic_key(reply_content) + reply_class = self._classify_outbound_reply(reply_content) + try: + sem_window = max(30, int(os.getenv("AI_OUTBOUND_SEMANTIC_DEDUPE_SECONDS", "180"))) + except Exception: + sem_window = 180 + try: + class_window = max(20, int(os.getenv("AI_OUTBOUND_CLASS_DEDUPE_SECONDS", "90"))) + except Exception: + class_window = 90 + + sem_bucket = self._outbound_semantic_seen.setdefault(key, {}) + cls_bucket = self._outbound_class_seen.setdefault(key, {}) + self._prune_seen(sem_bucket, now_mono, ttl_sec=max(sem_window * 2, 240)) + self._prune_seen(cls_bucket, now_mono, ttl_sec=max(class_window * 2, 180)) + + if sem_key and (now_mono - sem_bucket.get(sem_key, 0.0)) < sem_window: + self._activity_log( + "outbound_arbiter_block", + trace_id=trace_id, + acc_id=original_msg.get("acc_id", ""), + customer_id=original_msg.get("from_id", ""), + reason="semantic_duplicate", + semantic_key=sem_key[:80], + reply_class=reply_class, + msg=reply_content, + ) + return False, "semantic_duplicate" + + if reply_class in {"quote", "collect", "ack"} and (now_mono - cls_bucket.get(reply_class, 0.0)) < class_window: + self._activity_log( + "outbound_arbiter_block", + trace_id=trace_id, + acc_id=original_msg.get("acc_id", ""), + customer_id=original_msg.get("from_id", ""), + reason="class_duplicate", + reply_class=reply_class, + msg=reply_content, + ) + return False, "class_duplicate" + + if sem_key: + sem_bucket[sem_key] = now_mono + cls_bucket[reply_class] = now_mono + self._activity_log( + "outbound_arbiter_pass", + trace_id=trace_id, + acc_id=original_msg.get("acc_id", ""), + customer_id=original_msg.get("from_id", ""), + reply_class=reply_class, + semantic_key=sem_key[:80] if sem_key else "", + ) + return True, "pass" + async def receive_messages(self): """持续接收消息""" try: @@ -1935,6 +2027,14 @@ class QingjianAPIClient: logger.info(f"回复被AI质检拦截: {guard_reason}") return reply_content = checked_reply or str(reply_content) + pass_send, arbiter_reason = self._outbound_arbiter( + original_msg=original_msg, + reply_content=reply_content, + trace_id=trace_id, + ) + if not pass_send: + logger.info(f"回复被统一裁决层拦截: {arbiter_reason}") + return reply = { "msg_id": "",