refactor: unify workflow/websocket logging and extract conversation state store

This commit is contained in:
2026-03-01 16:35:39 +08:00
parent 8dd5a11b4b
commit 4a07f9c726
4 changed files with 219 additions and 181 deletions

View File

@@ -67,9 +67,9 @@ except Exception as e:
workflow = None
_get_shop_type = lambda acc_id, goods_name: "find_image"
import traceback
print(f"警告: Agent 模块导入失败: {e}")
logger.info(f"警告: Agent 模块导入失败: {e}")
traceback.print_exc()
print("将使用基础回复功能")
logger.info("将使用基础回复功能")
class QingjianAPIClient:
@@ -123,9 +123,9 @@ class QingjianAPIClient:
if self.enable_agent:
try:
self.agent = CustomerServiceAgent()
print(f"[{self.get_time()}] Agent 初始化成功")
logger.info(f"[{self.get_time()}] Agent 初始化成功")
except Exception as e:
print(f"[{self.get_time()}] Agent 初始化失败: {e}")
logger.info(f"[{self.get_time()}] Agent 初始化失败: {e}")
self.enable_agent = False
# 注册 workflow 消息发送回调供图片AI完成后推送消息用
@@ -149,15 +149,15 @@ class QingjianAPIClient:
"""连接WebSocket服务器"""
while self.running:
try:
print(f"[{self.get_time()}] 正在连接轻简API {self.uri}...")
logger.info(f"[{self.get_time()}] 正在连接轻简API {self.uri}...")
async with websockets.connect(self.uri) as websocket:
self.websocket = websocket
from utils.health_check import set_qingjian_connected
set_qingjian_connected(True)
print(f"[{self.get_time()}] 连接成功!")
logger.info(f"[{self.get_time()}] 连接成功!")
if self.enable_agent:
print(f"[{self.get_time()}] AI Agent 已启用,将自动处理消息")
print(f"[{self.get_time()}] 等待接收消息...")
logger.info(f"[{self.get_time()}] AI Agent 已启用,将自动处理消息")
logger.info(f"[{self.get_time()}] 等待接收消息...")
# 持续接收消息
await self.receive_messages()
@@ -165,19 +165,19 @@ class QingjianAPIClient:
except ConnectionRefusedError:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] 连接被拒绝,请检查轻简软件是否已启动")
logger.info(f"[{self.get_time()}] 连接被拒绝,请检查轻简软件是否已启动")
except websockets.exceptions.InvalidURI:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] URI格式错误")
logger.info(f"[{self.get_time()}] URI格式错误")
except Exception as e:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] 连接错误: {e}")
logger.info(f"[{self.get_time()}] 连接错误: {e}")
# 等待5秒后重连
if self.running:
print(f"[{self.get_time()}] 5秒后尝试重连...")
logger.info(f"[{self.get_time()}] 5秒后尝试重连...")
await asyncio.sleep(5)
def _customer_key(self, data: dict) -> str:
@@ -295,11 +295,11 @@ class QingjianAPIClient:
except websockets.exceptions.ConnectionClosed:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] 连接已关闭")
logger.info(f"[{self.get_time()}] 连接已关闭")
except Exception as e:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] 接收消息错误: {e}")
logger.info(f"[{self.get_time()}] 接收消息错误: {e}")
async def handle_message(self, message):
"""处理接收到的消息"""
@@ -317,25 +317,25 @@ class QingjianAPIClient:
self.last_msg = data
# 打印格式化的消息
print(f"\n{'='*50}")
print(f"[{timestamp}] 收到新消息:")
print(f"{'='*50}")
print(f" 消息ID: {data.get('msg_id', 'N/A')}")
print(f" 账号ID: {self.to_chinese(data.get('acc_id', 'N/A'))}")
print(f" 发送者ID: {self.to_chinese(data.get('from_id', 'N/A'))}")
print(f" 发送者名称: {self.to_chinese(data.get('from_name', 'N/A'))}")
print(f" 会话ID: {self.to_chinese(data.get('cy_id', 'N/A'))}")
print(f" 平台类型: {data.get('acc_type', 'N/A')}")
print(f" 消息类型: {self.get_msg_type_name(data.get('msg_type', 0))}")
print(f" 消息内容: {self.to_chinese(data.get('msg', 'N/A'))}")
logger.info(f"\n{'='*50}")
logger.info(f"[{timestamp}] 收到新消息:")
logger.info(f"{'='*50}")
logger.info(f" 消息ID: {data.get('msg_id', 'N/A')}")
logger.info(f" 账号ID: {self.to_chinese(data.get('acc_id', 'N/A'))}")
logger.info(f" 发送者ID: {self.to_chinese(data.get('from_id', 'N/A'))}")
logger.info(f" 发送者名称: {self.to_chinese(data.get('from_name', 'N/A'))}")
logger.info(f" 会话ID: {self.to_chinese(data.get('cy_id', 'N/A'))}")
logger.info(f" 平台类型: {data.get('acc_type', 'N/A')}")
logger.info(f" 消息类型: {self.get_msg_type_name(data.get('msg_type', 0))}")
logger.info(f" 消息内容: {self.to_chinese(data.get('msg', 'N/A'))}")
# 显示商品信息(如果有)
if data.get('goods_name'):
print(f" 商品名称: {self.to_chinese(data.get('goods_name', ''))}")
logger.info(f" 商品名称: {self.to_chinese(data.get('goods_name', ''))}")
if data.get('goods_order'):
print(f" 订单信息: {self.to_chinese(data.get('goods_order', ''))}")
logger.info(f" 订单信息: {self.to_chinese(data.get('goods_order', ''))}")
print(f"{'='*50}\n")
logger.info(f"{'='*50}\n")
# 消息去重:同一条消息不重复处理
msg_id = data.get('msg_id', '')
@@ -350,14 +350,14 @@ class QingjianAPIClient:
acc_id = data.get('acc_id', '')
msg_body = data.get('msg', '')
if not from_id or from_id == 'N/A' or not acc_id or acc_id == 'N/A':
print(f"[{self.get_time()}] 空消息跳过from_id={from_id!r} acc_id={acc_id!r}")
logger.info(f"[{self.get_time()}] 空消息跳过from_id={from_id!r} acc_id={acc_id!r}")
return
self._log_inbound_once(data)
# Gemini 店铺:不回复,直接跳过
goods_name = self.to_chinese(data.get('goods_name', '') or '')
if _get_shop_type(acc_id, goods_name) == "gemini_api":
print(f"[{self.get_time()}] Gemini 店铺消息,跳过")
logger.info(f"[{self.get_time()}] Gemini 店铺消息,跳过")
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
@@ -378,7 +378,7 @@ class QingjianAPIClient:
if msg_type == 0:
if self._is_transfer_msg(data):
# 会话转交 → 主动打招呼
print(f"[{self.get_time()}] 收到转交消息,发送问候")
logger.info(f"[{self.get_time()}] 收到转交消息,发送问候")
greeting = self._pick_transfer_greeting()
await self.send_reply(data, greeting)
try:
@@ -397,9 +397,9 @@ class QingjianAPIClient:
# 进店卡片有历史对话就不回复没有才打招呼Gemini 已在上面统一跳过)
cid = data.get('from_id', '')
if self._has_chat_history(cid):
print(f"[{self.get_time()}] 进店卡片(已有记录),跳过")
logger.info(f"[{self.get_time()}] 进店卡片(已有记录),跳过")
else:
print(f"[{self.get_time()}] 进店卡片(新客户),发送问候")
logger.info(f"[{self.get_time()}] 进店卡片(新客户),发送问候")
greeting = "在呢,发图来我看看"
await self.send_reply(data, greeting)
try:
@@ -415,9 +415,9 @@ class QingjianAPIClient:
except Exception:
pass
elif await self._handle_system_inquiry(data):
print(f"[{self.get_time()}] 系统客服询单消息,已按规则处理")
logger.info(f"[{self.get_time()}] 系统客服询单消息,已按规则处理")
elif self._should_ignore(data):
print(f"[{self.get_time()}] 系统通知,跳过回复")
logger.info(f"[{self.get_time()}] 系统通知,跳过回复")
else:
await self._debounce_agent_reply(data)
elif msg_type == 1:
@@ -425,7 +425,7 @@ class QingjianAPIClient:
await self.handle_image_message(data)
except json.JSONDecodeError:
print(f"[{timestamp}] 收到非JSON消息: {message}")
logger.info(f"[{timestamp}] 收到非JSON消息: {message}")
async def _debounce_agent_reply(self, data: dict):
"""
@@ -476,7 +476,7 @@ class QingjianAPIClient:
merged_msg = msgs[0]
else:
merged_msg = "".join(m for m in msgs if m.strip())
print(f"[{self.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}")
logger.info(f"[{self.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}")
self._activity_log(
"debounce_flush",
key=capture_key,
@@ -1514,7 +1514,7 @@ class QingjianAPIClient:
设计师在线状态:仅在转人工时按需查询,不轮询。
"""
if not self.websocket:
print(f"[{self.get_time()}] 错误: 未连接到服务器")
logger.info(f"[{self.get_time()}] 错误: 未连接到服务器")
return
acc_id = data.get("acc_id", "")
@@ -1569,14 +1569,14 @@ class QingjianAPIClient:
if assigned_to:
cmd = f"正在为你转接人工|[转移会话],{assigned_to},无原因"
await self.send_reply(data, cmd)
print(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 设计师:{assigned_to})")
logger.info(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 设计师:{assigned_to})")
return
if not group_id:
group_id = _get_transfer_group(acc_id)
cmd = f"话术|[转移会话],分组{group_id},无原因"
await self.send_reply(data, cmd)
print(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 分组:{group_id})")
logger.info(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 分组:{group_id})")
async def _save_conversation_summary(self, customer_id: str, buyer_msg: str, agent_reply: str):
"""用 AI 生成一句话对话摘要并持久化"""
@@ -1678,7 +1678,7 @@ class QingjianAPIClient:
"""
trace_id = original_msg.get("_trace_id", "")
if not self.websocket:
print(f"[{self.get_time()}] 错误: 未连接到服务器")
logger.info(f"[{self.get_time()}] 错误: 未连接到服务器")
self._activity_log(
"send_reply_skipped",
trace_id=trace_id,
@@ -1839,7 +1839,7 @@ class QingjianAPIClient:
msg_json = json.dumps(message, ensure_ascii=False)
await self.websocket.send(msg_json)
pretty = json.dumps(message, ensure_ascii=False, indent=2)
print(f"[{self.get_time()}] 发送成功:\n{pretty}")
logger.info(f"[{self.get_time()}] 发送成功:\n{pretty}")
self._activity_log(
"send_message_success",
trace_id=message.get("_trace_id", ""),
@@ -1849,7 +1849,7 @@ class QingjianAPIClient:
msg=message.get("msg", ""),
)
except Exception as e:
print(f"[{self.get_time()}] 发送失败: {e}")
logger.info(f"[{self.get_time()}] 发送失败: {e}")
self._activity_log(
"send_message_error",
trace_id=message.get("_trace_id", ""),
@@ -1858,7 +1858,7 @@ class QingjianAPIClient:
error=str(e),
)
else:
print(f"[{self.get_time()}] 错误: 连接未打开")
logger.info(f"[{self.get_time()}] 错误: 连接未打开")
self._activity_log(
"send_message_skipped",
trace_id=message.get("_trace_id", ""),
@@ -1873,13 +1873,13 @@ class QingjianAPIClient:
async def command_handler(self):
"""命令行交互"""
print("\n命令帮助:")
print(" reply <内容> - 回复最后一条消息")
print(" text <id> <平台> <内容> - 发送文本消息")
print(" img <id> <平台> <路径> - 发送图片")
print(" setid <id> - 设置回复ID")
print(" agent on/off - 开启/关闭 Agent")
print(" exit/quit - 退出\n")
logger.info("\n命令帮助:")
logger.info(" reply <内容> - 回复最后一条消息")
logger.info(" text <id> <平台> <内容> - 发送文本消息")
logger.info(" img <id> <平台> <路径> - 发送图片")
logger.info(" setid <id> - 设置回复ID")
logger.info(" agent on/off - 开启/关闭 Agent")
logger.info(" exit/quit - 退出\n")
while self.running:
try:
@@ -1893,7 +1893,7 @@ class QingjianAPIClient:
cmd = parts[0].lower()
if cmd in ["exit", "quit", "q"]:
print(f"[{self.get_time()}] 正在关闭...")
logger.info(f"[{self.get_time()}] 正在关闭...")
self.running = False
if self.websocket:
await self.websocket.close()
@@ -1901,21 +1901,21 @@ class QingjianAPIClient:
elif cmd == "setid" and len(parts) > 1:
self.reply_id = parts[1]
print(f"[{self.get_time()}] 回复ID已设置为: {self.reply_id}")
logger.info(f"[{self.get_time()}] 回复ID已设置为: {self.reply_id}")
elif cmd == "agent" and len(parts) > 1:
if parts[1].lower() == "on":
self.enable_agent = True
print(f"[{self.get_time()}] Agent 已开启")
logger.info(f"[{self.get_time()}] Agent 已开启")
elif parts[1].lower() == "off":
self.enable_agent = False
print(f"[{self.get_time()}] Agent 已关闭")
logger.info(f"[{self.get_time()}] Agent 已关闭")
elif cmd == "reply" and len(parts) > 1:
if self.last_msg:
await self.send_reply(self.last_msg, parts[1])
else:
print(f"[{self.get_time()}] 错误: 还没有收到任何消息")
logger.info(f"[{self.get_time()}] 错误: 还没有收到任何消息")
elif cmd == "text" and len(parts) > 1:
# text cy_id acc_type content
@@ -1923,7 +1923,7 @@ class QingjianAPIClient:
if len(args) >= 3:
await self.send_text(args[0], args[1], args[2])
else:
print(f"[{self.get_time()}] 格式: text <cy_id> <acc_type> <内容>")
logger.info(f"[{self.get_time()}] 格式: text <cy_id> <acc_type> <内容>")
elif cmd == "img" and len(parts) > 1:
# img cy_id acc_type image_path
@@ -1931,13 +1931,13 @@ class QingjianAPIClient:
if len(args) >= 3:
await self.send_image(args[0], args[1], args[2])
else:
print(f"[{self.get_time()}] 格式: img <cy_id> <acc_type> <图片路径>")
logger.info(f"[{self.get_time()}] 格式: img <cy_id> <acc_type> <图片路径>")
else:
print(f"[{self.get_time()}] 未知命令: {cmd}")
logger.info(f"[{self.get_time()}] 未知命令: {cmd}")
except Exception as e:
print(f"[{self.get_time()}] 命令错误: {e}")
logger.info(f"[{self.get_time()}] 命令错误: {e}")
def get_time(self):
"""获取当前时间字符串"""
@@ -1951,20 +1951,20 @@ class QingjianAPIClient:
try:
from mail.email_receiver import email_receiver
if email_receiver.username:
print(f"[{self.get_time()}] 邮件接收已启动,监控: {email_receiver.username}")
logger.info(f"[{self.get_time()}] 邮件接收已启动,监控: {email_receiver.username}")
tasks.append(email_receiver.start())
else:
print(f"[{self.get_time()}] 未配置邮件账号,跳过邮件接收")
logger.info(f"[{self.get_time()}] 未配置邮件账号,跳过邮件接收")
except Exception as e:
print(f"[{self.get_time()}] 邮件接收模块加载失败: {e}")
logger.info(f"[{self.get_time()}] 邮件接收模块加载失败: {e}")
# 启动每日汇总定时任务
try:
from utils.daily_summary import scheduler as daily_scheduler
tasks.append(daily_scheduler())
print(f"[{self.get_time()}] 每日日报定时任务已启动")
logger.info(f"[{self.get_time()}] 每日日报定时任务已启动")
except Exception as e:
print(f"[{self.get_time()}] 日报模块加载失败: {e}")
logger.info(f"[{self.get_time()}] 日报模块加载失败: {e}")
# 设计师在线状态:转人工时按需查询,不再轮询
@@ -1974,17 +1974,17 @@ class QingjianAPIClient:
def _qingjian_ok():
return self.websocket is not None and not getattr(self.websocket, "closed", True)
tasks.append(health_check_loop(_qingjian_ok))
print(f"[{self.get_time()}] 健康检查已启动")
logger.info(f"[{self.get_time()}] 健康检查已启动")
except Exception as e:
print(f"[{self.get_time()}] 健康检查模块加载失败: {e}")
logger.info(f"[{self.get_time()}] 健康检查模块加载失败: {e}")
# 每天早上8点发送启动消息到企微群
try:
from utils.wechat_chat_log import morning_startup_scheduler
tasks.append(morning_startup_scheduler())
print(f"[{self.get_time()}] 早8点企微启动消息已启动")
logger.info(f"[{self.get_time()}] 早8点企微启动消息已启动")
except Exception as e:
print(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}")
logger.info(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}")
await asyncio.gather(*tasks)
@@ -1999,7 +1999,7 @@ if __name__ == "__main__":
try:
asyncio.run(client.run())
except KeyboardInterrupt:
print("\n已停止")
logger.info("\n已停止")
async def _load_task_modules(self):
@@ -2091,3 +2091,4 @@ async def check_and_trigger_tasks_v2(self, data: dict):
except Exception as e:
logger.error(f"检查任务触发失败:{e}")