From 5210c8a86dc0f1a5d5a3c4b03ec9e36c026cb066 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sat, 28 Feb 2026 19:38:34 +0800 Subject: [PATCH] fix: block dense-text table jobs and prevent duplicate quote races --- core/websocket_client.py | 126 ++++++++++++++++++++++++--------------- image/image_analyzer.py | 1 + 2 files changed, 80 insertions(+), 47 deletions(-) diff --git a/core/websocket_client.py b/core/websocket_client.py index bc876e4..396d144 100755 --- a/core/websocket_client.py +++ b/core/websocket_client.py @@ -100,6 +100,8 @@ class QingjianAPIClient: self._pending_image_tasks: dict = {} self._system_inquiry_rules = self._load_system_inquiry_rules() self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts + self._inbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) + self._outbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) # 延迟加载任务模块(避免循环导入) self.task_scheduler = None @@ -206,6 +208,67 @@ class QingjianAPIClient: task.add_done_callback(_done) + @staticmethod + def _prune_seen(seen: dict, now_mono: float, ttl_sec: float = 8.0): + if len(seen) <= 2000: + return + stale = [k for k, t in seen.items() if (now_mono - t) > ttl_sec] + for k in stale: + seen.pop(k, None) + + def _log_inbound_once(self, data: dict): + """统一记录入站消息,短窗口去重,避免多分支重复写库。""" + try: + cid = data.get("from_id", "") + if not cid: + return + msg = self.to_chinese(data.get("msg", "") or "") + acc_id = data.get("acc_id", "") + mtype = int(data.get("msg_type", 0) or 0) + now_mono = time.monotonic() + sig = f"{acc_id}|{cid}|{mtype}|{msg}" + last = self._inbound_log_seen.get(sig, 0.0) + if (now_mono - last) < 2.0: + return + self._inbound_log_seen[sig] = now_mono + self._prune_seen(self._inbound_log_seen, now_mono, ttl_sec=8.0) + _chat_log( + cid, + msg, + "in", + customer_name=self.to_chinese(data.get("from_name", "") or data.get("cy_name", "")), + acc_id=acc_id, + platform=data.get("acc_type", ""), + msg_type=mtype, + ) + except Exception: + pass + + def _log_outbound_once(self, original_msg: dict, reply_content: str): + """统一记录出站消息,短窗口去重,避免重复写库。""" + try: + cid = original_msg.get("from_id", "") + if not cid or not reply_content: + return + acc_id = original_msg.get("acc_id", "") + now_mono = time.monotonic() + sig = f"{acc_id}|{cid}|{reply_content}" + last = self._outbound_log_seen.get(sig, 0.0) + if (now_mono - last) < 2.0: + return + self._outbound_log_seen[sig] = now_mono + self._prune_seen(self._outbound_log_seen, now_mono, ttl_sec=8.0) + _chat_log( + cid, + reply_content, + "out", + customer_name=self.to_chinese(original_msg.get("from_name", "") or original_msg.get("cy_name", "")), + acc_id=acc_id, + platform=original_msg.get("acc_type", ""), + ) + except Exception: + pass + async def receive_messages(self): """持续接收消息""" try: @@ -272,23 +335,12 @@ class QingjianAPIClient: if not from_id or from_id == 'N/A' or not acc_id or acc_id == 'N/A': print(f"[{self.get_time()}] 空消息跳过(from_id={from_id!r} acc_id={acc_id!r})") return + self._log_inbound_once(data) # Gemini 店铺:不回复,直接跳过 goods_name = self.to_chinese(data.get('goods_name', '') or '') if _get_shop_type(acc_id, goods_name) == "gemini_api": print(f"[{self.get_time()}] Gemini 店铺消息,跳过") - try: - _chat_log( - data.get('from_id', ''), - self.to_chinese(data.get('msg', '')), - "in", - customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')), - acc_id=data.get('acc_id', ''), - platform=data.get('acc_type', ''), - msg_type=data.get('msg_type', 0), - ) - except Exception: - pass try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( @@ -459,7 +511,10 @@ class QingjianAPIClient: def _msg_is_requirement(self, msg: str) -> bool: if not msg: return False - kws = ("要", "抓到", "放到", "合成", "替换", "抠", "修", "高清", "尺寸", "横", "竖", "颜色", "去背景", "排版", "一样", "类似", "同款") + kws = ( + "要", "抓到", "放到", "合成", "替换", "抠", "修", "高清", "尺寸", "横", "竖", "颜色", "去背景", "排版", "一样", "类似", "同款", + "能不能做", "能做吗", "可以做吗", "做不做", "这个能做吗", "这个能不能做", + ) return any(k in msg for k in kws) def _add_pending_images(self, key: str, urls: list, limit: int = 12): @@ -507,6 +562,14 @@ class QingjianAPIClient: from image.image_analyzer import image_analyzer r = await image_analyzer.analyze(url) if isinstance(r, dict) and r.get("success", False): + if r.get("feasibility") == "no" or r.get("risk") == "high": + note = str(r.get("note", "") or "") + if "文字内容过于密集" in note or "密集文字" in note: + reply = "这类文字太密的图我们这边不接单,抱歉哈。你要是简化后再发我可以继续看。" + else: + reply = "这张处理风险比较高,我这边先不直接接,建议转人工评估更稳。" + await self.send_reply(data, reply) + return from config.config import MIN_PRICE_FLOOR p = r.get("price_suggest", 20) floor_dyn = r.get("price_min", MIN_PRICE_FLOOR) @@ -521,17 +584,6 @@ class QingjianAPIClient: else: reply = "这张我看了,先按20元给你做" await self.send_reply(data, reply) - try: - _chat_log( - data.get('from_id', ''), - reply, - "out", - customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')), - acc_id=data.get('acc_id', ''), - platform=data.get('acc_type', '') - ) - except Exception: - pass except Exception: pass @@ -543,15 +595,6 @@ class QingjianAPIClient: _name = self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')) _plat = data.get('acc_type', '') - # 记录客户来消息 - if _cid and msg_text: - try: - _chat_log(_cid, msg_text, "in", customer_name=_name, - acc_id=data.get('acc_id', ''), - platform=_plat, msg_type=data.get('msg_type', 0)) - except Exception: - pass - # 超大尺寸(米制)直接拒单,避免进入报价/处理流程 oversize_reply = self._oversize_reply_if_needed(msg_text) if oversize_reply: @@ -618,11 +661,11 @@ class QingjianAPIClient: if self._msg_is_requirement(msg_text) or self._msg_is_price_inquiry(msg_text): key = self._customer_key(data) if self._pending_images.get(key): - await self.send_reply(data, "稍等,我把刚才那几张一起看下") - await self._flush_pending_images(key, data) old = self._pending_image_tasks.get(key) if old and not old.done(): old.cancel() + await self.send_reply(data, "稍等,我把刚才那几张一起看下") + await self._flush_pending_images(key, data) return if self._msg_is_price_inquiry(msg_text): recent_urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6) @@ -826,17 +869,6 @@ class QingjianAPIClient: else: reply = f"这组{count}张我看了,按{avg_price}元一张;复杂那张{top_price}元,满意再拍" await self.send_reply(data, reply) - try: - _chat_log( - data.get('from_id', ''), - reply, - "out", - customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')), - acc_id=data.get('acc_id', ''), - platform=data.get('acc_type', '') - ) - except Exception: - pass except Exception as e: logger.error(f"多图分析失败: {e}") try: @@ -1361,7 +1393,7 @@ class QingjianAPIClient: "msg_type": 0, "cy_name": customer_name } - + self._log_outbound_once(original_msg, str(reply_content)) await self.send_message(reply) async def send_text(self, cy_id, acc_type, content): diff --git a/image/image_analyzer.py b/image/image_analyzer.py index 9ef16ef..fa5a71e 100755 --- a/image/image_analyzer.py +++ b/image/image_analyzer.py @@ -241,6 +241,7 @@ class ImageAnalyzer: DENSE_TEXT_SUBJECT_KEYWORDS = ( "宣传栏", "公告栏", "展板", "海报墙", "通知栏", "知识栏", "制度牌", "公示栏", "墙报", "密密麻麻", + "表格", "检索表", "配伍表", "药物配伍", "课程表", "流程表", "说明表", "数据表", "word wall", "poster wall", "bulletin board", )