diff --git a/api/http_server.py b/api/http_server.py index f99bdef..2148fe5 100644 --- a/api/http_server.py +++ b/api/http_server.py @@ -275,6 +275,32 @@ def health_check(): }) +@app.route('/api/metrics', methods=['GET']) +def metrics_dashboard(): + """ + 运行与业务看板 + Query: + - hours: 统计窗口小时数,默认24 + """ + try: + hours = int(request.args.get('hours', 24)) + if hours <= 0: + hours = 24 + from utils.metrics_tracker import get_dashboard + data = get_dashboard(hours=hours) + return jsonify({ + 'code': 200, + 'message': 'OK', + 'data': data, + }) + except Exception as e: + logger.error(f"指标看板异常:{e}") + return jsonify({ + 'code': 500, + 'message': f'服务器错误:{str(e)}' + }), 500 + + def start_http_server(host='0.0.0.0', port=6060, debug=False): """启动 HTTP 服务器""" global task_manager, task_scheduler diff --git a/config/README.md b/config/README.md index 455ca9d..1eae01d 100755 --- a/config/README.md +++ b/config/README.md @@ -62,3 +62,31 @@ python scripts/init_designer_roster.py list # 查看当前数据 ### 对接方要求(外部 AI 服务) 对端需实现:企微群消息 → 解析「上线」/「下线」→ 存库 → 提供 GET `/online` 接口,按上述格式返回在线名单。 + +--- + +## system_inquiry_rules.json + +按店铺识别“系统客服询单”消息(非普通买家咨询),并配置处理策略。 + +```json +{ + "enabled": true, + "default_action": "silent", + "default_reply": "您好,这边已收到询单消息,稍后由人工客服跟进处理。", + "sender_keywords": ["系统客服", "官方客服", "平台客服", "机器人客服"], + "message_keywords": ["询单", "代客咨询", "平台代问", "系统代发"], + "shops": { + "test_shop": { + "enabled": true, + "action": "reply", + "reply": "收到,已登记,稍后人工给您回。", + "sender_keywords": ["系统客服"], + "message_keywords": ["询单"] + } + } +} +``` + +- `action` 支持: `silent` / `reply` / `transfer` +- `shops.` 可覆盖全局规则(店铺维度) diff --git a/config/config.py b/config/config.py index e8fb4f1..10f5419 100755 --- a/config/config.py +++ b/config/config.py @@ -40,6 +40,7 @@ IMAGE_MODULE_ENABLED = os.getenv("IMAGE_MODULE_ENABLED", "false").lower() in ("1 # ========== 防抖配置 ========== MESSAGE_DEBOUNCE_SECONDS = int(os.getenv("MESSAGE_DEBOUNCE_SECONDS", "8")) +OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS = int(os.getenv("OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS", "5")) # ========== AI 上下文加载 ========== CHAT_CONTEXT_LIMIT = int(os.getenv("CHAT_CONTEXT_LIMIT", "30")) @@ -47,3 +48,27 @@ CHAT_CONTEXT_TRUNCATE_LEN = int(os.getenv("CHAT_CONTEXT_TRUNCATE_LEN", "160")) # ========== 报价底线 ========== MIN_PRICE_FLOOR = int(os.getenv("MIN_PRICE_FLOOR", "15")) +MAX_SERVICE_SIZE_LONGEST_METERS = float(os.getenv("MAX_SERVICE_SIZE_LONGEST_METERS", "10")) +MAX_SERVICE_SIZE_AREA_SQM = float(os.getenv("MAX_SERVICE_SIZE_AREA_SQM", "20")) + +# ========== 批量报价/自动处理 ========== +BATCH_MAX_IMAGES = int(os.getenv("BATCH_MAX_IMAGES", "12")) # 单次最多处理图片数 +BATCH_ANALYZE_CONCURRENCY = int(os.getenv("BATCH_ANALYZE_CONCURRENCY", "3")) # 批量识别并发数 + +# ========== 灰度开关 ========== +FEATURE_BATCH_QUOTE_ENABLED = os.getenv("FEATURE_BATCH_QUOTE_ENABLED", "true").lower() in ("1", "true", "yes") +FEATURE_BATCH_QUOTE_PERCENT = int(os.getenv("FEATURE_BATCH_QUOTE_PERCENT", "100")) # 0-100 +FEATURE_BATCH_QUOTE_SHOPS = os.getenv("FEATURE_BATCH_QUOTE_SHOPS", "") # 逗号分隔 acc_id 白名单,空=全量 + +# ========== 系统客服询单识别 ========== +SYSTEM_INQUIRY_ENABLED = os.getenv("SYSTEM_INQUIRY_ENABLED", "true").lower() in ("1", "true", "yes") +SYSTEM_INQUIRY_DEFAULT_ACTION = os.getenv("SYSTEM_INQUIRY_DEFAULT_ACTION", "silent").strip().lower() +SYSTEM_INQUIRY_DEFAULT_REPLY = os.getenv( + "SYSTEM_INQUIRY_DEFAULT_REPLY", + "您好,这边已收到询单消息,稍后由人工客服跟进处理。", +) +SYSTEM_INQUIRY_SHOPS = os.getenv("SYSTEM_INQUIRY_SHOPS", "") # 逗号分隔店铺白名单,空=全店铺 +SYSTEM_INQUIRY_RULES_FILE = os.getenv( + "SYSTEM_INQUIRY_RULES_FILE", + str(CONFIG_DIR / "system_inquiry_rules.json"), +) diff --git a/config/system_inquiry_rules.json b/config/system_inquiry_rules.json new file mode 100644 index 0000000..4f5920d --- /dev/null +++ b/config/system_inquiry_rules.json @@ -0,0 +1,20 @@ +{ + "enabled": true, + "default_action": "silent", + "default_reply": "您好,这边已收到询单消息,稍后由人工客服跟进处理。", + "sender_keywords": [ + "系统客服", + "官方客服", + "平台客服", + "机器人客服", + "商家客服系统" + ], + "message_keywords": [ + "系统询单", + "代客咨询", + "平台代问", + "系统代发", + "客服询单" + ], + "shops": {} +} diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 477c656..dc036f9 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -8,13 +8,16 @@ import os import glob import asyncio -from typing import Optional, Dict +import random +import hashlib +from typing import Optional, Dict, List, Any from datetime import datetime -from pydantic import BaseModel +from pydantic import BaseModel, Field from pydantic_ai import Agent, RunContext from pydantic_ai.models.openai import OpenAIChatModel from pydantic_ai.providers.openai import OpenAIProvider from dotenv import load_dotenv +from utils.metrics_tracker import emit as metrics_emit load_dotenv() @@ -87,6 +90,8 @@ class ConversationState(BaseModel): order_status: Optional[str] = None # 订单状态 discount_count: int = 0 # 让价次数 image_count: int = 0 # 图片数量 + pending_image_urls: List[str] = Field(default_factory=list) # 待统一报价图片 + pending_requirements: List[str] = Field(default_factory=list) # 待统一报价需求 last_update: str = "" last_reply_at: Optional[datetime] = None # 最后一次回复客户的时间 @@ -150,6 +155,12 @@ def load_skill_md(skills_dir: str = "skills") -> str: class CustomerServiceAgent: """客服 Agent - 支持 SKILL.md + 工作流""" + C_RESET = "\033[0m" + C_PROMPT = "\033[96m" # cyan + C_THINK = "\033[95m" # magenta + C_TOOL = "\033[93m" # yellow + C_REPLY = "\033[92m" # green + C_MUTED = "\033[90m" # gray def __init__(self, skills_dir: str = "skills"): self.api_key = os.getenv("OPENAI_API_KEY") @@ -218,6 +229,23 @@ class CustomerServiceAgent: # 注册工具 self._register_tools() + @staticmethod + def _log_block(title: str, content: str): + """统一的控制台分层日志输出。""" + print(f"{CustomerServiceAgent.C_PROMPT}[{title}]{CustomerServiceAgent.C_RESET}") + print(content) + print(f"{CustomerServiceAgent.C_MUTED}────────────────────{CustomerServiceAgent.C_RESET}") + + @staticmethod + def _normalize_reply_text(text: Optional[str]) -> str: + """清洗模型输出,避免把占位词直接发给客户。""" + if text is None: + return "" + cleaned = str(text).strip() + if cleaned.lower() in {"无", "none", "null", "n/a"}: + return "" + return cleaned + def _register_tools(self): """注册所有 Tool,让 Agent 可以主动调用""" @@ -802,11 +830,15 @@ class CustomerServiceAgent: self.message_histories.pop(customer_id, None) except Exception: pass + # 进程内状态为空时,尝试从持久化恢复 + if not state.pending_image_urls and not state.pending_requirements: + self._restore_pending_quote_state(customer_id, state) else: self.conversations[customer_id] = ConversationState( customer_id=customer_id, last_update=now.isoformat() ) + self._restore_pending_quote_state(customer_id, self.conversations[customer_id]) # 定期清理长期不活跃客户(超过 7 天) self._cleanup_inactive(now) @@ -827,6 +859,63 @@ class CustomerServiceAgent: self.conversations.pop(cid, None) self.message_histories.pop(cid, None) + def _sync_pending_quote_state(self, customer_id: str, state: ConversationState): + """把待报价队列同步到客户库,避免重启丢失。""" + try: + from db.customer_db import db + db.update_pending_quote_state( + customer_id, + state.pending_image_urls, + state.pending_requirements, + ) + except Exception: + pass + + def _restore_pending_quote_state(self, customer_id: str, state: ConversationState): + """从客户库恢复待报价队列。""" + try: + from db.customer_db import db + profile = db.get_customer(customer_id) + state.pending_image_urls = list(getattr(profile, "pending_quote_images", []) or []) + state.pending_requirements = list(getattr(profile, "pending_quote_requirements", []) or []) + state.image_count = len(state.pending_image_urls) + except Exception: + pass + + def _build_reject_message(self, reason: str = "") -> str: + templates = [ + "这类图文字内容太密了,我们这边不接这单哈,建议精简后再发我看看。", + "这种密集文字/宣传栏类图片暂时做不了,抱歉啦,换一版简化内容我可以继续帮你看。", + "这张文字信息太多,处理风险高,我们先不接,您可以先筛重点文字再发我。", + ] + msg = random.choice(templates) + if reason: + msg += f"({reason})" + return msg + + def _is_batch_quote_enabled(self, customer_id: str, acc_id: str) -> bool: + """灰度开关:按店铺白名单 + 客户哈希百分比控制新策略是否生效。""" + try: + from config.config import ( + FEATURE_BATCH_QUOTE_ENABLED, + FEATURE_BATCH_QUOTE_PERCENT, + FEATURE_BATCH_QUOTE_SHOPS, + ) + if not FEATURE_BATCH_QUOTE_ENABLED: + return False + pct = max(0, min(100, int(FEATURE_BATCH_QUOTE_PERCENT))) + if pct == 0: + return False + shops = [s.strip() for s in (FEATURE_BATCH_QUOTE_SHOPS or "").split(",") if s.strip()] + if shops and (acc_id or "") not in shops: + return False + if pct >= 100: + return True + h = int(hashlib.md5((customer_id or "").encode("utf-8")).hexdigest()[:8], 16) % 100 + return h < pct + except Exception: + return True + def _detect_stage(self, message: str) -> str: """检测售前/售后""" # 系统订单通知不属于售后,单独处理 @@ -845,7 +934,7 @@ class CustomerServiceAgent: 核心原则:快、准、狠。**回复要像真人聊天,自然多变,禁止套模板、背台词。** 【你拥有的工具,按需调用】 -- analyze_image(url):收到图片必须调用,分析复杂度获取报价依据 +- analyze_image(url):客户确认“图片发完”后调用,分析复杂度用于统一报价 - process_image_gemini(customer_id):客户付款或说「安排/处理」时调用,走完整流程 - remove_background(image_url):只要去背景时单独调用 - perspective_correct(image_url):只要透视矫正时调用(需白底图) @@ -864,9 +953,9 @@ class CustomerServiceAgent: 【报价规则】 - 价格必须为5的整数倍(10/15/20/25/30),禁止报12、17、23等 - 客户只是文字询价,没发图 → 自然引导发图,不报价 -- 收到图片 → 立刻调用 analyze_image() → 工具返回结果后【必须】立刻回复客户报价 +- 收到图片先收集,不立刻报单张价;等客户明确“发完了/统一报价”后,再统一报价 - 报价和推成交的话术要自然多变,跟着客户语气走,不要每次都一样 -- analyze_image 工具调用完成后,你的下一句话一定是报价,不能是内部说明 +- 客户确认发完后,分析完成的下一句话必须是明确报价 - 报价后立刻推成交,不等客户反应 【文字加价规则】⚠️ 重要 @@ -1215,6 +1304,7 @@ class CustomerServiceAgent: async def process_message(self, message: CustomerMessage) -> AgentResponse: """处理客户消息并生成回复""" + metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id) # 获取或创建对话状态 state = self._get_conversation_state(message.from_id) @@ -1267,6 +1357,62 @@ class CustomerServiceAgent: print(f"[Agent] 订单通知静默({pay_status or order_status}),跳过回复") return AgentResponse(reply="", should_reply=False, need_transfer=False) + # 找图店:先收集图片和需求,等客户确认“发完”后统一报价 + customer_text, _ = self._split_customer_text(message.msg) + shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "") + if shop_type == "find_image" and self._is_batch_quote_enabled(message.from_id, message.acc_id): + incoming_urls = self._extract_image_urls(customer_text) + text_without_urls = self._strip_urls_from_text(customer_text) + + if incoming_urls: + for u in incoming_urls: + if u not in state.pending_image_urls: + state.pending_image_urls.append(u) + if text_without_urls: + self._append_requirement(state, text_without_urls) + state.image_count = len(state.pending_image_urls) + self._sync_pending_quote_state(message.from_id, state) + + if self._is_batch_finish_signal(customer_text): + quote_res = await self._quote_pending_images(state, message) + reply_text = quote_res.get("reply", "") + need_transfer = bool(quote_res.get("need_transfer")) + state.last_reply_at = datetime.now() + print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}") + return AgentResponse( + reply=reply_text, + should_reply=not need_transfer, + need_transfer=need_transfer, + transfer_msg=TRANSFER_MESSAGE if need_transfer else "", + ) + + ack = self._build_collect_ack(len(state.pending_image_urls)) + state.last_reply_at = datetime.now() + print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {ack}") + return AgentResponse(reply=ack, should_reply=True, need_transfer=False) + + if state.pending_image_urls: + if text_without_urls: + self._append_requirement(state, text_without_urls) + self._sync_pending_quote_state(message.from_id, state) + if self._is_batch_finish_signal(customer_text): + quote_res = await self._quote_pending_images(state, message) + reply_text = quote_res.get("reply", "") + need_transfer = bool(quote_res.get("need_transfer")) + state.last_reply_at = datetime.now() + print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}") + return AgentResponse( + reply=reply_text, + should_reply=not need_transfer, + need_transfer=need_transfer, + transfer_msg=TRANSFER_MESSAGE if need_transfer else "", + ) + + remind = self._build_collect_remind(len(state.pending_image_urls)) + state.last_reply_at = datetime.now() + print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {remind}") + return AgentResponse(reply=remind, should_reply=True, need_transfer=False) + # 构建提示词(包含对话状态 + 客户画像) user_prompt = self._build_prompt(message, state) @@ -1300,7 +1446,7 @@ class CustomerServiceAgent: # 取出该客户的历史对话,传给 AI 保持上下文 history = self.message_histories.get(message.from_id, []) - print(f"[Agent] ── 发送给AI的提示词 ──\n{user_prompt}\n────────────────────") + self._log_block("PROMPT->AI 前置提示词", user_prompt) try: msg_lower = message.msg.lower() @@ -1323,7 +1469,7 @@ class CustomerServiceAgent: result = await target_agent.run(user_prompt, deps=deps, message_history=history) # 更新历史,最多保留最近 30 条消息防止 token 超限 self.message_histories[message.from_id] = result.all_messages()[-30:] - reply_text = result.output + reply_text = self._normalize_reply_text(result.output) # 拦截超低杀价:客户报价低于底线时,统一礼貌拒绝 try: from config.config import MIN_PRICE_FLOOR @@ -1371,16 +1517,17 @@ class CustomerServiceAgent: for part in getattr(msg, 'parts', []): part_type = type(part).__name__ if 'ToolCall' in part_type: - print(f"[Agent] 工具调用: {getattr(part, 'tool_name', '')}({getattr(part, 'args', '')})") + print(f"{self.C_TOOL}[THINK/TOOL_CALL]{self.C_RESET} {getattr(part, 'tool_name', '')}({getattr(part, 'args', '')})") elif 'ToolReturn' in part_type: ret = str(getattr(part, 'content', ''))[:120] - print(f"[Agent] 工具返回: {ret}") + print(f"{self.C_TOOL}[THINK/TOOL_RETURN]{self.C_RESET} {ret}") - print(f"[Agent] AI原始输出: {repr(reply_text)}") + print(f"{self.C_THINK}[THINK/RAW_OUTPUT]{self.C_RESET} {repr(reply_text)}") except Exception as e: err_str = str(e) print(f"[Agent] AI 调用失败: {e},使用兜底回复") + metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id) if "AccountOverdueError" in err_str or "overdue" in err_str.lower(): asyncio.create_task(_notify_wechat_overdue()) else: @@ -1392,6 +1539,8 @@ class CustomerServiceAgent: tag="AI异常" )) reply_text = None + else: + metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id) # AI 失败兜底:给一个不出错的万能回复 if not reply_text: @@ -1434,6 +1583,7 @@ class CustomerServiceAgent: if reply_text and any(kw in reply_text for kw in transfer_keywords): need_transfer = True transfer_msg = TRANSFER_MESSAGE + metrics_emit("transfer_to_human", customer_id=message.from_id, acc_id=message.acc_id) # 未成交记录:客户表达放弃且已报价过(转人工不记录) customer_text, _ = self._split_customer_text(message.msg) @@ -1451,6 +1601,9 @@ class CustomerServiceAgent: # 记录本次回复时间,供冷却期判断 if should_reply: state.last_reply_at = datetime.now() + print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}") + else: + print(f"{self.C_MUTED}[REPLY->CUSTOMER]{self.C_RESET} <静默/不发送>") return AgentResponse(reply=reply_text, should_reply=should_reply, need_transfer=need_transfer, transfer_msg=transfer_msg) @@ -1461,6 +1614,7 @@ class CustomerServiceAgent: if numbers: price = round(int(numbers[0]) / 5) * 5 # 强制为5的整数倍 state.last_price = price + metrics_emit("quote_generated", customer_id=state.customer_id, price=price) # 持久化到客户数据库,重启后仍可读取 try: from db.customer_db import db @@ -1698,24 +1852,323 @@ class CustomerServiceAgent: def _extract_image_url(self, msg: str) -> str: """从消息中提取图片URL,兼容纯URL和 text#*#url 两种格式""" + urls = self._extract_image_urls(msg) + return urls[0] if urls else "" + + def _extract_image_urls(self, msg: str) -> List[str]: + """提取消息中的所有图片URL(去重保序)。""" + import re + if not msg: + return [] + image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") + image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com", "suning.com") + candidates = re.findall(r'https?://[^\s#]+', msg) + urls: List[str] = [] + for u in candidates: + low = u.lower() + if any(ext in low for ext in image_exts) or any(h in low for h in image_hosts): + if u not in urls: + urls.append(u) + return urls + + def _strip_urls_from_text(self, msg: str) -> str: + """去掉 URL 后的纯文本,用于提取额外需求。""" import re if not msg: return "" - # 处理 "有吗#*#https://..." 格式 - if "#*#" in msg: - parts = msg.split("#*#", 1) - candidate = parts[1].strip() - if candidate.startswith(("http://", "https://")): - return candidate - # 纯URL或URL在任意位置 - m = re.search(r'https?://\S+', msg) - if m: - url = m.group() - image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") - image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com", "suning.com") - if any(ext in url.lower() for ext in image_exts) or any(h in url.lower() for h in image_hosts): - return url - return "" + text = re.sub(r'https?://\S+', ' ', msg) + text = text.replace("#*#", " ").strip() + text = re.sub(r'\s+', ' ', text) + return text.strip(",,。.!!??;;:: ") + + def _is_batch_finish_signal(self, text: str) -> bool: + """客户是否表达“图发完了,可以统一报价”。""" + if not text: + return False + finish_keywords = [ + "发完了", "都发完了", "发齐了", "齐了", "先这些", "就这些", "全部", "一起报", "统一报价", + "总共多少钱", "一共多少钱", "打包价", "总价", "报价吧", "报个总价", "给个总价", + ] + return any(k in text for k in finish_keywords) + + def _build_collect_ack(self, count: int) -> str: + templates = [ + "收到,这边先记下了(已收{n}张)。你继续发,等你发完我再一起给你打包报价。", + "好的,当前这批先收到了(第{n}张)。还有图就继续发,发齐我一次性给你总价。", + "没问题,已记录到第{n}张。你把需求和图片都发完,我统一给你报更合适的价格。", + ] + return random.choice(templates).format(n=count) + + def _build_collect_remind(self, count: int) -> str: + templates = [ + "需求我记下了(当前共{n}张图)。你继续发齐,发完回我“发完了”,我一次性给你总价。", + "好的,这条需求也加上了(现在{n}张)。等你说发完,我立刻统一报价。", + "收到,这个要求我也记住了(共{n}张)。你发完我就给你打包价。", + ] + return random.choice(templates).format(n=count) + + def _append_requirement(self, state: ConversationState, text: str): + """追加需求并做去重/截断,减少上下文噪音。""" + t = (text or "").strip() + if not t: + return + t = t[:120] + if state.pending_requirements and state.pending_requirements[-1] == t: + return + if t in state.pending_requirements[-5:]: + return + state.pending_requirements.append(t) + if len(state.pending_requirements) > 20: + state.pending_requirements = state.pending_requirements[-20:] + + def _calc_requirement_surcharge(self, requirements: List[str]) -> Dict[str, Any]: + """ + 把客户补充需求做成结构化加价,避免纯靠模型自由发挥导致价格波动。 + 返回: + {"extra": int, "hits": List[str]} + """ + text = " ".join(requirements or []) + rules = [ + (["分层", "psd", "源文件"], 30, "分层/源文件"), + (["去背景", "抠图", "透明底", "白底"], 5, "去背景"), + (["换背景", "换场景", "合成"], 10, "合成/换背景"), + (["改字", "改文字", "替换文字", "排版"], 10, "改文字/排版"), + (["调色", "改色", "换色", "配色"], 5, "调色"), + (["多版本", "多个版本", "两版", "三版"], 10, "多版本"), + (["加急", "今天要", "马上要", "尽快"], 10, "加急"), + ] + total = 0 + hits: List[str] = [] + for keywords, fee, label in rules: + if any(k in text for k in keywords): + total += fee + hits.append(f"{label}+{fee}") + # 防止需求加价过高,做个上限保护 + total = min(total, 60) + # 金额统一 5 的倍数 + total = round(total / 5) * 5 + return {"extra": total, "hits": hits} + + def _build_batch_quote_reply( + self, + results: List[Tuple[str, Dict[str, Any]]], + total_suggest: int, + bundle_price: int, + req_fee: Dict[str, Any], + ) -> str: + """构建分图明细 + 单条总报价可选项回复。""" + complexity_map = { + "simple": "简单", + "normal": "常规", + "complex": "复杂", + "hard": "高难", + } + detail_lines: List[str] = [] + for i, (_, r) in enumerate(results, 1): + p = int(r.get("price_suggest", 20) or 20) + cx = complexity_map.get(str(r.get("complexity", "normal")), "常规") + reason = str(r.get("reason", "常规处理")).replace("\n", " ").strip() + if len(reason) > 18: + reason = reason[:18] + "..." + detail_lines.append(f"图{i}:{p}元({cx},{reason})") + + extra = int(req_fee.get("extra", 0) or 0) + single_total = round((total_suggest + extra) / 5) * 5 + req_hit = "、".join(req_fee.get("hits", [])) if req_fee.get("hits") else "" + + lines = ["先给你分图报下:"] + lines.extend(detail_lines) + if req_hit: + lines.append(f"需求加价:+{extra}元({req_hit})") + option_line = f"可选:A按单张做共{single_total}元;B打包一起做{bundle_price}元(更划算)。" + lines.append(option_line) + lines.append("你定一个方案,我这边马上安排。") + return "\n".join(lines) + + async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]: + """ + 批量识别待处理图片并统一处理: + - find_image 意图且可自动处理:直接 Gemini 处理 + 上传图绘 + 回链接 + - 高风险/不可做:转人工 + - 其他:统一报价 + """ + from image.image_analyzer import image_analyzer + + urls = list(state.pending_image_urls) + if not urls: + return {"reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False} + try: + from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY + max_images = max(1, int(BATCH_MAX_IMAGES)) + analyze_concurrency = max(1, int(BATCH_ANALYZE_CONCURRENCY)) + except Exception: + max_images = 12 + analyze_concurrency = 3 + if len(urls) > max_images: + return { + "reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。", + "need_transfer": False, + } + urls = urls[:max_images] + + sem = asyncio.Semaphore(analyze_concurrency) + async def _analyze_one(url: str): + async with sem: + try: + r = await image_analyzer.analyze(url) + except Exception: + r = { + "complexity": "normal", + "reason": "识别异常,按常规估价", + "price_min": 15, + "price_max": 25, + "price_suggest": 20, + "success": False, + } + return url, r + + results = list(await asyncio.gather(*[_analyze_one(u) for u in urls])) + for url, r in results: + # 与单图流程一致:识别后写入 workflow 任务 + try: + from core.workflow import workflow + await workflow.image_analysis_result( + customer_id=message.from_id, + image_url=url, + complexity=r.get("complexity", "normal"), + acc_id=message.acc_id, + acc_type=message.acc_type, + gemini_prompt=r.get("gemini_prompt", ""), + aspect_ratio=r.get("aspect_ratio", "1:1"), + perspective=r.get("perspective", "no"), + proc_type=r.get("proc_type", ""), + subject=r.get("subject", ""), + quality=r.get("quality", ""), + ) + except Exception as e: + print(f"[Agent] Workflow 批量任务创建失败: {e}") + + total_min = sum(int(r.get("price_min", 15) or 15) for _, r in results) + total_max = sum(int(r.get("price_max", 25) or 25) for _, r in results) + total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results) + req_fee = self._calc_requirement_surcharge(state.pending_requirements) + + # 打包优惠:2 张减 5,3 张及以上按 9 折(四舍五入到 5 元) + if len(results) == 2: + bundle_price = max(10, total_suggest - 5) + elif len(results) >= 3: + bundle_price = max(10, round(total_suggest * 0.9 / 5) * 5) + else: + bundle_price = total_suggest + bundle_price += int(req_fee.get("extra", 0) or 0) + bundle_price = round(bundle_price / 5) * 5 + + # 先分流:高风险/不可做 -> 转人工 + unsafe = [] + dense_text_reject = [] + for i, (_, r) in enumerate(results, 1): + if r.get("feasibility") == "no" or r.get("risk") == "high": + unsafe.append(f"图{i}") + note = str(r.get("note", "") or "") + if "文字内容过于密集" in note or "密集文字" in note: + dense_text_reject.append(f"图{i}") + if unsafe: + state.pending_image_urls.clear() + state.pending_requirements.clear() + self._sync_pending_quote_state(message.from_id, state) + if dense_text_reject and len(dense_text_reject) == len(unsafe): + return { + "reply": self._build_reject_message("文字密集类图片暂不接单"), + "need_transfer": False, + } + return { + "reply": f"这批里{'、'.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。", + "need_transfer": True, + } + + # 查找图片意图:直接自动处理并返回图绘链接 + intent_text = (message.msg or "") + " " + " ".join(state.pending_requirements[-5:]) + workflow_type, _ = self.workflow_router.detect_workflow(intent_text) + if workflow_type == "find_image": + links = [] + try: + from image.image_processor import image_processor + from utils.image_queue import run_with_queue + for idx, (url, r) in enumerate(results, 1): + req_parts = [f"complexity:{r.get('complexity', 'normal')}"] + if r.get("gemini_prompt"): + req_parts.append(f"prompt:{r.get('gemini_prompt')}") + if r.get("aspect_ratio"): + req_parts.append(f"ratio:{r.get('aspect_ratio')}") + if r.get("perspective") and r.get("perspective") != "no": + req_parts.append(f"perspective:{r.get('perspective')}") + if r.get("proc_type"): + req_parts.append(f"proc_type:{r.get('proc_type')}") + if r.get("subject"): + req_parts.append(f"subject:{r.get('subject')}") + if r.get("quality"): + req_parts.append(f"quality:{r.get('quality')}") + + process_res = await run_with_queue(image_processor.process_image( + url, + "enhance", + requirements="|".join(req_parts), + gemini_prompt=r.get("gemini_prompt", ""), + aspect_ratio=r.get("aspect_ratio", "1:1"), + perspective=r.get("perspective", "no"), + proc_type=r.get("proc_type", ""), + subject=r.get("subject", ""), + quality=r.get("quality", ""), + )) + if not process_res.get("success"): + raise RuntimeError(process_res.get("message", "图片处理失败")) + + ok, link, _ = await upload_to_tuhui( + process_res["result_path"], + title=f"客户{message.from_id[-4:]}-图片{idx}", + description="AI自动处理结果", + price=max(10, int(r.get("price_suggest", 20) or 20) + int(req_fee.get("extra", 0) or 0) // max(1, len(results))), + ) + if not ok: + raise RuntimeError(str(link)) + links.append(link) + except Exception as e: + print(f"[Agent] 自动处理并上传失败,回退统一报价: {e}") + else: + lines = [f"这批我先给你处理好了,按打包 {bundle_price} 元。"] + for i, link in enumerate(links, 1): + lines.append(f"链接{i}:{link}") + lines.append("你先看下效果,没问题我就按这个标准继续给你做。") + state.last_price = bundle_price + try: + from db.customer_db import db + db.update_last_price(message.from_id, bundle_price) + except Exception: + pass + state.pending_image_urls.clear() + state.pending_requirements.clear() + self._sync_pending_quote_state(message.from_id, state) + return {"reply": "\n".join(lines), "need_transfer": False} + + reply_text = self._build_batch_quote_reply( + results=results, + total_suggest=total_suggest, + bundle_price=bundle_price, + req_fee=req_fee, + ) + + state.last_price = bundle_price + try: + from db.customer_db import db + db.update_last_price(message.from_id, bundle_price) + except Exception: + pass + + # 清空待报价队列(本轮已统一报价) + state.pending_image_urls.clear() + state.pending_requirements.clear() + self._sync_pending_quote_state(message.from_id, state) + return {"reply": reply_text, "need_transfer": False} def _split_customer_text(self, msg: str) -> tuple: """ @@ -1823,11 +2276,11 @@ class CustomerServiceAgent: if shop_type == "gemini_api": prompt += "\n【Gemini API 店铺】客户问账号/pro/续费/套餐等,按 API 客服自然回复,不要求发图。" elif image_url: - prompt += f"\n客户发来图片(URL: {image_url})。必须:① 调用 analyze_image('{image_url}') ② 拿到结果后直接回复报价,话术自然多变。分析完必须回复,不能不回复。" + prompt += "\n客户在继续发图阶段:先确认“已收图”,并引导客户把图和要求一次发完;等客户明确“发完了/统一报价”后再统一报价。" elif any(kw in customer_text for kw in price_keywords): last_url = self._extract_image_url(msg_content) if last_url: - prompt += f"\n客户在询问上面那张图的价格,图片URL是 {last_url}。调用 analyze_image('{last_url}') 后直接回复报价,不能不回复。" + prompt += "\n客户在询问价格:若客户已确认发完,则给总报价;若还在发图,先引导发完后统一报价。" else: prompt += "\n客户在询问价格但未发图,回复「发图来我看看」。" elif any(kw in customer_text for kw in progress_keywords): diff --git a/core/websocket_client.py b/core/websocket_client.py index 7641507..c504e6b 100755 --- a/core/websocket_client.py +++ b/core/websocket_client.py @@ -3,10 +3,12 @@ import websockets import json import re import logging +import random +import time from collections import deque from datetime import datetime from pathlib import Path -from typing import Optional +from typing import Optional, Dict, Any, List # ========== 转接分组映射 ========== def _get_transfer_group(acc_id: str) -> str: @@ -49,6 +51,7 @@ import os logger = setup_logger() from db.chat_log_db import log_message as _chat_log +from utils.metrics_tracker import emit as metrics_emit # 导入 Agent 模块 try: @@ -94,6 +97,8 @@ class QingjianAPIClient: self._agent_semaphore = asyncio.Semaphore(8) self._pending_images: dict = {} self._pending_image_tasks: dict = {} + self._system_inquiry_rules = self._load_system_inquiry_rules() + self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts # 延迟加载任务模块(避免循环导入) self.task_scheduler = None @@ -289,7 +294,7 @@ class QingjianAPIClient: if self._is_transfer_msg(data): # 会话转交 → 主动打招呼 print(f"[{self.get_time()}] 收到转交消息,发送问候") - greeting = "在呢,发图来我看看" + greeting = self._pick_transfer_greeting() await self.send_reply(data, greeting) try: from utils.wechat_chat_log import push_chat_to_wechat @@ -324,6 +329,8 @@ class QingjianAPIClient: )) except Exception: pass + elif await self._handle_system_inquiry(data): + print(f"[{self.get_time()}] 系统客服询单消息,已按规则处理") elif self._should_ignore(data): print(f"[{self.get_time()}] 系统通知,跳过回复") else: @@ -529,6 +536,12 @@ class QingjianAPIClient: except Exception: pass + # 超大尺寸(米制)直接拒单,避免进入报价/处理流程 + oversize_reply = self._oversize_reply_if_needed(msg_text) + if oversize_reply: + await self.send_reply(data, oversize_reply) + return + # 消息含图片URL:累积到待处理列表,先询问要求 if self._msg_has_image_url(msg_text): urls = self._extract_image_urls(msg_text) @@ -543,6 +556,7 @@ class QingjianAPIClient: await self._flush_pending_images(capture_key, capture_data) task = asyncio.create_task(_delay_flush(key, data)) self._pending_image_tasks[key] = task + return elif self._msg_refers_images(msg_text): urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6) if urls: @@ -550,20 +564,24 @@ class QingjianAPIClient: self._add_pending_images(key, urls) await self.send_reply(data, "稍等,我找找刚才那几张") await self._flush_pending_images(key, data) + return else: status = self._detect_order_status(msg_text) if status == "paid": ack = "收到付款,我马上安排处理,有需要第一时间联系您" await self.send_reply(data, ack) + return elif status in ("waiting", "order"): ack = "订单我看到了哈,方便的话请完成付款,我好安排处理" await self.send_reply(data, ack) + return else: urls = self._extract_image_urls(msg_text) if len(urls) == 1: key = self._customer_key(data) self._add_pending_images(key, urls) await self.send_reply(data, "收到,我看看哈") + return else: if self._msg_requests_external_contact(msg_text): reply = "这里沟通就可以哦,其他联系方式不方便" @@ -603,9 +621,11 @@ class QingjianAPIClient: if status == "paid": ack = "收到付款,我马上安排处理,有需要第一时间联系您" await self.send_reply(data, ack) + return elif status in ("waiting", "order"): ack = "订单我看到了哈,方便的话请完成付款,我好安排处理" await self.send_reply(data, ack) + return # 构建 CustomerMessage customer_msg = CustomerMessage( @@ -813,11 +833,68 @@ class QingjianAPIClient: lower = msg.lower() kws = ("加qq", "qq号", "vx", "微信", "加v", "联系方式", "私聊", "加一下", "加个", "手机号", "电话", "加群", "q q", "v 信") return any(k in lower for k in kws) + + @staticmethod + def _extract_size_pairs_m(msg: str) -> list[tuple[float, float]]: + """提取消息中的米制尺寸对,如 15*6.4米 / 15米*6.4 / 15x6.4m。""" + if not msg: + return [] + s = (msg or "").lower().replace("×", "*").replace("x", "*") + pairs = [] + patterns = [ + r'(\d+(?:\.\d+)?)\s*\*\s*(\d+(?:\.\d+)?)\s*(?:米|m)\b', + r'(\d+(?:\.\d+)?)\s*(?:米|m)\s*\*\s*(\d+(?:\.\d+)?)\b', + ] + for p in patterns: + for m in re.findall(p, s): + try: + a = float(m[0]) + b = float(m[1]) + if a > 0 and b > 0: + pairs.append((a, b)) + except Exception: + continue + return pairs + + def _oversize_reply_if_needed(self, msg: str) -> str: + """ + 检测超大尺寸需求并返回拒绝话术;未命中返回空字符串。 + 规则:最长边 > 阈值 或 面积 > 阈值。 + """ + try: + from config.config import MAX_SERVICE_SIZE_LONGEST_METERS, MAX_SERVICE_SIZE_AREA_SQM + longest_limit = float(MAX_SERVICE_SIZE_LONGEST_METERS) + area_limit = float(MAX_SERVICE_SIZE_AREA_SQM) + except Exception: + longest_limit = 10.0 + area_limit = 20.0 + + pairs = self._extract_size_pairs_m(msg) + for w, h in pairs: + longest = max(w, h) + area = w * h + if longest > longest_limit or area > area_limit: + return ( + f"{w:g}米*{h:g}米这个尺寸太大了,我们这边做不了。" + "如果要做可以拆成几段小尺寸,我再给你按段评估。" + ) + return "" def _is_transfer_msg(self, data: dict) -> bool: """判断是否是会话转交消息(需要主动打招呼)""" msg = self.to_chinese(data.get('msg', '')) return '转交给' in msg or '转接给' in msg + def _pick_transfer_greeting(self) -> str: + """转接后问候话术:简短自然,随机避免机械感。""" + choices = [ + "在的亲,发图我看下", + "在呢亲,有需求直接说", + "我在的,您把要求发我", + "在的哈,你说我这边看着处理", + "在呢,图和需求发来我看看", + ] + return random.choice(choices) + def _is_shop_card(self, data: dict) -> bool: """判断是否是进店卡片消息""" msg = self.to_chinese(data.get('msg', '')) @@ -838,6 +915,130 @@ class QingjianAPIClient: except Exception: return False + def _load_system_inquiry_rules(self) -> Dict[str, Any]: + """加载系统客服询单规则(全局 + 店铺覆盖)。""" + from config.config import ( + SYSTEM_INQUIRY_ENABLED, + SYSTEM_INQUIRY_DEFAULT_ACTION, + SYSTEM_INQUIRY_DEFAULT_REPLY, + SYSTEM_INQUIRY_RULES_FILE, + ) + enabled_env = os.getenv("SYSTEM_INQUIRY_ENABLED") + enabled = ( + enabled_env.lower() in ("1", "true", "yes") + if isinstance(enabled_env, str) + else bool(SYSTEM_INQUIRY_ENABLED) + ) + action = (os.getenv("SYSTEM_INQUIRY_DEFAULT_ACTION") or SYSTEM_INQUIRY_DEFAULT_ACTION or "silent").strip().lower() + reply = os.getenv("SYSTEM_INQUIRY_DEFAULT_REPLY") or SYSTEM_INQUIRY_DEFAULT_REPLY or "" + rules_file = os.getenv("SYSTEM_INQUIRY_RULES_FILE") or str(SYSTEM_INQUIRY_RULES_FILE) + defaults: Dict[str, Any] = { + "enabled": bool(enabled), + "default_action": action, + "default_reply": reply, + "sender_keywords": ["系统客服", "官方客服", "平台客服", "机器人客服", "商家客服系统"], + "message_keywords": ["系统询单", "代客咨询", "平台代问", "系统代发", "客服询单"], + "shops": {}, + } + try: + p = Path(rules_file) + if p.exists(): + with p.open("r", encoding="utf-8") as f: + loaded = json.load(f) + if isinstance(loaded, dict): + defaults.update(loaded) + except Exception as e: + logger.warning(f"系统询单规则加载失败,使用默认规则: {e}") + return defaults + + @staticmethod + def _normalize_kw_list(v: Any) -> List[str]: + if not isinstance(v, list): + return [] + return [str(x).strip().lower() for x in v if str(x).strip()] + + def _resolve_system_inquiry_policy(self, acc_id: str) -> Dict[str, Any]: + """根据店铺合并系统询单策略。""" + from config.config import SYSTEM_INQUIRY_SHOPS + + rules = self._system_inquiry_rules or {} + if not bool(rules.get("enabled", True)): + return {"enabled": False} + + shops_env = os.getenv("SYSTEM_INQUIRY_SHOPS", SYSTEM_INQUIRY_SHOPS or "") + shop_whitelist = [s.strip() for s in shops_env.split(",") if s.strip()] + if shop_whitelist and (acc_id or "") not in shop_whitelist: + return {"enabled": False} + + policy: Dict[str, Any] = { + "enabled": True, + "action": str(rules.get("default_action", "silent")).strip().lower(), + "reply": str(rules.get("default_reply", "")).strip(), + "sender_keywords": self._normalize_kw_list(rules.get("sender_keywords")), + "message_keywords": self._normalize_kw_list(rules.get("message_keywords")), + } + shop_cfg = (rules.get("shops") or {}).get(acc_id or "", {}) + if isinstance(shop_cfg, dict): + if "enabled" in shop_cfg and not bool(shop_cfg.get("enabled", True)): + return {"enabled": False} + if shop_cfg.get("action"): + policy["action"] = str(shop_cfg.get("action")).strip().lower() + if shop_cfg.get("reply"): + policy["reply"] = str(shop_cfg.get("reply")).strip() + if isinstance(shop_cfg.get("sender_keywords"), list): + policy["sender_keywords"] = self._normalize_kw_list(shop_cfg.get("sender_keywords")) + if isinstance(shop_cfg.get("message_keywords"), list): + policy["message_keywords"] = self._normalize_kw_list(shop_cfg.get("message_keywords")) + if policy["action"] not in ("silent", "reply", "transfer"): + policy["action"] = "silent" + return policy + + def _match_system_inquiry(self, data: dict, policy: Dict[str, Any]) -> bool: + """识别是否为系统客服询单消息。""" + if not policy.get("enabled", False): + return False + + from_name = self.to_chinese(data.get("from_name", "") or "").lower() + from_id = str(data.get("from_id", "") or "").lower() + msg = self.to_chinese(data.get("msg", "") or "").lower() + + sender_hits = 0 + for kw in policy.get("sender_keywords", []): + if kw and (kw in from_name or kw in from_id): + sender_hits += 1 + message_hits = 0 + for kw in policy.get("message_keywords", []): + if kw and kw in msg: + message_hits += 1 + + # 优先看发送者特征;纯文本命中时至少要求两个关键词,降低误判风险 + return sender_hits > 0 or message_hits >= 2 + + async def _handle_system_inquiry(self, data: dict) -> bool: + """命中系统询单后按策略处理。""" + acc_id = data.get("acc_id", "") + policy = self._resolve_system_inquiry_policy(acc_id) + if not self._match_system_inquiry(data, policy): + return False + + customer_id = data.get("from_id", "") + metrics_emit("system_inquiry_detected", customer_id=customer_id, acc_id=acc_id) + action = policy.get("action", "silent") + logger.info(f"系统询单命中 | 店铺:{acc_id} | 客户:{customer_id} | action:{action}") + + if action == "reply": + reply = policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。" + await self.send_reply(data, reply) + metrics_emit("system_inquiry_auto_reply", customer_id=customer_id, acc_id=acc_id) + return True + if action == "transfer": + await self.transfer_to_human(data, "系统询单转人工") + metrics_emit("system_inquiry_transfer", customer_id=customer_id, acc_id=acc_id) + return True + + metrics_emit("system_inquiry_ignored", customer_id=customer_id, acc_id=acc_id) + return True + def _should_ignore(self, data: dict) -> bool: """判断是否应该忽略该消息(不回复)""" msg = self.to_chinese(data.get('msg', '')) @@ -1107,6 +1308,23 @@ class QingjianAPIClient: if not self.websocket: print(f"[{self.get_time()}] 错误: 未连接到服务器") return + + # 同一客户外发限流:N 秒内最多 1 条 + try: + from config.config import OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS + cooldown = max(0, int(OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS)) + except Exception: + cooldown = 5 + if cooldown > 0: + ckey = f"{original_msg.get('acc_id', '')}:{original_msg.get('from_id', '')}" + now_mono = time.monotonic() + last = self._last_reply_sent_at.get(ckey, 0.0) + if (now_mono - last) < cooldown: + logger.info( + f"外发限流命中,跳过发送 | 客户:{ckey} | cooldown:{cooldown}s | msg:{str(reply_content)[:40]}" + ) + return + self._last_reply_sent_at[ckey] = now_mono shop_id = original_msg.get("acc_id", "") diff --git a/db/customer_db.py b/db/customer_db.py index 6d89c63..b3e8725 100755 --- a/db/customer_db.py +++ b/db/customer_db.py @@ -65,6 +65,8 @@ class CustomerProfile: last_gemini_prompt: str = "" # 最近一次图片的 Gemini 处理提示词 last_aspect_ratio: str = "1:1" # 最近一次图片的建议输出比例 last_perspective: str = "no" # 最近一次图片的透视状态 + pending_quote_images: List[str] = None # 待统一报价图片队列(持久化) + pending_quote_requirements: List[str] = None # 待统一报价需求队列(持久化) # 当前任务状态 processing_status: str = "" # 待处理/处理中/等待确认/已完成 @@ -148,6 +150,10 @@ class CustomerProfile: self.image_type_history = [] if self.upsell_opportunity is None: self.upsell_opportunity = [] + if self.pending_quote_images is None: + self.pending_quote_images = [] + if self.pending_quote_requirements is None: + self.pending_quote_requirements = [] class CustomerDatabase: @@ -423,6 +429,24 @@ class CustomerDatabase: profile.last_perspective = perspective self.save_customer(profile) + def update_pending_quote_state( + self, + customer_id: str, + images: List[str], + requirements: List[str] + ): + """持久化收图阶段状态,防止服务重启丢失。""" + profile = self.get_customer(customer_id) + profile.pending_quote_images = list(images or []) + profile.pending_quote_requirements = list(requirements or []) + self.save_customer(profile) + + def clear_pending_quote_state(self, customer_id: str): + profile = self.get_customer(customer_id) + profile.pending_quote_images = [] + profile.pending_quote_requirements = [] + self.save_customer(profile) + def update_processing_status(self, customer_id: str, status: str, image_url: str = "", expected_done_at: str = ""): """更新当前任务处理状态""" profile = self.get_customer(customer_id) diff --git a/image/image_analyzer.py b/image/image_analyzer.py index a38cdcc..9ef16ef 100755 --- a/image/image_analyzer.py +++ b/image/image_analyzer.py @@ -239,6 +239,11 @@ class ImageAnalyzer: # 最短等待时间(秒):即使AI极快返回,也等这么久,看起来像真人在找 MIN_WAIT_SECONDS = 4 + DENSE_TEXT_SUBJECT_KEYWORDS = ( + "宣传栏", "公告栏", "展板", "海报墙", "通知栏", "知识栏", "制度牌", "公示栏", "墙报", "密密麻麻", + "word wall", "poster wall", "bulletin board", + ) + async def analyze(self, image_path: str) -> dict: """ 异步分析图片复杂度(使用火山引擎 /responses 接口)。 @@ -519,6 +524,8 @@ class ImageAnalyzer: sensitive = p(content, "敏感内容:", "敏感内容:").lower().strip() flatness = p(content, "平整度:", "平整度:").lower().strip() # flat|mild|rough has_text = p(content, "含文字:", "含文字:").lower().strip() + text_amount = p(content, "文字数量:", "文字数量:").strip() + text_layer_need = p(content, "文字分层需求:", "文字分层需求:").lower().strip() has_face = p(content, "含人脸:", "含人脸:").lower().strip() has_shadow = p(content, "阴影:", "阴影:").lower().strip() reason = p(content, "原因:", "原因:") @@ -534,11 +541,22 @@ class ImageAnalyzer: if has_face not in ("yes", "no"): has_face = "no" + valid_text_amounts = {"none", "少量 (1-10 字)", "中量 (11-50 字)", "大量 (51-200 字)", "极多 (200 字以上)"} + if text_amount not in valid_text_amounts: + text_amount = "none" + if text_layer_need not in ("yes", "no"): + text_layer_need = "no" if risk not in ("none", "low", "high"): risk = "none" if perspective not in ("no", "mild", "strong"): perspective = "no" + # 识别“密集文字场景”关键词(中文 + 英文兜底) + dense_text_scene = any( + kw in ((subject or "") + " " + (proc_type or "") + " " + (reason or "")).lower() + for kw in self.DENSE_TEXT_SUBJECT_KEYWORDS + ) + # 校验比例合法性 valid_ratios = {"1:1", "9:16", "16:9", "3:4", "4:3", "3:2", "2:3", "5:4", "4:5"} if aspect_ratio not in valid_ratios: @@ -556,11 +574,11 @@ class ImageAnalyzer: if has_text == "yes": if complexity == "simple": # 简单但含文字 → 提升到 normal 价格 - price_min, price_max = self.PRICE_MAP["normal"] + price_min, price_max, _ = self.PRICE_MAP["normal"] reason = "含文字,需精细处理" elif complexity == "normal": # normal 含文字 → 提升到 complex 价格 - price_min, price_max = self.PRICE_MAP["complex"] + price_min, price_max, _ = self.PRICE_MAP["complex"] reason = "含文字,需精细处理" # complex/hard 保持原价,已经够高 # 建议报价:complex/hard 取固定值,simple/normal 取中间,且必须为5的整数倍 @@ -605,6 +623,14 @@ class ImageAnalyzer: price_suggest = 80 reason += " | 大量文字分层" + # 硬规则:密集文字类图片不接单(知识宣传栏/公告栏等) + if text_amount in ["大量 (51-200 字)", "极多 (200 字以上)"] or dense_text_scene: + feasibility = "no" + risk = "high" + note = "文字内容过于密集(如宣传栏/公告栏),暂不接单处理" + reason = (reason or "文字密集") + " | 密集文字场景不接单" + price_suggest = 0 + # 确保是 5 的倍数 price_suggest = round(price_suggest / 5) * 5 @@ -649,6 +675,10 @@ class ImageAnalyzer: def _fallback(self, reason: str) -> dict: """识别失败时的默认结果(返回 normal,让人工判断)""" print(f"[ImageAnalyzer] 识别失败,使用默认值: {reason}") + text_amount = "none" + text_layer_need = "no" + text_surcharge = 0 + layer_surcharge = 0 return { "complexity": "normal", "reason": reason, diff --git a/services/service_gemini.py b/services/service_gemini.py index c4b9ead..dee77ec 100755 --- a/services/service_gemini.py +++ b/services/service_gemini.py @@ -14,11 +14,19 @@ import time from datetime import datetime from pathlib import Path import logging +from dotenv import load_dotenv +from utils.metrics_tracker import emit as metrics_emit from utils.service_base import BaseService logger = logging.getLogger(__name__) +load_dotenv() +GEMINI_IMAGE_MODEL = os.getenv("GEMINI_IMAGE_MODEL", "gemini-3.1-flash-image-preview") +GEMINI_IMAGE_FALLBACK_MODEL = os.getenv("GEMINI_IMAGE_FALLBACK_MODEL", "gemini-2.5-flash-image") +GEMINI_IMAGE_SIZE = os.getenv("GEMINI_IMAGE_SIZE", "1K") +GEMINI_THINKING_LEVEL = os.getenv("GEMINI_THINKING_LEVEL", "MINIMAL") +GEMINI_PERSON_GENERATION = os.getenv("GEMINI_PERSON_GENERATION", "") class GeminiExtractV2Service(BaseService): @@ -46,17 +54,26 @@ class GeminiExtractV2Service(BaseService): "name": "西风接口$0.014", "api_key": "sk-uRuvzLfIHsc3BiHZ2cyebk0cYsZ8NR9rLL326QqXCKIy9EpK", "api_url": "https://api.apiqik.online/v1beta/models", - "api_model": "gemini-2.5-flash-image", # 更稳定的模型 - "max_retries": 2, # 贵接口少重试 + "api_model": GEMINI_IMAGE_MODEL, + "max_retries": 2, "cost": "中", "use_gemini_format": True # 使用Gemini原生API格式 }, + { + "name": "西风接口Fallback", + "api_key": "sk-uRuvzLfIHsc3BiHZ2cyebk0cYsZ8NR9rLL326QqXCKIy9EpK", + "api_url": "https://api.apiqik.online/v1beta/models", + "api_model": GEMINI_IMAGE_FALLBACK_MODEL, + "max_retries": 1, + "cost": "中", + "use_gemini_format": True + }, { "name": "最贵的", "api_key": "sk-8i7uYE0RtnQwDImV8a5f7014DcAb46F6BcEb72Df92218aC8", "api_url": "https://api.laozhang.ai/v1/chat/completions", - "api_model": "gemini-2.5-flash-image-preview", + "api_model": GEMINI_IMAGE_MODEL, "max_retries": 1, "cost": "高" } @@ -90,6 +107,9 @@ class GeminiExtractV2Service(BaseService): output_path: str, custom_prompt: str = None, aspect_ratio: str = "1:1", + image_size: str = "", + person_generation: str = "", + thinking_level: str = "", ) -> tuple[bool, str, dict]: """ 使用多API配置进行印花图案提取 @@ -113,6 +133,7 @@ class GeminiExtractV2Service(BaseService): # 按优先级逐个尝试API配置 for config_index, config in enumerate(self.API_CONFIGS): logger.info(f"尝试使用API: {config['name']} (成本: {config['cost']})") + metrics_emit("gemini_request", model=config.get("api_model", ""), provider=config.get("name", "")) # 对每个API配置进行重试 for attempt in range(config['max_retries']): @@ -126,12 +147,25 @@ class GeminiExtractV2Service(BaseService): headers = { "Content-Type": "application/json" } - + # 有效比例列表(Auto 不传 aspectRatio) valid_ratios = {"1:1", "9:16", "16:9", "3:4", "4:3", "3:2", "2:3", "5:4", "4:5"} + valid_sizes = {"1K", "2K", "4K"} + valid_thinking = {"MINIMAL", "LOW", "MEDIUM", "HIGH"} image_config = {} if aspect_ratio in valid_ratios: image_config["aspectRatio"] = aspect_ratio + size_val = (image_size or GEMINI_IMAGE_SIZE or "").upper().strip() + if size_val in valid_sizes: + image_config["imageSize"] = size_val + person_val = (person_generation or GEMINI_PERSON_GENERATION or "").strip() + if person_val: + # 中转接口若支持该字段会生效;不设置时不发送,保证兼容 + image_config["personGeneration"] = person_val + thinking_val = (thinking_level or GEMINI_THINKING_LEVEL or "").upper().strip() + thinking_config = {} + if thinking_val in valid_thinking: + thinking_config["thinkingLevel"] = thinking_val data = { "contents": [ @@ -153,9 +187,13 @@ class GeminiExtractV2Service(BaseService): "generationConfig": { "responseModalities": ["IMAGE"], **({"imageConfig": image_config} if image_config else {}), + **({"thinkingConfig": thinking_config} if thinking_config else {}), } } - logger.info(f"Gemini 生成配置: 比例={aspect_ratio} 格式=JPEG") + logger.info( + f"Gemini 生成配置: 比例={aspect_ratio} 尺寸={image_config.get('imageSize', '默认')} " + f"person={image_config.get('personGeneration', '默认')} thinking={thinking_config.get('thinkingLevel', '默认')}" + ) else: # OpenAI兼容格式 api_url = config['api_url'] @@ -212,6 +250,25 @@ class GeminiExtractV2Service(BaseService): continue result = await response.json() + # Gemini 偶发只返回文本不返回图片:NO_IMAGE 时快速重试/降级 + if config.get('use_gemini_format', False): + finish_reason = "" + try: + finish_reason = ( + (result.get("candidates") or [{}])[0].get("finishReason", "") + ) + except Exception: + finish_reason = "" + if finish_reason == "NO_IMAGE": + logger.warning( + f"{config['name']} 返回 NO_IMAGE (模型={config.get('api_model')}),第{attempt + 1}次" + ) + metrics_emit("gemini_no_image", model=config.get("api_model", ""), provider=config.get("name", "")) + if attempt == config['max_retries'] - 1: + logger.warning(f"{config['name']} NO_IMAGE 重试已用完,切换下一个配置") + break + await asyncio.sleep(1 + attempt) + continue except (aiohttp.ClientError, asyncio.TimeoutError, AssertionError) as e: logger.error(f"{config['name']} 网络连接错误 (第{attempt + 1}次): {str(e)}") @@ -235,6 +292,7 @@ class GeminiExtractV2Service(BaseService): if success: logger.info(f"使用 {config['name']} 成功完成印花提取") + metrics_emit("gemini_success", model=config.get("api_model", ""), provider=config.get("name", "")) try: from utils.api_cost_tracker import record record("gemini_extract", count=1) @@ -506,4 +564,4 @@ if __name__ == "__main__": await service.cleanup() - asyncio.run(test()) \ No newline at end of file + asyncio.run(test()) diff --git a/tests/test_batch_quote_reply_format.py b/tests/test_batch_quote_reply_format.py new file mode 100644 index 0000000..421755f --- /dev/null +++ b/tests/test_batch_quote_reply_format.py @@ -0,0 +1,66 @@ +import unittest +from unittest.mock import AsyncMock, patch + +from core.pydantic_ai_agent import CustomerMessage, CustomerServiceAgent + + +class BatchQuoteReplyFormatTest(unittest.IsolatedAsyncioTestCase): + async def test_batch_reply_contains_per_image_and_options(self): + agent = CustomerServiceAgent() + cid = "__batch_quote_case__" + st = agent._get_conversation_state(cid) + st.pending_image_urls = ["https://img.alicdn.com/a.jpg", "https://img.alicdn.com/b.jpg"] + st.pending_requirements = ["去背景", "加急"] + + msg = CustomerMessage( + msg_id="m-batch-1", + acc_id="test_shop", + msg="发完了,统一报价", + from_id=cid, + from_name="t", + cy_id=cid, + acc_type="AliWorkbench", + msg_type=0, + cy_name="t", + goods_name="专业找图", + goods_order="", + ) + + fake_r1 = { + "complexity": "normal", + "reason": "常规处理", + "price_min": 15, + "price_max": 25, + "price_suggest": 20, + "feasibility": "yes", + "risk": "low", + "aspect_ratio": "1:1", + "perspective": "no", + } + fake_r2 = { + "complexity": "complex", + "reason": "细节较多", + "price_min": 20, + "price_max": 30, + "price_suggest": 25, + "feasibility": "yes", + "risk": "low", + "aspect_ratio": "1:1", + "perspective": "no", + } + + with patch("image.image_analyzer.image_analyzer.analyze", new=AsyncMock(side_effect=[fake_r1, fake_r2])): + with patch("core.workflow.workflow.image_analysis_result", new=AsyncMock(return_value=None)): + res = await agent._quote_pending_images(st, msg) + + self.assertFalse(res.get("need_transfer", False)) + reply = res.get("reply", "") + self.assertIn("图1", reply) + self.assertIn("图2", reply) + self.assertIn("可选", reply) + self.assertIn("打包", reply) + self.assertIn("共", reply) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_outbound_cooldown.py b/tests/test_outbound_cooldown.py new file mode 100644 index 0000000..358f768 --- /dev/null +++ b/tests/test_outbound_cooldown.py @@ -0,0 +1,40 @@ +import os +import unittest + +from websockets.protocol import State + +from core.websocket_client import QingjianAPIClient + + +class _DummyWS: + def __init__(self): + self.state = State.OPEN + self.sent = [] + + async def send(self, msg_json: str): + self.sent.append(msg_json) + + +class OutboundCooldownTest(unittest.IsolatedAsyncioTestCase): + def setUp(self): + os.environ["OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS"] = "5" + + async def test_skip_second_reply_within_cooldown(self): + c = QingjianAPIClient(enable_agent=False) + c.websocket = _DummyWS() + msg = { + "acc_id": "shop_a", + "from_id": "u001", + "from_name": "u001", + "acc_type": "AliWorkbench", + } + await c.send_reply(msg, "第一条") + await c.send_reply(msg, "第二条") + self.assertEqual(len(c.websocket.sent), 1) + + def tearDown(self): + os.environ.pop("OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS", None) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_oversize_guard.py b/tests/test_oversize_guard.py new file mode 100644 index 0000000..dfb7791 --- /dev/null +++ b/tests/test_oversize_guard.py @@ -0,0 +1,34 @@ +import os +import unittest + +from core.websocket_client import QingjianAPIClient + + +class OversizeGuardTest(unittest.TestCase): + def setUp(self): + os.environ["MAX_SERVICE_SIZE_LONGEST_METERS"] = "10" + os.environ["MAX_SERVICE_SIZE_AREA_SQM"] = "20" + + def test_extract_size_pairs(self): + c = QingjianAPIClient(enable_agent=False) + pairs = c._extract_size_pairs_m("15*6.4米 高度") + self.assertTrue(len(pairs) >= 1) + self.assertEqual(pairs[0], (15.0, 6.4)) + + def test_oversize_hits(self): + c = QingjianAPIClient(enable_agent=False) + r = c._oversize_reply_if_needed("15*6.4米") + self.assertIn("做不了", r) + + def test_normal_size_not_hit(self): + c = QingjianAPIClient(enable_agent=False) + r = c._oversize_reply_if_needed("2.4*1.2米") + self.assertEqual(r, "") + + def tearDown(self): + os.environ.pop("MAX_SERVICE_SIZE_LONGEST_METERS", None) + os.environ.pop("MAX_SERVICE_SIZE_AREA_SQM", None) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_regression_pipeline.py b/tests/test_regression_pipeline.py new file mode 100644 index 0000000..faf1ef9 --- /dev/null +++ b/tests/test_regression_pipeline.py @@ -0,0 +1,76 @@ +import os +import unittest +from unittest.mock import AsyncMock + +from core.pydantic_ai_agent import CustomerServiceAgent, CustomerMessage +from db.customer_db import db + + +class RegressionPipelineTest(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.customer_id = "__regression_test_customer__" + db.clear_pending_quote_state(self.customer_id) + os.environ["FEATURE_BATCH_QUOTE_ENABLED"] = "true" + os.environ["FEATURE_BATCH_QUOTE_PERCENT"] = "100" + os.environ["FEATURE_BATCH_QUOTE_SHOPS"] = "" + + async def test_collect_images_then_ack(self): + agent = CustomerServiceAgent() + msg = CustomerMessage( + msg_id="m1", + acc_id="test_shop", + msg="https://img.alicdn.com/a.jpg#*#https://img.alicdn.com/b.jpg", + from_id=self.customer_id, + from_name="t", + cy_id=self.customer_id, + acc_type="AliWorkbench", + msg_type=0, + cy_name="t", + goods_name="专业找图", + goods_order="", + ) + resp = await agent.process_message(msg) + self.assertTrue(resp.should_reply) + self.assertIn("张", resp.reply) + st = agent._get_conversation_state(self.customer_id) + self.assertEqual(len(st.pending_image_urls), 2) + + async def test_finish_signal_triggers_batch_quote(self): + agent = CustomerServiceAgent() + st = agent._get_conversation_state(self.customer_id) + st.pending_image_urls = ["https://img.alicdn.com/a.jpg"] + st.pending_requirements = ["去背景"] + agent._sync_pending_quote_state(self.customer_id, st) + agent._quote_pending_images = AsyncMock(return_value={"reply": "打包15元,确认我就安排", "need_transfer": False}) + + msg = CustomerMessage( + msg_id="m2", + acc_id="test_shop", + msg="发完了,报价吧", + from_id=self.customer_id, + from_name="t", + cy_id=self.customer_id, + acc_type="AliWorkbench", + msg_type=0, + cy_name="t", + goods_name="专业找图", + goods_order="", + ) + resp = await agent.process_message(msg) + self.assertTrue(resp.should_reply) + self.assertIn("15", resp.reply) + agent._quote_pending_images.assert_awaited() + + async def test_pending_state_restore(self): + db.update_pending_quote_state(self.customer_id, ["u1", "u2"], ["r1"]) + agent = CustomerServiceAgent() + st = agent._get_conversation_state(self.customer_id) + self.assertEqual(st.pending_image_urls, ["u1", "u2"]) + self.assertEqual(st.pending_requirements, ["r1"]) + + def tearDown(self): + db.clear_pending_quote_state(self.customer_id) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_system_inquiry_rules.py b/tests/test_system_inquiry_rules.py new file mode 100644 index 0000000..3cb19b4 --- /dev/null +++ b/tests/test_system_inquiry_rules.py @@ -0,0 +1,70 @@ +import os +import unittest +from unittest.mock import AsyncMock + +from core.websocket_client import QingjianAPIClient + + +class SystemInquiryRulesTest(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.rules = { + "enabled": True, + "default_action": "silent", + "default_reply": "已收到", + "sender_keywords": ["系统客服", "官方客服"], + "message_keywords": ["系统询单", "代客咨询"], + "shops": { + "shop_reply": { + "enabled": True, + "action": "reply", + "reply": "店铺回复模板", + "sender_keywords": ["机器人客服"], + "message_keywords": ["询单"], + } + }, + } + os.environ["SYSTEM_INQUIRY_ENABLED"] = "true" + os.environ["SYSTEM_INQUIRY_SHOPS"] = "" + + async def test_detect_by_sender_keyword(self): + client = QingjianAPIClient(enable_agent=False) + client._system_inquiry_rules = self.rules + policy = client._resolve_system_inquiry_policy("shop_a") + data = {"acc_id": "shop_a", "from_name": "平台系统客服", "from_id": "kefu001", "msg": "你好"} + self.assertTrue(client._match_system_inquiry(data, policy)) + + async def test_shop_rule_reply_action(self): + client = QingjianAPIClient(enable_agent=False) + client._system_inquiry_rules = self.rules + client.send_reply = AsyncMock() + client.transfer_to_human = AsyncMock() + + data = { + "acc_id": "shop_reply", + "from_name": "机器人客服A", + "from_id": "robot_01", + "msg": "有个询单请处理", + "acc_type": "AliWorkbench", + } + handled = await client._handle_system_inquiry(data) + self.assertTrue(handled) + client.send_reply.assert_awaited_once() + client.transfer_to_human.assert_not_awaited() + + async def test_shop_whitelist_blocks_other_shops(self): + os.environ["SYSTEM_INQUIRY_SHOPS"] = "shop_only" + client = QingjianAPIClient(enable_agent=False) + client._system_inquiry_rules = self.rules + client.send_reply = AsyncMock() + data = {"acc_id": "shop_other", "from_name": "系统客服", "from_id": "sys_1", "msg": "系统询单"} + handled = await client._handle_system_inquiry(data) + self.assertFalse(handled) + client.send_reply.assert_not_awaited() + + def tearDown(self): + for k in ("SYSTEM_INQUIRY_ENABLED", "SYSTEM_INQUIRY_SHOPS"): + os.environ.pop(k, None) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_transfer_greeting_context.py b/tests/test_transfer_greeting_context.py new file mode 100644 index 0000000..504c41e --- /dev/null +++ b/tests/test_transfer_greeting_context.py @@ -0,0 +1,20 @@ +import unittest + +from core.websocket_client import QingjianAPIClient + + +class TransferGreetingContextTest(unittest.TestCase): + def test_transfer_greeting_is_non_empty(self): + c = QingjianAPIClient(enable_agent=False) + text = c._pick_transfer_greeting() + self.assertTrue(isinstance(text, str) and len(text) > 0) + + def test_transfer_greeting_contains_presence_phrase(self): + c = QingjianAPIClient(enable_agent=False) + for _ in range(10): + text = c._pick_transfer_greeting() + self.assertTrue(("在" in text) or ("我在" in text)) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/utils/metrics_tracker.py b/utils/metrics_tracker.py new file mode 100644 index 0000000..0a415a8 --- /dev/null +++ b/utils/metrics_tracker.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +""" +轻量指标采集器 +- 事件落盘到 JSONL +- 提供近 N 小时聚合,给 /api/metrics 使用 +""" +from __future__ import annotations + +import json +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, Any + +ROOT = Path(__file__).resolve().parent.parent +METRICS_FILE = ROOT / "config" / ".runtime_metrics.jsonl" + + +def _now_iso() -> str: + return datetime.now().isoformat(timespec="seconds") + + +def emit(event: str, **fields: Any): + """记录一条事件,失败不抛异常。""" + try: + METRICS_FILE.parent.mkdir(parents=True, exist_ok=True) + payload = {"ts": _now_iso(), "event": event} + payload.update(fields or {}) + with METRICS_FILE.open("a", encoding="utf-8") as f: + f.write(json.dumps(payload, ensure_ascii=False) + "\n") + except Exception: + pass + + +def _iter_recent(hours: int = 24): + if not METRICS_FILE.exists(): + return [] + cutoff = datetime.now() - timedelta(hours=hours) + items = [] + try: + with METRICS_FILE.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + ts = datetime.fromisoformat(obj.get("ts", "")) + except Exception: + continue + if ts >= cutoff: + items.append(obj) + except Exception: + return [] + return items + + +def get_runtime_summary(hours: int = 24) -> Dict[str, Any]: + """近 N 小时运行指标。""" + rows = _iter_recent(hours=hours) + counts: Dict[str, int] = {} + for r in rows: + e = r.get("event", "unknown") + counts[e] = counts.get(e, 0) + 1 + + inbound = counts.get("inbound_msg", 0) + transfer = counts.get("transfer_to_human", 0) + quote = counts.get("quote_generated", 0) + ai_fail = counts.get("ai_call_failed", 0) + gemini_req = counts.get("gemini_request", 0) + no_image = counts.get("gemini_no_image", 0) + + return { + "window_hours": hours, + "counts": counts, + "rates": { + "transfer_rate": round((transfer / inbound) * 100, 2) if inbound else 0.0, + "quote_rate": round((quote / inbound) * 100, 2) if inbound else 0.0, + "ai_fail_rate": round((ai_fail / inbound) * 100, 2) if inbound else 0.0, + "no_image_rate": round((no_image / gemini_req) * 100, 2) if gemini_req else 0.0, + }, + } + + +def get_dashboard(hours: int = 24) -> Dict[str, Any]: + """业务+运行看板聚合。""" + runtime = get_runtime_summary(hours=hours) + try: + from db.deal_outcome_db import get_daily_summary + daily = get_daily_summary() + deal_ok = int(daily.get("成交数", 0)) + deal_fail = int(daily.get("未成交数", 0)) + total = deal_ok + deal_fail + conversion = round((deal_ok / total) * 100, 2) if total else 0.0 + except Exception: + deal_ok = deal_fail = 0 + conversion = 0.0 + return { + "runtime": runtime, + "business": { + "deal_success": deal_ok, + "deal_fail": deal_fail, + "conversion_rate": conversion, + }, + }