import asyncio import json import hashlib from collections import deque from datetime import datetime from typing import Optional, Dict, Any, List from utils.observability import emit_activity from core.websocket_agent_reply_flow import handle_agent_reply_flow from core.websocket_quote_flow import handle_single_image_quote, handle_multi_image_quote from core.websocket_debounce_flow import ( debounce_agent_reply, pick_debounce_seconds, guess_intent_for_debounce, looks_like_requirement_text, rand_between, msg_has_image_url, msg_refers_images, extract_image_urls, collect_recent_image_urls, ) from core.websocket_auto_quote_flow import ( cancel_auto_quote_task, build_auto_quote_signature, schedule_auto_quote, ) from core.websocket_system_inquiry_flow import ( load_system_inquiry_rules, normalize_kw_list, resolve_system_inquiry_policy, match_system_inquiry, handle_system_inquiry, ) from core.websocket_transfer_flow import transfer_to_human_flow from core.websocket_outbound_arbiter_flow import ( normalize_reply_semantic_key, classify_outbound_reply, template_family, outbound_arbiter, ) from core.websocket_followup_flow import ( unreplied_followup_loop, scan_and_send_unreplied_followups, compose_ai_scene_reply, ) from core.websocket_outbound_flow import ( send_reply_flow, ai_generate_outbound_reply, ai_guard_outbound_reply, colloquialize_outbound_reply, ) from core.websocket_runtime_flow import command_handler_flow, run_client_flow from core.websocket_workflow_flow import workflow_agent_notify_flow, workflow_send_flow from core.websocket_connection_flow import connect_flow, receive_messages_flow, handle_message_flow from core.websocket_send_flow import send_text_flow, send_image_flow, send_message_flow from core.websocket_callback_flow import post_tianwang_callback_flow from core.websocket_customer_profile_flow import extract_and_save_customer_info_flow from core.websocket_message_utils_flow import ( is_transfer_msg, pick_transfer_greeting, is_shop_card, extract_customer_text_from_shop_card_msg, has_chat_history, should_ignore, get_msg_type_name, to_chinese_text, ) from core.websocket_dispatch_flow import dispatch_assign_once_flow from core.websocket_image_entry_flow import handle_image_message_flow from core.websocket_misc_rules_flow import ( msg_is_price_inquiry, detect_order_status, msg_requests_external_contact, extract_size_pairs_m, oversize_reply_if_needed, ) from core.websocket_summary_flow import save_conversation_summary_flow from core.websocket_helpers_flow import ( fire_and_forget, prune_seen, log_inbound_once, log_outbound_once, build_customer_message, touch_customer_last_contact, push_chat_to_wechat_safe, ) from core.websocket_logger_setup import setup_logger # ========== 转接分组映射 ========== def _get_transfer_group(acc_id: str) -> str: """根据店铺 acc_id 获取转接分组 ID。不同店铺对应不同客服分组。""" from config.config import CONFIG_DIR config_path = CONFIG_DIR / "transfer_groups.json" default_group = "20252916034" try: if config_path.exists(): with open(config_path, "r", encoding="utf-8") as f: cfg = json.load(f) return cfg.get(acc_id, cfg.get("default", default_group)) except Exception: logger.debug("读取转接分组配置失败,使用默认分组", exc_info=True) return default_group import os logger = setup_logger() from db.chat_log_db import log_message as _chat_log from utils.metrics_tracker import emit as metrics_emit # 导入 Agent 模块 try: from core.pydantic_ai_agent import CustomerServiceAgent, CustomerMessage, AgentDeps, _get_shop_type from db.customer_db import db from core.workflow import workflow AGENT_AVAILABLE = True except Exception as e: AGENT_AVAILABLE = False workflow = None AgentDeps = None _get_shop_type = lambda acc_id, goods_name: "find_image" import traceback logger.info(f"警告: Agent 模块导入失败: {e}") traceback.print_exc() logger.info("将使用基础回复功能") class QingjianAPIClient: """轻简API WebSocket客户端""" def __init__(self, uri=None, enable_agent: bool = True): from config.config import QINGJIAN_WS_URI from config.config import IMAGE_MODULE_ENABLED from config.config import MESSAGE_DEBOUNCE_SECONDS self.uri = uri or QINGJIAN_WS_URI self.websocket = None self.running = True self.reply_id = "tb001" # 回复时使用的from_id self.last_msg = None # 保存最后一条消息 self.enable_agent = enable_agent and AGENT_AVAILABLE self.logger = logger self.AgentDeps = AgentDeps self.agent = None self._replied_msg_ids: deque = deque(maxlen=200) # 已回复消息ID,FIFO去重 # 消息防抖:同一客户连续发消息时,等待 N 秒后合并处理 self._DEBOUNCE_SECONDS = MESSAGE_DEBOUNCE_SECONDS if isinstance(MESSAGE_DEBOUNCE_SECONDS, int) else 8 self._adaptive_debounce_enabled = os.getenv("ADAPTIVE_DEBOUNCE_ENABLED", "true").lower() in ("1", "true", "yes") self._debounce_tasks: dict = {} # customer_key -> asyncio.Task self._pending_msgs: dict = {} # customer_key -> list[data] self._image_enabled = IMAGE_MODULE_ENABLED # 同客户消息串行:保证「发图→这个高清」等顺序,避免误判 self._customer_locks: dict = {} # customer_key -> asyncio.Lock # agent_reply 并发上限,防止 API 打满 self._agent_semaphore = asyncio.Semaphore(8) self._pending_images: dict = {} self._pending_image_tasks: dict = {} self._auto_quote_tasks: dict = {} # customer_key -> asyncio.Task self._auto_quote_done_sig: dict = {} # customer_key -> signature(同一批内容仅自动触发一次) # 旧版“看图即报价”快速链路(默认关闭,避免与 Agent 批量收集逻辑并发打架) self._legacy_fast_quote_enabled = os.getenv("LEGACY_FAST_IMAGE_QUOTE", "false").lower() in ("1", "true", "yes") self._system_inquiry_rules = self._load_system_inquiry_rules() self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts self._outbound_semantic_seen: dict = {} # customer_key -> {semantic_key: ts} self._outbound_class_seen: dict = {} # customer_key -> {reply_class: ts} self._outbound_template_seen: dict = {} # customer_key -> {template_family: ts} self._unreplied_followup_sent: dict = {} # customer_key -> monotonic ts(补偿消息节流) self._inbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) self._outbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) self._tianwang_callback_url = ( os.getenv("TIANWANG_CALLBACK_URL", "").strip() or "http://139.199.3.75:18789/api/callback" ) self._tianwang_agent_name = os.getenv("TIANWANG_AGENT_NAME", "终结者").strip() or "终结者" self._reply_guard_enabled = os.getenv("AI_REPLY_GUARD_ENABLED", "true").lower() in ("1", "true", "yes") self._reply_guard_verbose = os.getenv("AI_REPLY_GUARD_VERBOSE", "false").lower() in ("1", "true", "yes") self._force_ai_generate_reply = os.getenv("FORCE_AI_GENERATE_ALL_REPLIES", "true").lower() in ("1", "true", "yes") # 延迟加载任务模块(避免循环导入) self.task_scheduler = None self.task_manager = None self.trigger_engine = None # 多进程分片支持 self.shard_keys: set = set() # 本进程负责的客户 key 集合 self.worker_id = int(os.getenv('AI_CS_WORKER_ID', '0')) self.worker_count = max(1, int(os.getenv('AI_CS_WORKER_COUNT', '1'))) # 初始化 Agent if self.enable_agent: try: self.agent = CustomerServiceAgent() logger.info(f"[{self.get_time()}] Agent 初始化成功") except Exception as e: logger.info(f"[{self.get_time()}] Agent 初始化失败: {e}") self.enable_agent = False # 注册 workflow 消息发送回调(供图片AI完成后推送消息用) if workflow: workflow.register_send_callback(self._workflow_send) workflow.register_agent_notify_callback(self._workflow_agent_notify) def _activity_log(self, event: str, **kwargs): """统一活动日志,便于按 event 检索完整链路。""" emit_activity( logger, event=event, trace_id=str(kwargs.pop("trace_id", "")), customer_id=str(kwargs.pop("customer_id", "")), result=str(kwargs.pop("result", "ok")), **kwargs, ) async def _post_tianwang_callback(self, event: str, data: dict, extra: Optional[Dict[str, Any]] = None): await post_tianwang_callback_flow(self, event, data, extra=extra) async def connect(self): await connect_flow(self) def _customer_key(self, data: dict) -> str: """同一店铺+客户 = 同一会话""" return f"{data.get('acc_id','')}:{data.get('from_id','')}" def _get_customer_lock(self, key: str) -> asyncio.Lock: if key not in self._customer_locks: self._customer_locks[key] = asyncio.Lock() return self._customer_locks[key] def _is_owned_by_this_worker(self, customer_key: str) -> bool: """ 多进程兜底路由: - 若显式分片存在,用显式分片; - 否则按 customer_key 哈希到固定 worker,避免多进程重复处理同一消息。 """ if self.shard_keys: return customer_key in self.shard_keys if self.worker_count <= 1: return True try: h = int(hashlib.md5(customer_key.encode("utf-8")).hexdigest()[:8], 16) return (h % self.worker_count) == self.worker_id except Exception: return self.worker_id == 0 async def _agent_reply_serialized(self, data: dict): """同客户串行 + 全局并发限制,再执行 agent_reply""" key = self._customer_key(data) async with self._get_customer_lock(key): async with self._agent_semaphore: await self.agent_reply(data) def _fire_and_forget(self, coro): fire_and_forget(self, coro) @staticmethod def _prune_seen(seen: dict, now_mono: float, ttl_sec: float = 8.0): prune_seen(seen, now_mono, ttl_sec=ttl_sec) def _log_inbound_once(self, data: dict): log_inbound_once(self, data, _chat_log) def _log_outbound_once(self, original_msg: dict, reply_content: str): log_outbound_once(self, original_msg, reply_content, _chat_log) def _build_customer_message(self, data: dict) -> CustomerMessage: return build_customer_message(self, data, CustomerMessage) def _touch_customer_last_contact(self, customer_id: str): touch_customer_last_contact(self, customer_id, db) def _push_chat_to_wechat_safe( self, *, data: dict, customer_msg: str, reply_msg: str, tag: str, goods_name: str = "", ) -> None: push_chat_to_wechat_safe( self, data=data, customer_msg=customer_msg, reply_msg=reply_msg, tag=tag, goods_name=goods_name, ) @staticmethod def _normalize_reply_semantic_key(text: str) -> str: return normalize_reply_semantic_key(text) @staticmethod def _classify_outbound_reply(text: str) -> str: return classify_outbound_reply(text) @staticmethod def _template_family(reply: str) -> str: return template_family(reply) def _outbound_arbiter(self, original_msg: dict, reply_content: str, trace_id: str) -> tuple[bool, str]: return outbound_arbiter(self, original_msg, reply_content, trace_id) async def _unreplied_followup_loop(self): await unreplied_followup_loop(self) async def _scan_and_send_unreplied_followups(self): await scan_and_send_unreplied_followups(self) async def _compose_ai_scene_reply( self, *, original_msg: dict, scene: str, intent_hint: str, fallback: str, ) -> str: return await compose_ai_scene_reply( self, original_msg=original_msg, scene=scene, intent_hint=intent_hint, fallback=fallback, ) async def receive_messages(self): await receive_messages_flow(self) async def handle_message(self, message): await handle_message_flow(self, message, shop_type_resolver=_get_shop_type) async def _debounce_agent_reply(self, data: dict): await debounce_agent_reply(self, data) @staticmethod def _rand_between(low: float, high: float) -> float: return rand_between(low, high) def _guess_intent_for_debounce(self, msg: str) -> str: return guess_intent_for_debounce(self, msg) @staticmethod def _looks_like_requirement_text(msg: str) -> bool: return looks_like_requirement_text(msg) def _pick_debounce_seconds(self, data: dict, msg: str) -> float: return pick_debounce_seconds(self, data, msg) def _msg_has_image_url(self, msg: str) -> bool: return msg_has_image_url(msg) def _msg_refers_images(self, msg: str) -> bool: return msg_refers_images(msg) def _extract_image_urls(self, msg: str) -> list: return extract_image_urls(msg) def _collect_recent_image_urls(self, customer_id: str, acc_id: str, max_count: int = 6) -> list: return collect_recent_image_urls(self, customer_id, acc_id, max_count=max_count) def _msg_is_requirement(self, msg: str) -> bool: if not msg: return False kws = ( "要", "抓到", "放到", "合成", "替换", "抠", "修", "高清", "尺寸", "横", "竖", "颜色", "去背景", "排版", "一样", "类似", "同款", "能不能做", "能做吗", "可以做吗", "做不做", "这个能做吗", "这个能不能做", ) return any(k in msg for k in kws) def _add_pending_images(self, key: str, urls: list, limit: int = 12): if not urls: return cur = self._pending_images.get(key) or [] for u in urls: if u not in cur: cur.append(u) if len(cur) >= limit: break self._pending_images[key] = cur async def _flush_pending_images(self, key: str, data: dict): urls = self._pending_images.get(key) or [] if not urls: return self._pending_images[key] = [] if len(urls) == 1: await self._analyze_single_and_reply(data, urls[0]) else: await self._analyze_multi_and_reply(data, urls) def _msg_is_price_inquiry(self, msg: str) -> bool: return msg_is_price_inquiry(msg) def _detect_order_status(self, msg: str) -> str: return detect_order_status(msg) async def _analyze_single_and_reply(self, data: dict, url: str): await handle_single_image_quote(self, data, url) async def agent_reply(self, data: dict): """使用 Agent 处理消息并回复""" await handle_agent_reply_flow( self, data, workflow=workflow, shop_type_resolver=_get_shop_type, ) def _cancel_auto_quote_task(self, key: str, reason: str = ""): cancel_auto_quote_task(self, key, reason=reason) @staticmethod def _build_auto_quote_signature(state: Any) -> str: return build_auto_quote_signature(state) async def _maybe_schedule_auto_quote(self, data: dict): await schedule_auto_quote(self, data, shop_type_resolver=_get_shop_type) async def _analyze_multi_and_reply(self, data: dict, urls: list): await handle_multi_image_quote(self, data, urls) def _msg_requests_external_contact(self, msg: str) -> bool: return msg_requests_external_contact(msg) @staticmethod def _extract_size_pairs_m(msg: str) -> list[tuple[float, float]]: return extract_size_pairs_m(msg) def _oversize_reply_if_needed(self, msg: str) -> str: return oversize_reply_if_needed(msg) def _is_transfer_msg(self, data: dict) -> bool: return is_transfer_msg(self, data) def _pick_transfer_greeting(self) -> str: return pick_transfer_greeting() def _is_shop_card(self, data: dict) -> bool: return is_shop_card(self, data) def _extract_customer_text_from_shop_card_msg(self, msg: str) -> str: return extract_customer_text_from_shop_card_msg(self, msg) def _has_chat_history(self, customer_id: str, acc_id: str = "") -> bool: return has_chat_history(customer_id, acc_id=acc_id) def _load_system_inquiry_rules(self) -> Dict[str, Any]: return load_system_inquiry_rules() @staticmethod def _normalize_kw_list(v: Any) -> List[str]: return normalize_kw_list(v) def _resolve_system_inquiry_policy(self, acc_id: str) -> Dict[str, Any]: return resolve_system_inquiry_policy(self, acc_id) def _match_system_inquiry(self, data: dict, policy: Dict[str, Any]) -> bool: return match_system_inquiry(self, data, policy) async def _handle_system_inquiry(self, data: dict) -> bool: return await handle_system_inquiry(self, data) def _should_ignore(self, data: dict) -> bool: return should_ignore(self, data) def get_msg_type_name(self, msg_type): return get_msg_type_name(msg_type) def _extract_and_save_customer_info(self, message: str, customer_id: str): extract_and_save_customer_info_flow(self, message, customer_id, db) def to_chinese(self, text): return to_chinese_text(text) async def handle_image_message(self, data: dict): await handle_image_message_flow(self, data) async def _dispatch_assign_once(self) -> Dict[str, Any]: return await dispatch_assign_once_flow(self) async def transfer_to_human(self, data: dict, transfer_msg: str = ""): await transfer_to_human_flow( self, data, transfer_msg=transfer_msg, transfer_group_resolver=_get_transfer_group, ) async def _save_conversation_summary(self, customer_id: str, buyer_msg: str, agent_reply: str): await save_conversation_summary_flow(self, customer_id, buyer_msg, agent_reply) async def _workflow_agent_notify( self, customer_id: str, acc_id: str, acc_type: str, system_hint: str, ): await workflow_agent_notify_flow(self, customer_id, acc_id, acc_type, system_hint) async def _workflow_send( self, customer_id: str, acc_id: str, acc_type: str, content: str, msg_type: int = 0 ): await workflow_send_flow(self, customer_id, acc_id, acc_type, content, msg_type=msg_type) async def send_reply(self, original_msg, reply_content): await send_reply_flow(self, original_msg, reply_content) async def _ai_generate_outbound_reply(self, original_msg: dict, reply_content: str) -> str: return await ai_generate_outbound_reply(self, original_msg, reply_content) def _colloquialize_outbound_reply(self, text: Any) -> Any: return colloquialize_outbound_reply(text) async def _ai_guard_outbound_reply(self, original_msg: dict, reply_content: str) -> tuple[bool, str, str]: return await ai_guard_outbound_reply(self, original_msg, reply_content) async def send_text(self, cy_id, acc_type, content): await send_text_flow(self, cy_id, acc_type, content) async def send_image(self, cy_id, acc_type, image_path): await send_image_flow(self, cy_id, acc_type, image_path) async def send_message(self, message): await send_message_flow(self, message) async def auto_reply(self, data): """自动回复示例(已弃用,使用 agent_reply 替代)""" pass async def command_handler(self): await command_handler_flow(self) def get_time(self): """获取当前时间字符串""" return datetime.now().strftime("%H:%M:%S") async def run(self): await run_client_flow(self) if __name__ == "__main__": import sys # 检查是否有 --no-agent 参数 enable_agent = "--no-agent" not in sys.argv client = QingjianAPIClient(enable_agent=enable_agent) try: asyncio.run(client.run()) except KeyboardInterrupt: logger.info("\n已停止")