From 9d0276be410bb9108ec064ce3b677f780e6971d8 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Mon, 2 Mar 2026 11:09:26 +0800 Subject: [PATCH] feat: enforce full AI outbound generation and reduce template replies --- core/agent_pre_rules.py | 5 +- core/ai_reply_flow.py | 60 +++++- core/pydantic_ai_agent.py | 58 ++++-- core/websocket_client.py | 301 +++++++++++++++++++++++++++++- tests/test_regression_pipeline.py | 25 +++ 5 files changed, 415 insertions(+), 34 deletions(-) diff --git a/core/agent_pre_rules.py b/core/agent_pre_rules.py index 1231f03..7db22a5 100644 --- a/core/agent_pre_rules.py +++ b/core/agent_pre_rules.py @@ -70,10 +70,9 @@ class AgentPreRuleService: ) async def _rule_pred_meaningless_short_text(self, ctx: RuleContext) -> bool: - from core.pydantic_ai_agent import _is_meaningless_short_text - message = ctx.get("message") - return _is_meaningless_short_text(message.msg) + state = ctx.get("state") + return self.agent._should_handle_as_meaningless_short_text(state, message.msg) async def _rule_act_meaningless_short_text(self, ctx: RuleContext) -> RuleResult: from core.pydantic_ai_agent import AgentResponse diff --git a/core/ai_reply_flow.py b/core/ai_reply_flow.py index 5f031c1..71f5f7e 100644 --- a/core/ai_reply_flow.py +++ b/core/ai_reply_flow.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Optional, Tuple from core.post_ops import negotiation_strategy_reply logger = logging.getLogger("cs_agent") @@ -10,7 +10,43 @@ if TYPE_CHECKING: from core.pydantic_ai_agent import AgentDeps, ConversationState, CustomerMessage, CustomerServiceAgent -def select_target_agent(agent: "CustomerServiceAgent", message: "CustomerMessage", state: "ConversationState"): +def _select_agent_by_intent( + agent: "CustomerServiceAgent", + message: "CustomerMessage", + state: "ConversationState", +) -> Tuple[Optional[Any], str]: + """ + AI 意图优先路由;识别不到时返回 (None, "intent:none"),由关键词兜底。 + """ + try: + from utils.intent_analyzer import detect_intent + + decision = detect_intent(message.msg or "") + intent = (decision.intent or "").strip() + source = decision.source or "none" + score = float(decision.score or 0.0) + except Exception: + intent, source, score = "", "error", 0.0 + + if not intent: + return None, "intent:none" + + if intent in ("询价", "砍价"): + return agent.agent_pricing, f"intent:{intent}|src:{source}|score:{score:.3f}" + if intent in ("修改", "加急"): + return agent.agent_processing, f"intent:{intent}|src:{source}|score:{score:.3f}" + if intent == "售后": + return agent.agent_after_sale, f"intent:{intent}|src:{source}|score:{score:.3f}" + if intent == "转接": + return agent.agent_after_sale, f"intent:{intent}|src:{source}|score:{score:.3f}" + if intent in ("打招呼", "批量", "发图"): + target = agent.agent_after_sale if state.stage == "售后" else agent.agent + return target, f"intent:{intent}|src:{source}|score:{score:.3f}" + + return None, f"intent:unmapped:{intent}|src:{source}|score:{score:.3f}" + + +def select_target_agent(agent: "CustomerServiceAgent", message: "CustomerMessage", state: "ConversationState") -> Tuple[Any, str]: msg_lower = message.msg.lower() pricing_kw = ["多少钱", "多少一张", "报价", "给个价", "几块", "价位", "能便宜点吗"] processing_kw = ["安排", "处理一下", "开始做", "做一下", "尽快", "加急", "付款了", "已付款"] @@ -45,18 +81,23 @@ def select_target_agent(agent: "CustomerServiceAgent", message: "CustomerMessage "卫星地图", ] target_agent = agent.agent_after_sale if state.stage == "售后" else agent.agent + + ai_target, ai_reason = _select_agent_by_intent(agent, message, state) + if ai_target is not None: + return ai_target, ai_reason + risk_hit = any(k in msg_lower for k in risk_kw) or agent._is_political_inquiry(message.msg) or agent._is_map_inquiry(message.msg) if risk_hit: - return agent.agent_risk + return agent.agent_risk, "keyword:risk" if any(k in message.msg for k in order_markers): - return agent.agent_order + return agent.agent_order, "keyword:order" if any(k in msg_lower for k in processing_kw): - return agent.agent_processing + return agent.agent_processing, "keyword:processing" if any(k in msg_lower for k in pricing_kw): - return agent.agent_pricing + return agent.agent_pricing, "keyword:pricing" if any(k in msg_lower for k in similar_kw): - return agent.agent_similar - return target_agent + return agent.agent_similar, "keyword:similar" + return target_agent, "fallback:default" async def execute_ai_turn( @@ -68,7 +109,8 @@ async def execute_ai_turn( deps: "AgentDeps", history: list, ) -> str: - target_agent = select_target_agent(agent, message, state) + target_agent, route_reason = select_target_agent(agent, message, state) + logger.info("[路由] %s", route_reason) result = await target_agent.run(user_prompt, deps=deps, message_history=history) agent.message_histories[message.from_id] = result.all_messages()[-30:] reply_text = agent._colloquialize_reply(agent._normalize_reply_text(result.output)) diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 9d10ea2..2ec899a 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -490,23 +490,9 @@ class CustomerServiceAgent: """ 收图阶段回复默认走 AI 改写,失败时回退到固定模板。 """ - # 首张收图先承接“我看一下”,避免机械地立刻催“发完统一报价”。 + first_image_ack = "收到,我先看一下哈,稍等哈。" if scene == "collect_ack" and len(state.pending_image_urls) == 1: - first_ack = [ - "收到了,我先看一下哈,稍等哈", - "这张我收到了,我先看下,稍等我一下哈", - "收到这张了,我先过一眼,稍等哈", - "我先看这张哈,稍等我一下", - "图我收到了,我先看一眼,稍等我回你哈", - "这张先记上了,我先看下细节,稍等哈", - "收到哈,我先过一遍这张,稍等我会儿", - "我先看这张效果,稍等一下哈", - "图到了,我先看下清晰度,稍等哈", - "这张我先看着,稍等我一下就回你", - "收到这张了,我先核一下细节,稍等哈", - "我先把这张看完,稍等我一会儿哈", - ] - return random.choice(first_ack) + fallback = first_image_ack if not self.dynamic_collection_replies: return fallback try: @@ -525,7 +511,7 @@ class CustomerServiceAgent: f"客户原话: {message.msg}\n" f"当前已收图片数: {len(state.pending_image_urls)}\n" f"当前需求摘要: {pending_req}\n" - "输出要求: 不超过2句话,像真人店主聊天。" + "输出要求: 不超过2句话,像真人店主聊天;避免复用固定模板句。" ) result = await self.agent_natural_reply.run(user_prompt, deps=deps, message_history=history) self.message_histories[message.from_id] = result.all_messages()[-30:] @@ -695,6 +681,44 @@ class CustomerServiceAgent: clean = msg.strip().rstrip("!!??。.~~") return clean in self._COOLDOWN_PATTERNS + def _should_handle_as_meaningless_short_text(self, state: ConversationState, msg: str) -> bool: + """ + 无意义短句仅在“非业务处理中”生效,避免误拦截真实推进消息。 + 例如:已在收图/待报价阶段时,客户发“好的/在吗”不应直接 ping。 + """ + customer_text, _ = self._split_customer_text(msg or "") + text = (customer_text or "").strip() + if not _is_meaningless_short_text(text): + return False + if self._extract_image_urls(text): + return False + if (getattr(state, "pending_image_urls", None) or []): + return False + if getattr(state, "quote_phase", "idle") in {"collecting", "ready_to_quote", "waiting_result"}: + return False + return True + + async def build_auto_quote_reply(self, state: ConversationState, message: CustomerMessage) -> AgentResponse: + """ + 自动报价内部入口:不走 process_message,避免伪造客户语句污染上下文。 + """ + quote_res = await self._quote_pending_images(state, message) + reply_text = self._colloquialize_reply(quote_res.get("reply", "")) + reply_text = await self._rewrite_reply_with_ai( + message=message, + state=state, + reply=reply_text, + scene="batch_quote_reply", + ) + need_transfer = bool(quote_res.get("need_transfer")) + state.last_reply_at = datetime.now() + return AgentResponse( + reply=reply_text, + should_reply=not need_transfer, + need_transfer=need_transfer, + transfer_msg=TRANSFER_MESSAGE if need_transfer else "", + ) + async def process_message(self, message: CustomerMessage) -> AgentResponse: """处理客户消息并生成回复。""" return await process_incoming_message(self, message) diff --git a/core/websocket_client.py b/core/websocket_client.py index 1f3c2e9..b00867a 100755 --- a/core/websocket_client.py +++ b/core/websocket_client.py @@ -8,7 +8,7 @@ import secrets import time import hashlib from collections import deque -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Dict, Any, List from utils.observability import emit_activity, build_trace_id @@ -170,6 +170,8 @@ class QingjianAPIClient: self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts self._outbound_semantic_seen: dict = {} # customer_key -> {semantic_key: ts} self._outbound_class_seen: dict = {} # customer_key -> {reply_class: ts} + self._outbound_template_seen: dict = {} # customer_key -> {template_family: ts} + self._unreplied_followup_sent: dict = {} # customer_key -> monotonic ts(补偿消息节流) self._inbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) self._outbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) self._tianwang_callback_url = ( @@ -179,6 +181,7 @@ class QingjianAPIClient: self._tianwang_agent_name = os.getenv("TIANWANG_AGENT_NAME", "终结者").strip() or "终结者" self._reply_guard_enabled = os.getenv("AI_REPLY_GUARD_ENABLED", "true").lower() in ("1", "true", "yes") self._reply_guard_verbose = os.getenv("AI_REPLY_GUARD_VERBOSE", "false").lower() in ("1", "true", "yes") + self._force_ai_generate_reply = os.getenv("FORCE_AI_GENERATE_ALL_REPLIES", "true").lower() in ("1", "true", "yes") # 延迟加载任务模块(避免循环导入) self.task_scheduler = None @@ -429,6 +432,22 @@ class QingjianAPIClient: return "ack" return "general" + @staticmethod + def _template_family(reply: str) -> str: + """识别高频模板家族,做疲劳抑制。""" + s = (reply or "").strip() + if not s: + return "" + if "需求我记上了" in s and "继续发图" in s: + return "collect_remind" + if ("这批图过一遍" in s or "收齐了" in s or "收好了" in s) and ("总价" in s or "报价" in s): + return "quote_defer" + if "图片收到了" in s and "继续发" in s: + return "collect_ack" + if "好嘞,你稍等下,我这边看一下" in s: + return "fallback_ack" + return "" + def _outbound_arbiter(self, original_msg: dict, reply_content: str, trace_id: str) -> tuple[bool, str]: """ 统一出站裁决层: @@ -447,11 +466,17 @@ class QingjianAPIClient: class_window = max(20, int(os.getenv("AI_OUTBOUND_CLASS_DEDUPE_SECONDS", "90"))) except Exception: class_window = 90 + try: + template_window = max(120, int(os.getenv("AI_OUTBOUND_TEMPLATE_FATIGUE_SECONDS", "600"))) + except Exception: + template_window = 600 sem_bucket = self._outbound_semantic_seen.setdefault(key, {}) cls_bucket = self._outbound_class_seen.setdefault(key, {}) + tpl_bucket = self._outbound_template_seen.setdefault(key, {}) self._prune_seen(sem_bucket, now_mono, ttl_sec=max(sem_window * 2, 240)) self._prune_seen(cls_bucket, now_mono, ttl_sec=max(class_window * 2, 180)) + self._prune_seen(tpl_bucket, now_mono, ttl_sec=max(template_window * 2, 1200)) if sem_key and (now_mono - sem_bucket.get(sem_key, 0.0)) < sem_window: self._activity_log( @@ -466,6 +491,19 @@ class QingjianAPIClient: ) return False, "semantic_duplicate" + template_family = self._template_family(reply_content) + if template_family and (now_mono - tpl_bucket.get(template_family, 0.0)) < template_window: + self._activity_log( + "outbound_arbiter_block", + trace_id=trace_id, + acc_id=original_msg.get("acc_id", ""), + customer_id=original_msg.get("from_id", ""), + reason="template_fatigue", + template_family=template_family, + msg=reply_content, + ) + return False, "template_fatigue" + if reply_class in {"quote", "collect", "ack"} and (now_mono - cls_bucket.get(reply_class, 0.0)) < class_window: self._activity_log( "outbound_arbiter_block", @@ -481,16 +519,199 @@ class QingjianAPIClient: if sem_key: sem_bucket[sem_key] = now_mono cls_bucket[reply_class] = now_mono + if template_family: + tpl_bucket[template_family] = now_mono self._activity_log( "outbound_arbiter_pass", trace_id=trace_id, acc_id=original_msg.get("acc_id", ""), customer_id=original_msg.get("from_id", ""), reply_class=reply_class, + template_family=template_family, semantic_key=sem_key[:80] if sem_key else "", ) return True, "pass" + async def _unreplied_followup_loop(self): + """ + 定时补偿:对“最后一条是客户消息且长时间未回复”的会话,补发一次自然跟进。 + """ + if not self.enable_agent or not self.agent: + return + while self.running: + try: + await asyncio.sleep(max(30, int(os.getenv("UNREPLIED_FOLLOWUP_SCAN_SECONDS", "90")))) + await self._scan_and_send_unreplied_followups() + except asyncio.CancelledError: + break + except Exception as e: + self._activity_log("unreplied_followup_loop_error", error=str(e)) + + async def _scan_and_send_unreplied_followups(self): + from db import chat_log_db as cdb + try: + idle_minutes = max(5, int(os.getenv("UNREPLIED_FOLLOWUP_IDLE_MINUTES", "12"))) + max_age_minutes = max(idle_minutes, int(os.getenv("UNREPLIED_FOLLOWUP_MAX_AGE_MINUTES", "180"))) + followup_cd = max(300, int(os.getenv("UNREPLIED_FOLLOWUP_COOLDOWN_SECONDS", "3600"))) + limit = max(10, int(os.getenv("UNREPLIED_FOLLOWUP_LIMIT", "40"))) + except Exception: + idle_minutes, max_age_minutes, followup_cd, limit = 12, 180, 3600, 40 + + now = datetime.now() + window_start = (now - timedelta(minutes=max_age_minutes)).strftime("%Y-%m-%d %H:%M:%S") + conn = None + try: + conn = cdb._get_conn() + rows = conn.execute( + cdb._sql( + """ + SELECT acc_id, customer_id, MAX(id) AS last_id + FROM chat_logs + WHERE timestamp >= ? + GROUP BY acc_id, customer_id + ORDER BY MAX(id) DESC + LIMIT ? + """ + ), + (window_start, limit * 6), + ).fetchall() + sessions = [dict(r) for r in rows] + sent = 0 + for s in sessions: + if sent >= limit: + break + acc_id = str(s.get("acc_id", "") or "") + cid = str(s.get("customer_id", "") or "") + if not acc_id or not cid: + continue + ckey = f"{acc_id}:{cid}" + if not self._is_owned_by_this_worker(ckey): + continue + last = conn.execute( + cdb._sql( + """ + SELECT id, direction, message, timestamp, customer_name, acc_id, platform + FROM chat_logs + WHERE acc_id = ? AND customer_id = ? + ORDER BY id DESC + LIMIT 1 + """ + ), + (acc_id, cid), + ).fetchone() + if not last: + continue + last = dict(last) + if str(last.get("direction", "")) != "in": + continue + last_ts = last.get("timestamp") + if isinstance(last_ts, datetime): + last_dt = last_ts + else: + last_dt = datetime.strptime(str(last_ts)[:19], "%Y-%m-%d %H:%M:%S") + idle_s = (now - last_dt).total_seconds() + if idle_s < idle_minutes * 60 or idle_s > max_age_minutes * 60: + continue + now_mono = time.monotonic() + if (now_mono - self._unreplied_followup_sent.get(ckey, 0.0)) < followup_cd: + continue + # 避免对明显结束语/确认语再次打扰 + last_msg = str(last.get("message", "") or "").strip().lower() + if last_msg in {"好的", "好", "ok", "收到", "嗯", "哦"}: + continue + + followup = await self._compose_ai_scene_reply( + original_msg={ + "acc_id": acc_id, + "from_id": cid, + "from_name": self.to_chinese(last.get("customer_name", "") or cid), + "acc_type": str(last.get("platform", "") or "AliWorkbench"), + "msg": str(last.get("message", "") or ""), + }, + scene="unreplied_followup", + intent_hint="客户上一条消息还没接上,先自然承接并请对方补一句当前要处理的图或要求。", + fallback="刚看到你消息了,我在的。你把要处理的图或要求再发我一下,我马上接着看。", + ) + fake = { + "acc_id": acc_id, + "from_id": cid, + "from_name": self.to_chinese(last.get("customer_name", "") or cid), + "cy_id": cid, + "cy_name": self.to_chinese(last.get("customer_name", "") or cid), + "acc_type": str(last.get("platform", "") or "AliWorkbench"), + "msg": str(last.get("message", "") or ""), + "msg_type": 0, + } + await self.send_reply(fake, followup) + self._unreplied_followup_sent[ckey] = now_mono + sent += 1 + self._activity_log( + "unreplied_followup_sent", + acc_id=acc_id, + customer_id=cid, + idle_seconds=int(idle_s), + last_msg=str(last.get("message", "") or "")[:120], + reply=followup, + ) + finally: + try: + if conn: + conn.close() + except Exception: + pass + + async def _compose_ai_scene_reply( + self, + *, + original_msg: dict, + scene: str, + intent_hint: str, + fallback: str, + ) -> str: + """ + 场景化 AI 直接生成回复(不依赖固定模板)。 + """ + if not self.enable_agent or not self.agent or not AgentDeps: + return fallback + try: + deps = AgentDeps( + msg_id=str(original_msg.get("msg_id", "") or f"{scene}_gen"), + acc_id=str(original_msg.get("acc_id", "") or ""), + from_id=str(original_msg.get("from_id", "") or ""), + platform=str(original_msg.get("acc_type", "") or ""), + ) + customer_msg = self.to_chinese(str(original_msg.get("msg", "") or "")) + prompt = ( + "你是淘宝客服,直接生成一条发给客户的话。\n" + f"场景: {scene}\n" + f"意图: {intent_hint}\n" + f"客户原话: {customer_msg}\n" + "要求: 1-2句,自然口语,不要模板腔,不要新增价格/承诺;只输出最终回复。\n" + ) + result = await self.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[]) + out = str(getattr(result, "output", "") or "").strip() + if not out: + return fallback + if out.startswith("话术|") or "[转移会话]" in out or "TRANSFER_REQUESTED" in out: + return fallback + self._activity_log( + "ai_scene_reply_generated", + acc_id=str(original_msg.get("acc_id", "") or ""), + customer_id=str(original_msg.get("from_id", "") or ""), + scene=scene, + generated=out[:160], + ) + return out + except Exception as e: + self._activity_log( + "ai_scene_reply_error", + acc_id=str(original_msg.get("acc_id", "") or ""), + customer_id=str(original_msg.get("from_id", "") or ""), + scene=scene, + error=str(e), + ) + return fallback + async def receive_messages(self): """持续接收消息""" try: @@ -1296,7 +1517,7 @@ class QingjianAPIClient: return # 标记本批次已自动触发,避免同内容循环“马上报价”。 self._auto_quote_done_sig[capture_key] = capture_sig - # 直接置为可报价,然后走“发完了,报价吧”触发既有报价链路 + # 直接置为可报价,走内部自动报价入口(不伪造客户语句)。 self.agent._mark_quote_ready(st) self.agent._sync_pending_quote_state(capture_cid, st) self._activity_log( @@ -1308,7 +1529,7 @@ class QingjianAPIClient: notify_msg = CustomerMessage( msg_id="auto_quote_idle_trigger", acc_id=capture_data.get('acc_id', ''), - msg="发完了,报价吧", + msg="__AUTO_QUOTE_INTERNAL_TRIGGER__", from_id=capture_cid, from_name=self.to_chinese(capture_data.get('from_name', '') or capture_data.get('cy_name', '')), cy_id=capture_data.get('cy_id', ''), @@ -1318,7 +1539,7 @@ class QingjianAPIClient: goods_name=self.to_chinese(capture_data.get('goods_name', '')) if capture_data.get('goods_name') else None, goods_order=self.to_chinese(capture_data.get('goods_order', '')) if capture_data.get('goods_order') else None, ) - response = await self.agent.process_message(notify_msg) + response = await self.agent.build_auto_quote_reply(st, notify_msg) if response.should_reply and response.reply and not response.need_transfer: await self.send_reply(capture_data, response.reply) self._activity_log( @@ -1636,7 +1857,12 @@ class QingjianAPIClient: logger.info(f"系统询单命中 | 店铺:{acc_id} | 客户:{customer_id} | action:{action}") if action == "reply": - reply = policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。" + reply = await self._compose_ai_scene_reply( + original_msg=data, + scene="system_inquiry_reply", + intent_hint="这是系统客服询单消息,简短确认已收到并说明会跟进即可。", + fallback=(policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。"), + ) await self.send_reply(data, reply) metrics_emit("system_inquiry_auto_reply", customer_id=customer_id, acc_id=acc_id) return True @@ -1976,6 +2202,10 @@ class QingjianAPIClient: return reply_content = self._colloquialize_outbound_reply(reply_content) + reply_content = await self._ai_generate_outbound_reply( + original_msg=original_msg, + reply_content=str(reply_content or ""), + ) # 同一客户外发限流:N 秒内最多 1 条 try: @@ -2058,6 +2288,62 @@ class QingjianAPIClient: reply["_trace_id"] = trace_id await self.send_message(reply) + async def _ai_generate_outbound_reply(self, original_msg: dict, reply_content: str) -> str: + """ + 强制全量 AI 出站生成层: + - 所有普通文本外发先由 AI 生成最终话术; + - 控制命令/纯链接/转接指令直接绕过。 + """ + text = (reply_content or "").strip() + if not text: + return text + if text.startswith("话术|") or "[转移会话]" in text or "TRANSFER_REQUESTED" in text: + return text + if re.fullmatch(r"https?://\S+", text): + return text + if not self._force_ai_generate_reply or not self.enable_agent or not self.agent or not AgentDeps: + return text + try: + deps = AgentDeps( + msg_id=str(original_msg.get("msg_id", "") or "outbound_generate"), + acc_id=str(original_msg.get("acc_id", "") or ""), + from_id=str(original_msg.get("from_id", "") or ""), + platform=str(original_msg.get("acc_type", "") or ""), + ) + customer_msg = self.to_chinese(str(original_msg.get("msg", "") or "")) + prompt = ( + "你是淘宝客服外发文案生成器。请根据“回复意图草稿”生成最终发给客户的话。\n" + "要求:\n" + "1) 保留原意,不新增价格/承诺/流程;\n" + "2) 自然像真人聊天,不用固定模板句;\n" + "3) 1-2句;\n" + "4) 只输出最终回复文本。\n\n" + f"客户原话: {customer_msg}\n" + f"回复意图草稿: {text}\n" + ) + result = await self.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[]) + out = str(getattr(result, "output", "") or "").strip() + if not out: + return text + if out.startswith("话术|") or "[转移会话]" in out: + return text + self._activity_log( + "ai_generate_reply", + acc_id=str(original_msg.get("acc_id", "") or ""), + customer_id=str(original_msg.get("from_id", "") or ""), + draft=text[:160], + generated=out[:160], + ) + return out + except Exception as e: + self._activity_log( + "ai_generate_reply_error", + acc_id=str(original_msg.get("acc_id", "") or ""), + customer_id=str(original_msg.get("from_id", "") or ""), + error=str(e), + ) + return text + def _colloquialize_outbound_reply(self, text: Any) -> Any: """统一外发口语化处理,避免机械话术。""" if not isinstance(text, str): @@ -2393,6 +2679,11 @@ class QingjianAPIClient: except Exception as e: logger.info(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}") + # 未回复会话补偿(可关闭) + if os.getenv("UNREPLIED_FOLLOWUP_ENABLED", "true").lower() in ("1", "true", "yes"): + tasks.append(self._unreplied_followup_loop()) + logger.info(f"[{self.get_time()}] 未回复会话补偿任务已启动") + await asyncio.gather(*tasks) diff --git a/tests/test_regression_pipeline.py b/tests/test_regression_pipeline.py index 7950e60..319b57c 100644 --- a/tests/test_regression_pipeline.py +++ b/tests/test_regression_pipeline.py @@ -432,6 +432,31 @@ class RegressionPipelineTest(unittest.IsolatedAsyncioTestCase): self.assertTrue(resp.should_reply) self.assertIn(resp.reply, ("嗯咯", "嗯啦", "嗯", "哦")) + async def test_meaningless_short_text_not_ping_when_collecting(self): + agent = CustomerServiceAgent() + st = agent._get_conversation_state(self.customer_id) + st.pending_image_urls = ["https://img.alicdn.com/a.jpg"] + st.quote_phase = "collecting" + agent._sync_pending_quote_state(self.customer_id, st) + + msg = CustomerMessage( + msg_id="m-meaningless-collecting", + acc_id="test_shop", + msg="好的", + from_id=self.customer_id, + from_name="t", + cy_id=self.customer_id, + acc_type="AliWorkbench", + msg_type=0, + cy_name="t", + goods_name="专业找图", + goods_order="", + ) + resp = await agent.process_message(msg) + self.assertTrue(resp.should_reply) + self.assertTrue((resp.reply or "").strip()) + self.assertNotIn(resp.reply, ("嗯咯", "嗯啦", "嗯", "哦")) + def tearDown(self): db.clear_pending_quote_state(self.customer_id)