From 5a73aa34d2c2225183c1284a2d6135d146341ce3 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sat, 28 Feb 2026 22:54:00 +0800 Subject: [PATCH] feat: adaptive debounce and intent-driven quote trigger tuning --- core/pydantic_ai_agent.py | 116 ++++++++++++++++++++++++++---- core/websocket_client.py | 89 ++++++++++++++++++++++- tests/test_regression_pipeline.py | 52 ++++++++++++++ 3 files changed, 239 insertions(+), 18 deletions(-) diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index c9f3d2b..18bc574 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -1487,7 +1487,11 @@ class CustomerServiceAgent: state.image_count = len(state.pending_image_urls) self._sync_pending_quote_state(message.from_id, state) - if self._is_batch_finish_signal(customer_text): + if self._is_batch_finish_intent( + text=customer_text, + state=state, + has_incoming_urls=bool(incoming_urls), + ): quote_res = await self._quote_pending_images(state, message) reply_text = quote_res.get("reply", "") need_transfer = bool(quote_res.get("need_transfer")) @@ -1509,7 +1513,11 @@ class CustomerServiceAgent: if text_without_urls: self._append_requirement(state, text_without_urls) self._sync_pending_quote_state(message.from_id, state) - if self._is_batch_finish_signal(customer_text): + if self._is_batch_finish_intent( + text=customer_text, + state=state, + has_incoming_urls=False, + ): quote_res = await self._quote_pending_images(state, message) reply_text = quote_res.get("reply", "") need_transfer = bool(quote_res.get("need_transfer")) @@ -2057,23 +2065,86 @@ class CustomerServiceAgent: ] return any(k in text for k in finish_keywords) + def _is_batch_finish_intent( + self, + text: str, + state: ConversationState, + has_incoming_urls: bool, + ) -> bool: + """ + 语义结束识别: + - 显式口令:发完了/统一报价 + - 隐式意图:询价/砍价 + - 单图需求明确:如“这个门头上面的字做一下”可直接进入报价 + """ + if not text: + return False + if self._is_batch_finish_signal(text): + return True + if has_incoming_urls: + return False + if not state.pending_image_urls: + return False + + # 意图识别:询价/砍价通常意味着“可以报价了” + try: + from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords + intent = detect_intent_embedding(text) or detect_intent_keywords(text) + except Exception: + intent = "" + if intent in ("询价", "砍价"): + return True + + msg = (text or "").strip() + if not msg: + return False + # 单图场景:客户给出明确加工指令,可直接报价 + single_image_action_kw = ( + "做一下", "改一下", "处理一下", "就这张", "按这个做", "照这个做", + "这个门头", "上面的字", "这个字", "这个图做", "能做吗", + ) + multi_image_finish_kw = ( + "就这些", "就这几张", "按这几张", "这几张一起做", "一起做一下", + "先按这些", "先按这几张", "直接报价", "现在报价", "看下报价", + "先报个总价", "总价多少", "一起多少钱", "先做这几张", + ) + hold_kw = ("还有", "再发", "先等", "稍后", "等会", "回头") + if len(state.pending_image_urls) == 1: + if any(k in msg for k in single_image_action_kw) and not any(k in msg for k in hold_kw): + return True + elif len(state.pending_image_urls) >= 2: + if any(k in msg for k in multi_image_finish_kw) and not any(k in msg for k in hold_kw): + return True + + return False + def _build_collect_ack(self, count: int) -> str: if count <= 1: - return "收到这张图了,你还有图就继续发;发完我再一起给你报价。" + one_templates = [ + "这张收到啦,还有图就继续发,我一起给你看。", + "图我看到了,后面还有就接着发,最后我一口价给你。", + "收到这张了,你有其他图也发来,我统一帮你算。", + ] + return random.choice(one_templates) templates = [ - "收到,这边先记下了(已收{n}张)。你继续发,等你发完我再一起给你打包报价。", - "好的,当前这批先收到了(第{n}张)。还有图就继续发,发齐我一次性给你总价。", - "没问题,已记录到第{n}张。你把需求和图片都发完,我统一给你报更合适的价格。", + "这几张我都收到了(现在{n}张)。还有的话继续发,我一起给你报。", + "好嘞,先看到{n}张了。你可以继续发,或者直接说“就这些”我现在就报价。", + "收到哈(共{n}张)。你还要补图就继续发,不补的话我现在也可以直接给价。", ] return random.choice(templates).format(n=count) def _build_collect_remind(self, count: int) -> str: if count <= 1: - return "需求我记下了。你如果还有图继续发,发完回我“发完了”,我给你报价。" + one_templates = [ + "这个要求我记住了。你还有图就继续发,不补图我就按这张给你报价。", + "明白,这个需求我加上了。你继续发图也行,想直接报价也可以。", + "收到,我按你这个要求做。你要是就这张,我现在就能给你报价。", + ] + return random.choice(one_templates) templates = [ - "需求我记下了(当前共{n}张图)。你继续发齐,发完回我“发完了”,我一次性给你总价。", - "好的,这条需求也加上了(现在{n}张)。等你说发完,我立刻统一报价。", - "收到,这个要求我也记住了(共{n}张)。你发完我就给你打包价。", + "需求我记下了(当前{n}张)。你继续补图,或者直接说“就这些”我现在报价。", + "好,这个要求也加上了(现在{n}张)。不再补图的话我立刻给你打包价。", + "收到(共{n}张)。你还发就继续,不发的话我现在就给总价。", ] return random.choice(templates).format(n=count) @@ -2149,19 +2220,34 @@ class CustomerServiceAgent: # 单图时不要使用“分图/这批/A-B方案”措辞,避免客户误解为多图。 if len(results) == 1: line = detail_lines[0].replace("图1:", "这张:") - lines = [f"给你报下这张:{line.split(':', 1)[1]}"] + heads = [ + "这张我看过了,先给你报下:", + "这张可以做,价格给你报下:", + "看了这张图,报价如下:", + ] + lines = [f"{random.choice(heads)}{line.split(':', 1)[1]}"] if req_hit: lines.append(f"按你的需求另加{extra}元({req_hit})。") - lines.append(f"这张做下来共{single_total}元,可以的话我马上安排。") + tails = [ + f"这张做下来共{single_total}元,定了我马上开工。", + f"合下来是{single_total}元,你点头我这边立刻安排。", + f"总价{single_total}元,可以的话我现在就给你做。", + ] + lines.append(random.choice(tails)) return "\n".join(lines) - lines = ["先给你分图报下:"] + heads = [ + "我先按这几张给你报一下:", + "这几张我都看过了,价格给你列一下:", + "我把每张价格先给你说清楚:", + ] + lines = [random.choice(heads)] lines.extend(detail_lines) if req_hit: lines.append(f"需求加价:+{extra}元({req_hit})") - option_line = f"可选:A按单张做共{single_total}元;B打包一起做{bundle_price}元(更划算)。" + option_line = f"可选:按单张做(共{single_total}元),或打包做({bundle_price}元,会更省一点)。" lines.append(option_line) - lines.append("你定一个方案,我这边马上安排。") + lines.append("你定一个,我这边马上开工。") return "\n".join(lines) def _prepare_batch_intake(self, state: ConversationState) -> Dict[str, Any]: diff --git a/core/websocket_client.py b/core/websocket_client.py index 2a067ed..60c3417 100755 --- a/core/websocket_client.py +++ b/core/websocket_client.py @@ -4,6 +4,7 @@ import json import re import logging import random +import secrets import time import hashlib from collections import deque @@ -88,6 +89,7 @@ class QingjianAPIClient: # 消息防抖:同一客户连续发消息时,等待 N 秒后合并处理 self._DEBOUNCE_SECONDS = MESSAGE_DEBOUNCE_SECONDS if isinstance(MESSAGE_DEBOUNCE_SECONDS, int) else 8 + self._adaptive_debounce_enabled = os.getenv("ADAPTIVE_DEBOUNCE_ENABLED", "true").lower() in ("1", "true", "yes") self._debounce_tasks: dict = {} # customer_key -> asyncio.Task self._pending_msgs: dict = {} # customer_key -> list[data] self._image_enabled = IMAGE_MODULE_ENABLED @@ -436,9 +438,11 @@ class QingjianAPIClient: if old_task and not old_task.done(): old_task.cancel() + debounce_seconds = self._pick_debounce_seconds(data, msg_body) + # 创建新的延迟处理任务 - async def _delayed(capture_key, capture_data): - await asyncio.sleep(self._DEBOUNCE_SECONDS) + async def _delayed(capture_key, capture_data, wait_s: float): + await asyncio.sleep(wait_s) msgs = self._pending_msgs.pop(capture_key, []) if not msgs: return @@ -451,9 +455,88 @@ class QingjianAPIClient: merged_data['msg'] = merged_msg await self._agent_reply_serialized(merged_data) - task = asyncio.create_task(_delayed(key, data)) + task = asyncio.create_task(_delayed(key, data, debounce_seconds)) self._debounce_tasks[key] = task + @staticmethod + def _rand_between(low: float, high: float) -> float: + if high <= low: + return float(low) + # 使用 secrets 增强随机性,避免固定周期导致机械感 + span = high - low + return round(low + span * (secrets.randbelow(1000) / 1000.0), 2) + + def _guess_intent_for_debounce(self, msg: str) -> str: + text = (msg or "").strip() + if not text: + return "unknown" + if self._msg_has_image_url(text): + return "image" + try: + from utils.intent_analyzer import detect_intent_keywords + intent = detect_intent_keywords(text) + except Exception: + intent = "" + if intent: + return intent + lower = text.lower() + if any(k in lower for k in ["报价", "多少钱", "价格", "贵", "优惠"]): + return "询价" + if any(k in lower for k in ["做一下", "改一下", "需求", "门头", "上面的字", "处理"]): + return "修改" + if any(k in lower for k in ["在吗", "你好", "有人"]): + return "打招呼" + return "unknown" + + @staticmethod + def _looks_like_requirement_text(msg: str) -> bool: + text = (msg or "").strip().lower() + if not text: + return False + req_kw = ( + "做一下", "改一下", "处理一下", "这个字", "上面的字", "门头", "去背景", "抠图", + "换色", "调色", "清晰", "高清", "尺寸", "比例", "横版", "竖版", "排版", "改字", + "按这个做", "照这个做", "就这张", "看看做", "弄一下", + ) + return any(k in text for k in req_kw) + + def _pick_debounce_seconds(self, data: dict, msg: str) -> float: + """意图驱动防抖:不同意图不同等待区间,并引入轻微随机。""" + base = max(1.0, float(self._DEBOUNCE_SECONDS)) + if not self._adaptive_debounce_enabled: + return base + + intent = self._guess_intent_for_debounce(msg) + is_req = self._looks_like_requirement_text(msg) + has_img = self._msg_has_image_url(msg) + # 区间策略:越明确、越短消息,等待越短;需求描述类稍长 + if intent == "打招呼": + low, high = 1.0, min(3.0, base) + elif intent in ("询价", "砍价"): + low, high = 2.0, min(5.0, base) + elif intent in ("修改", "批量"): + low, high = max(3.0, base * 0.65), min(18.0, base + 2.0) + elif intent == "转接": + low, high = 1.0, 2.5 + else: + low, high = max(2.0, base * 0.5), base + + # 发图后的需求描述,优先“多等一点”收集完整需求,减少半句回复 + if is_req and not has_img: + low = max(low, min(10.0, base * 0.8)) + high = max(high, min(20.0, base + 4.0)) + + # 短句更快,长句稍慢,避免把连续半句拆开 + text_len = len((msg or "").strip()) + if text_len <= 4: + high = min(high, max(low + 0.2, 2.5)) + elif text_len >= 18: + low = min(high, low + 0.6) + + wait_s = self._rand_between(low, high) + logger.info(f"防抖等待 {wait_s}s | intent={intent} | len={text_len}") + return wait_s + def _msg_has_image_url(self, msg: str) -> bool: """判断文本消息里是否包含图片URL(客户粘贴了图片链接,可能带前缀文字如 有吗#*#https://...)""" if not msg: diff --git a/tests/test_regression_pipeline.py b/tests/test_regression_pipeline.py index faf1ef9..14dad9f 100644 --- a/tests/test_regression_pipeline.py +++ b/tests/test_regression_pipeline.py @@ -61,6 +61,58 @@ class RegressionPipelineTest(unittest.IsolatedAsyncioTestCase): self.assertIn("15", resp.reply) agent._quote_pending_images.assert_awaited() + async def test_single_image_requirement_intent_triggers_quote(self): + agent = CustomerServiceAgent() + st = agent._get_conversation_state(self.customer_id) + st.pending_image_urls = ["https://img.alicdn.com/a.jpg"] + st.pending_requirements = [] + agent._sync_pending_quote_state(self.customer_id, st) + agent._quote_pending_images = AsyncMock(return_value={"reply": "这张20元,定了我马上做", "need_transfer": False}) + + msg = CustomerMessage( + msg_id="m3", + acc_id="test_shop", + msg="这个门头上面的字做一下", + from_id=self.customer_id, + from_name="t", + cy_id=self.customer_id, + acc_type="AliWorkbench", + msg_type=0, + cy_name="t", + goods_name="专业找图", + goods_order="", + ) + resp = await agent.process_message(msg) + self.assertTrue(resp.should_reply) + self.assertIn("20", resp.reply) + agent._quote_pending_images.assert_awaited() + + async def test_multi_image_finish_intent_triggers_quote(self): + agent = CustomerServiceAgent() + st = agent._get_conversation_state(self.customer_id) + st.pending_image_urls = ["https://img.alicdn.com/a.jpg", "https://img.alicdn.com/b.jpg"] + st.pending_requirements = ["改字"] + agent._sync_pending_quote_state(self.customer_id, st) + agent._quote_pending_images = AsyncMock(return_value={"reply": "两张打包45,定了我就开做", "need_transfer": False}) + + msg = CustomerMessage( + msg_id="m4", + acc_id="test_shop", + msg="就这几张,先按这些报个价", + from_id=self.customer_id, + from_name="t", + cy_id=self.customer_id, + acc_type="AliWorkbench", + msg_type=0, + cy_name="t", + goods_name="专业找图", + goods_order="", + ) + resp = await agent.process_message(msg) + self.assertTrue(resp.should_reply) + self.assertIn("45", resp.reply) + agent._quote_pending_images.assert_awaited() + async def test_pending_state_restore(self): db.update_pending_quote_state(self.customer_id, ["u1", "u2"], ["r1"]) agent = CustomerServiceAgent()