diff --git a/core/websocket_client.py b/core/websocket_client.py index 74a718b..7cc5d09 100755 --- a/core/websocket_client.py +++ b/core/websocket_client.py @@ -166,6 +166,11 @@ class QingjianAPIClient: self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts self._inbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) self._outbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) + self._tianwang_callback_url = ( + os.getenv("TIANWANG_CALLBACK_URL", "").strip() + or "http://139.199.3.75:18789/api/callback" + ) + self._tianwang_agent_name = os.getenv("TIANWANG_AGENT_NAME", "终结者").strip() or "终结者" # 延迟加载任务模块(避免循环导入) self.task_scheduler = None @@ -202,6 +207,49 @@ class QingjianAPIClient: **kwargs, ) + async def _post_tianwang_callback(self, event: str, data: dict, extra: Optional[Dict[str, Any]] = None): + """将消息处理事件回调给天网。""" + if not self._tianwang_callback_url: + return + try: + import httpx + + payload = { + "event": event, + "timestamp": datetime.now().isoformat(), + "agent_name": self._tianwang_agent_name, + "acc_id": str(data.get("acc_id", "") or ""), + "customer_id": str(data.get("from_id", "") or ""), + "customer_name": self.to_chinese(data.get("from_name", "") or data.get("cy_name", "")), + "msg_id": str(data.get("msg_id", "") or ""), + "msg_type": int(data.get("msg_type", 0) or 0), + "msg": self.to_chinese(data.get("msg", "") or ""), + "goods_name": self.to_chinese(data.get("goods_name", "") or ""), + "goods_order": self.to_chinese(data.get("goods_order", "") or ""), + } + if extra: + payload.update(extra) + async with httpx.AsyncClient(timeout=6) as client: + resp = await client.post(self._tianwang_callback_url, json=payload) + ok = 200 <= resp.status_code < 300 + self._activity_log( + "tianwang_callback", + result="ok" if ok else "http_error", + event_name=event, + status_code=resp.status_code, + acc_id=payload["acc_id"], + customer_id=payload["customer_id"], + ) + except Exception as e: + self._activity_log( + "tianwang_callback", + result="error", + event_name=event, + acc_id=str(data.get("acc_id", "") or ""), + customer_id=str(data.get("from_id", "") or ""), + error=str(e), + ) + async def connect(self): """连接WebSocket服务器""" @@ -411,6 +459,7 @@ class QingjianAPIClient: logger.info(f"[{self.get_time()}] 空消息跳过(from_id={from_id!r} acc_id={acc_id!r})") return self._log_inbound_once(data) + self._fire_and_forget(self._post_tianwang_callback("message_received", data)) # Gemini 店铺:不回复,直接跳过 goods_name = self.to_chinese(data.get('goods_name', '') or '') @@ -969,6 +1018,16 @@ class QingjianAPIClient: customer_id=data.get("from_id", ""), transfer_msg=response.transfer_msg, ) + self._fire_and_forget(self._post_tianwang_callback( + "message_processed", + data, + extra={ + "should_reply": bool(response.should_reply), + "need_transfer": True, + "agent_reply": response.reply or "", + "transfer_msg": response.transfer_msg or "", + }, + )) await self.transfer_to_human(data, response.transfer_msg) # 推送到企微:客户消息+转接回复成对 try: @@ -1027,6 +1086,15 @@ class QingjianAPIClient: ) await self.send_reply(data, response.reply) await self._maybe_schedule_auto_quote(data) + self._fire_and_forget(self._post_tianwang_callback( + "message_processed", + data, + extra={ + "should_reply": True, + "need_transfer": bool(response.need_transfer), + "agent_reply": response.reply, + }, + )) # 推送到企微:客户消息+AI回复成对 try: from utils.wechat_chat_log import push_chat_to_wechat @@ -1048,6 +1116,15 @@ class QingjianAPIClient: acc_id=data.get("acc_id", ""), customer_id=data.get("from_id", ""), ) + self._fire_and_forget(self._post_tianwang_callback( + "message_processed", + data, + extra={ + "should_reply": False, + "need_transfer": False, + "agent_reply": "", + }, + )) except Exception as e: logger.error(f"Agent 处理失败: {e}")