From 4a07f9c726701706c91388d83d4ee5f7a3f2daab Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sun, 1 Mar 2026 16:35:39 +0800 Subject: [PATCH] refactor: unify workflow/websocket logging and extract conversation state store --- core/conversation_state_store.py | 95 +++++++++++++++++++++ core/pydantic_ai_agent.py | 93 ++++---------------- core/websocket_client.py | 141 ++++++++++++++++--------------- core/workflow.py | 71 ++++++++-------- 4 files changed, 219 insertions(+), 181 deletions(-) create mode 100644 core/conversation_state_store.py diff --git a/core/conversation_state_store.py b/core/conversation_state_store.py new file mode 100644 index 0000000..78a4cf2 --- /dev/null +++ b/core/conversation_state_store.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from core.quote_state_machine import QuoteStateMachine + + +def refresh_quote_phase(state: Any, phase_hint: str = "") -> None: + """统一维护收图报价状态机。""" + QuoteStateMachine().refresh(state, phase_hint=phase_hint) + + +def sync_pending_quote_state(agent: Any, customer_id: str, state: Any) -> None: + """把待报价队列同步到客户库,避免重启丢失。""" + try: + refresh_quote_phase(state) + from db.customer_db import db + + db.update_pending_quote_state( + customer_id, + state.pending_image_urls, + state.pending_requirements, + ) + except Exception: + pass + + +def restore_pending_quote_state(customer_id: str, state: Any) -> None: + """从客户库恢复待报价队列。""" + try: + from db.customer_db import db + + profile = db.get_customer(customer_id) + state.pending_image_urls = list(getattr(profile, "pending_quote_images", []) or []) + state.pending_requirements = list(getattr(profile, "pending_quote_requirements", []) or []) + state.image_count = len(state.pending_image_urls) + refresh_quote_phase(state) + except Exception: + pass + + +def cleanup_inactive(conversations: dict, message_histories: dict, now: datetime) -> None: + """清理超过 7 天没有消息的对话状态,释放内存。""" + if len(conversations) % 100 != 0: + return + expired = [ + cid + for cid, state in conversations.items() + if state.last_update and (now - datetime.fromisoformat(state.last_update)).days > 7 + ] + for cid in expired: + conversations.pop(cid, None) + message_histories.pop(cid, None) + + +def get_conversation_state(agent: Any, customer_id: str) -> Any: + """获取或创建对话状态,超时自动重置。""" + now = datetime.now() + + if customer_id in agent.conversations: + state = agent.conversations[customer_id] + if state.last_update: + try: + last = datetime.fromisoformat(state.last_update) + hours = (now - last).total_seconds() / 3600 + if hours > agent.CONVERSATION_TIMEOUT_HOURS: + state.stage = "售前" + state.discount_count = 0 + agent.message_histories.pop(customer_id, None) + except Exception: + pass + if not state.pending_image_urls and not state.pending_requirements: + restore_pending_quote_state(customer_id, state) + else: + agent.conversations[customer_id] = agent.ConversationStateClass( + customer_id=customer_id, + last_update=now.isoformat(), + ) + restore_pending_quote_state(customer_id, agent.conversations[customer_id]) + + cleanup_inactive(agent.conversations, agent.message_histories, now) + return agent.conversations[customer_id] + + +def should_defer_batch_quote(agent: Any, state: Any, mark_ready: bool = False) -> bool: + """批量报价延后控制。""" + agent.quote_state_machine.delay_turns = max(0, int(agent.batch_quote_delay_turns)) + return agent.quote_state_machine.should_defer_batch_quote(state, mark_ready=mark_ready) + + +def mark_quote_ready(agent: Any, state: Any) -> None: + """仅标记 ready 状态,不消费等待轮次。""" + agent.quote_state_machine.delay_turns = max(0, int(agent.batch_quote_delay_turns)) + agent.quote_state_machine.mark_ready(state) diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index cef58b4..9d10ea2 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -71,6 +71,15 @@ from core.batch_quote_helpers import ( from core.prompt_builder import build_prompt as build_agent_prompt, split_customer_text from core.image_workflow_router import handle_image_workflow as route_image_workflow from core.message_orchestrator import process_incoming_message +from core.conversation_state_store import ( + get_conversation_state as load_conversation_state, + mark_quote_ready as state_mark_quote_ready, + refresh_quote_phase as state_refresh_quote_phase, + should_defer_batch_quote as state_should_defer_batch_quote, + sync_pending_quote_state as state_sync_pending_quote_state, + restore_pending_quote_state as state_restore_pending_quote_state, + cleanup_inactive as state_cleanup_inactive, +) load_dotenv() @@ -285,6 +294,7 @@ class CustomerServiceAgent: # 对话状态管理 self.conversations: Dict[str, ConversationState] = {} + self.ConversationStateClass = ConversationState # 多轮对话历史(PydanticAI ModelMessage 列表,按客户ID存储) self.message_histories: Dict[str, list] = {} self.evolution_candidate = self._load_evolution_candidate() @@ -584,95 +594,26 @@ class CustomerServiceAgent: CONVERSATION_TIMEOUT_HOURS = 12 def _get_conversation_state(self, customer_id: str) -> ConversationState: - """获取或创建对话状态,超时自动重置""" - now = datetime.now() - - if customer_id in self.conversations: - state = self.conversations[customer_id] - # 超过 12 小时没有消息,重置阶段和压价次数 - if state.last_update: - try: - last = datetime.fromisoformat(state.last_update) - hours = (now - last).total_seconds() / 3600 - if hours > self.CONVERSATION_TIMEOUT_HOURS: - state.stage = "售前" - state.discount_count = 0 - # 同时清理对话历史,避免发送过期上下文 - self.message_histories.pop(customer_id, None) - except Exception: - pass - # 进程内状态为空时,尝试从持久化恢复 - if not state.pending_image_urls and not state.pending_requirements: - self._restore_pending_quote_state(customer_id, state) - else: - self.conversations[customer_id] = ConversationState( - customer_id=customer_id, - last_update=now.isoformat() - ) - self._restore_pending_quote_state(customer_id, self.conversations[customer_id]) - - # 定期清理长期不活跃客户(超过 7 天) - self._cleanup_inactive(now) - - return self.conversations[customer_id] + return load_conversation_state(self, customer_id) def _cleanup_inactive(self, now: datetime): - """清理超过 7 天没有消息的对话状态,释放内存""" - # 每 100 次调用清理一次,避免每次都遍历 - if len(self.conversations) % 100 != 0: - return - expired = [ - cid for cid, state in self.conversations.items() - if state.last_update and - (now - datetime.fromisoformat(state.last_update)).days > 7 - ] - for cid in expired: - self.conversations.pop(cid, None) - self.message_histories.pop(cid, None) + state_cleanup_inactive(self.conversations, self.message_histories, now) def _sync_pending_quote_state(self, customer_id: str, state: ConversationState): - """把待报价队列同步到客户库,避免重启丢失。""" - try: - self._refresh_quote_phase(state) - from db.customer_db import db - db.update_pending_quote_state( - customer_id, - state.pending_image_urls, - state.pending_requirements, - ) - except Exception: - pass + state_sync_pending_quote_state(self, customer_id, state) def _restore_pending_quote_state(self, customer_id: str, state: ConversationState): - """从客户库恢复待报价队列。""" - try: - from db.customer_db import db - profile = db.get_customer(customer_id) - state.pending_image_urls = list(getattr(profile, "pending_quote_images", []) or []) - state.pending_requirements = list(getattr(profile, "pending_quote_requirements", []) or []) - state.image_count = len(state.pending_image_urls) - self._refresh_quote_phase(state) - except Exception: - pass + state_restore_pending_quote_state(customer_id, state) @staticmethod def _refresh_quote_phase(state: ConversationState, phase_hint: str = ""): - """统一维护收图报价状态机。""" - QuoteStateMachine().refresh(state, phase_hint=phase_hint) + state_refresh_quote_phase(state, phase_hint=phase_hint) def _should_defer_batch_quote(self, state: ConversationState, mark_ready: bool = False) -> bool: - """ - 批量报价延后控制: - - 首次进入 ready_to_quote 时按配置等待 N 轮 - - 等待轮次归零后,本轮即可报价 - """ - self.quote_state_machine.delay_turns = max(0, int(self.batch_quote_delay_turns)) - return self.quote_state_machine.should_defer_batch_quote(state, mark_ready=mark_ready) + return state_should_defer_batch_quote(self, state, mark_ready=mark_ready) def _mark_quote_ready(self, state: ConversationState): - """仅标记 ready 状态,不消费等待轮次。""" - self.quote_state_machine.delay_turns = max(0, int(self.batch_quote_delay_turns)) - self.quote_state_machine.mark_ready(state) + state_mark_quote_ready(self, state) def _build_reject_message(self, reason: str = "") -> str: templates = [ diff --git a/core/websocket_client.py b/core/websocket_client.py index 723e373..a822228 100755 --- a/core/websocket_client.py +++ b/core/websocket_client.py @@ -67,9 +67,9 @@ except Exception as e: workflow = None _get_shop_type = lambda acc_id, goods_name: "find_image" import traceback - print(f"警告: Agent 模块导入失败: {e}") + logger.info(f"警告: Agent 模块导入失败: {e}") traceback.print_exc() - print("将使用基础回复功能") + logger.info("将使用基础回复功能") class QingjianAPIClient: @@ -123,9 +123,9 @@ class QingjianAPIClient: if self.enable_agent: try: self.agent = CustomerServiceAgent() - print(f"[{self.get_time()}] Agent 初始化成功") + logger.info(f"[{self.get_time()}] Agent 初始化成功") except Exception as e: - print(f"[{self.get_time()}] Agent 初始化失败: {e}") + logger.info(f"[{self.get_time()}] Agent 初始化失败: {e}") self.enable_agent = False # 注册 workflow 消息发送回调(供图片AI完成后推送消息用) @@ -149,15 +149,15 @@ class QingjianAPIClient: """连接WebSocket服务器""" while self.running: try: - print(f"[{self.get_time()}] 正在连接轻简API {self.uri}...") + logger.info(f"[{self.get_time()}] 正在连接轻简API {self.uri}...") async with websockets.connect(self.uri) as websocket: self.websocket = websocket from utils.health_check import set_qingjian_connected set_qingjian_connected(True) - print(f"[{self.get_time()}] 连接成功!") + logger.info(f"[{self.get_time()}] 连接成功!") if self.enable_agent: - print(f"[{self.get_time()}] AI Agent 已启用,将自动处理消息") - print(f"[{self.get_time()}] 等待接收消息...") + logger.info(f"[{self.get_time()}] AI Agent 已启用,将自动处理消息") + logger.info(f"[{self.get_time()}] 等待接收消息...") # 持续接收消息 await self.receive_messages() @@ -165,19 +165,19 @@ class QingjianAPIClient: except ConnectionRefusedError: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) - print(f"[{self.get_time()}] 连接被拒绝,请检查轻简软件是否已启动") + logger.info(f"[{self.get_time()}] 连接被拒绝,请检查轻简软件是否已启动") except websockets.exceptions.InvalidURI: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) - print(f"[{self.get_time()}] URI格式错误") + logger.info(f"[{self.get_time()}] URI格式错误") except Exception as e: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) - print(f"[{self.get_time()}] 连接错误: {e}") + logger.info(f"[{self.get_time()}] 连接错误: {e}") # 等待5秒后重连 if self.running: - print(f"[{self.get_time()}] 5秒后尝试重连...") + logger.info(f"[{self.get_time()}] 5秒后尝试重连...") await asyncio.sleep(5) def _customer_key(self, data: dict) -> str: @@ -295,11 +295,11 @@ class QingjianAPIClient: except websockets.exceptions.ConnectionClosed: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) - print(f"[{self.get_time()}] 连接已关闭") + logger.info(f"[{self.get_time()}] 连接已关闭") except Exception as e: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) - print(f"[{self.get_time()}] 接收消息错误: {e}") + logger.info(f"[{self.get_time()}] 接收消息错误: {e}") async def handle_message(self, message): """处理接收到的消息""" @@ -317,25 +317,25 @@ class QingjianAPIClient: self.last_msg = data # 打印格式化的消息 - print(f"\n{'='*50}") - print(f"[{timestamp}] 收到新消息:") - print(f"{'='*50}") - print(f" 消息ID: {data.get('msg_id', 'N/A')}") - print(f" 账号ID: {self.to_chinese(data.get('acc_id', 'N/A'))}") - print(f" 发送者ID: {self.to_chinese(data.get('from_id', 'N/A'))}") - print(f" 发送者名称: {self.to_chinese(data.get('from_name', 'N/A'))}") - print(f" 会话ID: {self.to_chinese(data.get('cy_id', 'N/A'))}") - print(f" 平台类型: {data.get('acc_type', 'N/A')}") - print(f" 消息类型: {self.get_msg_type_name(data.get('msg_type', 0))}") - print(f" 消息内容: {self.to_chinese(data.get('msg', 'N/A'))}") + logger.info(f"\n{'='*50}") + logger.info(f"[{timestamp}] 收到新消息:") + logger.info(f"{'='*50}") + logger.info(f" 消息ID: {data.get('msg_id', 'N/A')}") + logger.info(f" 账号ID: {self.to_chinese(data.get('acc_id', 'N/A'))}") + logger.info(f" 发送者ID: {self.to_chinese(data.get('from_id', 'N/A'))}") + logger.info(f" 发送者名称: {self.to_chinese(data.get('from_name', 'N/A'))}") + logger.info(f" 会话ID: {self.to_chinese(data.get('cy_id', 'N/A'))}") + logger.info(f" 平台类型: {data.get('acc_type', 'N/A')}") + logger.info(f" 消息类型: {self.get_msg_type_name(data.get('msg_type', 0))}") + logger.info(f" 消息内容: {self.to_chinese(data.get('msg', 'N/A'))}") # 显示商品信息(如果有) if data.get('goods_name'): - print(f" 商品名称: {self.to_chinese(data.get('goods_name', ''))}") + logger.info(f" 商品名称: {self.to_chinese(data.get('goods_name', ''))}") if data.get('goods_order'): - print(f" 订单信息: {self.to_chinese(data.get('goods_order', ''))}") + logger.info(f" 订单信息: {self.to_chinese(data.get('goods_order', ''))}") - print(f"{'='*50}\n") + logger.info(f"{'='*50}\n") # 消息去重:同一条消息不重复处理 msg_id = data.get('msg_id', '') @@ -350,14 +350,14 @@ class QingjianAPIClient: acc_id = data.get('acc_id', '') msg_body = data.get('msg', '') if not from_id or from_id == 'N/A' or not acc_id or acc_id == 'N/A': - print(f"[{self.get_time()}] 空消息跳过(from_id={from_id!r} acc_id={acc_id!r})") + logger.info(f"[{self.get_time()}] 空消息跳过(from_id={from_id!r} acc_id={acc_id!r})") return self._log_inbound_once(data) # Gemini 店铺:不回复,直接跳过 goods_name = self.to_chinese(data.get('goods_name', '') or '') if _get_shop_type(acc_id, goods_name) == "gemini_api": - print(f"[{self.get_time()}] Gemini 店铺消息,跳过") + logger.info(f"[{self.get_time()}] Gemini 店铺消息,跳过") try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( @@ -378,7 +378,7 @@ class QingjianAPIClient: if msg_type == 0: if self._is_transfer_msg(data): # 会话转交 → 主动打招呼 - print(f"[{self.get_time()}] 收到转交消息,发送问候") + logger.info(f"[{self.get_time()}] 收到转交消息,发送问候") greeting = self._pick_transfer_greeting() await self.send_reply(data, greeting) try: @@ -397,9 +397,9 @@ class QingjianAPIClient: # 进店卡片:有历史对话就不回复,没有才打招呼(Gemini 已在上面统一跳过) cid = data.get('from_id', '') if self._has_chat_history(cid): - print(f"[{self.get_time()}] 进店卡片(已有记录),跳过") + logger.info(f"[{self.get_time()}] 进店卡片(已有记录),跳过") else: - print(f"[{self.get_time()}] 进店卡片(新客户),发送问候") + logger.info(f"[{self.get_time()}] 进店卡片(新客户),发送问候") greeting = "在呢,发图来我看看" await self.send_reply(data, greeting) try: @@ -415,9 +415,9 @@ class QingjianAPIClient: except Exception: pass elif await self._handle_system_inquiry(data): - print(f"[{self.get_time()}] 系统客服询单消息,已按规则处理") + logger.info(f"[{self.get_time()}] 系统客服询单消息,已按规则处理") elif self._should_ignore(data): - print(f"[{self.get_time()}] 系统通知,跳过回复") + logger.info(f"[{self.get_time()}] 系统通知,跳过回复") else: await self._debounce_agent_reply(data) elif msg_type == 1: @@ -425,7 +425,7 @@ class QingjianAPIClient: await self.handle_image_message(data) except json.JSONDecodeError: - print(f"[{timestamp}] 收到非JSON消息: {message}") + logger.info(f"[{timestamp}] 收到非JSON消息: {message}") async def _debounce_agent_reply(self, data: dict): """ @@ -476,7 +476,7 @@ class QingjianAPIClient: merged_msg = msgs[0] else: merged_msg = "、".join(m for m in msgs if m.strip()) - print(f"[{self.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}") + logger.info(f"[{self.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}") self._activity_log( "debounce_flush", key=capture_key, @@ -1514,7 +1514,7 @@ class QingjianAPIClient: 设计师在线状态:仅在转人工时按需查询,不轮询。 """ if not self.websocket: - print(f"[{self.get_time()}] 错误: 未连接到服务器") + logger.info(f"[{self.get_time()}] 错误: 未连接到服务器") return acc_id = data.get("acc_id", "") @@ -1569,14 +1569,14 @@ class QingjianAPIClient: if assigned_to: cmd = f"正在为你转接人工|[转移会话],{assigned_to},无原因" await self.send_reply(data, cmd) - print(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 设计师:{assigned_to})") + logger.info(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 设计师:{assigned_to})") return if not group_id: group_id = _get_transfer_group(acc_id) cmd = f"话术|[转移会话],分组{group_id},无原因" await self.send_reply(data, cmd) - print(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 分组:{group_id})") + logger.info(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 分组:{group_id})") async def _save_conversation_summary(self, customer_id: str, buyer_msg: str, agent_reply: str): """用 AI 生成一句话对话摘要并持久化""" @@ -1678,7 +1678,7 @@ class QingjianAPIClient: """ trace_id = original_msg.get("_trace_id", "") if not self.websocket: - print(f"[{self.get_time()}] 错误: 未连接到服务器") + logger.info(f"[{self.get_time()}] 错误: 未连接到服务器") self._activity_log( "send_reply_skipped", trace_id=trace_id, @@ -1839,7 +1839,7 @@ class QingjianAPIClient: msg_json = json.dumps(message, ensure_ascii=False) await self.websocket.send(msg_json) pretty = json.dumps(message, ensure_ascii=False, indent=2) - print(f"[{self.get_time()}] 发送成功:\n{pretty}") + logger.info(f"[{self.get_time()}] 发送成功:\n{pretty}") self._activity_log( "send_message_success", trace_id=message.get("_trace_id", ""), @@ -1849,7 +1849,7 @@ class QingjianAPIClient: msg=message.get("msg", ""), ) except Exception as e: - print(f"[{self.get_time()}] 发送失败: {e}") + logger.info(f"[{self.get_time()}] 发送失败: {e}") self._activity_log( "send_message_error", trace_id=message.get("_trace_id", ""), @@ -1858,7 +1858,7 @@ class QingjianAPIClient: error=str(e), ) else: - print(f"[{self.get_time()}] 错误: 连接未打开") + logger.info(f"[{self.get_time()}] 错误: 连接未打开") self._activity_log( "send_message_skipped", trace_id=message.get("_trace_id", ""), @@ -1873,13 +1873,13 @@ class QingjianAPIClient: async def command_handler(self): """命令行交互""" - print("\n命令帮助:") - print(" reply <内容> - 回复最后一条消息") - print(" text <平台> <内容> - 发送文本消息") - print(" img <平台> <路径> - 发送图片") - print(" setid - 设置回复ID") - print(" agent on/off - 开启/关闭 Agent") - print(" exit/quit - 退出\n") + logger.info("\n命令帮助:") + logger.info(" reply <内容> - 回复最后一条消息") + logger.info(" text <平台> <内容> - 发送文本消息") + logger.info(" img <平台> <路径> - 发送图片") + logger.info(" setid - 设置回复ID") + logger.info(" agent on/off - 开启/关闭 Agent") + logger.info(" exit/quit - 退出\n") while self.running: try: @@ -1893,7 +1893,7 @@ class QingjianAPIClient: cmd = parts[0].lower() if cmd in ["exit", "quit", "q"]: - print(f"[{self.get_time()}] 正在关闭...") + logger.info(f"[{self.get_time()}] 正在关闭...") self.running = False if self.websocket: await self.websocket.close() @@ -1901,21 +1901,21 @@ class QingjianAPIClient: elif cmd == "setid" and len(parts) > 1: self.reply_id = parts[1] - print(f"[{self.get_time()}] 回复ID已设置为: {self.reply_id}") + logger.info(f"[{self.get_time()}] 回复ID已设置为: {self.reply_id}") elif cmd == "agent" and len(parts) > 1: if parts[1].lower() == "on": self.enable_agent = True - print(f"[{self.get_time()}] Agent 已开启") + logger.info(f"[{self.get_time()}] Agent 已开启") elif parts[1].lower() == "off": self.enable_agent = False - print(f"[{self.get_time()}] Agent 已关闭") + logger.info(f"[{self.get_time()}] Agent 已关闭") elif cmd == "reply" and len(parts) > 1: if self.last_msg: await self.send_reply(self.last_msg, parts[1]) else: - print(f"[{self.get_time()}] 错误: 还没有收到任何消息") + logger.info(f"[{self.get_time()}] 错误: 还没有收到任何消息") elif cmd == "text" and len(parts) > 1: # text cy_id acc_type content @@ -1923,7 +1923,7 @@ class QingjianAPIClient: if len(args) >= 3: await self.send_text(args[0], args[1], args[2]) else: - print(f"[{self.get_time()}] 格式: text <内容>") + logger.info(f"[{self.get_time()}] 格式: text <内容>") elif cmd == "img" and len(parts) > 1: # img cy_id acc_type image_path @@ -1931,13 +1931,13 @@ class QingjianAPIClient: if len(args) >= 3: await self.send_image(args[0], args[1], args[2]) else: - print(f"[{self.get_time()}] 格式: img <图片路径>") + logger.info(f"[{self.get_time()}] 格式: img <图片路径>") else: - print(f"[{self.get_time()}] 未知命令: {cmd}") + logger.info(f"[{self.get_time()}] 未知命令: {cmd}") except Exception as e: - print(f"[{self.get_time()}] 命令错误: {e}") + logger.info(f"[{self.get_time()}] 命令错误: {e}") def get_time(self): """获取当前时间字符串""" @@ -1951,20 +1951,20 @@ class QingjianAPIClient: try: from mail.email_receiver import email_receiver if email_receiver.username: - print(f"[{self.get_time()}] 邮件接收已启动,监控: {email_receiver.username}") + logger.info(f"[{self.get_time()}] 邮件接收已启动,监控: {email_receiver.username}") tasks.append(email_receiver.start()) else: - print(f"[{self.get_time()}] 未配置邮件账号,跳过邮件接收") + logger.info(f"[{self.get_time()}] 未配置邮件账号,跳过邮件接收") except Exception as e: - print(f"[{self.get_time()}] 邮件接收模块加载失败: {e}") + logger.info(f"[{self.get_time()}] 邮件接收模块加载失败: {e}") # 启动每日汇总定时任务 try: from utils.daily_summary import scheduler as daily_scheduler tasks.append(daily_scheduler()) - print(f"[{self.get_time()}] 每日日报定时任务已启动") + logger.info(f"[{self.get_time()}] 每日日报定时任务已启动") except Exception as e: - print(f"[{self.get_time()}] 日报模块加载失败: {e}") + logger.info(f"[{self.get_time()}] 日报模块加载失败: {e}") # 设计师在线状态:转人工时按需查询,不再轮询 @@ -1974,17 +1974,17 @@ class QingjianAPIClient: def _qingjian_ok(): return self.websocket is not None and not getattr(self.websocket, "closed", True) tasks.append(health_check_loop(_qingjian_ok)) - print(f"[{self.get_time()}] 健康检查已启动") + logger.info(f"[{self.get_time()}] 健康检查已启动") except Exception as e: - print(f"[{self.get_time()}] 健康检查模块加载失败: {e}") + logger.info(f"[{self.get_time()}] 健康检查模块加载失败: {e}") # 每天早上8点发送启动消息到企微群 try: from utils.wechat_chat_log import morning_startup_scheduler tasks.append(morning_startup_scheduler()) - print(f"[{self.get_time()}] 早8点企微启动消息已启动") + logger.info(f"[{self.get_time()}] 早8点企微启动消息已启动") except Exception as e: - print(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}") + logger.info(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}") await asyncio.gather(*tasks) @@ -1999,7 +1999,7 @@ if __name__ == "__main__": try: asyncio.run(client.run()) except KeyboardInterrupt: - print("\n已停止") + logger.info("\n已停止") async def _load_task_modules(self): @@ -2091,3 +2091,4 @@ async def check_and_trigger_tasks_v2(self, data: dict): except Exception as e: logger.error(f"检查任务触发失败:{e}") + diff --git a/core/workflow.py b/core/workflow.py index fa325d9..495700a 100755 --- a/core/workflow.py +++ b/core/workflow.py @@ -6,7 +6,7 @@ - 图片AI接入点:调用 workflow.image_ai_submit_result(task_id, result_url) - 消息回调接口:通过 register_send_callback 注入发送函数 """ -import asyncio +import asyncio`r`nimport logging import os import uuid from enum import Enum @@ -14,7 +14,7 @@ from typing import Optional, Dict, Callable, Awaitable, Any, List from datetime import datetime from dataclasses import dataclass, field -_WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "") +_WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "")`r`nlogger = logging.getLogger("cs_agent") async def _wechat_notify(content: str): @@ -30,11 +30,11 @@ async def _wechat_notify(content: str): }) data = resp.json() if data.get("errcode") == 0: - print(f"[Workflow通知] 企业微信推送成功 ✓") + logger.info(f"[Workflow通知] 企业微信推送成功 ✓") else: - print(f"[Workflow通知] 企业微信推送失败: {data}") + logger.info(f"[Workflow通知] 企业微信推送失败: {data}") except Exception as e: - print(f"[Workflow通知] 推送异常: {e}") + logger.info(f"[Workflow通知] 推送异常: {e}") from db.customer_db import db @@ -142,7 +142,7 @@ class CustomerServiceWorkflow: if requirements: db.add_requirement(customer_id, requirements) - print(f"[Workflow] 创建任务 {task_id} | 客户: {customer_name} | 操作: {operation}") + logger.info(f"[Workflow] 创建任务 {task_id} | 客户: {customer_name} | 操作: {operation}") return task_id def get_task(self, task_id: str) -> Optional[ImageTask]: @@ -216,7 +216,7 @@ class CustomerServiceWorkflow: requirements=requirements, ) - print(f"[Workflow] 图片识别完成 | 客户:{customer_id} | 复杂度:{complexity} | 建议报价:{price_hint}") + logger.info(f"[Workflow] 图片识别完成 | 客户:{customer_id} | 复杂度:{complexity} | 建议报价:{price_hint}") # 通知客服AI报价(把识别结果注入消息,让AI根据结果报价) if self._send_message: @@ -266,10 +266,10 @@ class CustomerServiceWorkflow: if not task: # 内存任务丢失(重启场景)→ 从客户档案重建 - print(f"[Workflow] 付款触发:内存无任务,尝试从客户档案重建 | 客户: {customer_id}") + logger.info(f"[Workflow] 付款触发:内存无任务,尝试从客户档案重建 | 客户: {customer_id}") task = await self._rebuild_task_from_profile(customer_id, acc_id, acc_type) if not task: - print(f"[Workflow] 付款触发:客户 {customer_id} 无图片记录,无法重建任务,跳过") + logger.info(f"[Workflow] 付款触发:客户 {customer_id} 无图片记录,无法重建任务,跳过") await _wechat_notify( f"⚠️ **付款但无图片**\n" f"客户:{customer_id}\n" @@ -279,11 +279,11 @@ class CustomerServiceWorkflow: return False if task.status not in (TaskStatus.PENDING,): - print(f"[Workflow] 付款触发:任务 {task.task_id[:8]}... 状态={task.status.value},跳过") + logger.info(f"[Workflow] 付款触发:任务 {task.task_id[:8]}... 状态={task.status.value},跳过") return False task.operation = task.operation or "enhance" - print(f"[Workflow] 付款确认,启动 Gemini 处理 | 客户: {customer_id} | 任务: {task.task_id[:8]}...") + logger.info(f"[Workflow] 付款确认,启动 Gemini 处理 | 客户: {customer_id} | 任务: {task.task_id[:8]}...") asyncio.create_task(self._auto_process(task.task_id, acc_id=acc_id, acc_type=acc_type)) return True @@ -325,10 +325,10 @@ class CustomerServiceWorkflow: ) self.tasks[task_id] = task self.customer_active_task[customer_id] = task_id - print(f"[Workflow] 任务已重建 | 客户: {customer_id} | 图片: {image_url[:60]}...") + logger.info(f"[Workflow] 任务已重建 | 客户: {customer_id} | 图片: {image_url[:60]}...") return task except Exception as e: - print(f"[Workflow] 任务重建失败: {e}") + logger.info(f"[Workflow] 任务重建失败: {e}") return None @staticmethod @@ -367,7 +367,7 @@ class CustomerServiceWorkflow: if revision_note: gemini_prompt = (gemini_prompt or "") + f"\n【客户修改要求】{revision_note}" - print(f"[Workflow] Gemini 开始处理 | 任务: {task_id[:8]}... | 比例: {aspect_ratio} | 透视: {perspective} | 图片: {task.original_image}") + logger.info(f"[Workflow] Gemini 开始处理 | 任务: {task_id[:8]}... | 比例: {aspect_ratio} | 透视: {perspective} | 图片: {task.original_image}") try: from image.image_processor import image_processor from utils.image_queue import run_with_queue @@ -387,7 +387,7 @@ class CustomerServiceWorkflow: qa_score = result.get("qa_score", 0) qa_pass = result.get("qa_pass", True) qa_issue = result.get("qa_issue", "") - print(f"[Workflow] Gemini 处理完成 | 任务: {task_id[:8]}... | 质检: {qa_score}分 | 尝试: {attempts}次") + logger.info(f"[Workflow] Gemini 处理完成 | 任务: {task_id[:8]}... | 质检: {qa_score}分 | 尝试: {attempts}次") # 质检未通过(已达重试上限,保留结果但人工跟进) if not qa_pass: @@ -408,7 +408,7 @@ class CustomerServiceWorkflow: ) else: err_msg = result['message'] - print(f"[Workflow] Gemini 处理失败: {err_msg}") + logger.info(f"[Workflow] Gemini 处理失败: {err_msg}") task.update_status(TaskStatus.FAILED) # 企业微信预警 await _wechat_notify( @@ -428,7 +428,7 @@ class CustomerServiceWorkflow: msg_type=0, ) except Exception as e: - print(f"[Workflow] 自动处理异常: {e}") + logger.info(f"[Workflow] 自动处理异常: {e}") task.update_status(TaskStatus.FAILED) await _wechat_notify( f"⚠️ **Workflow处理异常**\n" @@ -459,13 +459,13 @@ class CustomerServiceWorkflow: """ task = self.tasks.get(task_id) if not task: - print(f"[Workflow] 任务不存在: {task_id}") + logger.info(f"[Workflow] 任务不存在: {task_id}") return False task.result_url = result_url task.update_status(TaskStatus.AWAITING_CONFIRM) - print(f"[Workflow] 任务 {task_id} 处理完成,发送给客户确认") + logger.info(f"[Workflow] 任务 {task_id} 处理完成,发送给客户确认") # 先发结果图片 if self._send_message: @@ -573,7 +573,7 @@ class CustomerServiceWorkflow: ) return result.get("success", False) except Exception as e: - print(f"[Workflow] 邮件发送失败: {e}") + logger.info(f"[Workflow] 邮件发送失败: {e}") await _wechat_notify( f"⚠️ **邮件发送失败**\n" f"客户:{task.customer_id}\n" @@ -621,17 +621,17 @@ class CustomerServiceWorkflow: # 尝试从数据库加载 db_task = self.db.get_task(task_id) if db_task: - print(f"[Workflow] 从数据库加载任务:{task_id[:8]}...") + logger.info(f"[Workflow] 从数据库加载任务:{task_id[:8]}...") # 可以在这里重建内存任务 else: - print(f"[Workflow] 任务不存在:{task_id}") + logger.info(f"[Workflow] 任务不存在:{task_id}") return False # 添加到数据库 success = self.db.add_customer_note(task_id, requirement, changed_by) if success: - print(f"[Workflow] 客户添加需求:{task_id[:8]}... | {requirement}") + logger.info(f"[Workflow] 客户添加需求:{task_id[:8]}... | {requirement}") # 如果任务还在待处理状态,通知 AI 客服 if task and task.status.value == 'pending': @@ -664,12 +664,12 @@ class CustomerServiceWorkflow: if not task: db_task = self.db.get_task(task_id) if not db_task: - print(f"[Workflow] 任务不存在:{task_id}") + logger.info(f"[Workflow] 任务不存在:{task_id}") return False # 检查状态,已处理完成的不允许修改 if task and task.status.value in ['completed', 'processing']: - print(f"[Workflow] 任务已开始处理,不允许修改操作:{task_id}") + logger.info(f"[Workflow] 任务已开始处理,不允许修改操作:{task_id}") if self._send_message: await self._send_message( customer_id=customer_id, @@ -685,7 +685,7 @@ class CustomerServiceWorkflow: if success and task: task.operation = new_operation - print(f"[Workflow] 修改操作类型:{task_id[:8]}... -> {new_operation}") + logger.info(f"[Workflow] 修改操作类型:{task_id[:8]}... -> {new_operation}") if self._send_message: await self._send_message( @@ -720,7 +720,7 @@ class CustomerServiceWorkflow: bool: 是否成功 """ try: - print(f"[Workflow] 启动查找图片工作流 | 客户:{customer_id}") + logger.info(f"[Workflow] 启动查找图片工作流 | 客户:{customer_id}") # 1. 创建任务 task_id = self.create_image_task( @@ -754,7 +754,7 @@ class CustomerServiceWorkflow: msg_type=0, ) - print(f"[Workflow] 查找图片完成 | 客户:{customer_id} | URL: {tuhui_url}") + logger.info(f"[Workflow] 查找图片完成 | 客户:{customer_id} | URL: {tuhui_url}") return True except Exception as e: @@ -777,7 +777,7 @@ class CustomerServiceWorkflow: bool: 是否成功 """ try: - print(f"[Workflow] 启动处理图片工作流 | 客户:{customer_id}") + logger.info(f"[Workflow] 启动处理图片工作流 | 客户:{customer_id}") # 1. 创建任务 task_id = self.create_image_task( @@ -803,7 +803,7 @@ class CustomerServiceWorkflow: # 3. 启动处理 await self.trigger_processing_on_payment(customer_id, acc_id, acc_type) - print(f"[Workflow] 处理图片已启动 | 客户:{customer_id}") + logger.info(f"[Workflow] 处理图片已启动 | 客户:{customer_id}") return True except Exception as e: @@ -828,7 +828,7 @@ class CustomerServiceWorkflow: bool: 是否成功 """ try: - print(f"[Workflow] 启动转人工派单工作流 | 客户:{customer_id} | 原因:{reason}") + logger.info(f"[Workflow] 启动转人工派单工作流 | 客户:{customer_id} | 原因:{reason}") # 1. 创建任务 task_id = self.create_image_task( @@ -864,7 +864,7 @@ class CustomerServiceWorkflow: f"请安排设计师上线" ) - print(f"[Workflow] 无人在线 | 客户:{customer_id}") + logger.info(f"[Workflow] 无人在线 | 客户:{customer_id}") return False # 3. 派单给在线设计师 @@ -885,7 +885,7 @@ class CustomerServiceWorkflow: msg_type=0, ) - print(f"[Workflow] 已派单给设计师:{designer} | 客户:{customer_id}") + logger.info(f"[Workflow] 已派单给设计师:{designer} | 客户:{customer_id}") return True except Exception as e: @@ -901,7 +901,7 @@ class CustomerServiceWorkflow: """ try: designers = await self.dispatch_client.get_online_designers() - print(f"[Workflow] 查询在线设计师:{len(designers)}人在线 | {designers}") + logger.info(f"[Workflow] 查询在线设计师:{len(designers)}人在线 | {designers}") return designers except Exception as e: @@ -945,7 +945,7 @@ class CustomerServiceWorkflow: ) if success: - print(f"[Workflow] 派单成功:{dispatch_task_id} → {designer_name} | 客户:{customer_id}") + logger.info(f"[Workflow] 派单成功:{dispatch_task_id} → {designer_name} | 客户:{customer_id}") # 企业微信通知 await _wechat_notify( @@ -969,3 +969,4 @@ class CustomerServiceWorkflow: # ========== 全局实例 ========== workflow = CustomerServiceWorkflow() +