feat: post inbound and processed message callbacks to Tianwang endpoint

This commit is contained in:
2026-03-01 17:38:57 +08:00
parent 3972764c79
commit 5c1f33114f

View File

@@ -166,6 +166,11 @@ class QingjianAPIClient:
self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts
self._inbound_log_seen: dict = {} # signature -> monotonic ts防重复写入 self._inbound_log_seen: dict = {} # signature -> monotonic ts防重复写入
self._outbound_log_seen: dict = {} # signature -> monotonic ts防重复写入 self._outbound_log_seen: dict = {} # signature -> monotonic ts防重复写入
self._tianwang_callback_url = (
os.getenv("TIANWANG_CALLBACK_URL", "").strip()
or "http://139.199.3.75:18789/api/callback"
)
self._tianwang_agent_name = os.getenv("TIANWANG_AGENT_NAME", "终结者").strip() or "终结者"
# 延迟加载任务模块(避免循环导入) # 延迟加载任务模块(避免循环导入)
self.task_scheduler = None self.task_scheduler = None
@@ -202,6 +207,49 @@ class QingjianAPIClient:
**kwargs, **kwargs,
) )
async def _post_tianwang_callback(self, event: str, data: dict, extra: Optional[Dict[str, Any]] = None):
"""将消息处理事件回调给天网。"""
if not self._tianwang_callback_url:
return
try:
import httpx
payload = {
"event": event,
"timestamp": datetime.now().isoformat(),
"agent_name": self._tianwang_agent_name,
"acc_id": str(data.get("acc_id", "") or ""),
"customer_id": str(data.get("from_id", "") or ""),
"customer_name": self.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
"msg_id": str(data.get("msg_id", "") or ""),
"msg_type": int(data.get("msg_type", 0) or 0),
"msg": self.to_chinese(data.get("msg", "") or ""),
"goods_name": self.to_chinese(data.get("goods_name", "") or ""),
"goods_order": self.to_chinese(data.get("goods_order", "") or ""),
}
if extra:
payload.update(extra)
async with httpx.AsyncClient(timeout=6) as client:
resp = await client.post(self._tianwang_callback_url, json=payload)
ok = 200 <= resp.status_code < 300
self._activity_log(
"tianwang_callback",
result="ok" if ok else "http_error",
event_name=event,
status_code=resp.status_code,
acc_id=payload["acc_id"],
customer_id=payload["customer_id"],
)
except Exception as e:
self._activity_log(
"tianwang_callback",
result="error",
event_name=event,
acc_id=str(data.get("acc_id", "") or ""),
customer_id=str(data.get("from_id", "") or ""),
error=str(e),
)
async def connect(self): async def connect(self):
"""连接WebSocket服务器""" """连接WebSocket服务器"""
@@ -411,6 +459,7 @@ class QingjianAPIClient:
logger.info(f"[{self.get_time()}] 空消息跳过from_id={from_id!r} acc_id={acc_id!r}") logger.info(f"[{self.get_time()}] 空消息跳过from_id={from_id!r} acc_id={acc_id!r}")
return return
self._log_inbound_once(data) self._log_inbound_once(data)
self._fire_and_forget(self._post_tianwang_callback("message_received", data))
# Gemini 店铺:不回复,直接跳过 # Gemini 店铺:不回复,直接跳过
goods_name = self.to_chinese(data.get('goods_name', '') or '') goods_name = self.to_chinese(data.get('goods_name', '') or '')
@@ -969,6 +1018,16 @@ class QingjianAPIClient:
customer_id=data.get("from_id", ""), customer_id=data.get("from_id", ""),
transfer_msg=response.transfer_msg, transfer_msg=response.transfer_msg,
) )
self._fire_and_forget(self._post_tianwang_callback(
"message_processed",
data,
extra={
"should_reply": bool(response.should_reply),
"need_transfer": True,
"agent_reply": response.reply or "",
"transfer_msg": response.transfer_msg or "",
},
))
await self.transfer_to_human(data, response.transfer_msg) await self.transfer_to_human(data, response.transfer_msg)
# 推送到企微:客户消息+转接回复成对 # 推送到企微:客户消息+转接回复成对
try: try:
@@ -1027,6 +1086,15 @@ class QingjianAPIClient:
) )
await self.send_reply(data, response.reply) await self.send_reply(data, response.reply)
await self._maybe_schedule_auto_quote(data) await self._maybe_schedule_auto_quote(data)
self._fire_and_forget(self._post_tianwang_callback(
"message_processed",
data,
extra={
"should_reply": True,
"need_transfer": bool(response.need_transfer),
"agent_reply": response.reply,
},
))
# 推送到企微:客户消息+AI回复成对 # 推送到企微:客户消息+AI回复成对
try: try:
from utils.wechat_chat_log import push_chat_to_wechat from utils.wechat_chat_log import push_chat_to_wechat
@@ -1048,6 +1116,15 @@ class QingjianAPIClient:
acc_id=data.get("acc_id", ""), acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""), customer_id=data.get("from_id", ""),
) )
self._fire_and_forget(self._post_tianwang_callback(
"message_processed",
data,
extra={
"should_reply": False,
"need_transfer": False,
"agent_reply": "",
},
))
except Exception as e: except Exception as e:
logger.error(f"Agent 处理失败: {e}") logger.error(f"Agent 处理失败: {e}")