"""PydanticAI Agent 模块 架构:单 Agent + 多 Tool 模式 - Agent 负责对话逻辑和决策 - Tool 负责具体能力:看图/查客户/转接 - AI 自主决定何时调用哪个工具,时序自然,不需要外部协调 """ import os import glob import asyncio import random import hashlib from typing import Optional, Dict, List, Any from datetime import datetime from pydantic import BaseModel, Field from pydantic_ai import Agent, RunContext from pydantic_ai.models.openai import OpenAIChatModel from pydantic_ai.providers.openai import OpenAIProvider from dotenv import load_dotenv from utils.metrics_tracker import emit as metrics_emit load_dotenv() from services.service_tuhui_upload import upload_to_tuhui from core.workflow_router import get_workflow_router from core.workflow_router import get_workflow_router # ========== 企业微信通知 ========== _WECHAT_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cc88bdef-a13f-4d7e-bdb6-ee51b68b8205" async def _notify_wechat(content: str, tag: str = "通知"): """发送企业微信 markdown 通知,任何异常都发""" if not _WECHAT_WEBHOOK: print(f"[{tag}] 未配置 WECHAT_WEBHOOK,跳过推送") return try: import httpx async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(_WECHAT_WEBHOOK, json={ "msgtype": "markdown", "markdown": {"content": content} }) data = resp.json() if data.get("errcode") == 0: print(f"[{tag}] 企业微信推送成功 ✓") else: print(f"[{tag}] 企业微信推送失败: {data}") except Exception as e: print(f"[{tag}] 企业微信发送异常: {e}") async def _notify_wechat_overdue(): """API 欠费时发企业微信通知""" await _notify_wechat( "⚠️ **火山引擎 API 欠费**,客服AI已停止响应,请立即充值!\n" "地址:https://console.volcengine.com/ark" ) # ========== 转接常量 ========== TRANSFER_MESSAGE = "话术|[转移会话],分组20252916034,无原因" # ========== 数据模型 ========== class CustomerMessage(BaseModel): """客户消息模型""" msg_id: str acc_id: str msg: str from_id: str from_name: str cy_id: str acc_type: str msg_type: int cy_name: str goods_name: Optional[str] = None goods_order: Optional[str] = None class ConversationState(BaseModel): """对话状态""" customer_id: str stage: str = "售前" # 售前/售后 last_price: Optional[int] = None # 最后报价 last_min_price: Optional[int] = None # 最近图片的最低价 last_order_id: Optional[str] = None # 订单号 order_status: Optional[str] = None # 订单状态 discount_count: int = 0 # 让价次数 image_count: int = 0 # 图片数量 pending_image_urls: List[str] = Field(default_factory=list) # 待统一报价图片 pending_requirements: List[str] = Field(default_factory=list) # 待统一报价需求 last_update: str = "" last_reply_at: Optional[datetime] = None # 最后一次回复客户的时间 class AgentDeps(BaseModel): """Agent 依赖项 - 用于传递上下文""" msg_id: str acc_id: str from_id: str platform: str class AgentResponse(BaseModel): """Agent 回复模型""" reply: str should_reply: bool = True need_transfer: bool = False # 是否需要转人工 transfer_msg: str = "" # 转接消息 def _get_shop_type(acc_id: str = "", goods_name: str = "") -> str: """根据 acc_id 或 goods_name 判断店铺类型,返回 gemini_api / find_image / default""" try: from config.config import CONFIG_DIR import json cfg_path = CONFIG_DIR / "shop_prompts.json" if not cfg_path.exists(): return "find_image" with open(cfg_path, "r", encoding="utf-8") as f: cfg = json.load(f) shops = cfg.get("shops", {}) goods_kw = cfg.get("goods_keywords", {}) type_hints = cfg.get("type_hints", {}) # 优先按 acc_id if acc_id and acc_id in shops: return shops[acc_id].get("type", "find_image") # 按商品名关键词 goods_lower = (goods_name or "").lower() for kw, stype in goods_kw.items(): if kw in goods_lower: return stype except Exception: pass return "find_image" def load_skill_md(skills_dir: str = "skills") -> str: """加载 skills 目录下的所有 SKILL.md 文件内容""" skill_contents = [] skill_files = glob.glob(os.path.join(skills_dir, "**/SKILL.md"), recursive=True) for skill_file in skill_files: try: with open(skill_file, 'r', encoding='utf-8') as f: content = f.read() skill_contents.append(content) except Exception as e: print(f"警告: 读取 {skill_file} 失败: {e}") return "\n\n".join(skill_contents) class CustomerServiceAgent: """客服 Agent - 支持 SKILL.md + 工作流""" C_RESET = "\033[0m" C_PROMPT = "\033[96m" # cyan C_THINK = "\033[95m" # magenta C_TOOL = "\033[93m" # yellow C_REPLY = "\033[92m" # green C_MUTED = "\033[90m" # gray def __init__(self, skills_dir: str = "skills"): self.api_key = os.getenv("OPENAI_API_KEY") self.base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") self.model_name = os.getenv("OPENAI_MODEL", "gpt-4o-mini") if not self.api_key: raise ValueError("请设置 OPENAI_API_KEY 环境变量") # 对话状态管理 self.conversations: Dict[str, ConversationState] = {} # 多轮对话历史(PydanticAI ModelMessage 列表,按客户ID存储) self.message_histories: Dict[str, list] = {} # 加载 skills 内容 self.skills_content = load_skill_md(skills_dir) # 创建 OpenAI 模型 model = OpenAIChatModel( model_name=self.model_name, provider=OpenAIProvider( api_key=self.api_key, base_url=self.base_url ) ) self.agent = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_system_prompt() ) self.agent_after_sale = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_after_sale_prompt() ) self.agent_pricing = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_pricing_prompt() ) self.agent_processing = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_processing_prompt() ) self.agent_similar = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_similar_prompt() ) # 工作流程路由器 self.workflow_router = get_workflow_router() self.agent_order = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_order_prompt() ) self.agent_risk = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_risk_prompt() ) # 注册工具 self._register_tools() @staticmethod def _log_block(title: str, content: str): """统一的控制台分层日志输出。""" print(f"{CustomerServiceAgent.C_PROMPT}[{title}]{CustomerServiceAgent.C_RESET}") print(content) print(f"{CustomerServiceAgent.C_MUTED}────────────────────{CustomerServiceAgent.C_RESET}") @staticmethod def _normalize_reply_text(text: Optional[str]) -> str: """清洗模型输出,避免把占位词直接发给客户。""" if text is None: return "" cleaned = str(text).strip() if cleaned.lower() in {"无", "none", "null", "n/a"}: return "" return cleaned def _register_tools(self): """注册所有 Tool,让 Agent 可以主动调用""" @self.agent.tool async def analyze_image(ctx: RunContext[AgentDeps], image_url: str) -> str: """ 分析客户发来的图片复杂度,用于报价。 收到图片URL时调用此工具,返回复杂度和建议报价。 """ try: from image.image_analyzer import image_analyzer result = await image_analyzer.analyze(image_url) complexity_label = { "simple": "简单(画面干净)", "normal": "一般复杂度", "complex": "细节偏多", "hard": "非常复杂", }.get(result["complexity"], result["complexity"]) # 持久化图片URL和复杂度,重启后仍能记住这张图 try: from db.customer_db import db db.update_last_image( ctx.deps.from_id, image_url, complexity=result["complexity"], gemini_prompt=result.get("gemini_prompt", ""), aspect_ratio=result.get("aspect_ratio", "1:1"), perspective=result.get("perspective", "no"), ) except Exception: pass # 存图片类型到客户画像 try: from db.customer_db import db as _db if result.get("subject"): _db.add_image_type(ctx.deps.from_id, result["subject"]) except Exception: pass # 在 workflow 里创建待处理任务(付款后自动触发 Gemini) try: from core.workflow import workflow await workflow.image_analysis_result( customer_id=ctx.deps.from_id, image_url=image_url, complexity=result["complexity"], acc_id=ctx.deps.acc_id, acc_type=ctx.deps.platform, gemini_prompt=result.get("gemini_prompt", ""), aspect_ratio=result.get("aspect_ratio", "1:1"), perspective=result.get("perspective", "no"), proc_type=result.get("proc_type", ""), subject=result.get("subject", ""), quality=result.get("quality", ""), ) print(f"[Agent] Workflow 任务已创建 | 客户: {ctx.deps.from_id} | 比例: {result.get('aspect_ratio')} | 透视: {result.get('perspective')} | 图片: {image_url[:60]}...") except Exception as e: print(f"[Agent] Workflow 任务创建失败: {e}") # 组装给 AI 的分析报告 risk = result.get("risk", "none") has_face = result.get("has_face", "no") feasibility = result.get("feasibility", "yes") note = result.get("note", "") lines = [ f"图片主体:{result['subject'] or '未识别'}", f"处理类型:{result['proc_type'] or '高清修复'}", f"原图质量:{result['quality'] or '未知'}", f"图片类型:{result.get('category', '') or '通用'}", f"图片尺寸:{(result.get('width') or 0)}x{(result.get('height') or 0)}({result.get('megapixels', 0.0)}MP)", f"含人脸:{'是' if has_face == 'yes' else '否'}", f"复杂度:{complexity_label}", f"原因:{result['reason']}", ] if result.get("size_surcharge"): lines.append(f"尺寸加价:+{result['size_surcharge']}元") if result.get("size_note"): lines.append(f"尺寸提示:{result['size_note']}") try: st = self._get_conversation_state(ctx.deps.from_id) if isinstance(result.get("price_min"), (int, float)): st.last_min_price = int(result.get("price_min") or 0) try: from db.customer_db import db as _db _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) except Exception: pass except Exception: pass # 根据可做性和风险等级给 AI 不同的行动指引 if feasibility == "no": if "敏感" in (note or ""): lines.append("【拒绝】图片含敏感/黄色/擦边内容,不接单。") lines.append("→ 直接拒绝,不说「发图来看看」,自然回复如:这类不做/不接。") else: lines.append("【无法处理】此图无法处理(纯黑/纯白/完全损坏/要找原始RAW文件)。") lines.append("→ 告知客户无法处理,建议换图或说明原因,不要报价。") elif risk == "high": lines.append(f"【高风险】此图处理风险高:{note or 'AI修复后效果不能保证与原图一致'}") lines.append(f"建议报价:{result['price_suggest']}元") lines.append("→ 先自然说明风险(人脸/效果可能不完美),再报价,满意再拍。话术自然。") elif risk == "low": lines.append(f"【低风险-含人脸】修复后人脸相似度约70-90%,效果不稳定。") lines.append(f"建议报价:{result['price_suggest']}元") lines.append(f"→ 报价时自然加一句风险提示(人脸可能有轻微变化、满意再付等)") else: # 无风险,正常报价 base_price = result.get('price_suggest', 20) text_surcharge = result.get('text_surcharge', 0) layer_surcharge = result.get('layer_surcharge', 0) total_price = base_price + text_surcharge + layer_surcharge # 构建报价说明 price_explanation = f"建议报价:{total_price}元" if text_surcharge > 0: price_explanation += f"(含文字处理 +{text_surcharge}元)" if layer_surcharge > 0: price_explanation += f"(含分层 +{layer_surcharge}元)" lines.append(price_explanation) # 添加文字数量说明 text_amount = result.get('text_amount', 'none') if text_amount != 'none': lines.append(f"文字数量:{text_amount},需要精细处理") if feasibility == "partial": lines.append(f"⚠️ 此图有一定难度:{note or '效果可能不完美'},回复时可加「效果不满意退款」") if note and note not in ("无", ""): lines.append(f"提示:{note}") lines.append(f"【立刻回复客户报价 {total_price} 元,话术自然多变】") return "\n".join(lines) except Exception as e: return f"图片分析失败: {e},请根据经验判断报价" @self.agent.tool async def get_customer_info(ctx: RunContext[AgentDeps], customer_id: str) -> str: """ 查询客户历史信息:消费记录、性格标签、报价历史等。 对话开始时或需要了解客户背景时调用。 """ try: from db.customer_db import db return db.get_profile_text(customer_id) except Exception as e: return f"查询失败: {e}" @self.agent.tool async def transfer_to_human(ctx: RunContext[AgentDeps]) -> str: """ 转接人工客服。 遇到退款/投诉/情绪激动/复杂售后时调用。 """ return "TRANSFER_REQUESTED" @self.agent.tool async def save_customer_note( ctx: RunContext[AgentDeps], customer_id: str, note: str ) -> str: """ 记录客户关键信息到画像(邮箱/微信/特殊需求等)。 客户提供联系方式或重要信息时调用。 """ try: from db.customer_db import db db.add_note(customer_id, note) return "已记录" except Exception as e: return f"记录失败: {e}" @self.agent.tool async def update_contact_info( ctx: RunContext[AgentDeps], customer_id: str, contact_type: str, value: str ) -> str: """ 更新客户联系方式。 当客户说出邮箱/手机/微信时调用,比正则提取更准确。 contact_type 枚举值: email - 邮箱 phone - 手机号 wechat - 微信号 """ try: from db.customer_db import db if contact_type == "email": db.update_email(customer_id, value) elif contact_type == "phone": db.update_phone(customer_id, value) elif contact_type == "wechat": db.update_wechat(customer_id, value) else: return f"未知联系方式类型: {contact_type}" return f"已保存 {contact_type}: {value}" except Exception as e: return f"保存失败: {e}" @self.agent.tool async def record_quote( ctx: RunContext[AgentDeps], customer_id: str, price: int, description: str = "" ) -> str: """ 记录本次报价到客户画像,用于后续对话保持价格一致。 每次给客户报价后调用。 Args: customer_id: 客户ID price: 报价金额(元) description: 报价描述,如"单图处理"/"三图打包" """ try: from db.customer_db import db db.update_last_price(customer_id, price) if description: db.add_note(customer_id, f"报价 {price}元({description})") # 同步到内存状态 state = self.conversations.get(customer_id) if state: state.last_price = price return f"已记录报价 {price}元" except Exception as e: return f"记录失败: {e}" @self.agent.tool async def process_image_gemini(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: """ 触发 Gemini 作图处理。客户付款后或说「安排一下」「处理一下」时调用。 会从客户档案读取上次发图的 URL 和处理参数(提示词、比例、透视),启动 Gemini 流程。 处理完成后会自动发图给客户。 """ try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不自动作图" except Exception: return "现在处理模块暂时暂停,先不自动作图" cid = customer_id or ctx.deps.from_id try: from core.workflow import workflow ok = await workflow.trigger_processing_on_payment( customer_id=cid, acc_id=ctx.deps.acc_id, acc_type=ctx.deps.platform, ) if ok: return "已安排,稍后发你" return "该客户暂无待处理图片,请先发图" except Exception as e: return f"触发作图失败: {e},请稍后重试或转人工" @self.agent_pricing.tool async def analyze_image_pricing(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from image.image_analyzer import image_analyzer result = await image_analyzer.analyze(image_url) p = result.get("price_suggest", 20) try: st = self._get_conversation_state(ctx.deps.from_id) if isinstance(result.get("price_min"), (int, float)): st.last_min_price = int(result.get("price_min") or 0) try: from db.customer_db import db as _db _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) except Exception: pass except Exception: pass return f"建议报价:{p}元" except Exception as e: return f"图片分析失败: {e}" @self.agent_pricing.tool async def record_quote_pricing( ctx: RunContext[AgentDeps], customer_id: str, price: int, description: str = "" ) -> str: try: from db.customer_db import db db.update_last_price(customer_id, price) return "ok" except Exception as e: return f"记录失败: {e}" @self.agent_processing.tool async def process_image_gemini_run(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: """触发 Gemini 作图处理(processing agent 专用入口)。""" return await process_image_gemini(ctx, customer_id) @self.agent_similar.tool async def recommend_similar(ctx: RunContext[AgentDeps], hint: str = "") -> str: try: return "有类似款,拍下我发你参考图。" except Exception as e: return f"推荐失败: {e}" @self.agent_order.tool async def handle_order(ctx: RunContext[AgentDeps], raw_msg: str = "") -> str: try: info = self._parse_order_info(raw_msg or "") paid_kw = ["等待发货", "已付款", "付款成功", "买家已付款"] if any(k in (info.get("pay_status", "") or "") for k in paid_kw) or any(k in (info.get("order_status", "") or "") for k in paid_kw): return "已安排,稍后发你" return "" except Exception: return "" @self.agent_risk.tool async def risk_filter(ctx: RunContext[AgentDeps], text: str = "") -> str: return "这类不接哦,抱歉哈。" @self.agent.tool async def remove_background(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】去背景,输出白底图。客户只要去背景时调用。""" try: from image.image_tools import remove_background as _rb r = await _rb(image_url) if r["success"]: return f"去背景完成,已保存。自然回复客户好了发你" return f"去背景失败:{r['message']}" except Exception as e: return f"去背景失败:{e}" @self.agent.tool async def perspective_correct(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】透视矫正。输入需白底图,输出展平图。""" try: from image.image_tools import perspective_correct as _pc r = await _pc(image_url) if r["success"]: return f"透视矫正完成。自然回复客户好了" return f"透视矫正失败:{r['message']}" except Exception as e: return f"透视矫正失败:{e}" @self.agent.tool async def extract_pattern_tool( ctx: RunContext[AgentDeps], image_url: str, prompt: str = "", aspect_ratio: str = "1:1" ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】印花提取/主处理。按提示词和比例处理。""" try: from image.image_tools import extract_pattern r = await extract_pattern(image_url, prompt=prompt, aspect_ratio=aspect_ratio) if r["success"]: return f"提取完成。自然回复客户好了发你" return f"提取失败:{r['message']}" except Exception as e: return f"提取失败:{e}" @self.agent.tool async def enhance_image_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】高清增强。客户只要清晰化时调用。""" try: from image.image_tools import enhance_image r = await enhance_image(image_url) if r["success"]: return f"高清增强完成。自然回复客户好了" return f"增强失败:{r['message']}" except Exception as e: return f"增强失败:{e}" @self.agent.tool async def color_match_tool( ctx: RunContext[AgentDeps], orig_url: str, result_url: str, strength: float = 0.75 ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】颜色匹配。将 result 色调匹配到 orig。""" try: from image.image_tools import color_match_images r = await color_match_images(orig_url, result_url, strength=strength) if r["success"]: return f"颜色匹配完成" return f"颜色匹配失败:{r['message']}" except Exception as e: return f"颜色匹配失败:{e}" @self.agent.tool async def trim_border_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】裁切四周背景边(白/黄/米等)。""" try: from image.image_tools import trim_border r = await trim_border(image_url) if r["success"]: return f"裁边完成" return f"裁边失败:{r['message']}" except Exception as e: return f"裁边失败:{e}" @self.agent.tool async def vectorize_to_eps_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】矢量化 - 将图片转为 EPS 矢量文件。客户要做矢量图、转 EPS、转 AI 格式时调用。""" try: from image.image_tools import vectorize_to_eps r = await vectorize_to_eps(image_url) if r["success"]: return f"矢量化完成,已生成 EPS 文件。自然回复客户好了发你" return f"矢量化失败:{r['message']}" except Exception as e: return f"矢量化失败:{e}" @self.agent.tool async def meitu_enhance_tool( ctx: RunContext[AgentDeps], image_url: str, mode: str = "standard" ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """ 【独立工具】美图画质增强。客户要画质增强、清晰化、美图处理时调用。 Args: image_url: 图片 URL 或本地路径 mode: 处理模式。crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化) """ try: from image.image_tools import meitu_enhance r = await meitu_enhance(image_url, mode=mode) if r["success"]: return f"画质增强完成。自然回复客户好了发你" return f"画质增强失败:{r['message']}" except Exception as e: return f"画质增强失败:{e}" @self.agent.tool async def resize_image( ctx: RunContext[AgentDeps], image_url: str, width: int, height: int = 0 ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """ 改图片尺寸。客户说「改成1920x1080」「弄成横图」「改下尺寸」时调用。 Args: image_url: 图片URL(客户刚发的图,或从对话中获取) width: 目标宽度(像素),如 1920 height: 目标高度(0=按宽度等比缩放),如 1080 常用尺寸:1920x1080(横屏) 1080x1920(竖屏) 2000x2000(方图) """ try: from image.image_processor import image_processor result = await image_processor.resize(image_url, width, height) if result["success"]: return f"改尺寸完成:{width}x{height},已保存。自然回复客户改好了" else: return f"改尺寸失败:{result['message']},告知客户稍后重试" except Exception as e: return f"改尺寸失败:{e}" @self.agent.tool async def calculate_bulk_price( ctx: RunContext[AgentDeps], image_count: int, complexities: str = "" ) -> str: """ 计算多图打包价格。 客户要做多张图时调用,返回建议总价。 Args: image_count: 图片数量 complexities: 各图复杂度,逗号分隔,如 "normal,complex,simple" 没有识别结果时留空,按平均价格估算 """ if image_count <= 0: return "图片数量无效" # 各复杂度单价(必须为5的整数倍) unit_price = {"simple": 15, "normal": 20, "complex": 25, "hard": 30} default_unit = 20 # 没有识别结果时的默认单价 if complexities: levels = [c.strip() for c in complexities.split(",")] total = sum(unit_price.get(lv, default_unit) for lv in levels) else: total = image_count * default_unit # 打包优惠:3张以上9折,5张以上8折,价格必须为5的整数倍 if image_count >= 5: discounted = round(total * 0.8 / 5) * 5 tip = f"({image_count}张8折优惠)" elif image_count >= 3: discounted = round(total * 0.9 / 5) * 5 tip = f"({image_count}张9折优惠)" else: discounted = round(total / 5) * 5 tip = "" return f"建议打包报价:{discounted}元{tip}(原价{total}元)" # 对话状态超过多少小时后重置(避免昨天的售后状态影响今天) CONVERSATION_TIMEOUT_HOURS = 12 def _get_conversation_state(self, customer_id: str) -> ConversationState: """获取或创建对话状态,超时自动重置""" now = datetime.now() if customer_id in self.conversations: state = self.conversations[customer_id] # 超过 12 小时没有消息,重置阶段和压价次数 if state.last_update: try: last = datetime.fromisoformat(state.last_update) hours = (now - last).total_seconds() / 3600 if hours > self.CONVERSATION_TIMEOUT_HOURS: state.stage = "售前" state.discount_count = 0 # 同时清理对话历史,避免发送过期上下文 self.message_histories.pop(customer_id, None) except Exception: pass # 进程内状态为空时,尝试从持久化恢复 if not state.pending_image_urls and not state.pending_requirements: self._restore_pending_quote_state(customer_id, state) else: self.conversations[customer_id] = ConversationState( customer_id=customer_id, last_update=now.isoformat() ) self._restore_pending_quote_state(customer_id, self.conversations[customer_id]) # 定期清理长期不活跃客户(超过 7 天) self._cleanup_inactive(now) return self.conversations[customer_id] def _cleanup_inactive(self, now: datetime): """清理超过 7 天没有消息的对话状态,释放内存""" # 每 100 次调用清理一次,避免每次都遍历 if len(self.conversations) % 100 != 0: return expired = [ cid for cid, state in self.conversations.items() if state.last_update and (now - datetime.fromisoformat(state.last_update)).days > 7 ] for cid in expired: self.conversations.pop(cid, None) self.message_histories.pop(cid, None) def _sync_pending_quote_state(self, customer_id: str, state: ConversationState): """把待报价队列同步到客户库,避免重启丢失。""" try: from db.customer_db import db db.update_pending_quote_state( customer_id, state.pending_image_urls, state.pending_requirements, ) except Exception: pass def _restore_pending_quote_state(self, customer_id: str, state: ConversationState): """从客户库恢复待报价队列。""" try: from db.customer_db import db profile = db.get_customer(customer_id) state.pending_image_urls = list(getattr(profile, "pending_quote_images", []) or []) state.pending_requirements = list(getattr(profile, "pending_quote_requirements", []) or []) state.image_count = len(state.pending_image_urls) except Exception: pass def _build_reject_message(self, reason: str = "") -> str: templates = [ "这类图文字内容太密了,我们这边不接这单哈,建议精简后再发我看看。", "这种密集文字/宣传栏类图片暂时做不了,抱歉啦,换一版简化内容我可以继续帮你看。", "这张文字信息太多,处理风险高,我们先不接,您可以先筛重点文字再发我。", ] msg = random.choice(templates) if reason: msg += f"({reason})" return msg def _is_batch_quote_enabled(self, customer_id: str, acc_id: str) -> bool: """灰度开关:按店铺白名单 + 客户哈希百分比控制新策略是否生效。""" try: from config.config import ( FEATURE_BATCH_QUOTE_ENABLED, FEATURE_BATCH_QUOTE_PERCENT, FEATURE_BATCH_QUOTE_SHOPS, ) if not FEATURE_BATCH_QUOTE_ENABLED: return False pct = max(0, min(100, int(FEATURE_BATCH_QUOTE_PERCENT))) if pct == 0: return False shops = [s.strip() for s in (FEATURE_BATCH_QUOTE_SHOPS or "").split(",") if s.strip()] if shops and (acc_id or "") not in shops: return False if pct >= 100: return True h = int(hashlib.md5((customer_id or "").encode("utf-8")).hexdigest()[:8], 16) % 100 return h < pct except Exception: return True def _detect_stage(self, message: str) -> str: """检测售前/售后""" # 系统订单通知不属于售后,单独处理 if "系统订单信息" in message: return "订单通知" after_sale_keywords = ["已下单", "已付款", "催一下", "发文件", "要修改", "不满意", "退款", "退货"] for keyword in after_sale_keywords: if keyword in message: return "售后" return "售前" def _get_system_prompt(self) -> str: base_prompt = """你是淘宝找原图店的客服,对外说帮客户找原图/高清图,实际是帮客户把模糊图处理成高清版,擅长快速成交。 核心原则:快、准、狠。**回复要像真人聊天,自然多变,禁止套模板、背台词。** 【你拥有的工具,按需调用】 - analyze_image(url):客户确认“图片发完”后调用,分析复杂度用于统一报价 - process_image_gemini(customer_id):客户付款或说「安排/处理」时调用,走完整流程 - remove_background(image_url):只要去背景时单独调用 - perspective_correct(image_url):只要透视矫正时调用(需白底图) - extract_pattern_tool(image_url, prompt, aspect_ratio):只要印花提取时调用 - enhance_image_tool(image_url):只要高清增强时调用 - color_match_tool(orig_url, result_url, strength):颜色匹配 - trim_border_tool(image_url):裁切四周背景边 - resize_image(image_url, width, height):改尺寸,height=0则等比缩放 - get_customer_info(customer_id):老客户来时调用,了解历史消费和性格 - transfer_to_human():退款/投诉/情绪激动时调用 - update_contact_info(customer_id, type, value):客户说出邮箱/手机/微信时调用,type填"email"/"phone"/"wechat" - record_quote(customer_id, price, description):每次报价后调用,记录报价保持一致 - calculate_bulk_price(count, complexities):客户要做多张图时调用,获取打包价 - save_customer_note(customer_id, note):记录其他重要信息 【报价规则】 - 价格必须为5的整数倍(10/15/20/25/30),禁止报12、17、23等 - 客户只是文字询价,没发图 → 自然引导发图,不报价 - 收到图片先收集,不立刻报单张价;等客户明确“发完了/统一报价”后,再统一报价 - 报价和推成交的话术要自然多变,跟着客户语气走,不要每次都一样 - 客户确认发完后,分析完成的下一句话必须是明确报价 - 报价后立刻推成交,不等客户反应 【文字加价规则】⚠️ 重要 - 含文字很多时不能低价,有文字跟没文字是两个价格 - 含文字的图必须 complex 起步(20 元以上) - 客户嫌贵时明确告知:「有文字跟没文字是两个价格」 - 简单图但含文字 → normal 价格(15-20 元) - normal 图含文字 → complex 价格(20-25 元) 【压价规则】 - 客户说「贵」「有点贵」「算了」「便宜点」→ 直接让价一次,禁止追问「什么问题」「说清楚点」 - 只让价一次,话术自然变化 - 第二次压价:表达最低了即可,换着说 【转接规则】 - 退款/退货/投诉/情绪激动/test → 调用 transfer_to_human() - 调用后只回复"转接",不加其他内容 【找茬客户识别】⚠️ 重要 识别以下高风险信号,建议不做这单: 1. 下单后立即申请退款 2. 从高价砍到低价(30→10 元) 3. 反复问"不满意可以退吗"(2 次以上) 4. 质疑服务内容("源文件还是什么") 5. 质疑价值("就一张图片") 6. 问"小一点就快一点的嘛"(想占便宜) 7. 重复问同一个问题(想找麻烦) 识别到以上 3 个以上信号 → 建议转人工或直接拒绝接单 话术:「不好意思,这单做不了」「去别家做吧」 【售后规则】 - 催进度:自然回复在做了/快了/马上好之类 - 要修改:自然问哪里要改 【禁忌】 - 没看到图不报价 - 不说"不行/不可以" - 不解释技术细节 - 不给价格区间 - 回复不超过2句话 - 绝对禁止输出任何内部独白或状态说明,包括但不限于:"无需回复""已完成""已经完成""不需要回复""流程结束""操作完成""任务完成""记录完成""报价已记录"等 - 每次必须输出真实的、发给客户看的回复文字,哪怕只有一句话""" if self.skills_content: base_prompt += f"\n\n=== 技能文档 ===\n{self.skills_content}" return base_prompt def _get_after_sale_prompt(self) -> str: return """你是淘宝客服的售后助手,负责售后阶段的自然沟通与处理进度反馈。 核心:简洁、自然、不解释技术细节、尽量不调用报价相关工具。 规则: - 已付款客户优先:确认安排、说明进度、承诺时间点 - 修改需求:礼貌询问具体改哪里,尽量一句话 - 催进度:自然回复在做了/快了/马上好,给预计时间 - 投诉/情绪激动/退款:转人工 - 输出不超过2句话,不说内部状态""" def _get_pricing_prompt(self) -> str: try: from config.config import MIN_PRICE_FLOOR floor = MIN_PRICE_FLOOR except Exception: floor = 15 return f"""你是淘宝客服的报价助手,负责在客户明确提到价格/询价时快速给出自然报价并推动成交。 规则: - 收到图片或历史有图片依据时尽量结合复杂度给出单价,价格为5的整数倍 - 没有图片时引导发图,不给价格区间 - 报价后紧跟一句推动成交,话术自然不重复 - 最低价不低于{floor}元,客户出价低于底线时礼貌拒绝(不好意思) - 输出不超过2句话""" def _get_processing_prompt(self) -> str: return """你是淘宝客服的处理助手,负责在客户说安排/处理/开始做或已付款的场景下进行处理安排与进度反馈。 规则: - 已付款或明确要求开始时,确认安排并给预计时间点 - 可调用处理流程工具 - 投诉/退款时转人工 - 输出不超过2句话""" def _get_similar_prompt(self) -> str: return """你是淘宝客服的相似图助手,客户问“有一样的吗/类似的吗/同款吗”时,给出自然回复与参考建议。 规则: - 先确认可以找类似款,建议拍后我发参考图 - 如已知图案/类型,简要说明“同类型都有”,推动成交 - 输出不超过2句话""" def _get_order_prompt(self) -> str: return """你是淘宝客服的订单助手,负责系统订单通知的处理。 规则: - 已付款时自然确认安排;其他状态静默(输出空字符串) - 输出不超过1句话""" def _get_risk_prompt(self) -> str: return """你是淘宝客服的风控助手,负责敏感/违规内容的前置拦截与替代话术。 规则: - 黄色/擦边/涉政等不接单,礼貌拒绝 - 输出不超过1句话""" def _get_customer_profile_context(self, customer_id: str) -> str: """从数据库读取客户画像,注入给 AI。含个性化语气、报价策略、主动预测、近期对话。""" try: from db.customer_db import db profile = db.get_customer(customer_id) if profile.blacklist: return f"【⚠️黑名单客户】原因:{profile.blacklist_reason or '已标记'},请转接人工处理,不要自动回复" lines = [] lines.append("=== 客户档案 ===") # 基础信息 basic_info = [] basic_info.append(f"客户ID: {customer_id}") basic_info.append(f"姓名: {profile.name or '未知'}") if profile.email: basic_info.append(f"邮箱: {profile.email}") if profile.phone: basic_info.append(f"电话: {profile.phone}") if profile.wechat: basic_info.append(f"微信: {profile.wechat}") lines.append(" | ".join(basic_info)) # 消费分析 consume_info = [] consume_info.append(f"客户等级: {profile.customer_level}级") if profile.vip: consume_info.append("VIP客户") consume_info.append(f"总订单: {profile.total_orders}单") consume_info.append(f"总消费: {profile.total_spent}元") if profile.total_orders > 0: consume_info.append(f"客单价: {profile.total_spent // profile.total_orders}元") lines.append("--- 消费分析 ---") lines.append(" | ".join(consume_info)) # 报价历史 price_info = [] if profile.vip_custom_price: price_info.append(f"VIP专属价: {profile.vip_custom_price}元(直接报这个价)") if profile.last_price: price_info.append(f"上次报价: {profile.last_price}元") if profile.lowest_price_accepted: price_info.append(f"历史最低成交: {profile.lowest_price_accepted}元") if profile.discount_given_count: price_info.append(f"历史让价: {profile.discount_given_count}次") if profile.price_sensitivity: price_info.append(f"价格敏感度: {profile.price_sensitivity}") if getattr(profile, "last_quote_no_convert", False): price_info.append("【策略】上次报价未成交,本次可降5-10元") if price_info: lines.append("--- 报价历史 ---") lines.append(" | ".join(price_info)) # 性格与决策 personality_info = [] if profile.personality: personality_info.append(f"性格: {'/'.join(profile.personality)}") if profile.decision_speed: personality_info.append(f"决策速度: {profile.decision_speed}") if profile.communication_prefer: personality_info.append(f"沟通偏好: {profile.communication_prefer}") if personality_info: lines.append("--- 性格特征 ---") lines.append(" | ".join(personality_info)) # 图片习惯 image_info = [] image_info.append(f"累计发图: {profile.total_images_sent}张") if profile.complexity_history: avg_complexity = self._calc_avg_complexity(profile.complexity_history) image_info.append(f"平均复杂度: {avg_complexity}") if profile.image_type_history: from collections import Counter top_types = Counter(profile.image_type_history).most_common(3) types_str = "、".join(f"{t}({c}次)" for t, c in top_types) image_info.append(f"常见类型: {types_str}") if profile.preferred_format: image_info.append(f"格式偏好: {profile.preferred_format}") if profile.preferred_size: image_info.append(f"尺寸要求: {profile.preferred_size}") if profile.last_image_url: image_info.append(f"最近发图: {profile.last_image_url[:60]}...") lines.append("--- 图片习惯 ---") lines.append(" | ".join(image_info)) # 当前任务状态 if profile.processing_status: task_info = [] task_info.append(f"状态: {profile.processing_status}") if profile.processing_image_url: task_info.append(f"处理中: {profile.processing_image_url[:40]}...") if profile.expected_done_at: task_info.append(f"预计完成: {profile.expected_done_at}") lines.append("--- 当前任务 ---") lines.append(" | ".join(task_info)) # 上次对话摘要 if profile.last_conversation_summary: time_str = "" if profile.last_conversation_time: try: t = datetime.fromisoformat(profile.last_conversation_time) diff = datetime.now() - t if diff.days > 0: time_str = f"({diff.days}天前)" else: h = diff.seconds // 3600 time_str = f"({h}小时前)" if h > 0 else "(刚刚)" except Exception: pass lines.append(f"--- 上次对话 {time_str} ---") lines.append(profile.last_conversation_summary) # 个性化回复策略 hints = [] if profile.personality: if "爽快" in profile.personality: hints.append("回复简洁直接,不废话,快速报价") if "砍价" in profile.personality or "砍价狂" in profile.personality: hints.append("报价时强调性价比,只让价一次,第二次引导去 xinhui.cloud") if "纠结" in profile.personality or "墨迹" in profile.personality: hints.append("多给一点说明,耐心回答") if profile.price_sensitivity == "高": hints.append("报价时顺带提「满意再拍」降低顾虑") if profile.decision_speed == "快": hints.append("直接报价推成交,少铺垫") if profile.total_orders > 0 and profile.decision_speed == "快": hints.append("老客爽快,直接报价成交") if hints: lines.append("--- 回复策略 ---") lines.append(";".join(hints)) # 主动推荐 proactive = [] if profile.bulk_potential == "有" or (profile.total_images_sent or 0) >= 2: proactive.append("可问「要做多张吗,多张有优惠」") if profile.upsell_opportunity: proactive.append(f"加购机会: {'、'.join(profile.upsell_opportunity)}") if proactive: lines.append("--- 主动推荐 ---") lines.append(";".join(proactive)) return "\n".join(lines) except Exception as e: print(f"[Agent] 获取客户画像失败: {e}") return "" def _calc_avg_complexity(self, complexity_history: list) -> str: """计算平均复杂度""" if not complexity_history: return "未知" level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4} label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"} try: avg = sum(level_map.get(c, 2) for c in complexity_history) / len(complexity_history) return label_map.get(round(avg), "一般") except Exception: return "一般" def _get_refusal_context_hint(self, customer_id: str, current_msg: str, profile_context: str) -> str: """ 检测「刚拒绝某张图 + 客户问能找到吗」场景,注入显式提示,避免前后矛盾。 原因:last_conversation_summary 异步更新可能滞后,message_histories 模型可能忽略。 """ ask_keywords = ["能找到吗", "可以吗", "有吗", "能做吗", "可以找吗", "可以弄吗"] if not any(kw in current_msg for kw in ask_keywords): return "" refusal_keywords = ["不做", "不接", "拒绝", "不做这类", "这类不做"] # 检查 profile 摘要(可能因异步更新而滞后) if any(kw in profile_context for kw in refusal_keywords): return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。" # 检查内存历史中最近几条消息(ModelResponse 含客服回复) history = self.message_histories.get(customer_id, []) for msg in reversed(history[-6:]): msg_str = str(msg) if any(kw in msg_str for kw in refusal_keywords): return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。" return "" def _get_conversation_context(self, customer_id: str, acc_id: str = "", limit: int = 12, max_len: int = 80) -> str: """ 每一次对话都从数据库加载近期对话,压缩后注入 prompt。 确保模型看到上下文,同时控制 token 消耗。 """ try: try: from config.config import CHAT_CONTEXT_LIMIT, CHAT_CONTEXT_TRUNCATE_LEN limit = CHAT_CONTEXT_LIMIT max_len = CHAT_CONTEXT_TRUNCATE_LEN except Exception: pass from db.chat_log_db import get_recent_conversation msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=limit) if not msgs: return "" lines = [] for m in msgs: role = "客" if m.get("direction") == "in" else "服" msg_text = (m.get("message") or "").strip().replace("\n", " ")[:max_len] if not msg_text: continue lines.append(f"{role}:{msg_text}") if not lines: return "" return "【近期】\n" + "\n".join(lines) + "\n\n" except Exception: return "" def _get_intent_emotion_hint(self, msg: str) -> str: """语义匹配:意图/情绪识别,注入提示。EMBEDDING_MODEL 未配置时用关键词。""" try: from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords, detect_emotion_embedding intent = detect_intent_embedding(msg) if not intent: intent = detect_intent_keywords(msg) emotion = detect_emotion_embedding(msg) if os.getenv("EMBEDDING_MODEL") else None parts = [] if intent: parts.append(f"意图:{intent}") if emotion: parts.append(f"情绪:{emotion}") if parts: return f"【当前消息】{', '.join(parts)}" except Exception: pass return "" # 简单打招呼类消息(在近期已回复后无需再回) _COOLDOWN_PATTERNS = [ "你好", "您好", "在吗", "在么", "在不在", "有人吗", "嗯", "嗯嗯", "好", "好的", "好哒", "ok", "OK", "okay", "谢谢", "谢谢你", "感谢", "收到", "知道了", "明白了", ] _COOLDOWN_SECONDS = 5 * 60 # 5 分钟内不重复回复纯打招呼 def _in_cooldown(self, state: ConversationState, msg: str) -> bool: """最近刚回复过 + 消息是纯打招呼 → True 静默""" if not state.last_reply_at: return False elapsed = (datetime.now() - state.last_reply_at).total_seconds() if elapsed > self._COOLDOWN_SECONDS: return False clean = msg.strip().rstrip("!!??。.~~") return clean in self._COOLDOWN_PATTERNS async def process_message(self, message: CustomerMessage) -> AgentResponse: """处理客户消息并生成回复""" metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id) # 获取或创建对话状态 state = self._get_conversation_state(message.from_id) # 冷却期检测:近期已回复 + 纯打招呼 → 静默 if self._in_cooldown(state, message.msg): elapsed = int((datetime.now() - state.last_reply_at).total_seconds()) print(f"[Agent] 冷却期静默(距上次回复 {elapsed}s):{message.msg!r}") return AgentResponse(reply="", should_reply=False, need_transfer=False) # 检测售前/售后 new_stage = self._detect_stage(message.msg) if new_stage != state.stage: state.stage = new_stage state.last_update = datetime.now().isoformat() # 订单通知前置处理 if "系统订单信息" in message.msg or "订单状态" in message.msg: _, order_block = self._split_customer_text(message.msg) customer_text, _ = self._split_customer_text(message.msg) order = self._parse_order_info(order_block or message.msg) pay_status = order.get("pay_status", "") order_status = order.get("order_status", "") paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"] is_paid = any(kw in pay_status or kw in order_status for kw in paid_keywords) if is_paid: # 订单金额核查:对比报价和实付金额 asyncio.create_task(self._check_order_amount( message.from_id, order, message.acc_id )) # 成交记录:写入数据库供日报分析 asyncio.create_task(self._record_deal_success( message.from_id, message.from_name, message.acc_id, message.acc_type, order, state )) # 已付款:触发 Gemini 作图 try: from core.workflow import workflow asyncio.create_task(workflow.trigger_processing_on_payment( customer_id=message.from_id, acc_id=message.acc_id, acc_type=message.acc_type, )) except Exception as e: print(f"[Agent] 触发作图失败: {e}") elif not customer_text: # 非付款 + 没有客户文字 → 直接静默,不调用 AI print(f"[Agent] 订单通知静默({pay_status or order_status}),跳过回复") return AgentResponse(reply="", should_reply=False, need_transfer=False) # 找图店:先收集图片和需求,等客户确认“发完”后统一报价 customer_text, _ = self._split_customer_text(message.msg) shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "") if shop_type == "find_image" and self._is_batch_quote_enabled(message.from_id, message.acc_id): incoming_urls = self._extract_image_urls(customer_text) text_without_urls = self._strip_urls_from_text(customer_text) if incoming_urls: for u in incoming_urls: if u not in state.pending_image_urls: state.pending_image_urls.append(u) if text_without_urls: self._append_requirement(state, text_without_urls) state.image_count = len(state.pending_image_urls) self._sync_pending_quote_state(message.from_id, state) if self._is_batch_finish_signal(customer_text): quote_res = await self._quote_pending_images(state, message) reply_text = quote_res.get("reply", "") need_transfer = bool(quote_res.get("need_transfer")) state.last_reply_at = datetime.now() print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}") return AgentResponse( reply=reply_text, should_reply=not need_transfer, need_transfer=need_transfer, transfer_msg=TRANSFER_MESSAGE if need_transfer else "", ) ack = self._build_collect_ack(len(state.pending_image_urls)) state.last_reply_at = datetime.now() print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {ack}") return AgentResponse(reply=ack, should_reply=True, need_transfer=False) if state.pending_image_urls: if text_without_urls: self._append_requirement(state, text_without_urls) self._sync_pending_quote_state(message.from_id, state) if self._is_batch_finish_signal(customer_text): quote_res = await self._quote_pending_images(state, message) reply_text = quote_res.get("reply", "") need_transfer = bool(quote_res.get("need_transfer")) state.last_reply_at = datetime.now() print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}") return AgentResponse( reply=reply_text, should_reply=not need_transfer, need_transfer=need_transfer, transfer_msg=TRANSFER_MESSAGE if need_transfer else "", ) remind = self._build_collect_remind(len(state.pending_image_urls)) state.last_reply_at = datetime.now() print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {remind}") return AgentResponse(reply=remind, should_reply=True, need_transfer=False) # 构建提示词(包含对话状态 + 客户画像) user_prompt = self._build_prompt(message, state) # 注入客户历史画像(个性化语气、报价策略、主动预测) profile_context = self._get_customer_profile_context(message.from_id) if profile_context: user_prompt = profile_context + "\n\n" + user_prompt # 前后一致提示:刚拒绝某张图后,客户问「能找到吗」等时,必须区分能做/不能做 refusal_hint = self._get_refusal_context_hint(message.from_id, message.msg, profile_context or "") if refusal_hint: user_prompt = refusal_hint + "\n\n" + user_prompt # 每一次对话都注入近期上下文,确保模型看到完整对话 conv_context = self._get_conversation_context(message.from_id, acc_id=message.acc_id or "") if conv_context: user_prompt = conv_context + user_prompt # 语义匹配:意图/情绪识别(配置 EMBEDDING_MODEL 时用 embedding,否则关键词) intent_hint = self._get_intent_emotion_hint(message.msg) if intent_hint: user_prompt = intent_hint + "\n\n" + user_prompt deps = AgentDeps( msg_id=message.msg_id, acc_id=message.acc_id, from_id=message.from_id, platform=message.acc_type ) # 取出该客户的历史对话,传给 AI 保持上下文 history = self.message_histories.get(message.from_id, []) self._log_block("PROMPT->AI 前置提示词", user_prompt) try: msg_lower = message.msg.lower() pricing_kw = ["多少钱", "多少一张", "报价", "给个价", "几块", "价位", "能便宜点吗"] processing_kw = ["安排", "处理一下", "开始做", "做一下", "尽快", "加急", "付款了", "已付款"] similar_kw = ["有一样的", "有一样吗", "一样的吗", "类似的", "类似的吗", "同款", "相似", "类似吗"] order_markers = ["[系统订单信息]", "订单状态", "买家已付款"] risk_kw = ["黄色", "擦边", "色情", "涉黄", "涉政", "政治", "裸", "不雅"] target_agent = self.agent_after_sale if state.stage == "售后" else self.agent if any(k in msg_lower for k in risk_kw): target_agent = self.agent_risk elif any(k in message.msg for k in order_markers): target_agent = self.agent_order if any(k in msg_lower for k in processing_kw): target_agent = self.agent_processing elif any(k in msg_lower for k in pricing_kw): target_agent = self.agent_pricing elif any(k in msg_lower for k in similar_kw): target_agent = self.agent_similar result = await target_agent.run(user_prompt, deps=deps, message_history=history) # 更新历史,最多保留最近 30 条消息防止 token 超限 self.message_histories[message.from_id] = result.all_messages()[-30:] reply_text = self._normalize_reply_text(result.output) # 拦截超低杀价:客户报价低于底线时,统一礼貌拒绝 try: from config.config import MIN_PRICE_FLOOR import re offer = None m = re.search(r'(\d{1,4})\s*(?:元|块|块钱|元钱)\b', message.msg) if m: offer = int(m.group(1)) else: m2 = re.search(r'(?:能|可以|可否|能否)\s*(\d{1,4})\b', message.msg) offer = int(m2.group(1)) if m2 else None st = self._get_conversation_state(message.from_id) floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR if offer is not None and offer < floor: reply_text = "不好意思" except Exception: pass # 降限:若AI在回复中给出小于底线的报价,提升到>=底线且为5的倍数 try: from config.config import MIN_PRICE_FLOOR st = self._get_conversation_state(message.from_id) floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR def _adjust(text: str) -> str: import re def _repl(m): num = int(m.group(1)) adj = max(floor, round(num / 5) * 5) return m.group(0).replace(str(num), str(adj)) patterns = [ r'按(\d{1,4})元', r'报价[::]\s*(\d{1,4})\s*元', r'(\d{1,4})\s*元一张', r'打包(\d{1,4})\s*元', ] t = text for p in patterns: t = re.sub(p, _repl, t) return t reply_text = _adjust(reply_text or "") except Exception: pass # 打印工具调用记录 for msg in result.new_messages(): for part in getattr(msg, 'parts', []): part_type = type(part).__name__ if 'ToolCall' in part_type: print(f"{self.C_TOOL}[THINK/TOOL_CALL]{self.C_RESET} {getattr(part, 'tool_name', '')}({getattr(part, 'args', '')})") elif 'ToolReturn' in part_type: ret = str(getattr(part, 'content', ''))[:120] print(f"{self.C_TOOL}[THINK/TOOL_RETURN]{self.C_RESET} {ret}") print(f"{self.C_THINK}[THINK/RAW_OUTPUT]{self.C_RESET} {repr(reply_text)}") except Exception as e: err_str = str(e) print(f"[Agent] AI 调用失败: {e},使用兜底回复") metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id) if "AccountOverdueError" in err_str or "overdue" in err_str.lower(): asyncio.create_task(_notify_wechat_overdue()) else: asyncio.create_task(_notify_wechat( f"⚠️ **AI调用异常**\n" f"客户:{message.from_id}\n" f"店铺:{message.acc_id}\n" f"错误:{err_str[:200]}", tag="AI异常" )) reply_text = None else: metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id) # AI 失败兜底:给一个不出错的万能回复 if not reply_text: return AgentResponse( reply="好的稍等,我看一下", should_reply=True, need_transfer=False ) # 敏感词过滤:党政/暴力/血腥/黄色 try: from utils.content_filter import should_block_reply blocked, fallback = should_block_reply(reply_text) if blocked: print(f"[Agent] 敏感词拦截,使用兜底回复") reply_text = fallback or "好的,您稍等,我帮您确认一下" except Exception: pass # 成本统计(可选) try: from utils.api_cost_tracker import record record("openai_chat", count=1) except Exception: pass # 检测是否报价 self._detect_price(reply_text, state) # 检测压价 self._detect_discount(message.msg, state) # 自动打标签(异步,不阻塞) asyncio.create_task(self._auto_tag(message, reply_text, state)) # 检测是否需要转接(文字触发 或 AI 调用了 transfer_to_human 工具) need_transfer = False transfer_msg = "" transfer_keywords = ["TRANSFER_REQUESTED", "[转移会话]", "转移会话", "转人工", "转接"] if reply_text and any(kw in reply_text for kw in transfer_keywords): need_transfer = True transfer_msg = TRANSFER_MESSAGE metrics_emit("transfer_to_human", customer_id=message.from_id, acc_id=message.acc_id) # 未成交记录:客户表达放弃且已报价过(转人工不记录) customer_text, _ = self._split_customer_text(message.msg) no_convert_keywords = ["算了", "不要了", "不做了", "下次再说", "先不弄了"] if customer_text and state.last_price and state.last_price > 0: if any(kw in customer_text for kw in no_convert_keywords): reason = "嫌贵放弃" if any(k in customer_text for k in ["贵", "贵了", "便宜"]) else "放弃" asyncio.create_task(self._record_deal_fail( message.from_id, message.from_name, message.acc_id, message.acc_type, reason )) # 需要转接时不把原始回复发给客户 should_reply = bool(reply_text and reply_text.strip()) and not need_transfer # 记录本次回复时间,供冷却期判断 if should_reply: state.last_reply_at = datetime.now() print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}") else: print(f"{self.C_MUTED}[REPLY->CUSTOMER]{self.C_RESET} <静默/不发送>") return AgentResponse(reply=reply_text, should_reply=should_reply, need_transfer=need_transfer, transfer_msg=transfer_msg) def _detect_price(self, reply: str, state: ConversationState): """从回复中提取价格,同步写入客户数据库(价格必须为5的整数倍)""" import re numbers = re.findall(r'(\d+)[元]', reply) if numbers: price = round(int(numbers[0]) / 5) * 5 # 强制为5的整数倍 state.last_price = price metrics_emit("quote_generated", customer_id=state.customer_id, price=price) # 持久化到客户数据库,重启后仍可读取 try: from db.customer_db import db db.update_last_price(state.customer_id, price) except Exception: pass async def _check_order_amount(self, customer_id: str, order: dict, acc_id: str): """核查订单实付金额是否与报价一致,异常时企业微信预警""" try: import re from db.customer_db import db profile = db.get_customer(customer_id) quoted = profile.last_price # 上次报价(元) if not quoted: return # 从订单解析实付金额 raw_amount = order.get("amount", "") m = re.search(r'[\d.]+', str(raw_amount)) if not m: return paid = float(m.group()) print(f"[Agent] 订单金额核查:报价 {quoted}元 vs 实付 {paid}元(客户 {customer_id})") # 实付金额明显低于报价(低于报价的 60%)才预警 if paid < quoted * 0.6: msg = ( f"⚠️ **订单金额异常**\n" f"店铺:{acc_id}\n" f"客户:{customer_id}({profile.name or ''})\n" f"报价:{quoted}元\n" f"实付:{paid}元\n" f"差额:{quoted - paid:.1f}元 — 请人工核查" ) print(f"[Agent] {msg}") await _notify_wechat(msg) except Exception as e: print(f"[Agent] 订单金额核查失败: {e}") async def _record_deal_success( self, customer_id: str, customer_name: str, acc_id: str, platform: str, order: dict, state: "ConversationState", ): """成交时写入数据库,供日报与数据分析""" try: import re from db.deal_outcome_db import record_deal order_id = order.get("order_id", "") raw_amount = order.get("amount", "") m = re.search(r"[\d.]+", str(raw_amount)) amount = float(m.group()) if m else 0 reason = "让价后成交" if (state.discount_count or 0) > 0 else "直接成交" record_deal( customer_id=customer_id, outcome="成交", reason=reason, customer_name=customer_name or "", acc_id=acc_id or "", platform=platform or "", order_id=order_id, amount=amount, discount_given=(state.discount_count or 0) > 0, ) # 同步到客户库 try: from db.customer_db import db if order_id: db.add_order(customer_id, order_id, amount) db.clear_quote_no_convert(customer_id) except Exception: pass print(f"[Agent] 成交记录: {customer_id} {reason} {amount}元") except Exception as e: print(f"[Agent] 成交记录失败: {e}") async def _record_deal_fail( self, customer_id: str, customer_name: str, acc_id: str, platform: str, reason: str, ): """未成交时写入数据库,供日报与数据分析;标记报价未成交,下次可适当降低""" try: from db.deal_outcome_db import record_deal from db.customer_db import db record_deal( customer_id=customer_id, outcome="未成交", reason=reason, customer_name=customer_name or "", acc_id=acc_id or "", platform=platform or "", ) db.mark_quote_no_convert(customer_id) print(f"[Agent] 未成交记录: {customer_id} {reason}") except Exception as e: print(f"[Agent] 未成交记录失败: {e}") async def _auto_tag(self, message: CustomerMessage, reply: str, state: ConversationState): """自动识别并写入各类标签""" try: from db.customer_db import db cid = message.from_id msg = message.msg.lower() # 批量潜力 if any(kw in msg for kw in ["还有", "多张", "好几张", "一批", "下次还"]): db.set_bulk_potential(cid, "有") db.add_upsell_opportunity(cid, "批量打包") # 加购机会:问过PSD/分层 if any(kw in msg for kw in ["psd", "分层", "源文件"]): db.add_upsell_opportunity(cid, "分层PSD") db.update_preferred_format(cid, "psd") # 格式偏好 if "jpg" in msg or "jpeg" in msg: db.update_preferred_format(cid, "jpg") if "png" in msg: db.update_preferred_format(cid, "png") # 尺寸/分辨率偏好 if any(kw in msg for kw in ["分辨率", "dpi", "尺寸", "大图", "印刷"]): db.update_preferred_size(cid, message.msg[:30]) # 决策速度:收到报价后立刻说拍了 → 快 if any(kw in msg for kw in ["拍了", "下单了", "好的", "行"]) and state.last_price: db.update_decision_speed(cid, "快") # 图片类型识别(简单关键词匹配) type_keywords = { "印花": ["印花", "花纹", "图案", "面料", "布料", "纺织"], "logo": ["logo", "标志", "品牌", "商标"], "人物": ["人物", "人像", "照片", "脸", "头像"], "产品": ["产品", "商品", "包装", "实物"], "老照片": ["老照片", "旧照片", "发黄", "修复"], } for img_type, keywords in type_keywords.items(): if any(kw in message.msg for kw in keywords): db.add_image_type(cid, img_type) break # 定期自动计算衍生标签 db.auto_compute_tags(cid) except Exception: pass def _detect_discount(self, message: str, state: ConversationState): """检测压价,并持久化让价记录""" if any(kw in message for kw in ["贵", "便宜", "太贵", "有点贵"]): state.discount_count += 1 if state.last_price: try: from db.customer_db import db db.record_discount(state.customer_id, state.last_price) except Exception: pass # 客户明确给价(如“10元/10块/能10吗”) import re m = re.search(r'(\d+)\s*元|\b(\d+)\s*块', message) offer = None if m: offer = int(m.group(1) or m.group(2)) if offer: try: from config.config import MIN_PRICE_FLOOR if offer < MIN_PRICE_FLOOR: # 标记本次为超低出价(用于后续拒绝话术提示) state.last_price = state.last_price or 0 except Exception: pass def _parse_order_info(self, msg: str) -> dict: """从系统订单消息中提取所有字段""" import re info = {} m = re.search(r'订单号[::]\s*(\d+)', msg) if m: info['order_id'] = m.group(1) # 订单大状态(新订单/交易成功/交易关闭等) m = re.search(r'订单状态[::]\s*([^\s\[]+)', msg) if m: info['order_status'] = m.group(1).strip() # 支付细状态(等待买家付款/等待发货/交易完成等) m = re.search(r'\[状态[::]\s*([^\]]+)\]', msg) if m: info['pay_status'] = m.group(1).strip() # 金额 m = re.search(r'金额[::]\s*([\d.]+)元', msg) if m: info['amount'] = m.group(1) # 数量 m = re.search(r'数量[::]\s*(\d+)', msg) if m: info['quantity'] = m.group(1) # 时间(格式:2026-2-24 19:52:52) m = re.search(r'(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})', msg) if m: info['order_time'] = m.group(1).strip() # 买家备注 m = re.search(r'买家备注[::]\s*([^\n]+)', msg) if m and m.group(1).strip(): info['buyer_note'] = m.group(1).strip() return info def _get_order_instruction(self, pay_status: str, order_status: str) -> str: """ 根据订单状态生成 AI 指令。 只有「已付款」才需要回复客户,其他状态一律静默。 """ paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"] if any(kw in pay_status or kw in order_status for kw in paid_keywords): return "【已付款-必须回复】客户已付款,立刻自然回复确认收款并告知马上安排。" else: # 所有其他状态(待付款、交易完成、关闭等)静默处理 return "【仅系统通知-无需回复客户】这是系统订单通知,不需要回复客户任何内容,直接跳过。" def _extract_image_url(self, msg: str) -> str: """从消息中提取图片URL,兼容纯URL和 text#*#url 两种格式""" urls = self._extract_image_urls(msg) return urls[0] if urls else "" def _extract_image_urls(self, msg: str) -> List[str]: """提取消息中的所有图片URL(去重保序)。""" import re if not msg: return [] image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com", "suning.com") candidates = re.findall(r'https?://[^\s#]+', msg) urls: List[str] = [] for u in candidates: low = u.lower() if any(ext in low for ext in image_exts) or any(h in low for h in image_hosts): if u not in urls: urls.append(u) return urls def _strip_urls_from_text(self, msg: str) -> str: """去掉 URL 后的纯文本,用于提取额外需求。""" import re if not msg: return "" text = re.sub(r'https?://\S+', ' ', msg) text = text.replace("#*#", " ").strip() text = re.sub(r'\s+', ' ', text) return text.strip(",,。.!!??;;:: ") def _is_batch_finish_signal(self, text: str) -> bool: """客户是否表达“图发完了,可以统一报价”。""" if not text: return False finish_keywords = [ "发完了", "都发完了", "发齐了", "齐了", "先这些", "就这些", "全部", "一起报", "统一报价", "总共多少钱", "一共多少钱", "打包价", "总价", "报价吧", "报个总价", "给个总价", ] return any(k in text for k in finish_keywords) def _build_collect_ack(self, count: int) -> str: templates = [ "收到,这边先记下了(已收{n}张)。你继续发,等你发完我再一起给你打包报价。", "好的,当前这批先收到了(第{n}张)。还有图就继续发,发齐我一次性给你总价。", "没问题,已记录到第{n}张。你把需求和图片都发完,我统一给你报更合适的价格。", ] return random.choice(templates).format(n=count) def _build_collect_remind(self, count: int) -> str: templates = [ "需求我记下了(当前共{n}张图)。你继续发齐,发完回我“发完了”,我一次性给你总价。", "好的,这条需求也加上了(现在{n}张)。等你说发完,我立刻统一报价。", "收到,这个要求我也记住了(共{n}张)。你发完我就给你打包价。", ] return random.choice(templates).format(n=count) def _append_requirement(self, state: ConversationState, text: str): """追加需求并做去重/截断,减少上下文噪音。""" t = (text or "").strip() if not t: return t = t[:120] if state.pending_requirements and state.pending_requirements[-1] == t: return if t in state.pending_requirements[-5:]: return state.pending_requirements.append(t) if len(state.pending_requirements) > 20: state.pending_requirements = state.pending_requirements[-20:] def _calc_requirement_surcharge(self, requirements: List[str]) -> Dict[str, Any]: """ 把客户补充需求做成结构化加价,避免纯靠模型自由发挥导致价格波动。 返回: {"extra": int, "hits": List[str]} """ text = " ".join(requirements or []) rules = [ (["分层", "psd", "源文件"], 30, "分层/源文件"), (["去背景", "抠图", "透明底", "白底"], 5, "去背景"), (["换背景", "换场景", "合成"], 10, "合成/换背景"), (["改字", "改文字", "替换文字", "排版"], 10, "改文字/排版"), (["调色", "改色", "换色", "配色"], 5, "调色"), (["多版本", "多个版本", "两版", "三版"], 10, "多版本"), (["加急", "今天要", "马上要", "尽快"], 10, "加急"), ] total = 0 hits: List[str] = [] for keywords, fee, label in rules: if any(k in text for k in keywords): total += fee hits.append(f"{label}+{fee}") # 防止需求加价过高,做个上限保护 total = min(total, 60) # 金额统一 5 的倍数 total = round(total / 5) * 5 return {"extra": total, "hits": hits} def _build_batch_quote_reply( self, results: List[Tuple[str, Dict[str, Any]]], total_suggest: int, bundle_price: int, req_fee: Dict[str, Any], ) -> str: """构建分图明细 + 单条总报价可选项回复。""" complexity_map = { "simple": "简单", "normal": "常规", "complex": "复杂", "hard": "高难", } detail_lines: List[str] = [] for i, (_, r) in enumerate(results, 1): p = int(r.get("price_suggest", 20) or 20) cx = complexity_map.get(str(r.get("complexity", "normal")), "常规") reason = str(r.get("reason", "常规处理")).replace("\n", " ").strip() if len(reason) > 18: reason = reason[:18] + "..." detail_lines.append(f"图{i}:{p}元({cx},{reason})") extra = int(req_fee.get("extra", 0) or 0) single_total = round((total_suggest + extra) / 5) * 5 req_hit = "、".join(req_fee.get("hits", [])) if req_fee.get("hits") else "" lines = ["先给你分图报下:"] lines.extend(detail_lines) if req_hit: lines.append(f"需求加价:+{extra}元({req_hit})") option_line = f"可选:A按单张做共{single_total}元;B打包一起做{bundle_price}元(更划算)。" lines.append(option_line) lines.append("你定一个方案,我这边马上安排。") return "\n".join(lines) async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]: """ 批量识别待处理图片并统一处理: - find_image 意图且可自动处理:直接 Gemini 处理 + 上传图绘 + 回链接 - 高风险/不可做:转人工 - 其他:统一报价 """ from image.image_analyzer import image_analyzer urls = list(state.pending_image_urls) if not urls: return {"reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False} try: from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY max_images = max(1, int(BATCH_MAX_IMAGES)) analyze_concurrency = max(1, int(BATCH_ANALYZE_CONCURRENCY)) except Exception: max_images = 12 analyze_concurrency = 3 if len(urls) > max_images: return { "reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。", "need_transfer": False, } urls = urls[:max_images] sem = asyncio.Semaphore(analyze_concurrency) async def _analyze_one(url: str): async with sem: try: r = await image_analyzer.analyze(url) except Exception: r = { "complexity": "normal", "reason": "识别异常,按常规估价", "price_min": 15, "price_max": 25, "price_suggest": 20, "success": False, } return url, r results = list(await asyncio.gather(*[_analyze_one(u) for u in urls])) for url, r in results: # 与单图流程一致:识别后写入 workflow 任务 try: from core.workflow import workflow await workflow.image_analysis_result( customer_id=message.from_id, image_url=url, complexity=r.get("complexity", "normal"), acc_id=message.acc_id, acc_type=message.acc_type, gemini_prompt=r.get("gemini_prompt", ""), aspect_ratio=r.get("aspect_ratio", "1:1"), perspective=r.get("perspective", "no"), proc_type=r.get("proc_type", ""), subject=r.get("subject", ""), quality=r.get("quality", ""), ) except Exception as e: print(f"[Agent] Workflow 批量任务创建失败: {e}") total_min = sum(int(r.get("price_min", 15) or 15) for _, r in results) total_max = sum(int(r.get("price_max", 25) or 25) for _, r in results) total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results) req_fee = self._calc_requirement_surcharge(state.pending_requirements) # 打包优惠:2 张减 5,3 张及以上按 9 折(四舍五入到 5 元) if len(results) == 2: bundle_price = max(10, total_suggest - 5) elif len(results) >= 3: bundle_price = max(10, round(total_suggest * 0.9 / 5) * 5) else: bundle_price = total_suggest bundle_price += int(req_fee.get("extra", 0) or 0) bundle_price = round(bundle_price / 5) * 5 # 先分流:高风险/不可做 -> 转人工 unsafe = [] dense_text_reject = [] for i, (_, r) in enumerate(results, 1): if r.get("feasibility") == "no" or r.get("risk") == "high": unsafe.append(f"图{i}") note = str(r.get("note", "") or "") if "文字内容过于密集" in note or "密集文字" in note: dense_text_reject.append(f"图{i}") if unsafe: state.pending_image_urls.clear() state.pending_requirements.clear() self._sync_pending_quote_state(message.from_id, state) if dense_text_reject and len(dense_text_reject) == len(unsafe): return { "reply": self._build_reject_message("文字密集类图片暂不接单"), "need_transfer": False, } return { "reply": f"这批里{'、'.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。", "need_transfer": True, } # 查找图片意图:直接自动处理并返回图绘链接 intent_text = (message.msg or "") + " " + " ".join(state.pending_requirements[-5:]) workflow_type, _ = self.workflow_router.detect_workflow(intent_text) if workflow_type == "find_image": links = [] try: from image.image_processor import image_processor from utils.image_queue import run_with_queue for idx, (url, r) in enumerate(results, 1): req_parts = [f"complexity:{r.get('complexity', 'normal')}"] if r.get("gemini_prompt"): req_parts.append(f"prompt:{r.get('gemini_prompt')}") if r.get("aspect_ratio"): req_parts.append(f"ratio:{r.get('aspect_ratio')}") if r.get("perspective") and r.get("perspective") != "no": req_parts.append(f"perspective:{r.get('perspective')}") if r.get("proc_type"): req_parts.append(f"proc_type:{r.get('proc_type')}") if r.get("subject"): req_parts.append(f"subject:{r.get('subject')}") if r.get("quality"): req_parts.append(f"quality:{r.get('quality')}") process_res = await run_with_queue(image_processor.process_image( url, "enhance", requirements="|".join(req_parts), gemini_prompt=r.get("gemini_prompt", ""), aspect_ratio=r.get("aspect_ratio", "1:1"), perspective=r.get("perspective", "no"), proc_type=r.get("proc_type", ""), subject=r.get("subject", ""), quality=r.get("quality", ""), )) if not process_res.get("success"): raise RuntimeError(process_res.get("message", "图片处理失败")) ok, link, _ = await upload_to_tuhui( process_res["result_path"], title=f"客户{message.from_id[-4:]}-图片{idx}", description="AI自动处理结果", price=max(10, int(r.get("price_suggest", 20) or 20) + int(req_fee.get("extra", 0) or 0) // max(1, len(results))), ) if not ok: raise RuntimeError(str(link)) links.append(link) except Exception as e: print(f"[Agent] 自动处理并上传失败,回退统一报价: {e}") else: lines = [f"这批我先给你处理好了,按打包 {bundle_price} 元。"] for i, link in enumerate(links, 1): lines.append(f"链接{i}:{link}") lines.append("你先看下效果,没问题我就按这个标准继续给你做。") state.last_price = bundle_price try: from db.customer_db import db db.update_last_price(message.from_id, bundle_price) except Exception: pass state.pending_image_urls.clear() state.pending_requirements.clear() self._sync_pending_quote_state(message.from_id, state) return {"reply": "\n".join(lines), "need_transfer": False} reply_text = self._build_batch_quote_reply( results=results, total_suggest=total_suggest, bundle_price=bundle_price, req_fee=req_fee, ) state.last_price = bundle_price try: from db.customer_db import db db.update_last_price(message.from_id, bundle_price) except Exception: pass # 清空待报价队列(本轮已统一报价) state.pending_image_urls.clear() state.pending_requirements.clear() self._sync_pending_quote_state(message.from_id, state) return {"reply": reply_text, "need_transfer": False} def _split_customer_text(self, msg: str) -> tuple: """ 把混合消息拆分为(客户真实文字, 系统订单块)。 平台有时把客户文字和系统订单通知拼在同一条消息里。 """ import re # 找到系统订单块的起始位置 order_marker = re.search(r'\[系统订单信息\]|\[系统通知\]', msg) if order_marker: customer_text = msg[:order_marker.start()].strip() order_block = msg[order_marker.start():].strip() else: customer_text = msg.strip() order_block = "" return customer_text, order_block def _build_prompt(self, message: CustomerMessage, state: ConversationState) -> str: """构建提示词""" msg_content = message.msg stage_info = f"【当前阶段】{state.stage}" # 拆分:客户文字 vs 系统订单块 customer_text, order_block = self._split_customer_text(msg_content) has_order = bool(order_block) if has_order: order = self._parse_order_info(order_block) if order.get('order_id'): state.last_order_id = order['order_id'] stage_info += f"\n【订单号】{order['order_id']}" if order.get('order_status'): state.order_status = order['order_status'] stage_info += f"\n【订单状态】{order['order_status']}" if order.get('pay_status'): stage_info += f"\n【支付状态】{order['pay_status']}" if order.get('amount'): stage_info += f"\n【订单金额】{order['amount']}元" if order.get('quantity'): stage_info += f"\n【数量】{order['quantity']}件" if order.get('order_time'): stage_info += f"\n【下单时间】{order['order_time']}" if order.get('buyer_note'): stage_info += f"\n【买家备注】{order['buyer_note']}" if state.discount_count > 0: stage_info += f"\n【客户压价次数】{state.discount_count}" # 店铺类型:不同店铺不同回复策略 shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "") shop_hint = "" try: from config.config import CONFIG_DIR import json cfg_path = CONFIG_DIR / "shop_prompts.json" if cfg_path.exists(): with open(cfg_path, "r", encoding="utf-8") as f: cfg = json.load(f) hints = cfg.get("type_hints", {}) shop_hint = hints.get(shop_type, "") if not shop_hint and message.acc_id: sh = cfg.get("shops", {}).get(message.acc_id, {}) shop_hint = sh.get("hint", "") except Exception: pass prompt = f"""收到新消息: {stage_info} 发送者: {message.from_name} ({message.from_id}) """ if message.goods_name: prompt += f"商品名称: {message.goods_name}\n" if shop_hint: prompt += f"\n{shop_hint}\n" # ── 优先处理客户真实问题 ── # ── 判断订单付款状态(供后续逻辑使用)── order_paid = False order_unpaid = False if has_order: order = self._parse_order_info(order_block) paid_kws = ["等待发货", "已付款", "付款成功", "买家已付款"] unpaid_kws = ["等待买家付款", "待付款", "未付款"] ps = order.get('pay_status', '') os_ = order.get('order_status', '') if any(kw in ps or kw in os_ for kw in paid_kws): order_paid = True elif any(kw in ps or kw in os_ for kw in unpaid_kws): order_unpaid = True # ── 催单/进度询问关键词 ── progress_keywords = [ "安排了吗", "安排好了吗", "好了吗", "做了吗", "做好了吗", "弄好了吗", "好了没", "做了没", "什么时候好", "多久好", "进度", "催一下", "快点", "什么时候能好", "做完了吗", ] if customer_text: prompt += f"\n客户说:{customer_text}\n" image_url = self._extract_image_url(customer_text) price_keywords = ["多少钱", "多少", "价格", "几块", "怎么收费", "报个价"] # gemini_api 店铺:不触发找图流程,按 API 客服回复 if shop_type == "gemini_api": prompt += "\n【Gemini API 店铺】客户问账号/pro/续费/套餐等,按 API 客服自然回复,不要求发图。" elif image_url: prompt += "\n客户在继续发图阶段:先确认“已收图”,并引导客户把图和要求一次发完;等客户明确“发完了/统一报价”后再统一报价。" elif any(kw in customer_text for kw in price_keywords): last_url = self._extract_image_url(msg_content) if last_url: prompt += "\n客户在询问价格:若客户已确认发完,则给总报价;若还在发图,先引导发完后统一报价。" else: prompt += "\n客户在询问价格但未发图,回复「发图来我看看」。" elif any(kw in customer_text for kw in progress_keywords): # 客户问进度/催单,必须先核查付款状态 if order_unpaid: prompt += "\n⚠️【订单未付款】客户问安排进度,但订单还未付款。自然告知拍下付款后马上安排即可。" elif order_paid: prompt += "\n客户催单,订单已付款,自然回复在做了/快了之类。" else: prompt += "\n客户催单,查询当前处理状态后自然回复。" elif any(kw in customer_text for kw in ["贵", "有点贵", "太贵", "算了", "便宜点", "少点", "打折", "贵哦"]): # 客户嫌贵/要放弃 → 直接让价一次,不问「什么问题」 prompt += "\n⚠️【客户嫌贵】客户已表达价格顾虑或要放弃。禁止追问「什么问题」「说清楚点」。\n→ 第一步:直接让价一次(如原价 20→15),话术自然。\n→ 第二步:如果让价后客户还是嫌贵,引导客户自己去注册星绘平台:\n - 每个话术都必须带网址 xinhui.cloud\n - 强调「自己去弄」「自己操作」「自己下单」\n - 示例:「嫌贵就去 xinhui.cloud 注册个号,自己有余额可以弄」「xinhui.cloud 去注册下,新用户送体验金,自己弄便宜」\n - 语气直接,让客户自己去操作」" elif any(kw in customer_text for kw in ["擦边", "黄色", "色情", "大尺度", "性感图", "露点", "半裸"]): # 客户问擦边/黄色内容 → 直接拒绝,不说「发图来看看」 prompt += "\n⚠️【拒绝】客户询问擦边/黄色/敏感内容。直接拒绝,不接单,不说「发图来看看」。自然回复如:这类不做/不接/做不了。" else: prompt += "\n根据客户说的内容自然回应,像真人聊天,不要套模板。" # ── 附加订单信息(不覆盖客户问题的优先级)── if has_order: order = self._parse_order_info(order_block) order_instruction = self._get_order_instruction( order.get('pay_status', ''), order.get('order_status', '') ) if customer_text: if not order_unpaid: # 未付款情况已在上面明确处理,不重复添加背景 prompt += f"\n\n【背景参考-订单通知】{order_instruction}" else: # 纯系统通知,没有客户文字 prompt += f"\n\n{order_instruction}" if not customer_text and not has_order: prompt += f"\n消息内容: {msg_content}\n请按工作流规则回复。" return prompt async def _handle_image_workflow(self, message: str, data: dict, image_urls: list) -> bool: """处理图片工作流(根据客户说的话判断执行哪种工作流)""" if not image_urls: return False workflow_type, confidence = self.workflow_router.detect_workflow(message) customer_id = data.get('from_id') acc_id = data.get('acc_id', '') acc_type = data.get('acc_type', 'AliWorkbench') image_url = image_urls[0] print(f"[Agent] 检测到工作流类型:{workflow_type} (置信度:{confidence})") if workflow_type == "find_image": print(f"[Agent] 执行查找图片工作流 | 客户:{customer_id}") from core.workflow import workflow return await workflow.find_image_workflow( customer_id=customer_id, image_url=image_url, acc_id=acc_id, acc_type=acc_type ) elif workflow_type == "process_image": print(f"[Agent] 执行处理图片工作流 | 客户:{customer_id}") from core.workflow import workflow return await workflow.process_image_workflow( customer_id=customer_id, image_url=image_url, acc_id=acc_id, acc_type=acc_type ) elif workflow_type == "transfer_human": print(f"[Agent] 执行转人工派单工作流 | 客户:{customer_id}") from core.workflow import workflow return await workflow.transfer_to_designer_workflow( customer_id=customer_id, image_url=image_url, acc_id=acc_id, acc_type=acc_type, reason="客户主动要求转人工" ) return False async def test_agent(): """测试 Agent""" agent = CustomerServiceAgent(skills_dir="skills") test_msg = CustomerMessage( msg_id="123", acc_id="test_account", msg="这张图可以做吗?", from_id="customer001", from_name="张三", cy_id="customer001", acc_type="AliWorkbench", msg_type=0, cy_name="张三", goods_name="专业找图代找高清图片", goods_order="" ) response = await agent.process_message(test_msg) print(f"回复内容: {response.reply}") if __name__ == "__main__": import asyncio asyncio.run(test_agent())