diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index aeee2e7..ca72e39 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -424,6 +424,15 @@ class CustomerServiceAgent: "收到了,我先看一下哈,稍等哈", "这张我收到了,我先看下,等我一下哈", "收到这张了,我先过一眼,稍等哈", + "我先看这张哈,稍等我一下", + "图我收到了,我先看一眼,马上回你哈", + "这张先记上了,我先看下细节,稍等哈", + "收到哈,我先过一遍这张,等我会儿", + "我先看这张效果,稍等一下哈", + "图到了,我先看下清晰度,稍等哈", + "这张我先看着,稍等我一下就回你", + "收到这张了,我先核一下细节,稍等哈", + "我先把这张看完,稍等我一会儿哈", ] return random.choice(first_ack) if not self.dynamic_collection_replies: diff --git a/core/websocket_client.py b/core/websocket_client.py index 4595dcd..051f69c 100755 --- a/core/websocket_client.py +++ b/core/websocket_client.py @@ -100,6 +100,7 @@ class QingjianAPIClient: self._agent_semaphore = asyncio.Semaphore(8) self._pending_images: dict = {} self._pending_image_tasks: dict = {} + self._auto_quote_tasks: dict = {} # customer_key -> asyncio.Task # 旧版“看图即报价”快速链路(默认关闭,避免与 Agent 批量收集逻辑并发打架) self._legacy_fast_quote_enabled = os.getenv("LEGACY_FAST_IMAGE_QUOTE", "false").lower() in ("1", "true", "yes") self._system_inquiry_rules = self._load_system_inquiry_rules() @@ -433,6 +434,8 @@ class QingjianAPIClient: 订单通知、付款相关消息不走防抖,立即处理。 """ msg_body = data.get('msg', '') + key = f"{data.get('acc_id','')}:{data.get('from_id','')}" + self._cancel_auto_quote_task(key, reason="new_inbound") # 以下情况跳过防抖,立即处理(后台执行,不阻塞接收循环) immediate_keywords = ["买家已付款", "已付款", "[系统订单信息]"] if any(kw in msg_body for kw in immediate_keywords): @@ -446,8 +449,6 @@ class QingjianAPIClient: self._fire_and_forget(self._agent_reply_serialized(data)) return - key = f"{data.get('acc_id','')}:{data.get('from_id','')}" - # 积攒消息 if key not in self._pending_msgs: self._pending_msgs[key] = [] @@ -936,6 +937,7 @@ class QingjianAPIClient: reply=response.reply, ) await self.send_reply(data, response.reply) + await self._maybe_schedule_auto_quote(data) # 推送到企微:客户消息+AI回复成对 try: from utils.wechat_chat_log import push_chat_to_wechat @@ -956,7 +958,7 @@ class QingjianAPIClient: acc_id=data.get("acc_id", ""), customer_id=data.get("from_id", ""), ) - + except Exception as e: logger.error(f"Agent 处理失败: {e}") self._activity_log( @@ -966,6 +968,87 @@ class QingjianAPIClient: error=str(e), ) + def _cancel_auto_quote_task(self, key: str, reason: str = ""): + task = self._auto_quote_tasks.get(key) + if task and not task.done(): + task.cancel() + self._activity_log("auto_quote_cancel", key=key, reason=reason or "unknown") + + async def _maybe_schedule_auto_quote(self, data: dict): + """ + 智能兜底:客户发图后若长时间不再补充消息,自动触发一次报价,避免会话卡住。 + """ + if not self.enable_agent or not self.agent: + return + try: + shop_type = _get_shop_type(data.get('acc_id', ''), self.to_chinese(data.get('goods_name', '') or '')) + if shop_type != "find_image": + return + cid = data.get('from_id', '') + key = self._customer_key(data) + state = self.agent._get_conversation_state(cid) + if not state or not getattr(state, "pending_image_urls", None): + self._cancel_auto_quote_task(key, reason="no_pending_images") + return + if state.quote_phase not in {"collecting", "waiting_result"}: + return + try: + idle_seconds = max(8, int(os.getenv("AUTO_QUOTE_IDLE_SECONDS", "18"))) + except Exception: + idle_seconds = 18 + + self._cancel_auto_quote_task(key, reason="reschedule") + + async def _delayed_auto_quote(capture_key: str, capture_data: dict, wait_s: int): + await asyncio.sleep(wait_s) + async with self._get_customer_lock(capture_key): + capture_cid = capture_data.get('from_id', '') + st = self.agent._get_conversation_state(capture_cid) + if not st or not st.pending_image_urls: + return + # 直接置为可报价,然后走“发完了,报价吧”触发既有报价链路 + self.agent._mark_quote_ready(st) + self.agent._sync_pending_quote_state(capture_cid, st) + self._activity_log( + "auto_quote_trigger", + key=capture_key, + pending_count=len(st.pending_image_urls), + wait_s=wait_s, + ) + notify_msg = CustomerMessage( + msg_id="auto_quote_idle_trigger", + acc_id=capture_data.get('acc_id', ''), + msg="发完了,报价吧", + from_id=capture_cid, + from_name=self.to_chinese(capture_data.get('from_name', '') or capture_data.get('cy_name', '')), + cy_id=capture_data.get('cy_id', ''), + acc_type=capture_data.get('acc_type', ''), + msg_type=0, + cy_name=self.to_chinese(capture_data.get('cy_name', '') or capture_data.get('from_name', '')), + goods_name=self.to_chinese(capture_data.get('goods_name', '')) if capture_data.get('goods_name') else None, + goods_order=self.to_chinese(capture_data.get('goods_order', '')) if capture_data.get('goods_order') else None, + ) + response = await self.agent.process_message(notify_msg) + if response.should_reply and response.reply and not response.need_transfer: + await self.send_reply(capture_data, response.reply) + self._activity_log( + "auto_quote_sent", + key=capture_key, + reply=response.reply, + ) + + task = asyncio.create_task(_delayed_auto_quote(key, dict(data), idle_seconds)) + self._auto_quote_tasks[key] = task + self._activity_log( + "auto_quote_scheduled", + key=key, + pending_count=len(state.pending_image_urls), + phase=state.quote_phase, + wait_s=idle_seconds, + ) + except Exception as e: + self._activity_log("auto_quote_schedule_error", error=str(e), key=self._customer_key(data)) + async def _analyze_multi_and_reply(self, data: dict, urls: list): try: from image.image_analyzer import image_analyzer