import asyncio import websockets import json import re import logging import random import secrets import time import hashlib from collections import deque from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any, List # ========== 转接分组映射 ========== def _get_transfer_group(acc_id: str) -> str: """根据店铺 acc_id 获取转接分组 ID。不同店铺对应不同客服分组。""" from config.config import CONFIG_DIR config_path = CONFIG_DIR / "transfer_groups.json" default_group = "20252916034" try: if config_path.exists(): with open(config_path, "r", encoding="utf-8") as f: cfg = json.load(f) return cfg.get(acc_id, cfg.get("default", default_group)) except Exception: pass return default_group # ========== 日志配置(轮转:按大小 10MB,保留 7 份)========== def setup_logger(): from logging.handlers import RotatingFileHandler from config.config import LOG_DIR, LOG_MAX_BYTES, LOG_BACKUP_COUNT logger = logging.getLogger("cs_agent") logger.setLevel(logging.INFO) fmt = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S") ch = logging.StreamHandler() ch.setFormatter(fmt) logger.addHandler(ch) LOG_DIR.mkdir(exist_ok=True) today = datetime.now().strftime("%Y-%m-%d") fh = RotatingFileHandler( LOG_DIR / f"chat_{today}.log", maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT, encoding="utf-8", ) fh.setFormatter(fmt) logger.addHandler(fh) return logger import os logger = setup_logger() from db.chat_log_db import log_message as _chat_log from utils.metrics_tracker import emit as metrics_emit # 导入 Agent 模块 try: from core.pydantic_ai_agent import CustomerServiceAgent, CustomerMessage, _get_shop_type from db.customer_db import db from core.workflow import workflow AGENT_AVAILABLE = True except Exception as e: AGENT_AVAILABLE = False workflow = None _get_shop_type = lambda acc_id, goods_name: "find_image" import traceback print(f"警告: Agent 模块导入失败: {e}") traceback.print_exc() print("将使用基础回复功能") class QingjianAPIClient: """轻简API WebSocket客户端""" def __init__(self, uri=None, enable_agent: bool = True): from config.config import QINGJIAN_WS_URI from config.config import IMAGE_MODULE_ENABLED from config.config import MESSAGE_DEBOUNCE_SECONDS self.uri = uri or QINGJIAN_WS_URI self.websocket = None self.running = True self.reply_id = "tb001" # 回复时使用的from_id self.last_msg = None # 保存最后一条消息 self.enable_agent = enable_agent and AGENT_AVAILABLE self.agent = None self._replied_msg_ids: deque = deque(maxlen=200) # 已回复消息ID,FIFO去重 # 消息防抖:同一客户连续发消息时,等待 N 秒后合并处理 self._DEBOUNCE_SECONDS = MESSAGE_DEBOUNCE_SECONDS if isinstance(MESSAGE_DEBOUNCE_SECONDS, int) else 8 self._adaptive_debounce_enabled = os.getenv("ADAPTIVE_DEBOUNCE_ENABLED", "true").lower() in ("1", "true", "yes") self._debounce_tasks: dict = {} # customer_key -> asyncio.Task self._pending_msgs: dict = {} # customer_key -> list[data] self._image_enabled = IMAGE_MODULE_ENABLED # 同客户消息串行:保证「发图→这个高清」等顺序,避免误判 self._customer_locks: dict = {} # customer_key -> asyncio.Lock # agent_reply 并发上限,防止 API 打满 self._agent_semaphore = asyncio.Semaphore(8) self._pending_images: dict = {} self._pending_image_tasks: dict = {} # 旧版“看图即报价”快速链路(默认关闭,避免与 Agent 批量收集逻辑并发打架) self._legacy_fast_quote_enabled = os.getenv("LEGACY_FAST_IMAGE_QUOTE", "false").lower() in ("1", "true", "yes") self._system_inquiry_rules = self._load_system_inquiry_rules() self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts self._inbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) self._outbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入) # 延迟加载任务模块(避免循环导入) self.task_scheduler = None self.task_manager = None self.trigger_engine = None # 多进程分片支持 self.shard_keys: set = set() # 本进程负责的客户 key 集合 self.worker_id = int(os.getenv('AI_CS_WORKER_ID', '0')) self.worker_count = max(1, int(os.getenv('AI_CS_WORKER_COUNT', '1'))) # 初始化 Agent if self.enable_agent: try: self.agent = CustomerServiceAgent() print(f"[{self.get_time()}] Agent 初始化成功") except Exception as e: print(f"[{self.get_time()}] Agent 初始化失败: {e}") self.enable_agent = False # 注册 workflow 消息发送回调(供图片AI完成后推送消息用) if workflow: workflow.register_send_callback(self._workflow_send) workflow.register_agent_notify_callback(self._workflow_agent_notify) async def connect(self): """连接WebSocket服务器""" while self.running: try: print(f"[{self.get_time()}] 正在连接轻简API {self.uri}...") async with websockets.connect(self.uri) as websocket: self.websocket = websocket from utils.health_check import set_qingjian_connected set_qingjian_connected(True) print(f"[{self.get_time()}] 连接成功!") if self.enable_agent: print(f"[{self.get_time()}] AI Agent 已启用,将自动处理消息") print(f"[{self.get_time()}] 等待接收消息...") # 持续接收消息 await self.receive_messages() except ConnectionRefusedError: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) print(f"[{self.get_time()}] 连接被拒绝,请检查轻简软件是否已启动") except websockets.exceptions.InvalidURI: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) print(f"[{self.get_time()}] URI格式错误") except Exception as e: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) print(f"[{self.get_time()}] 连接错误: {e}") # 等待5秒后重连 if self.running: print(f"[{self.get_time()}] 5秒后尝试重连...") await asyncio.sleep(5) def _customer_key(self, data: dict) -> str: """同一店铺+客户 = 同一会话""" return f"{data.get('acc_id','')}:{data.get('from_id','')}" def _get_customer_lock(self, key: str) -> asyncio.Lock: if key not in self._customer_locks: self._customer_locks[key] = asyncio.Lock() return self._customer_locks[key] def _is_owned_by_this_worker(self, customer_key: str) -> bool: """ 多进程兜底路由: - 若显式分片存在,用显式分片; - 否则按 customer_key 哈希到固定 worker,避免多进程重复处理同一消息。 """ if self.shard_keys: return customer_key in self.shard_keys if self.worker_count <= 1: return True try: h = int(hashlib.md5(customer_key.encode("utf-8")).hexdigest()[:8], 16) return (h % self.worker_count) == self.worker_id except Exception: return self.worker_id == 0 async def _agent_reply_serialized(self, data: dict): """同客户串行 + 全局并发限制,再执行 agent_reply""" key = self._customer_key(data) async with self._get_customer_lock(key): async with self._agent_semaphore: await self.agent_reply(data) def _fire_and_forget(self, coro): """后台执行协程,不阻塞接收循环;异常会记录到日志""" task = asyncio.create_task(coro) def _done(t): if t.cancelled(): return exc = t.exception() if exc: logger.exception(f"后台任务异常: {exc}") task.add_done_callback(_done) @staticmethod def _prune_seen(seen: dict, now_mono: float, ttl_sec: float = 8.0): if len(seen) <= 2000: return stale = [k for k, t in seen.items() if (now_mono - t) > ttl_sec] for k in stale: seen.pop(k, None) def _log_inbound_once(self, data: dict): """统一记录入站消息,短窗口去重,避免多分支重复写库。""" try: cid = data.get("from_id", "") if not cid: return msg = self.to_chinese(data.get("msg", "") or "") acc_id = data.get("acc_id", "") mtype = int(data.get("msg_type", 0) or 0) now_mono = time.monotonic() sig = f"{acc_id}|{cid}|{mtype}|{msg}" last = self._inbound_log_seen.get(sig, 0.0) if (now_mono - last) < 2.0: return self._inbound_log_seen[sig] = now_mono self._prune_seen(self._inbound_log_seen, now_mono, ttl_sec=8.0) _chat_log( cid, msg, "in", customer_name=self.to_chinese(data.get("from_name", "") or data.get("cy_name", "")), acc_id=acc_id, platform=data.get("acc_type", ""), msg_type=mtype, ) except Exception: pass def _log_outbound_once(self, original_msg: dict, reply_content: str): """统一记录出站消息,短窗口去重,避免重复写库。""" try: cid = original_msg.get("from_id", "") if not cid or not reply_content: return acc_id = original_msg.get("acc_id", "") now_mono = time.monotonic() sig = f"{acc_id}|{cid}|{reply_content}" last = self._outbound_log_seen.get(sig, 0.0) if (now_mono - last) < 2.0: return self._outbound_log_seen[sig] = now_mono self._prune_seen(self._outbound_log_seen, now_mono, ttl_sec=8.0) _chat_log( cid, reply_content, "out", customer_name=self.to_chinese(original_msg.get("from_name", "") or original_msg.get("cy_name", "")), acc_id=acc_id, platform=original_msg.get("acc_type", ""), ) except Exception: pass async def receive_messages(self): """持续接收消息""" try: async for message in self.websocket: await self.handle_message(message) except websockets.exceptions.ConnectionClosed: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) print(f"[{self.get_time()}] 连接已关闭") except Exception as e: from utils.health_check import set_qingjian_connected set_qingjian_connected(False) print(f"[{self.get_time()}] 接收消息错误: {e}") async def handle_message(self, message): """处理接收到的消息""" try: data = json.loads(message) # 多进程分片检查:确保同一客户只由一个 worker 处理 customer_key = self._customer_key(data) if not self._is_owned_by_this_worker(customer_key): return timestamp = self.get_time() # 保存最后一条消息用于回复 self.last_msg = data # 打印格式化的消息 print(f"\n{'='*50}") print(f"[{timestamp}] 收到新消息:") print(f"{'='*50}") print(f" 消息ID: {data.get('msg_id', 'N/A')}") print(f" 账号ID: {self.to_chinese(data.get('acc_id', 'N/A'))}") print(f" 发送者ID: {self.to_chinese(data.get('from_id', 'N/A'))}") print(f" 发送者名称: {self.to_chinese(data.get('from_name', 'N/A'))}") print(f" 会话ID: {self.to_chinese(data.get('cy_id', 'N/A'))}") print(f" 平台类型: {data.get('acc_type', 'N/A')}") print(f" 消息类型: {self.get_msg_type_name(data.get('msg_type', 0))}") print(f" 消息内容: {self.to_chinese(data.get('msg', 'N/A'))}") # 显示商品信息(如果有) if data.get('goods_name'): print(f" 商品名称: {self.to_chinese(data.get('goods_name', ''))}") if data.get('goods_order'): print(f" 订单信息: {self.to_chinese(data.get('goods_order', ''))}") print(f"{'='*50}\n") # 消息去重:同一条消息不重复处理 msg_id = data.get('msg_id', '') if msg_id and msg_id in self._replied_msg_ids: logger.info(f"重复消息,跳过: {msg_id}") return if msg_id: self._replied_msg_ids.append(msg_id) # deque 自动淘汰最旧的 # 空消息/无效消息过滤(N/A 或关键字段全为空) from_id = data.get('from_id', '') acc_id = data.get('acc_id', '') msg_body = data.get('msg', '') if not from_id or from_id == 'N/A' or not acc_id or acc_id == 'N/A': print(f"[{self.get_time()}] 空消息跳过(from_id={from_id!r} acc_id={acc_id!r})") return self._log_inbound_once(data) # Gemini 店铺:不回复,直接跳过 goods_name = self.to_chinese(data.get('goods_name', '') or '') if _get_shop_type(acc_id, goods_name) == "gemini_api": print(f"[{self.get_time()}] Gemini 店铺消息,跳过") try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')), customer_id=data.get('from_id', ''), acc_id=data.get('acc_id', ''), customer_msg=self.to_chinese(data.get('msg', '')), reply_msg="", goods_name=goods_name, )) except Exception: pass return # 使用 Agent 自动回复(仅处理文本消息) if self.enable_agent: msg_type = data.get('msg_type', 0) if msg_type == 0: if self._is_transfer_msg(data): # 会话转交 → 主动打招呼 print(f"[{self.get_time()}] 收到转交消息,发送问候") greeting = self._pick_transfer_greeting() await self.send_reply(data, greeting) try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')), customer_id=data.get('from_id', ''), acc_id=data.get('acc_id', ''), customer_msg=self.to_chinese(data.get('msg', '')), reply_msg=greeting, goods_name=self.to_chinese(data.get('goods_name', '') or ''), )) except Exception: pass elif self._is_shop_card(data): # 进店卡片:有历史对话就不回复,没有才打招呼(Gemini 已在上面统一跳过) cid = data.get('from_id', '') if self._has_chat_history(cid): print(f"[{self.get_time()}] 进店卡片(已有记录),跳过") else: print(f"[{self.get_time()}] 进店卡片(新客户),发送问候") greeting = "在呢,发图来我看看" await self.send_reply(data, greeting) try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')), customer_id=data.get('from_id', ''), acc_id=data.get('acc_id', ''), customer_msg=self.to_chinese(data.get('msg', '')), reply_msg=greeting, goods_name=goods_name, )) except Exception: pass elif await self._handle_system_inquiry(data): print(f"[{self.get_time()}] 系统客服询单消息,已按规则处理") elif self._should_ignore(data): print(f"[{self.get_time()}] 系统通知,跳过回复") else: await self._debounce_agent_reply(data) elif msg_type == 1: # 图片消息直接处理,不走防抖(图片不会连续多发) await self.handle_image_message(data) except json.JSONDecodeError: print(f"[{timestamp}] 收到非JSON消息: {message}") async def _debounce_agent_reply(self, data: dict): """ 消息防抖:同一客户在 _DEBOUNCE_SECONDS 内的连续消息合并后再处理。 订单通知、图片URL、付款相关消息不走防抖,立即处理。 """ msg_body = data.get('msg', '') # 以下情况跳过防抖,立即处理(后台执行,不阻塞接收循环) immediate_keywords = ["买家已付款", "已付款", "[系统订单信息]"] if any(kw in msg_body for kw in immediate_keywords) or self._msg_has_image_url(msg_body): self._fire_and_forget(self._agent_reply_serialized(data)) return key = f"{data.get('acc_id','')}:{data.get('from_id','')}" # 积攒消息 if key not in self._pending_msgs: self._pending_msgs[key] = [] self._pending_msgs[key].append(msg_body) # 取消上一个等待任务(如果有) old_task = self._debounce_tasks.get(key) if old_task and not old_task.done(): old_task.cancel() debounce_seconds = self._pick_debounce_seconds(data, msg_body) # 创建新的延迟处理任务 async def _delayed(capture_key, capture_data, wait_s: float): await asyncio.sleep(wait_s) msgs = self._pending_msgs.pop(capture_key, []) if not msgs: return if len(msgs) == 1: merged_msg = msgs[0] else: merged_msg = "、".join(m for m in msgs if m.strip()) print(f"[{self.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}") merged_data = dict(capture_data) merged_data['msg'] = merged_msg await self._agent_reply_serialized(merged_data) task = asyncio.create_task(_delayed(key, data, debounce_seconds)) self._debounce_tasks[key] = task @staticmethod def _rand_between(low: float, high: float) -> float: if high <= low: return float(low) # 使用 secrets 增强随机性,避免固定周期导致机械感 span = high - low return round(low + span * (secrets.randbelow(1000) / 1000.0), 2) def _guess_intent_for_debounce(self, msg: str) -> str: text = (msg or "").strip() if not text: return "unknown" if self._msg_has_image_url(text): return "image" try: from utils.intent_analyzer import detect_intent_keywords intent = detect_intent_keywords(text) except Exception: intent = "" if intent: return intent lower = text.lower() if any(k in lower for k in ["报价", "多少钱", "价格", "贵", "优惠"]): return "询价" if any(k in lower for k in ["做一下", "改一下", "需求", "门头", "上面的字", "处理"]): return "修改" if any(k in lower for k in ["在吗", "你好", "有人"]): return "打招呼" return "unknown" @staticmethod def _looks_like_requirement_text(msg: str) -> bool: text = (msg or "").strip().lower() if not text: return False req_kw = ( "做一下", "改一下", "处理一下", "这个字", "上面的字", "门头", "去背景", "抠图", "换色", "调色", "清晰", "高清", "尺寸", "比例", "横版", "竖版", "排版", "改字", "按这个做", "照这个做", "就这张", "看看做", "弄一下", ) return any(k in text for k in req_kw) def _pick_debounce_seconds(self, data: dict, msg: str) -> float: """意图驱动防抖:不同意图不同等待区间,并引入轻微随机。""" base = max(1.0, float(self._DEBOUNCE_SECONDS)) if not self._adaptive_debounce_enabled: return base intent = self._guess_intent_for_debounce(msg) is_req = self._looks_like_requirement_text(msg) has_img = self._msg_has_image_url(msg) # 区间策略:越明确、越短消息,等待越短;需求描述类稍长 if intent == "打招呼": low, high = 1.0, min(3.0, base) elif intent in ("询价", "砍价"): low, high = 2.0, min(5.0, base) elif intent in ("修改", "批量"): low, high = max(3.0, base * 0.65), min(18.0, base + 2.0) elif intent == "转接": low, high = 1.0, 2.5 else: low, high = max(2.0, base * 0.5), base # 发图后的需求描述,优先“多等一点”收集完整需求,减少半句回复 if is_req and not has_img: low = max(low, min(10.0, base * 0.8)) high = max(high, min(20.0, base + 4.0)) # 短句更快,长句稍慢,避免把连续半句拆开 text_len = len((msg or "").strip()) if text_len <= 4: high = min(high, max(low + 0.2, 2.5)) elif text_len >= 18: low = min(high, low + 0.6) wait_s = self._rand_between(low, high) logger.info(f"防抖等待 {wait_s}s | intent={intent} | len={text_len}") return wait_s def _msg_has_image_url(self, msg: str) -> bool: """判断文本消息里是否包含图片URL(客户粘贴了图片链接,可能带前缀文字如 有吗#*#https://...)""" if not msg: return False lower = msg.lower() image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com") if "http://" in lower or "https://" in lower: if any(ext in lower for ext in image_exts) or any(h in lower for h in image_hosts): return True return False def _msg_refers_images(self, msg: str) -> bool: """判断文本是否指代之前的图片(图一/图二/这张/那张/上面那张等)""" if not msg: return False refs = ( "图一", "图二", "第一张", "第二张", "这张", "那张", "这图", "那个图", "这个", "这个呢", "上面那张", "下面那张", "刚才那张", "上一张", "下一张", ) return any(r in msg for r in refs) def _extract_image_urls(self, msg: str) -> list: if not msg: return [] parts = [p.strip() for p in msg.split("#*#") if p.strip()] urls = [] for p in parts: if p.startswith("http://") or p.startswith("https://"): urls.append(p) if not urls and ("http://" in msg or "https://" in msg): tokens = re.findall(r'(https?://\S+)', msg) for t in tokens: if any(ext in t.lower() for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]): urls.append(t) return urls[:8] def _collect_recent_image_urls(self, customer_id: str, acc_id: str, max_count: int = 6) -> list: """从最近对话中回溯收集图片URL(优先买家消息),用于慢发或引用图片的场景""" urls, seen = [], set() try: from db.chat_log_db import get_recent_conversation recent = get_recent_conversation(customer_id=customer_id, acc_id=acc_id, limit=20) # 从最近到更早遍历,收集买家(in)消息中的图片链接 for m in reversed(recent): if m.get("direction") != "in": continue ms = m.get("message") or "" us = self._extract_image_urls(ms) for u in us: if u not in seen: seen.add(u) urls.append(u) if len(urls) >= max_count: return urls except Exception: pass return urls def _msg_is_requirement(self, msg: str) -> bool: if not msg: return False kws = ( "要", "抓到", "放到", "合成", "替换", "抠", "修", "高清", "尺寸", "横", "竖", "颜色", "去背景", "排版", "一样", "类似", "同款", "能不能做", "能做吗", "可以做吗", "做不做", "这个能做吗", "这个能不能做", ) return any(k in msg for k in kws) def _add_pending_images(self, key: str, urls: list, limit: int = 12): if not urls: return cur = self._pending_images.get(key) or [] for u in urls: if u not in cur: cur.append(u) if len(cur) >= limit: break self._pending_images[key] = cur async def _flush_pending_images(self, key: str, data: dict): urls = self._pending_images.get(key) or [] if not urls: return self._pending_images[key] = [] if len(urls) == 1: await self._analyze_single_and_reply(data, urls[0]) else: await self._analyze_multi_and_reply(data, urls) def _msg_is_price_inquiry(self, msg: str) -> bool: """判断是否是价格询问""" if not msg: return False patterns = ("多少钱", "多少一张", "一张多少钱", "画图多少", "报价", "给个价", "几块", "多少钱") return any(p in msg for p in patterns) def _detect_order_status(self, msg: str) -> str: if not msg: return "" s = msg if "买家已付款" in s or "已付款" in s: return "paid" if "[系统订单信息]" in s: if "等待买家付款" in s or "未付款" in s: return "waiting" return "order" return "" async def _analyze_single_and_reply(self, data: dict, url: str): try: from image.image_analyzer import image_analyzer r = await image_analyzer.analyze(url) if isinstance(r, dict) and r.get("success", False): if r.get("feasibility") == "no" or r.get("risk") == "high": note = str(r.get("note", "") or "") if "文字内容过于密集" in note or "密集文字" in note: reply = "这类文字太密的图我们这边不接单,抱歉哈。你要是简化后再发我可以继续看。" else: reply = "这张处理风险比较高,我这边先不直接接,建议转人工评估更稳。" await self.send_reply(data, reply) return from config.config import MIN_PRICE_FLOOR p = r.get("price_suggest", 20) floor_dyn = r.get("price_min", MIN_PRICE_FLOOR) floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR) p = max(floor, round(p / 5) * 5) try: from db.customer_db import db as _db _db.update_last_min_price(data.get('from_id',''), floor) except Exception: pass reply = f"这张按{p}元,满意再拍" else: # 识别失败时不做兜底报价,避免把未识别图片误判为可做 reply = "这张我这边暂时识别不稳定,先不乱报价。你可以换一张更清晰的,我再给你准报价。" await self.send_reply(data, reply) except Exception: pass async def agent_reply(self, data: dict): """使用 Agent 处理消息并回复""" try: msg_text = self.to_chinese(data.get('msg', '')) _cid = data.get('from_id', '') _name = self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')) _plat = data.get('acc_type', '') _shop_type = _get_shop_type(data.get('acc_id', ''), self.to_chinese(data.get('goods_name', '') or '')) # 超大尺寸(米制)直接拒单,避免进入报价/处理流程 oversize_reply = self._oversize_reply_if_needed(msg_text) if oversize_reply: await self.send_reply(data, oversize_reply) return # 找图/修图店铺:统一走 Agent 的“收集需求后统一报价”流程,避免按单图快速报价 if self._legacy_fast_quote_enabled and _shop_type != "find_image": # 消息含图片URL:累积到待处理列表,先询问要求 if self._msg_has_image_url(msg_text): urls = self._extract_image_urls(msg_text) key = self._customer_key(data) self._add_pending_images(key, urls) await self.send_reply(data, "收到,我看看哈") old = self._pending_image_tasks.get(key) if old and not old.done(): old.cancel() async def _delay_flush(capture_key, capture_data): await asyncio.sleep(self._DEBOUNCE_SECONDS + 4) # 与同客户 agent_reply 串行,避免“延迟报价”和“当前追问”并发打架 async with self._get_customer_lock(capture_key): await self._flush_pending_images(capture_key, capture_data) task = asyncio.create_task(_delay_flush(key, data)) self._pending_image_tasks[key] = task return elif self._msg_refers_images(msg_text): urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6) if urls: key = self._customer_key(data) self._add_pending_images(key, urls) await self.send_reply(data, "稍等,我找找刚才那几张") await self._flush_pending_images(key, data) return else: status = self._detect_order_status(msg_text) if status == "paid": ack = "收到付款,我马上安排处理,有需要第一时间联系您" await self.send_reply(data, ack) return elif status in ("waiting", "order"): ack = "订单我看到了哈,方便的话请完成付款,我好安排处理" await self.send_reply(data, ack) return else: urls = self._extract_image_urls(msg_text) if len(urls) == 1: key = self._customer_key(data) self._add_pending_images(key, urls) await self.send_reply(data, "收到,我看看哈") return else: if self._msg_requests_external_contact(msg_text): reply = "这里沟通就可以哦,其他联系方式不方便" await self.send_reply(data, reply) try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( customer_name=_name, customer_id=_cid, acc_id=data.get('acc_id', ''), customer_msg=msg_text, reply_msg=reply, goods_name=self.to_chinese(data.get('goods_name', '') or ''), )) except Exception: pass return if self._msg_is_requirement(msg_text) or self._msg_is_price_inquiry(msg_text): key = self._customer_key(data) if self._pending_images.get(key): old = self._pending_image_tasks.get(key) if old and not old.done(): old.cancel() await self.send_reply(data, "稍等,我把刚才那几张一起看下") await self._flush_pending_images(key, data) return if self._msg_is_price_inquiry(msg_text): recent_urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6) if recent_urls: await self.send_reply(data, "稍等,我刚才那几张一起看下") if len(recent_urls) == 1: asyncio.create_task(self._analyze_single_and_reply(data, recent_urls[0])) else: asyncio.create_task(self._analyze_multi_and_reply(data, recent_urls)) return status = self._detect_order_status(msg_text) if status == "paid": ack = "收到付款,我马上安排处理,有需要第一时间联系您" await self.send_reply(data, ack) return elif status in ("waiting", "order"): ack = "订单我看到了哈,方便的话请完成付款,我好安排处理" await self.send_reply(data, ack) return # 构建 CustomerMessage customer_msg = CustomerMessage( msg_id=data.get('msg_id', ''), acc_id=data.get('acc_id', ''), msg=self.to_chinese(data.get('msg', '')), from_id=data.get('from_id', ''), from_name=self.to_chinese(data.get('from_name', '')), cy_id=data.get('cy_id', ''), acc_type=data.get('acc_type', ''), msg_type=data.get('msg_type', 0), cy_name=self.to_chinese(data.get('cy_name', '')), goods_name=self.to_chinese(data.get('goods_name', '')) if data.get('goods_name') else None, goods_order=self.to_chinese(data.get('goods_order', '')) if data.get('goods_order') else None ) # 先检查是否是 workflow 等待确认中的回复(如邮箱、确认/不满意) if workflow: workflow_reply = await workflow.handle_customer_reply( customer_id=data.get('from_id', ''), message=self.to_chinese(data.get('msg', '')), acc_id=data.get('acc_id', ''), acc_type=data.get('acc_type', 'AliWorkbench') ) if workflow_reply: logger.info(f"Workflow 回复: {workflow_reply}") await self.send_reply(data, workflow_reply) # 推送到企微:客户消息+回复成对 try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( customer_name=_name, customer_id=_cid, acc_id=data.get('acc_id', ''), customer_msg=msg_text, reply_msg=workflow_reply, goods_name=self.to_chinese(data.get('goods_name', '') or ''), )) except Exception: pass return logger.info("Agent 正在处理消息...") # 调用 Agent response = await self.agent.process_message(customer_msg) # 检查是否需要转接人工 if response.need_transfer: logger.info("Agent 决定转接人工") await self.transfer_to_human(data, response.transfer_msg) # 推送到企微:客户消息+转接回复成对 try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( customer_name=_name, customer_id=_cid, acc_id=data.get('acc_id', ''), customer_msg=msg_text, reply_msg=response.transfer_msg or "转接", goods_name=self.to_chinese(data.get('goods_name', '') or ''), )) except Exception: pass # 联系方式提取已由 Agent 的 update_contact_info 工具负责 # 此处仅做兜底:更新最后联系时间 customer_id = data.get('from_id', '') if customer_id: try: profile = db.get_customer(customer_id) profile.last_contact = datetime.now().isoformat() db.save_customer(profile) except Exception: pass # 保存对话摘要(异步,不阻塞回复) if response.should_reply and response.reply and customer_id: asyncio.create_task(self._save_conversation_summary( customer_id=customer_id, buyer_msg=self.to_chinese(data.get('msg', '')), agent_reply=response.reply, )) # 正常回复 if response.should_reply and response.reply: # 过滤 AI 误输出的"无需回复"类废话,避免发给客户 nonsense_patterns = [ "无需", "流程已完成", "不需要回复", "无需额外", "已完成", "无需回复", "不需要额外", "已经完成", "无需再", "操作已完成", "任务完成", "流程完成", "记录完成", "报价已", ] matched = [p for p in nonsense_patterns if p in response.reply] if matched: logger.warning(f"Agent 回复含无效内容,已拦截: {response.reply} ← 命中pattern: {matched}") else: # 模拟真人打字延迟,避免瞬间回复太机械 await asyncio.sleep(0.8) logger.info(f"Agent 回复: {response.reply}") await self.send_reply(data, response.reply) # 推送到企微:客户消息+AI回复成对 try: from utils.wechat_chat_log import push_chat_to_wechat asyncio.create_task(push_chat_to_wechat( customer_name=_name, customer_id=_cid, acc_id=data.get('acc_id', ''), customer_msg=msg_text, reply_msg=response.reply, goods_name=self.to_chinese(data.get('goods_name', '') or ''), )) except Exception: pass elif not response.need_transfer: logger.info("Agent 决定不回复此消息") except Exception as e: logger.error(f"Agent 处理失败: {e}") async def _analyze_multi_and_reply(self, data: dict, urls: list): try: from image.image_analyzer import image_analyzer def _detect_composite_request() -> bool: try: from db.chat_log_db import get_recent_conversation recent = get_recent_conversation( customer_id=data.get('from_id', ''), acc_id=data.get('acc_id', ''), limit=8 ) kw = ("抓到", "放到", "合成", "融合", "嵌到", "换到", "替换", "P到", "抠出来放到") for m in recent: msg = (m.get("message") or "") if any(k in msg for k in kw): return True except Exception: pass return False tasks = [image_analyzer.analyze(u) for u in urls] results = await asyncio.gather(*tasks, return_exceptions=True) # 先做风险分流:多图中只要出现不可做/高风险,不进入报价 unsafe = [] dense_text_reject = [] for i, r in enumerate(results, 1): if isinstance(r, dict) and r.get("success", False): if r.get("feasibility") == "no" or r.get("risk") == "high": unsafe.append(f"图{i}") note = str(r.get("note", "") or "") if "文字内容过于密集" in note or "密集文字" in note: dense_text_reject.append(f"图{i}") if unsafe: if dense_text_reject and len(dense_text_reject) == len(unsafe): reply = "这类文字太密的图我们这边不接单,抱歉哈。你要是简化后再发我可以继续看。" else: reply = f"这批里{'、'.join(unsafe)}处理风险较高,我这边先不直接接,建议转人工评估更稳。" await self.send_reply(data, reply) return pairs = [] for u, r in zip(urls, results): if isinstance(r, dict) and r.get("success", False): from config.config import MIN_PRICE_FLOOR floor_dyn = r.get("price_min", MIN_PRICE_FLOOR) floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR) ps = max(floor, round(r.get("price_suggest", 20) / 5) * 5) pairs.append((u, ps, r.get("category", ""), r.get("megapixels", 0.0))) try: if pairs: floors = [] for u, r in zip(urls, results): if isinstance(r, dict) and r.get("success", False): floor_dyn = r.get("price_min", MIN_PRICE_FLOOR) floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR) floors.append(floor) if floors: from db.customer_db import db as _db _db.update_last_min_price(data.get('from_id',''), min(floors)) except Exception: pass if not pairs: await self.send_reply(data, "这组图我这边暂时识别不稳定,先不乱报价。你可以换清晰图再发我。") return composite = _detect_composite_request() composite_fee = 5 if composite else 0 avg_raw = sum(p for _, p, _, _ in pairs) / len(pairs) from config.config import MIN_PRICE_FLOOR avg_price = max(MIN_PRICE_FLOOR, round((avg_raw + composite_fee) / 5) * 5) top_price = max(MIN_PRICE_FLOOR, max(pairs, key=lambda x: x[1])[1] + composite_fee) count = len(pairs) if composite: reply = f"这组{count}张我看了,按{avg_price}元一张;合成那张{top_price}元,满意再拍" else: reply = f"这组{count}张我看了,按{avg_price}元一张;复杂那张{top_price}元,满意再拍" await self.send_reply(data, reply) except Exception as e: logger.error(f"多图分析失败: {e}") try: await self.send_reply(data, "这组图我这边暂时识别异常,先不乱报价。你可以稍后再发我。") except Exception: pass def _msg_requests_external_contact(self, msg: str) -> bool: if not msg: return False lower = msg.lower() kws = ("加qq", "qq号", "vx", "微信", "加v", "联系方式", "私聊", "加一下", "加个", "手机号", "电话", "加群", "q q", "v 信") return any(k in lower for k in kws) @staticmethod def _extract_size_pairs_m(msg: str) -> list[tuple[float, float]]: """提取消息中的米制尺寸对,如 15*6.4米 / 15米*6.4 / 15x6.4m。""" if not msg: return [] s = (msg or "").lower().replace("×", "*").replace("x", "*") pairs = [] patterns = [ r'(\d+(?:\.\d+)?)\s*\*\s*(\d+(?:\.\d+)?)\s*(?:米|m)\b', r'(\d+(?:\.\d+)?)\s*(?:米|m)\s*\*\s*(\d+(?:\.\d+)?)\b', ] for p in patterns: for m in re.findall(p, s): try: a = float(m[0]) b = float(m[1]) if a > 0 and b > 0: pairs.append((a, b)) except Exception: continue return pairs def _oversize_reply_if_needed(self, msg: str) -> str: """ 检测超大尺寸需求并返回拒绝话术;未命中返回空字符串。 规则:最长边 > 阈值 或 面积 > 阈值。 """ try: from config.config import MAX_SERVICE_SIZE_LONGEST_METERS, MAX_SERVICE_SIZE_AREA_SQM longest_limit = float(MAX_SERVICE_SIZE_LONGEST_METERS) area_limit = float(MAX_SERVICE_SIZE_AREA_SQM) except Exception: longest_limit = 10.0 area_limit = 20.0 pairs = self._extract_size_pairs_m(msg) for w, h in pairs: longest = max(w, h) area = w * h if longest > longest_limit or area > area_limit: return ( f"{w:g}米*{h:g}米这个尺寸太大了,我们这边做不了。" "如果要做可以拆成几段小尺寸,我再给你按段评估。" ) return "" def _is_transfer_msg(self, data: dict) -> bool: """判断是否是会话转交消息(需要主动打招呼)""" msg = self.to_chinese(data.get('msg', '')) return '转交给' in msg or '转接给' in msg def _pick_transfer_greeting(self) -> str: """转接后问候话术:简短自然,随机避免机械感。""" choices = [ "在的亲,发图我看下", "在呢亲,有需求直接说", "我在的,您把要求发我", "在的哈,你说我这边看着处理", "在呢,图和需求发来我看看", ] return random.choice(choices) def _is_shop_card(self, data: dict) -> bool: """判断是否是进店卡片消息""" msg = self.to_chinese(data.get('msg', '')) return msg.startswith('[进店卡片]') or '我想咨询你们店的这个商品' in msg def _has_chat_history(self, customer_id: str) -> bool: """判断该客户是否已有聊天记录(内存历史或数据库均可)""" if not customer_id: return False # 先查内存对话历史(最快) if customer_id in self.agent.message_histories and self.agent.message_histories[customer_id]: return True # 再查数据库(重启后仍有记录) try: from db.chat_log_db import get_conversation msgs = get_conversation(customer_id, limit=1) return len(msgs) > 0 except Exception: return False def _load_system_inquiry_rules(self) -> Dict[str, Any]: """加载系统客服询单规则(全局 + 店铺覆盖)。""" from config.config import ( SYSTEM_INQUIRY_ENABLED, SYSTEM_INQUIRY_DEFAULT_ACTION, SYSTEM_INQUIRY_DEFAULT_REPLY, SYSTEM_INQUIRY_RULES_FILE, ) enabled_env = os.getenv("SYSTEM_INQUIRY_ENABLED") enabled = ( enabled_env.lower() in ("1", "true", "yes") if isinstance(enabled_env, str) else bool(SYSTEM_INQUIRY_ENABLED) ) action = (os.getenv("SYSTEM_INQUIRY_DEFAULT_ACTION") or SYSTEM_INQUIRY_DEFAULT_ACTION or "silent").strip().lower() reply = os.getenv("SYSTEM_INQUIRY_DEFAULT_REPLY") or SYSTEM_INQUIRY_DEFAULT_REPLY or "" rules_file = os.getenv("SYSTEM_INQUIRY_RULES_FILE") or str(SYSTEM_INQUIRY_RULES_FILE) defaults: Dict[str, Any] = { "enabled": bool(enabled), "default_action": action, "default_reply": reply, "sender_keywords": ["系统客服", "官方客服", "平台客服", "机器人客服", "商家客服系统"], "message_keywords": ["系统询单", "代客咨询", "平台代问", "系统代发", "客服询单"], "shops": {}, } try: p = Path(rules_file) if p.exists(): with p.open("r", encoding="utf-8") as f: loaded = json.load(f) if isinstance(loaded, dict): defaults.update(loaded) except Exception as e: logger.warning(f"系统询单规则加载失败,使用默认规则: {e}") return defaults @staticmethod def _normalize_kw_list(v: Any) -> List[str]: if not isinstance(v, list): return [] return [str(x).strip().lower() for x in v if str(x).strip()] def _resolve_system_inquiry_policy(self, acc_id: str) -> Dict[str, Any]: """根据店铺合并系统询单策略。""" from config.config import SYSTEM_INQUIRY_SHOPS rules = self._system_inquiry_rules or {} if not bool(rules.get("enabled", True)): return {"enabled": False} shops_env = os.getenv("SYSTEM_INQUIRY_SHOPS", SYSTEM_INQUIRY_SHOPS or "") shop_whitelist = [s.strip() for s in shops_env.split(",") if s.strip()] if shop_whitelist and (acc_id or "") not in shop_whitelist: return {"enabled": False} policy: Dict[str, Any] = { "enabled": True, "action": str(rules.get("default_action", "silent")).strip().lower(), "reply": str(rules.get("default_reply", "")).strip(), "sender_keywords": self._normalize_kw_list(rules.get("sender_keywords")), "message_keywords": self._normalize_kw_list(rules.get("message_keywords")), } shop_cfg = (rules.get("shops") or {}).get(acc_id or "", {}) if isinstance(shop_cfg, dict): if "enabled" in shop_cfg and not bool(shop_cfg.get("enabled", True)): return {"enabled": False} if shop_cfg.get("action"): policy["action"] = str(shop_cfg.get("action")).strip().lower() if shop_cfg.get("reply"): policy["reply"] = str(shop_cfg.get("reply")).strip() if isinstance(shop_cfg.get("sender_keywords"), list): policy["sender_keywords"] = self._normalize_kw_list(shop_cfg.get("sender_keywords")) if isinstance(shop_cfg.get("message_keywords"), list): policy["message_keywords"] = self._normalize_kw_list(shop_cfg.get("message_keywords")) if policy["action"] not in ("silent", "reply", "transfer"): policy["action"] = "silent" return policy def _match_system_inquiry(self, data: dict, policy: Dict[str, Any]) -> bool: """识别是否为系统客服询单消息。""" if not policy.get("enabled", False): return False from_name = self.to_chinese(data.get("from_name", "") or "").lower() from_id = str(data.get("from_id", "") or "").lower() msg = self.to_chinese(data.get("msg", "") or "").lower() sender_hits = 0 for kw in policy.get("sender_keywords", []): if kw and (kw in from_name or kw in from_id): sender_hits += 1 message_hits = 0 for kw in policy.get("message_keywords", []): if kw and kw in msg: message_hits += 1 # 优先看发送者特征;纯文本命中时至少要求两个关键词,降低误判风险 return sender_hits > 0 or message_hits >= 2 async def _handle_system_inquiry(self, data: dict) -> bool: """命中系统询单后按策略处理。""" acc_id = data.get("acc_id", "") policy = self._resolve_system_inquiry_policy(acc_id) if not self._match_system_inquiry(data, policy): return False customer_id = data.get("from_id", "") metrics_emit("system_inquiry_detected", customer_id=customer_id, acc_id=acc_id) action = policy.get("action", "silent") logger.info(f"系统询单命中 | 店铺:{acc_id} | 客户:{customer_id} | action:{action}") if action == "reply": reply = policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。" await self.send_reply(data, reply) metrics_emit("system_inquiry_auto_reply", customer_id=customer_id, acc_id=acc_id) return True if action == "transfer": await self.transfer_to_human(data, "系统询单转人工") metrics_emit("system_inquiry_transfer", customer_id=customer_id, acc_id=acc_id) return True metrics_emit("system_inquiry_ignored", customer_id=customer_id, acc_id=acc_id) return True def _should_ignore(self, data: dict) -> bool: """判断是否应该忽略该消息(不回复)""" msg = self.to_chinese(data.get('msg', '')) # 会话转交由 _is_transfer_msg 单独处理,这里不再忽略 ignore_patterns = [ '已转接', '接入会话', '结束会话', '会话已', '[系统消息]', '[系统通知]', ] for pattern in ignore_patterns: if pattern in msg: return True # 发送者是自己(店铺账号),避免回复自己发的消息 acc_id = data.get('acc_id', '') from_id = data.get('from_id', '') if acc_id and from_id and acc_id == from_id: return True return False def get_msg_type_name(self, msg_type): """获取消息类型名称""" types = { 0: "文本", 1: "图片", 2: "视频", 3: "文件" } return types.get(msg_type, f"未知({msg_type})") def _extract_and_save_customer_info(self, message: str, customer_id: str): """从消息中提取客户信息并保存""" if not message or not customer_id: return # 提取邮箱 email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+' email_match = re.search(email_pattern, message) if email_match: db.update_email(customer_id, email_match.group()) # 提取手机号 phone_pattern = r'1[3-9]\d{9}' phone_match = re.search(phone_pattern, message) if phone_match: db.update_phone(customer_id, phone_match.group()) # 提取微信号 wechat_pattern = r'[Vv微信]+号[::]?\s*([\w-]+)' wechat_match = re.search(wechat_pattern, message) if wechat_match: db.update_wechat(customer_id, wechat_match.group(1)) # 提取预算关键词 budget_keywords = ['预算', '不超过', '最多', '便宜点', '便宜'] for keyword in budget_keywords: if keyword in message: db.add_personality_tag(customer_id, "关注价格") break # 提取性格关键词 personality_keywords = { '爽快': '爽快', '干脆': '爽快', '纠结': '纠结', '墨迹': '纠结', '砍价': '砍价', '贵': '砍价' } for keyword, tag in personality_keywords.items(): if keyword in message: db.add_personality_tag(customer_id, tag) # 更新最后联系时间 profile = db.get_customer(customer_id) profile.last_contact = datetime.now().isoformat() db.save_customer(profile) def to_chinese(self, text): """处理文本,安全地转换 unicode 转义""" if not isinstance(text, str): return text if '\\u' not in text: return text try: return json.loads(f'"{text}"') except Exception: return text async def handle_image_message(self, data: dict): """ 处理图片消息。 先回复"我找找",然后把图片URL作为消息内容交给 Agent(后台执行)。 Agent 会自主调用 analyze_image() 工具分析复杂度,再报价。 整个过程由 Agent 自主协调,无需外部干预。 不阻塞接收循环,可同时接收其他客户消息。 """ # 立刻回复,让客户感觉真人在操作 await self.send_reply(data, "我找找") # 把图片URL当作消息内容,交给 Agent 后台处理(图片分析约 12 秒,不阻塞新消息接收) image_data = dict(data) image_data['msg'] = f"[客户发来图片] {data.get('msg', '')}" image_data['msg_type'] = 0 # 转为文本消息,让 agent_reply 处理 self._fire_and_forget(self._agent_reply_serialized(image_data)) async def transfer_to_human(self, data: dict, transfer_msg: str = ""): """ 转接人工客服。 1. 优先从 designer_roster 轮询派单(在线设计师) 2. 无人在线或未配置时,回退到 config/transfer_groups.json 设计师在线状态:仅在转人工时按需查询,不轮询。 """ if not self.websocket: print(f"[{self.get_time()}] 错误: 未连接到服务器") return acc_id = data.get("acc_id", "") group_id = None # 1. 转人工时按需查询设计师在线状态(调用另一台 AI 的查询服务),再派单 try: from utils.designer_roster import poll_and_update_roster from db.designer_roster_db import get_transfer_group_for_shop await poll_and_update_roster() group_id = get_transfer_group_for_shop(acc_id) except Exception as e: logger.debug(f"设计师派单未启用或异常: {e}") # 2. 无人在线时企微提醒 if not group_id: try: from config.config import WECHAT_WEBHOOK if WECHAT_WEBHOOK: import httpx async with httpx.AsyncClient(timeout=5) as client: resp = await client.post(WECHAT_WEBHOOK, json={ "msgtype": "text", "text": {"content": "谁在线啊"} }) if resp.status_code != 200: logger.warning(f"企微提醒发送失败: {resp.status_code} {resp.text}") else: logger.debug("未配置 WECHAT_WEBHOOK,跳过企微提醒") except Exception as e: logger.warning(f"企微提醒发送异常: {e}") # 3. 回退到静态配置 if not group_id: group_id = _get_transfer_group(acc_id) # 先发一条提示语给客户 await self.send_reply(data, "亲,正在为您转接人工客服,请稍等~") cmd = f"话术|[转移会话],分组{group_id},无原因" await self.send_reply(data, cmd) print(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 分组:{group_id})") async def _save_conversation_summary(self, customer_id: str, buyer_msg: str, agent_reply: str): """用 AI 生成一句话对话摘要并持久化""" try: from db.customer_db import db from openai import AsyncOpenAI client = AsyncOpenAI( api_key=self.agent.api_key if self.agent else None, base_url=self.agent.base_url if self.agent else None, ) resp = await client.chat.completions.create( model=self.agent.model_name if self.agent else "gpt-4o-mini", messages=[ {"role": "system", "content": "用一句话(15字以内)总结这段对话的核心内容,只输出摘要文字。"}, {"role": "user", "content": f"买家:{buyer_msg}\n客服:{agent_reply}"}, ], max_tokens=30, temperature=0.3, ) summary = resp.choices[0].message.content.strip() db.save_conversation_summary(customer_id, summary) except Exception: pass # 摘要失败不影响主流程 async def _workflow_agent_notify( self, customer_id: str, acc_id: str, acc_type: str, system_hint: str, ): """图片处理完成后,让客服 AI 生成自然话术发给客户""" if not self.enable_agent or not self.agent: return try: from core.pydantic_ai_agent import CustomerMessage notify_msg = CustomerMessage( msg_id="workflow_notify", acc_id=acc_id, msg=system_hint, from_id=customer_id, from_name="", cy_id=customer_id, acc_type=acc_type, msg_type=0, cy_name="", ) response = await self.agent.process_message(notify_msg) if response.should_reply and response.reply: nonsense_patterns = [ "无需", "流程已完成", "不需要回复", "无需额外", "已完成", "无需回复", "不需要额外", "已经完成", "无需再", "操作已完成", "任务完成", "流程完成", "记录完成", "报价已", ] if not any(p in response.reply for p in nonsense_patterns): # 构造一个虚拟原始消息用于 send_reply fake_data = { "acc_id": acc_id, "from_id": customer_id, "from_name": "", "cy_id": customer_id, "acc_type": acc_type, } await asyncio.sleep(0.5) await self.send_reply(fake_data, response.reply) logger.info(f"[Workflow] AI 通知已发送: {response.reply}") except Exception as e: logger.error(f"[Workflow] AI 通知生成失败: {e}") async def _workflow_send( self, customer_id: str, acc_id: str, acc_type: str, content: str, msg_type: int = 0 ): """workflow 回调:图片AI完成后用此方法推送消息给客户""" msg = { "msg_id": "", "acc_id": acc_id, "msg": content, "from_id": customer_id, "from_name": customer_id, "cy_id": customer_id, "acc_type": acc_type, "msg_type": msg_type, "cy_name": customer_id } await self.send_message(msg) async def send_reply(self, original_msg, reply_content): """ 发送回复消息 Args: original_msg: 收到的原始消息字典 reply_content: 回复内容(文本或本地文件路径/http地址) """ if not self.websocket: print(f"[{self.get_time()}] 错误: 未连接到服务器") return # 同一客户外发限流:N 秒内最多 1 条 try: from config.config import OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS cooldown = max(0, int(OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS)) except Exception: cooldown = 5 if cooldown > 0: ckey = f"{original_msg.get('acc_id', '')}:{original_msg.get('from_id', '')}" now_mono = time.monotonic() last = self._last_reply_sent_at.get(ckey, 0.0) if (now_mono - last) < cooldown: logger.info( f"外发限流命中,跳过发送 | 客户:{ckey} | cooldown:{cooldown}s | msg:{str(reply_content)[:40]}" ) return self._last_reply_sent_at[ckey] = now_mono shop_id = original_msg.get("acc_id", "") # 根据轻简API文档: # from_id = 客户ID(收消息方) # cy_id = 非群聊时与 from_id 相同 customer_id = original_msg.get("from_id", "") customer_name = original_msg.get("from_name", "") reply = { "msg_id": "", "acc_id": shop_id, "msg": reply_content, "from_id": customer_id, "from_name": customer_name, "cy_id": customer_id, "acc_type": original_msg.get("acc_type", ""), "msg_type": 0, "cy_name": customer_name } self._log_outbound_once(original_msg, str(reply_content)) await self.send_message(reply) async def send_text(self, cy_id, acc_type, content): """ 主动发送文本消息 Args: cy_id: 会话ID(对方ID) acc_type: 平台类型 content: 消息内容 """ message = { "msg_id": "", "acc_id": "", "msg": content, "from_id": self.reply_id, "from_name": self.reply_id, "cy_id": cy_id, "acc_type": acc_type, "msg_type": 0, "cy_name": "" } await self.send_message(message) async def send_image(self, cy_id, acc_type, image_path): """ 主动发送图片消息 Args: cy_id: 会话ID(对方ID) acc_type: 平台类型 image_path: 图片本地路径或http地址 """ message = { "msg_id": "", "acc_id": "", "msg": image_path, "from_id": self.reply_id, "from_name": self.reply_id, "cy_id": cy_id, "acc_type": acc_type, "msg_type": 1, "cy_name": "" } await self.send_message(message) async def send_message(self, message): """发送消息到服务器""" if self.websocket and self.websocket.state == websockets.protocol.State.OPEN: try: msg_json = json.dumps(message, ensure_ascii=False) await self.websocket.send(msg_json) pretty = json.dumps(message, ensure_ascii=False, indent=2) print(f"[{self.get_time()}] 发送成功:\n{pretty}") except Exception as e: print(f"[{self.get_time()}] 发送失败: {e}") else: print(f"[{self.get_time()}] 错误: 连接未打开") async def auto_reply(self, data): """自动回复示例(已弃用,使用 agent_reply 替代)""" pass async def command_handler(self): """命令行交互""" print("\n命令帮助:") print(" reply <内容> - 回复最后一条消息") print(" text <平台> <内容> - 发送文本消息") print(" img <平台> <路径> - 发送图片") print(" setid - 设置回复ID") print(" agent on/off - 开启/关闭 Agent") print(" exit/quit - 退出\n") while self.running: try: loop = asyncio.get_running_loop() user_input = await loop.run_in_executor(None, input, "") parts = user_input.strip().split(maxsplit=1) if not parts: continue cmd = parts[0].lower() if cmd in ["exit", "quit", "q"]: print(f"[{self.get_time()}] 正在关闭...") self.running = False if self.websocket: await self.websocket.close() break elif cmd == "setid" and len(parts) > 1: self.reply_id = parts[1] print(f"[{self.get_time()}] 回复ID已设置为: {self.reply_id}") elif cmd == "agent" and len(parts) > 1: if parts[1].lower() == "on": self.enable_agent = True print(f"[{self.get_time()}] Agent 已开启") elif parts[1].lower() == "off": self.enable_agent = False print(f"[{self.get_time()}] Agent 已关闭") elif cmd == "reply" and len(parts) > 1: if self.last_msg: await self.send_reply(self.last_msg, parts[1]) else: print(f"[{self.get_time()}] 错误: 还没有收到任何消息") elif cmd == "text" and len(parts) > 1: # text cy_id acc_type content args = parts[1].split(maxsplit=2) if len(args) >= 3: await self.send_text(args[0], args[1], args[2]) else: print(f"[{self.get_time()}] 格式: text <内容>") elif cmd == "img" and len(parts) > 1: # img cy_id acc_type image_path args = parts[1].split(maxsplit=2) if len(args) >= 3: await self.send_image(args[0], args[1], args[2]) else: print(f"[{self.get_time()}] 格式: img <图片路径>") else: print(f"[{self.get_time()}] 未知命令: {cmd}") except Exception as e: print(f"[{self.get_time()}] 命令错误: {e}") def get_time(self): """获取当前时间字符串""" return datetime.now().strftime("%H:%M:%S") async def run(self): """运行客户端""" tasks = [self.connect(), self.command_handler()] # 启动邮件接收后台任务 try: from mail.email_receiver import email_receiver if email_receiver.username: print(f"[{self.get_time()}] 邮件接收已启动,监控: {email_receiver.username}") tasks.append(email_receiver.start()) else: print(f"[{self.get_time()}] 未配置邮件账号,跳过邮件接收") except Exception as e: print(f"[{self.get_time()}] 邮件接收模块加载失败: {e}") # 启动每日汇总定时任务 try: from utils.daily_summary import scheduler as daily_scheduler tasks.append(daily_scheduler()) print(f"[{self.get_time()}] 每日日报定时任务已启动") except Exception as e: print(f"[{self.get_time()}] 日报模块加载失败: {e}") # 设计师在线状态:转人工时按需查询,不再轮询 # 启动健康检查(轻简/企微断线告警) try: from utils.health_check import health_check_loop def _qingjian_ok(): return self.websocket is not None and not getattr(self.websocket, "closed", True) tasks.append(health_check_loop(_qingjian_ok)) print(f"[{self.get_time()}] 健康检查已启动") except Exception as e: print(f"[{self.get_time()}] 健康检查模块加载失败: {e}") # 每天早上8点发送启动消息到企微群 try: from utils.wechat_chat_log import morning_startup_scheduler tasks.append(morning_startup_scheduler()) print(f"[{self.get_time()}] 早8点企微启动消息已启动") except Exception as e: print(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}") await asyncio.gather(*tasks) if __name__ == "__main__": import sys # 检查是否有 --no-agent 参数 enable_agent = "--no-agent" not in sys.argv client = QingjianAPIClient(enable_agent=enable_agent) try: asyncio.run(client.run()) except KeyboardInterrupt: print("\n已停止") async def _load_task_modules(self): """延迟加载任务模块,避免循环导入""" from core.task_scheduler import get_task_scheduler from core.task_trigger import get_trigger_engine from db.task_db.task_model import get_task_manager self.trigger_engine = get_trigger_engine() async def check_and_trigger_tasks(self, data: dict): """检查并触发匹配的任务""" try: customer_key = self._customer_key(data) customer_id = data.get('from_id') message = data.get('content', '') # 获取该客户的待触发任务 pending_tasks = self.task_manager.get_pending_tasks(customer_id) for task in pending_tasks: trigger = { 'type': task['trigger_type'], 'keyword': task['trigger_keyword'], 'keywords': task['trigger_keywords'] } # 检查是否匹配触发条件 if self.task_scheduler.check_trigger_match(message, trigger): logger.info(f"任务触发条件匹配:{task['task_id']}") # 异步执行任务 asyncio.create_task(self.task_scheduler.execute_task(task)) except Exception as e: logger.error(f"检查任务触发失败:{e}") async def _load_task_modules(self): """延迟加载任务模块,避免循环导入""" from core.task_scheduler import get_task_scheduler from core.task_trigger import get_trigger_engine from db.task_db.task_model import get_task_manager self.trigger_engine = get_trigger_engine() async def _load_task_modules(self): """延迟加载任务模块""" if self.task_scheduler is None: from core.task_scheduler import get_task_scheduler from core.task_trigger import get_trigger_engine from db.task_db.task_model import get_task_manager self.trigger_engine = get_trigger_engine() async def check_and_trigger_tasks_v2(self, data: dict): """增强版:检查并触发匹配的任务(支持指定客户)""" # 确保任务模块已加载 await self._load_task_modules() try: customer_key = self._customer_key(data) customer_id = data.get('from_id') customer_name = data.get('from_name') message = data.get('content', '') # 准备上下文 context = { 'customer_id': customer_id, 'customer_name': customer_name, 'acc_id': data.get('acc_id') } # 获取该客户的待触发任务 pending_tasks = self.task_manager.get_pending_tasks(customer_id) for task in pending_tasks: trigger = { 'type': task['trigger_type'], 'keyword': task['trigger_keyword'], 'keywords': task['trigger_keywords'], # 指定客户相关字段 'customer_id': task.get('specified_customer_id'), 'customer_name': task.get('specified_customer_name') } # 使用触发引擎检查是否匹配 if self.trigger_engine.check_trigger(message, trigger, context): logger.info(f"任务触发条件匹配:{task['task_id']} (客户:{customer_name}/{customer_id})") # 异步执行任务 asyncio.create_task(self.task_scheduler.execute_task(task)) except Exception as e: logger.error(f"检查任务触发失败:{e}")