"""PydanticAI Agent 模块 架构:单 Agent + 多 Tool 模式 - Agent 负责对话逻辑和决策 - Tool 负责具体能力:看图/查客户/转接 - AI 自主决定何时调用哪个工具,时序自然,不需要外部协调 """ import os import glob import asyncio import random import hashlib import re import json import logging from pathlib import Path from typing import Optional, Dict, List, Any, Tuple from datetime import datetime from pydantic import BaseModel, Field, model_validator from pydantic_ai import Agent, RunContext from pydantic_ai.models.openai import OpenAIChatModel from pydantic_ai.providers.openai import OpenAIProvider from dotenv import load_dotenv from utils.metrics_tracker import emit as metrics_emit from utils.observability import emit_activity, build_trace_id from core.quote_state_machine import QuoteStateMachine from services.risk_service import RiskService from core.agent_pre_rules import AgentPreRuleService from core.find_image_flow import handle_find_image_batch_flow from core.order_flow import handle_order_notification from core.ai_reply_flow import execute_ai_turn from core.reply_finalize_flow import finalize_ai_reply load_dotenv() from services.service_tuhui_upload import upload_to_tuhui from core.workflow_router import get_workflow_router from core.workflow_router import get_workflow_router from db.customer_risk_db import risk_db # ========== 企业微信通知 ========== _WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "") logger = logging.getLogger("cs_agent") async def _notify_wechat(content: str, tag: str = "通知"): """发送企业微信 markdown 通知,任何异常都发""" if not _WECHAT_WEBHOOK: print(f"[{tag}] 未配置 WECHAT_WEBHOOK,跳过推送") return try: import httpx async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(_WECHAT_WEBHOOK, json={ "msgtype": "markdown", "markdown": {"content": content} }) data = resp.json() if data.get("errcode") == 0: print(f"[{tag}] 企业微信推送成功 ✓") else: print(f"[{tag}] 企业微信推送失败: {data}") except Exception as e: print(f"[{tag}] 企业微信发送异常: {e}") async def _notify_wechat_overdue(): """API 欠费时发企业微信通知""" await _notify_wechat( "⚠️ **火山引擎 API 欠费**,客服AI已停止响应,请立即充值!\n" "地址:https://console.volcengine.com/ark" ) # ========== 转接常量 ========== TRANSFER_MESSAGE = "话术|[转移会话],分组20252916034,无原因" CASE_LIBRARY_LINK = "https://www.yuque.com/zuowei-dfvpq/kge0in/mynala0g35b8cec5" TAOBAO_REPLY_TAILS = ("嗯", "哦", "好的", "嗯咯", "嗯啦") def _is_ack_like_customer_text(text: str) -> bool: """客户是否为确认型短句(好的/嗯/收到/ok 等)。""" s = (text or "").strip().lower() if not s: return False s = s.rstrip("。.!!?~~") ack_set = { "好", "好的", "嗯", "嗯嗯", "收到", "知道了", "明白了", "ok", "okay", "行", "可以", "好嘞", "好的呢", } return s in ack_set def _is_meaningless_short_text(text: str) -> bool: """识别无意义短句:仅需简短承接,不进入复杂流程。""" s = (text or "").strip().lower().rstrip("。.!!?~~") if not s: return False meaningless = { "好", "好的", "嗯", "嗯嗯", "哦", "哦哦", "收到", "知道了", "明白了", "ok", "okay", "行", "可以", "好嘞", "好的呢", "在吗", "有人吗", "在不在", } return s in meaningless # ========== 数据模型 ========== class CustomerMessage(BaseModel): """客户消息模型""" msg_id: str acc_id: str msg: str from_id: str from_name: str cy_id: str acc_type: str msg_type: int cy_name: str goods_name: Optional[str] = None goods_order: Optional[str] = None class ConversationState(BaseModel): """对话状态""" customer_id: str stage: str = "售前" # 售前/售后 last_price: Optional[int] = None # 最后报价 last_min_price: Optional[int] = None # 最近图片的最低价 last_order_id: Optional[str] = None # 订单号 order_status: Optional[str] = None # 订单状态 discount_count: int = 0 # 让价次数 image_count: int = 0 # 图片数量 pending_image_urls: List[str] = Field(default_factory=list) # 待统一报价图片 pending_requirements: List[str] = Field(default_factory=list) # 待统一报价需求 quote_phase: str = "idle" # idle/collecting/ready_to_quote/waiting_result quote_ready_turns: int = 0 # ready_to_quote 阶段还需等待的消息轮次 last_update: str = "" last_reply_at: Optional[datetime] = None # 最后一次回复客户的时间 class AgentDeps(BaseModel): """Agent 依赖项 - 用于传递上下文""" msg_id: str acc_id: str from_id: str platform: str class AgentResponse(BaseModel): """Agent 回复模型""" reply: str should_reply: bool = True need_transfer: bool = False # 是否需要转人工 transfer_msg: str = "" # 转接消息 @model_validator(mode="after") def _ensure_reply_tail(self): # 统一在 process_message 中按客户输入决定是否补口语尾词 return self def _get_shop_type(acc_id: str = "", goods_name: str = "") -> str: """根据 acc_id 或 goods_name 判断店铺类型,返回 gemini_api / find_image / default""" try: from config.config import CONFIG_DIR import json cfg_path = CONFIG_DIR / "shop_prompts.json" if not cfg_path.exists(): return "find_image" with open(cfg_path, "r", encoding="utf-8") as f: cfg = json.load(f) shops = cfg.get("shops", {}) goods_kw = cfg.get("goods_keywords", {}) type_hints = cfg.get("type_hints", {}) # 优先按 acc_id if acc_id and acc_id in shops: return shops[acc_id].get("type", "find_image") # 按商品名关键词 goods_lower = (goods_name or "").lower() for kw, stype in goods_kw.items(): if kw in goods_lower: return stype except Exception: pass return "find_image" def load_skill_map(skills_dir: str = "skills") -> Dict[str, str]: """按技能目录名加载 SKILL.md,返回 {skill_name: content}。""" skill_map: Dict[str, str] = {} skill_files = glob.glob(os.path.join(skills_dir, "**/SKILL.md"), recursive=True) for skill_file in skill_files: try: content = Path(skill_file).read_text(encoding="utf-8") skill_name = Path(skill_file).parent.name.strip().lower() if not skill_name: continue if skill_name in skill_map: skill_map[skill_name] += "\n\n" + content else: skill_map[skill_name] = content except Exception as e: print(f"警告: 读取 {skill_file} 失败: {e}") return skill_map class CustomerServiceAgent: """客服 Agent - 支持 SKILL.md + 工作流""" C_RESET = "\033[0m" C_PROMPT = "\033[96m" # cyan C_THINK = "\033[95m" # magenta C_TOOL = "\033[93m" # yellow C_REPLY = "\033[92m" # green C_MUTED = "\033[90m" # gray _DEFAULT_EVOLUTION_CANDIDATE = Path("config") / "evolution_candidate.json" @staticmethod def _activity_log(event: str, **kwargs): emit_activity( logger, event=event, trace_id=str(kwargs.pop("trace_id", "")), customer_id=str(kwargs.pop("customer_id", "")), result=str(kwargs.pop("result", "ok")), **kwargs, ) def __init__(self, skills_dir: str = "skills"): self.api_key = os.getenv("OPENAI_API_KEY") self.base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") self.model_name = os.getenv("OPENAI_MODEL", "gpt-4o-mini") self.reply_persona = os.getenv("AI_REPLY_PERSONA", "淘宝老店主,直爽利落,口语自然") self.dynamic_collection_replies = os.getenv("AI_DYNAMIC_COLLECTION_REPLIES", "true").strip().lower() in {"1", "true", "yes", "on"} self.rewrite_all_replies = os.getenv("AI_REWRITE_ALL_REPLIES", "true").strip().lower() in {"1", "true", "yes", "on"} try: self.batch_quote_delay_turns = max(0, int(os.getenv("BATCH_QUOTE_DELAY_TURNS", "1"))) except Exception: self.batch_quote_delay_turns = 1 self.quote_state_machine = QuoteStateMachine(delay_turns=self.batch_quote_delay_turns) self.risk_service = RiskService() self.pre_rule_service = AgentPreRuleService(self, self.risk_service) if not self.api_key: raise ValueError("请设置 OPENAI_API_KEY 环境变量") # 对话状态管理 self.conversations: Dict[str, ConversationState] = {} # 多轮对话历史(PydanticAI ModelMessage 列表,按客户ID存储) self.message_histories: Dict[str, list] = {} self.evolution_candidate = self._load_evolution_candidate() # 加载技能并按角色拆分,避免所有 Agent 吃同一份大杂烩提示词 self.skill_map = load_skill_map(skills_dir) self.skill_style = self._compose_skill_content(["style-skill", "owner-style"]) self.skill_pre_sales = self._compose_skill_content(["pre-sales-skill"]) self.skill_pricing = self._compose_skill_content(["pricing-skill"]) self.skill_after_sale = self._compose_skill_content(["after-sales-skill"]) self.skill_risk = self._compose_skill_content(["risk-skill"]) # 创建 OpenAI 模型 model = OpenAIChatModel( model_name=self.model_name, provider=OpenAIProvider( api_key=self.api_key, base_url=self.base_url ) ) self.agent = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_system_prompt() ) self.agent_after_sale = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_after_sale_prompt() ) self.agent_pricing = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_pricing_prompt() ) self.agent_processing = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_processing_prompt() ) self.agent_similar = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_similar_prompt() ) self.agent_natural_reply = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_natural_reply_prompt() ) # 工作流程路由器 self.workflow_router = get_workflow_router() self.agent_order = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_order_prompt() ) self.agent_risk = Agent( model=model, deps_type=AgentDeps, system_prompt=self._get_risk_prompt() ) # 注册工具 self._register_tools() def _compose_skill_content(self, names: List[str]) -> str: """按技能名拼接技能文本,找不到则跳过。""" parts: List[str] = [] for name in names: key = (name or "").strip().lower() if key and key in self.skill_map: parts.append(self.skill_map[key]) return "\n\n".join(parts) @staticmethod def _attach_skill_docs(prompt: str, *skill_docs: str) -> str: docs = [d for d in skill_docs if d] if not docs: return prompt return prompt + "\n\n=== 角色技能 ===\n" + "\n\n".join(docs) def _load_evolution_candidate(self) -> Dict[str, Any]: """读取自我进化候选配置(灰度策略),读取失败时返回空。""" try: path = Path(os.getenv("EVOLUTION_CANDIDATE_PATH", str(self._DEFAULT_EVOLUTION_CANDIDATE))) if not path.exists(): return {} data = json.loads(path.read_text(encoding="utf-8")) if not isinstance(data, dict): return {} return data except Exception: return {} def _evolution_gray_percent(self) -> int: """灰度比例,默认 5%。""" try: env_pct = os.getenv("EVOLUTION_GRAY_PERCENT", "").strip() if env_pct: pct = int(float(env_pct)) else: pct = int(((self.evolution_candidate or {}).get("gray_percent", 5))) return max(0, min(100, pct)) except Exception: return 5 def _evolution_enabled_for_customer(self, customer_id: str) -> bool: """按客户哈希稳定灰度命中,命中后启用候选策略。""" cand = self.evolution_candidate or {} if str(cand.get("status", "")).strip() != "ready_for_gray_5_percent": return False if not customer_id: return False pct = self._evolution_gray_percent() if pct <= 0: return False digest = hashlib.md5(customer_id.encode("utf-8")).hexdigest() bucket = int(digest[:8], 16) % 100 hit = bucket < pct if hit: metrics_emit("evolution_gray_hit", customer_id=customer_id, percent=pct, version=str(cand.get("version", ""))) return hit def _evolution_has_proposal(self, proposal_id: str) -> bool: cand = self.evolution_candidate or {} for p in cand.get("proposals", []) or []: if str((p or {}).get("id", "")).strip() == proposal_id: return True return False @staticmethod def _is_service_risk_inquiry(text: str) -> bool: """识别退款/投诉等服务风险场景。""" s = (text or "").strip().lower() if not s: return False kw = ("退款", "退货", "投诉", "差评", "举报", "欺骗", "骗人", "起诉", "法院", "生气", "不满意") return any(k in s for k in kw) @staticmethod def _log_block(title: str, content: str): """统一的控制台分层日志输出。""" print(f"{CustomerServiceAgent.C_PROMPT}[{title}]{CustomerServiceAgent.C_RESET}") print(content) print(f"{CustomerServiceAgent.C_MUTED}────────────────────{CustomerServiceAgent.C_RESET}") @staticmethod def _normalize_reply_text(text: Optional[str]) -> str: """清洗模型输出,避免把占位词直接发给客户。""" if text is None: return "" cleaned = str(text).strip() if cleaned.lower() in {"无", "none", "null", "n/a"}: return "" return cleaned @staticmethod def _colloquialize_reply(text: str) -> str: """把常见机械表达柔化为更口语的客服话术。""" t = (text or "").strip() if not t: return t repl = { "确认我就安排": "你点头我就开做", "可以的话我马上安排": "可以我就马上给你做", "我这边马上安排": "我马上安排", "立刻统一报价": "马上给你报价", "统一报价": "一起给你报价", "您": "你", "请您": "你", "可选:A": "可选:", "流程完成": "已经安排好了", } for k, v in repl.items(): t = t.replace(k, v) return t async def _render_collection_reply_with_ai( self, *, message: CustomerMessage, state: ConversationState, scene: str, intent_hint: str, fallback: str, ) -> str: """ 收图阶段回复默认走 AI 改写,失败时回退到固定模板。 """ # 首张收图先承接“我看一下”,避免机械地立刻催“发完统一报价”。 if scene == "collect_ack" and len(state.pending_image_urls) == 1: first_ack = [ "收到了,我先看一下哈,稍等哈", "这张我收到了,我先看下,等我一下哈", "收到这张了,我先过一眼,稍等哈", "我先看这张哈,稍等我一下", "图我收到了,我先看一眼,马上回你哈", "这张先记上了,我先看下细节,稍等哈", "收到哈,我先过一遍这张,等我会儿", "我先看这张效果,稍等一下哈", "图到了,我先看下清晰度,稍等哈", "这张我先看着,稍等我一下就回你", "收到这张了,我先核一下细节,稍等哈", "我先把这张看完,稍等我一会儿哈", ] return random.choice(first_ack) if not self.dynamic_collection_replies: return fallback try: deps = AgentDeps( msg_id=message.msg_id, acc_id=message.acc_id, from_id=message.from_id, platform=message.acc_type, ) history = self.message_histories.get(message.from_id, []) pending_req = ";".join((state.pending_requirements or [])[-4:]) or "无" user_prompt = ( "请按下面意图生成给客户的自然回复。\n" f"场景: {scene}\n" f"回复意图: {intent_hint}\n" f"客户原话: {message.msg}\n" f"当前已收图片数: {len(state.pending_image_urls)}\n" f"当前需求摘要: {pending_req}\n" "输出要求: 不超过2句话,像真人店主聊天。" ) result = await self.agent_natural_reply.run(user_prompt, deps=deps, message_history=history) self.message_histories[message.from_id] = result.all_messages()[-30:] text = self._colloquialize_reply(self._normalize_reply_text(result.output)) if not text: return fallback transfer_keywords = ("TRANSFER_REQUESTED", "[转移会话]", "转移会话") if any(k in text for k in transfer_keywords): return fallback return text except Exception: return fallback async def _rewrite_reply_with_ai( self, *, message: CustomerMessage, state: ConversationState, reply: str, scene: str = "final_reply", ) -> str: """ 对最终回复做 AI 润色,统一口吻。失败时返回原文。 """ text = (reply or "").strip() if not text or not self.rewrite_all_replies: return text transfer_keywords = ("TRANSFER_REQUESTED", "[转移会话]", "转移会话") if any(k in text for k in transfer_keywords): return text try: deps = AgentDeps( msg_id=message.msg_id, acc_id=message.acc_id, from_id=message.from_id, platform=message.acc_type, ) history = self.message_histories.get(message.from_id, []) pending_req = ";".join((state.pending_requirements or [])[-4:]) or "无" prompt = ( "请把下面这句客服回复润色成更自然的微信聊天口吻,语义必须保持一致。\n" f"场景: {scene}\n" f"客户原话: {message.msg}\n" f"当前已收图: {len(state.pending_image_urls)}张\n" f"当前需求摘要: {pending_req}\n" f"原回复: {text}\n" "要求: 不要新增承诺/价格/流程;不超过2句话;只输出润色后的最终回复。" ) result = await self.agent_natural_reply.run(prompt, deps=deps, message_history=history) self.message_histories[message.from_id] = result.all_messages()[-30:] polished = self._colloquialize_reply(self._normalize_reply_text(result.output)) if not polished: return text if any(k in polished for k in transfer_keywords): return text return polished except Exception: return text def _register_tools(self): """注册所有 Tool,让 Agent 可以主动调用""" @self.agent.tool async def analyze_image(ctx: RunContext[AgentDeps], image_url: str) -> str: """ 分析客户发来的图片复杂度,用于报价。 收到图片URL时调用此工具,返回复杂度和建议报价。 """ try: from image.image_analyzer import image_analyzer result = await image_analyzer.analyze(image_url) complexity_label = { "simple": "简单(画面干净)", "normal": "一般复杂度", "complex": "细节偏多", "hard": "非常复杂", }.get(result["complexity"], result["complexity"]) # 持久化图片URL和复杂度,重启后仍能记住这张图 try: from db.customer_db import db db.update_last_image( ctx.deps.from_id, image_url, complexity=result["complexity"], gemini_prompt=result.get("gemini_prompt", ""), aspect_ratio=result.get("aspect_ratio", "1:1"), perspective=result.get("perspective", "no"), ) except Exception: pass # 存图片类型到客户画像 try: from db.customer_db import db as _db if result.get("subject"): _db.add_image_type(ctx.deps.from_id, result["subject"]) except Exception: pass # 在 workflow 里创建待处理任务(付款后自动触发 Gemini) try: from core.workflow import workflow await workflow.image_analysis_result( customer_id=ctx.deps.from_id, image_url=image_url, complexity=result["complexity"], acc_id=ctx.deps.acc_id, acc_type=ctx.deps.platform, gemini_prompt=result.get("gemini_prompt", ""), aspect_ratio=result.get("aspect_ratio", "1:1"), perspective=result.get("perspective", "no"), proc_type=result.get("proc_type", ""), subject=result.get("subject", ""), quality=result.get("quality", ""), ) print(f"[Agent] Workflow 任务已创建 | 客户: {ctx.deps.from_id} | 比例: {result.get('aspect_ratio')} | 透视: {result.get('perspective')} | 图片: {image_url[:60]}...") except Exception as e: print(f"[Agent] Workflow 任务创建失败: {e}") # 组装给 AI 的分析报告 risk = result.get("risk", "none") has_face = result.get("has_face", "no") feasibility = result.get("feasibility", "yes") note = result.get("note", "") lines = [ f"图片主体:{result['subject'] or '未识别'}", f"处理类型:{result['proc_type'] or '高清修复'}", f"原图质量:{result['quality'] or '未知'}", f"图片类型:{result.get('category', '') or '通用'}", f"图片尺寸:{(result.get('width') or 0)}x{(result.get('height') or 0)}({result.get('megapixels', 0.0)}MP)", f"含人脸:{'是' if has_face == 'yes' else '否'}", f"复杂度:{complexity_label}", f"原因:{result['reason']}", ] if result.get("size_surcharge"): lines.append(f"尺寸加价:+{result['size_surcharge']}元") if result.get("size_note"): lines.append(f"尺寸提示:{result['size_note']}") try: st = self._get_conversation_state(ctx.deps.from_id) if isinstance(result.get("price_min"), (int, float)): st.last_min_price = int(result.get("price_min") or 0) try: from db.customer_db import db as _db _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) except Exception: pass except Exception: pass # 根据可做性和风险等级给 AI 不同的行动指引 if feasibility == "no": if "敏感" in (note or ""): lines.append("【拒绝】图片含敏感/黄色/擦边内容,不接单。") lines.append("→ 直接拒绝,不说「发图来看看」,自然回复如:这类不做/不接。") else: lines.append("【无法处理】此图无法处理(纯黑/纯白/完全损坏/要找原始RAW文件)。") lines.append("→ 告知客户无法处理,建议换图或说明原因,不要报价。") elif risk == "high": lines.append(f"【高风险】此图处理风险高:{note or 'AI修复后效果不能保证与原图一致'}") lines.append(f"建议报价:{result['price_suggest']}元") lines.append("→ 先自然说明风险(人脸/效果可能不完美),再报价,满意再拍。话术自然。") elif risk == "low": lines.append(f"【低风险-含人脸】修复后人脸相似度约70-90%,效果不稳定。") lines.append(f"建议报价:{result['price_suggest']}元") lines.append(f"→ 报价时自然加一句风险提示(人脸可能有轻微变化、满意再付等)") else: # 无风险,正常报价 base_price = result.get('price_suggest', 20) text_surcharge = result.get('text_surcharge', 0) layer_surcharge = result.get('layer_surcharge', 0) total_price = base_price + text_surcharge + layer_surcharge # 构建报价说明 price_explanation = f"建议报价:{total_price}元" if text_surcharge > 0: price_explanation += f"(含文字处理 +{text_surcharge}元)" if layer_surcharge > 0: price_explanation += f"(含分层 +{layer_surcharge}元)" lines.append(price_explanation) # 添加文字数量说明 text_amount = result.get('text_amount', 'none') if text_amount != 'none': lines.append(f"文字数量:{text_amount},需要精细处理") if feasibility == "partial": lines.append(f"⚠️ 此图有一定难度:{note or '效果可能不完美'},回复时可加「效果不满意退款」") if note and note not in ("无", ""): lines.append(f"提示:{note}") lines.append(f"【立刻回复客户报价 {total_price} 元,话术自然多变】") return "\n".join(lines) except Exception as e: return f"图片分析失败: {e},请根据经验判断报价" @self.agent.tool async def get_customer_info(ctx: RunContext[AgentDeps], customer_id: str) -> str: """ 查询客户历史信息:消费记录、性格标签、报价历史等。 对话开始时或需要了解客户背景时调用。 """ try: from db.customer_db import db return db.get_profile_text(customer_id) except Exception as e: return f"查询失败: {e}" @self.agent.tool async def transfer_to_human(ctx: RunContext[AgentDeps]) -> str: """ 转接人工客服。 遇到退款/投诉/情绪激动/复杂售后时调用。 """ return "TRANSFER_REQUESTED" @self.agent.tool async def get_customer_risk_profile(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: """查询客户风控画像:退款/不付款/差评/人工黑名单等。""" cid = customer_id or ctx.deps.from_id try: info = risk_db.evaluate_customer(cid) return ( f"客户:{cid}\n" f"不接单:{'是' if info.get('do_not_serve') else '否'}\n" f"风险等级:{info.get('computed_level','low')} 分数:{info.get('computed_score',0)}\n" f"近30天退款:{info.get('refund_30d',0)}\n" f"近7天未付款下单:{info.get('unpaid_7d',0)}\n" f"近90天差评:{info.get('bad_review_90d',0)}\n" f"备注:{info.get('note','') or '无'}" ) except Exception as e: return f"查询风控画像失败: {e}" @self.agent.tool async def mark_customer_risk( ctx: RunContext[AgentDeps], customer_id: str, do_not_serve: bool = False, risk_level: str = "low", risk_score: int = 0, note: str = "", tag: str = "", ) -> str: """人工标记客户风控画像(不接单/高风险/备注标签)。""" try: tags = [tag] if tag else [] risk_db.set_profile( customer_id=customer_id, do_not_serve=do_not_serve, risk_level=risk_level, risk_score=risk_score, note=note, tags=tags, ) return "风控画像已更新" except Exception as e: return f"更新风控画像失败: {e}" @self.agent.tool async def record_customer_risk_event( ctx: RunContext[AgentDeps], customer_id: str, event_type: str, event_count: int = 1, note: str = "", ) -> str: """记录风控事件:refund/unpaid_order/bad_review/blacklist_hit 等。""" try: risk_db.record_event( customer_id=customer_id, event_type=event_type, event_count=event_count, note=note, ) return "风控事件已记录" except Exception as e: return f"记录风控事件失败: {e}" @self.agent.tool async def save_customer_note( ctx: RunContext[AgentDeps], customer_id: str, note: str ) -> str: """ 记录客户关键信息到画像(邮箱/微信/特殊需求等)。 客户提供联系方式或重要信息时调用。 """ try: from db.customer_db import db db.add_note(customer_id, note) return "已记录" except Exception as e: return f"记录失败: {e}" @self.agent.tool async def update_contact_info( ctx: RunContext[AgentDeps], customer_id: str, contact_type: str, value: str ) -> str: """ 更新客户联系方式。 当客户说出邮箱/手机/微信时调用,比正则提取更准确。 contact_type 枚举值: email - 邮箱 phone - 手机号 wechat - 微信号 """ try: from db.customer_db import db if contact_type == "email": db.update_email(customer_id, value) elif contact_type == "phone": db.update_phone(customer_id, value) elif contact_type == "wechat": db.update_wechat(customer_id, value) else: return f"未知联系方式类型: {contact_type}" return f"已保存 {contact_type}: {value}" except Exception as e: return f"保存失败: {e}" @self.agent.tool async def record_quote( ctx: RunContext[AgentDeps], customer_id: str, price: int, description: str = "" ) -> str: """ 记录本次报价到客户画像,用于后续对话保持价格一致。 每次给客户报价后调用。 Args: customer_id: 客户ID price: 报价金额(元) description: 报价描述,如"单图处理"/"三图打包" """ try: from db.customer_db import db db.update_last_price(customer_id, price) if description: db.add_note(customer_id, f"报价 {price}元({description})") # 同步到内存状态 state = self.conversations.get(customer_id) if state: state.last_price = price return f"已记录报价 {price}元" except Exception as e: return f"记录失败: {e}" @self.agent.tool async def process_image_gemini(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: """ 触发 Gemini 作图处理。客户付款后或说「安排一下」「处理一下」时调用。 会从客户档案读取上次发图的 URL 和处理参数(提示词、比例、透视),启动 Gemini 流程。 处理完成后会自动发图给客户。 """ try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不自动作图" except Exception: return "现在处理模块暂时暂停,先不自动作图" cid = customer_id or ctx.deps.from_id try: from core.workflow import workflow ok = await workflow.trigger_processing_on_payment( customer_id=cid, acc_id=ctx.deps.acc_id, acc_type=ctx.deps.platform, ) if ok: return "已安排,稍后发你" return "该客户暂无待处理图片,请先发图" except Exception as e: return f"触发作图失败: {e},请稍后重试或转人工" @self.agent_pricing.tool async def analyze_image_pricing(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from image.image_analyzer import image_analyzer result = await image_analyzer.analyze(image_url) if result.get("feasibility") == "no" or result.get("risk") == "high": return "该图风险高或不可做:不报价,建议换图或转人工评估。" if not result.get("success", False): return "图片识别异常:先不报价,建议客户重发更清晰图片。" p = result.get("price_suggest", 20) try: st = self._get_conversation_state(ctx.deps.from_id) if isinstance(result.get("price_min"), (int, float)): st.last_min_price = int(result.get("price_min") or 0) try: from db.customer_db import db as _db _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) except Exception: pass except Exception: pass return f"建议报价:{p}元" except Exception as e: return f"图片分析失败: {e}" @self.agent_pricing.tool async def record_quote_pricing( ctx: RunContext[AgentDeps], customer_id: str, price: int, description: str = "" ) -> str: try: from db.customer_db import db db.update_last_price(customer_id, price) return "ok" except Exception as e: return f"记录失败: {e}" @self.agent_processing.tool async def process_image_gemini_run(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: """触发 Gemini 作图处理(processing agent 专用入口)。""" return await process_image_gemini(ctx, customer_id) @self.agent_similar.tool async def recommend_similar(ctx: RunContext[AgentDeps], hint: str = "") -> str: try: return "有类似款,拍下我发你参考图。" except Exception as e: return f"推荐失败: {e}" @self.agent_order.tool async def handle_order(ctx: RunContext[AgentDeps], raw_msg: str = "") -> str: try: info = self._parse_order_info(raw_msg or "") paid_kw = ["等待发货", "已付款", "付款成功", "买家已付款"] if any(k in (info.get("pay_status", "") or "") for k in paid_kw) or any(k in (info.get("order_status", "") or "") for k in paid_kw): return "已安排,稍后发你" return "" except Exception: return "" @self.agent_risk.tool async def risk_filter(ctx: RunContext[AgentDeps], text: str = "") -> str: return "这类不做哈,政治/敏感内容都不接。" @self.agent_risk.tool async def get_customer_risk_profile_risk(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: return await get_customer_risk_profile(ctx, customer_id) @self.agent_risk.tool async def mark_customer_risk_risk( ctx: RunContext[AgentDeps], customer_id: str, do_not_serve: bool = False, risk_level: str = "low", risk_score: int = 0, note: str = "", tag: str = "", ) -> str: return await mark_customer_risk( ctx=ctx, customer_id=customer_id, do_not_serve=do_not_serve, risk_level=risk_level, risk_score=risk_score, note=note, tag=tag, ) @self.agent_risk.tool async def record_customer_risk_event_risk( ctx: RunContext[AgentDeps], customer_id: str, event_type: str, event_count: int = 1, note: str = "", ) -> str: return await record_customer_risk_event( ctx=ctx, customer_id=customer_id, event_type=event_type, event_count=event_count, note=note, ) @self.agent.tool async def remove_background(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】去背景,输出白底图。客户只要去背景时调用。""" try: from image.image_tools import remove_background as _rb r = await _rb(image_url) if r["success"]: return f"去背景完成,已保存。自然回复客户好了发你" return f"去背景失败:{r['message']}" except Exception as e: return f"去背景失败:{e}" @self.agent.tool async def perspective_correct(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】透视矫正。输入需白底图,输出展平图。""" try: from image.image_tools import perspective_correct as _pc r = await _pc(image_url) if r["success"]: return f"透视矫正完成。自然回复客户好了" return f"透视矫正失败:{r['message']}" except Exception as e: return f"透视矫正失败:{e}" @self.agent.tool async def extract_pattern_tool( ctx: RunContext[AgentDeps], image_url: str, prompt: str = "", aspect_ratio: str = "1:1" ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】印花提取/主处理。按提示词和比例处理。""" try: from image.image_tools import extract_pattern r = await extract_pattern(image_url, prompt=prompt, aspect_ratio=aspect_ratio) if r["success"]: return f"提取完成。自然回复客户好了发你" return f"提取失败:{r['message']}" except Exception as e: return f"提取失败:{e}" @self.agent.tool async def enhance_image_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】高清增强。客户只要清晰化时调用。""" try: from image.image_tools import enhance_image r = await enhance_image(image_url) if r["success"]: return f"高清增强完成。自然回复客户好了" return f"增强失败:{r['message']}" except Exception as e: return f"增强失败:{e}" @self.agent.tool async def color_match_tool( ctx: RunContext[AgentDeps], orig_url: str, result_url: str, strength: float = 0.75 ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】颜色匹配。将 result 色调匹配到 orig。""" try: from image.image_tools import color_match_images r = await color_match_images(orig_url, result_url, strength=strength) if r["success"]: return f"颜色匹配完成" return f"颜色匹配失败:{r['message']}" except Exception as e: return f"颜色匹配失败:{e}" @self.agent.tool async def trim_border_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】裁切四周背景边(白/黄/米等)。""" try: from image.image_tools import trim_border r = await trim_border(image_url) if r["success"]: return f"裁边完成" return f"裁边失败:{r['message']}" except Exception as e: return f"裁边失败:{e}" @self.agent.tool async def vectorize_to_eps_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】矢量化 - 将图片转为 EPS 矢量文件。客户要做矢量图、转 EPS、转 AI 格式时调用。""" try: from image.image_tools import vectorize_to_eps r = await vectorize_to_eps(image_url) if r["success"]: return f"矢量化完成,已生成 EPS 文件。自然回复客户好了发你" return f"矢量化失败:{r['message']}" except Exception as e: return f"矢量化失败:{e}" @self.agent.tool async def meitu_enhance_tool( ctx: RunContext[AgentDeps], image_url: str, mode: str = "standard" ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """ 【独立工具】美图画质增强。客户要画质增强、清晰化、美图处理时调用。 Args: image_url: 图片 URL 或本地路径 mode: 处理模式。crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化) """ try: from image.image_tools import meitu_enhance r = await meitu_enhance(image_url, mode=mode) if r["success"]: return f"画质增强完成。自然回复客户好了发你" return f"画质增强失败:{r['message']}" except Exception as e: return f"画质增强失败:{e}" @self.agent.tool async def resize_image( ctx: RunContext[AgentDeps], image_url: str, width: int, height: int = 0 ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """ 改图片尺寸。客户说「改成1920x1080」「弄成横图」「改下尺寸」时调用。 Args: image_url: 图片URL(客户刚发的图,或从对话中获取) width: 目标宽度(像素),如 1920 height: 目标高度(0=按宽度等比缩放),如 1080 常用尺寸:1920x1080(横屏) 1080x1920(竖屏) 2000x2000(方图) """ try: from image.image_processor import image_processor result = await image_processor.resize(image_url, width, height) if result["success"]: return f"改尺寸完成:{width}x{height},已保存。自然回复客户改好了" else: return f"改尺寸失败:{result['message']},告知客户稍后重试" except Exception as e: return f"改尺寸失败:{e}" @self.agent.tool async def calculate_bulk_price( ctx: RunContext[AgentDeps], image_count: int, complexities: str = "" ) -> str: """ 计算多图打包价格。 客户要做多张图时调用,返回建议总价。 Args: image_count: 图片数量 complexities: 各图复杂度,逗号分隔,如 "normal,complex,simple" 没有识别结果时留空,按平均价格估算 """ if image_count <= 0: return "图片数量无效" # 各复杂度单价(必须为5的整数倍) unit_price = {"simple": 15, "normal": 20, "complex": 25, "hard": 30} default_unit = 20 # 没有识别结果时的默认单价 if complexities: levels = [c.strip() for c in complexities.split(",")] total = sum(unit_price.get(lv, default_unit) for lv in levels) else: total = image_count * default_unit # 打包优惠:3张以上9折,5张以上8折,价格必须为5的整数倍 if image_count >= 5: discounted = round(total * 0.8 / 5) * 5 tip = f"({image_count}张8折优惠)" elif image_count >= 3: discounted = round(total * 0.9 / 5) * 5 tip = f"({image_count}张9折优惠)" else: discounted = round(total / 5) * 5 tip = "" return f"建议打包报价:{discounted}元{tip}(原价{total}元)" # 对话状态超过多少小时后重置(避免昨天的售后状态影响今天) CONVERSATION_TIMEOUT_HOURS = 12 def _get_conversation_state(self, customer_id: str) -> ConversationState: """获取或创建对话状态,超时自动重置""" now = datetime.now() if customer_id in self.conversations: state = self.conversations[customer_id] # 超过 12 小时没有消息,重置阶段和压价次数 if state.last_update: try: last = datetime.fromisoformat(state.last_update) hours = (now - last).total_seconds() / 3600 if hours > self.CONVERSATION_TIMEOUT_HOURS: state.stage = "售前" state.discount_count = 0 # 同时清理对话历史,避免发送过期上下文 self.message_histories.pop(customer_id, None) except Exception: pass # 进程内状态为空时,尝试从持久化恢复 if not state.pending_image_urls and not state.pending_requirements: self._restore_pending_quote_state(customer_id, state) else: self.conversations[customer_id] = ConversationState( customer_id=customer_id, last_update=now.isoformat() ) self._restore_pending_quote_state(customer_id, self.conversations[customer_id]) # 定期清理长期不活跃客户(超过 7 天) self._cleanup_inactive(now) return self.conversations[customer_id] def _cleanup_inactive(self, now: datetime): """清理超过 7 天没有消息的对话状态,释放内存""" # 每 100 次调用清理一次,避免每次都遍历 if len(self.conversations) % 100 != 0: return expired = [ cid for cid, state in self.conversations.items() if state.last_update and (now - datetime.fromisoformat(state.last_update)).days > 7 ] for cid in expired: self.conversations.pop(cid, None) self.message_histories.pop(cid, None) def _sync_pending_quote_state(self, customer_id: str, state: ConversationState): """把待报价队列同步到客户库,避免重启丢失。""" try: self._refresh_quote_phase(state) from db.customer_db import db db.update_pending_quote_state( customer_id, state.pending_image_urls, state.pending_requirements, ) except Exception: pass def _restore_pending_quote_state(self, customer_id: str, state: ConversationState): """从客户库恢复待报价队列。""" try: from db.customer_db import db profile = db.get_customer(customer_id) state.pending_image_urls = list(getattr(profile, "pending_quote_images", []) or []) state.pending_requirements = list(getattr(profile, "pending_quote_requirements", []) or []) state.image_count = len(state.pending_image_urls) self._refresh_quote_phase(state) except Exception: pass @staticmethod def _refresh_quote_phase(state: ConversationState, phase_hint: str = ""): """统一维护收图报价状态机。""" QuoteStateMachine().refresh(state, phase_hint=phase_hint) def _should_defer_batch_quote(self, state: ConversationState, mark_ready: bool = False) -> bool: """ 批量报价延后控制: - 首次进入 ready_to_quote 时按配置等待 N 轮 - 等待轮次归零后,本轮即可报价 """ self.quote_state_machine.delay_turns = max(0, int(self.batch_quote_delay_turns)) return self.quote_state_machine.should_defer_batch_quote(state, mark_ready=mark_ready) def _mark_quote_ready(self, state: ConversationState): """仅标记 ready 状态,不消费等待轮次。""" self.quote_state_machine.delay_turns = max(0, int(self.batch_quote_delay_turns)) self.quote_state_machine.mark_ready(state) def _build_reject_message(self, reason: str = "") -> str: templates = [ "这类图文字内容太密了,我们这边不接这单哈,建议精简后再发我看看。", "这种密集文字/宣传栏类图片暂时做不了,抱歉啦,换一版简化内容我可以继续帮你看。", "这张文字信息太多,处理风险高,我们先不接,您可以先筛重点文字再发我。", ] msg = random.choice(templates) if reason: msg += f"({reason})" return msg def _is_batch_quote_enabled(self, customer_id: str, acc_id: str) -> bool: """灰度开关:按店铺白名单 + 客户哈希百分比控制新策略是否生效。""" try: from config.config import ( FEATURE_BATCH_QUOTE_ENABLED, FEATURE_BATCH_QUOTE_PERCENT, FEATURE_BATCH_QUOTE_SHOPS, ) if not FEATURE_BATCH_QUOTE_ENABLED: return False pct = max(0, min(100, int(FEATURE_BATCH_QUOTE_PERCENT))) if pct == 0: return False shops = [s.strip() for s in (FEATURE_BATCH_QUOTE_SHOPS or "").split(",") if s.strip()] if shops and (acc_id or "") not in shops: return False if pct >= 100: return True h = int(hashlib.md5((customer_id or "").encode("utf-8")).hexdigest()[:8], 16) % 100 return h < pct except Exception: return True def _detect_stage(self, message: str) -> str: """检测售前/售后""" # 系统订单通知不属于售后,单独处理 if "系统订单信息" in message: return "订单通知" after_sale_keywords = ["已下单", "已付款", "催一下", "发文件", "要修改", "不满意", "退款", "退货"] for keyword in after_sale_keywords: if keyword in message: return "售后" return "售前" def _get_system_prompt(self) -> str: base_prompt = """你是淘宝找原图店的客服,对外说帮客户找原图/高清图,实际是帮客户把模糊图处理成高清版,擅长快速成交。 核心原则:快、准、狠。**回复要像真人聊天,自然多变,禁止套模板、背台词。** 【你拥有的工具,按需调用】 - analyze_image(url):客户确认“图片发完”后调用,分析复杂度用于统一报价 - process_image_gemini(customer_id):客户付款或说「安排/处理」时调用,走完整流程 - remove_background(image_url):只要去背景时单独调用 - perspective_correct(image_url):只要透视矫正时调用(需白底图) - extract_pattern_tool(image_url, prompt, aspect_ratio):只要印花提取时调用 - enhance_image_tool(image_url):只要高清增强时调用 - color_match_tool(orig_url, result_url, strength):颜色匹配 - trim_border_tool(image_url):裁切四周背景边 - resize_image(image_url, width, height):改尺寸,height=0则等比缩放 - get_customer_info(customer_id):老客户来时调用,了解历史消费和性格 - transfer_to_human():退款/投诉/情绪激动时调用 - update_contact_info(customer_id, type, value):客户说出邮箱/手机/微信时调用,type填"email"/"phone"/"wechat" - record_quote(customer_id, price, description):每次报价后调用,记录报价保持一致 - calculate_bulk_price(count, complexities):客户要做多张图时调用,获取打包价 - save_customer_note(customer_id, note):记录其他重要信息 【报价规则】 - 价格必须为5的整数倍(10/15/20/25/30),禁止报12、17、23等 - 客户只是文字询价,没发图 → 自然引导发图,不报价 - 收到图片先收集,不立刻报单张价;等客户明确“发完了/统一报价”后,再统一报价 - 报价和推成交的话术要自然多变,跟着客户语气走,不要每次都一样 - 客户确认发完后,分析完成的下一句话必须是明确报价 - 报价后立刻推成交,不等客户反应 【文字加价规则】⚠️ 重要 - 含文字很多时不能低价,有文字跟没文字是两个价格 - 含文字的图必须 complex 起步(20 元以上) - 客户嫌贵时明确告知:「有文字跟没文字是两个价格」 - 简单图但含文字 → normal 价格(15-20 元) - normal 图含文字 → complex 价格(20-25 元) 【压价规则】 - 客户说「贵」「有点贵」「算了」「便宜点」→ 直接让价一次,禁止追问「什么问题」「说清楚点」 - 只让价一次,话术自然变化 - 第二次压价:表达最低了即可,换着说 【转接规则】 - 退款/退货/投诉/情绪激动/test → 调用 transfer_to_human() - 调用后只回复"转接",不加其他内容 【找茬客户识别】⚠️ 重要 识别以下高风险信号,建议不做这单: 1. 下单后立即申请退款 2. 从高价砍到低价(30→10 元) 3. 反复问"不满意可以退吗"(2 次以上) 4. 质疑服务内容("源文件还是什么") 5. 质疑价值("就一张图片") 6. 问"小一点就快一点的嘛"(想占便宜) 7. 重复问同一个问题(想找麻烦) 识别到以上 3 个以上信号 → 建议转人工或直接拒绝接单 话术:「不好意思,这单做不了」「去别家做吧」 【售后规则】 - 催进度:自然回复在做了/快了/马上好之类 - 要修改:自然问哪里要改 【禁忌】 - 没看到图不报价 - 不说"不行/不可以" - 不解释技术细节 - 不给价格区间 - 回复不超过2句话 - 绝对禁止输出任何内部独白或状态说明,包括但不限于:"无需回复""已完成""已经完成""不需要回复""流程结束""操作完成""任务完成""记录完成""报价已记录"等 - 每次必须输出真实的、发给客户看的回复文字,哪怕只有一句话""" base_prompt += f"\n\n【人设语气】\n- 人设:{self.reply_persona}\n- 语气像真人店主,不官腔,不机械,不背模板。" return self._attach_skill_docs(base_prompt, self.skill_pre_sales, self.skill_style) def _get_natural_reply_prompt(self) -> str: base = f"""你是淘宝店主客服,专门把系统给你的“回复意图”改写成自然的一句话或两句话。 人设:{self.reply_persona} 规则: - 只输出发给客户的话,不要解释你的思考。 - 口语化、简短、有温度,避免“这个需求我收到了”这类机械表达。 - 不要编造价格、订单、进度;只按输入意图表达。 - 默认不超过2句话。""" return self._attach_skill_docs(base, self.skill_style) def _get_after_sale_prompt(self) -> str: base = """你是淘宝客服的售后助手,负责售后阶段的自然沟通与处理进度反馈。 核心:简洁、自然、不解释技术细节、尽量不调用报价相关工具。 规则: - 已付款客户优先:确认安排、说明进度、承诺时间点 - 修改需求:礼貌询问具体改哪里,尽量一句话 - 催进度:自然回复在做了/快了/马上好,给预计时间 - 投诉/情绪激动/退款:转人工 - 输出不超过2句话,不说内部状态""" return self._attach_skill_docs(base, self.skill_after_sale, self.skill_style) def _get_pricing_prompt(self) -> str: try: from config.config import MIN_PRICE_FLOOR floor = MIN_PRICE_FLOOR except Exception: floor = 15 base = f"""你是淘宝客服的报价助手,负责在客户明确提到价格/询价时快速给出自然报价并推动成交。 规则: - 收到图片或历史有图片依据时尽量结合复杂度给出单价,价格为5的整数倍 - 没有图片时引导发图,不给价格区间 - 报价后紧跟一句推动成交,话术自然不重复,避免机械重复“最低了” - 客户说“有点贵/优惠点/两张优惠点”时,优先给打包价或数量优惠,不要只会拒绝 - 客户说“不放心/先看效果”时,先建立信任:可发案例链接 {CASE_LIBRARY_LINK},并说明不满意可退 - 可直接复用这条信任话术(按需微调,不要每次完全一样): 小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。 有什么想要的效果随时告诉我哈,我这边都可以按您的要求来做哦~/:065 效果不好不满意,我们这边包退的哦。 - 最低价不低于{floor}元,客户出价低于底线时礼貌拒绝(不好意思) - 输出不超过2句话""" return self._attach_skill_docs(base, self.skill_pricing, self.skill_style) def _get_processing_prompt(self) -> str: base = """你是淘宝客服的处理助手,负责在客户说安排/处理/开始做或已付款的场景下进行处理安排与进度反馈。 规则: - 已付款或明确要求开始时,确认安排并给预计时间点 - 可调用处理流程工具 - 投诉/退款时转人工 - 输出不超过2句话""" return self._attach_skill_docs(base, self.skill_after_sale, self.skill_style) def _get_similar_prompt(self) -> str: base = """你是淘宝客服的相似图助手,客户问“有一样的吗/类似的吗/同款吗”时,给出自然回复与参考建议。 规则: - 先确认可以找类似款,建议拍后我发参考图 - 如已知图案/类型,简要说明“同类型都有”,推动成交 - 输出不超过2句话""" return self._attach_skill_docs(base, self.skill_pre_sales, self.skill_style) def _get_order_prompt(self) -> str: base = """你是淘宝客服的订单助手,负责系统订单通知的处理。 规则: - 已付款时自然确认安排;其他状态静默(输出空字符串) - 输出不超过1句话""" return self._attach_skill_docs(base, self.skill_after_sale, self.skill_style) def _get_risk_prompt(self) -> str: base = """你是淘宝客服的风控助手,负责敏感/违规内容的前置拦截与替代话术。 规则: - 黄色/擦边/涉政/政治人物/政治事件/政治图片/地图类内容等不接单,礼貌拒绝 - 输出不超过1句话""" return self._attach_skill_docs(base, self.skill_risk, self.skill_style) @staticmethod def _is_political_inquiry(text: str) -> bool: """文本前置风控:政治人物/政治事件/政治图片相关询问一律拒绝。""" s = (text or "").strip().lower() if not s: return False kw = ( "政治", "涉政", "党政", "政治人物", "政治事件", "政治图片", "政治海报", "政治宣传", "领导人", "伟人", "元帅", "将军", "红色人物", "党史", "天安门", "人民大会堂", "中南海", "习近平", "毛泽东", "邓小平", "江泽民", "胡锦涛", "李克强", "周恩来", "特朗普", "拜登", "普京", "泽连斯基", "trump", "biden", "putin", "zelensky", "xi jinping", ) if any(k in s for k in kw): return True # 兜底:类似“有没有十大元帅的照片/图片” return bool(re.search(r"(元帅|将军|领导人|政治人物|政治事件).*(照片|图片|头像|原图)?", s)) @staticmethod def _is_map_inquiry(text: str) -> bool: """地图类需求一律拒绝(按业务规则)。""" s = (text or "").strip().lower() if not s: return False kw = ( "地图", "地形图", "行政区划图", "世界地图", "中国地图", "卫星地图", "导航图", "航海图", "作战地图", "军事地图", "map", "topographic map", "satellite map", ) return any(k in s for k in kw) def _get_customer_profile_context(self, customer_id: str) -> str: """从数据库读取客户画像,注入给 AI。含个性化语气、报价策略、主动预测、近期对话。""" try: from db.customer_db import db profile = db.get_customer(customer_id) if profile.blacklist: return f"【⚠️黑名单客户】原因:{profile.blacklist_reason or '已标记'},请转接人工处理,不要自动回复" lines = [] lines.append("=== 客户档案 ===") # 基础信息 basic_info = [] basic_info.append(f"客户ID: {customer_id}") basic_info.append(f"姓名: {profile.name or '未知'}") if profile.email: basic_info.append(f"邮箱: {profile.email}") if profile.phone: basic_info.append(f"电话: {profile.phone}") if profile.wechat: basic_info.append(f"微信: {profile.wechat}") lines.append(" | ".join(basic_info)) # 消费分析 consume_info = [] consume_info.append(f"客户等级: {profile.customer_level}级") if profile.vip: consume_info.append("VIP客户") consume_info.append(f"总订单: {profile.total_orders}单") consume_info.append(f"总消费: {profile.total_spent}元") if profile.total_orders > 0: consume_info.append(f"客单价: {profile.total_spent // profile.total_orders}元") lines.append("--- 消费分析 ---") lines.append(" | ".join(consume_info)) # 报价历史 price_info = [] if profile.vip_custom_price: price_info.append(f"VIP专属价: {profile.vip_custom_price}元(直接报这个价)") if profile.last_price: price_info.append(f"上次报价: {profile.last_price}元") if profile.lowest_price_accepted: price_info.append(f"历史最低成交: {profile.lowest_price_accepted}元") if profile.discount_given_count: price_info.append(f"历史让价: {profile.discount_given_count}次") if profile.price_sensitivity: price_info.append(f"价格敏感度: {profile.price_sensitivity}") if getattr(profile, "last_quote_no_convert", False): price_info.append("【策略】上次报价未成交,本次可降5-10元") if price_info: lines.append("--- 报价历史 ---") lines.append(" | ".join(price_info)) # 性格与决策 personality_info = [] if profile.personality: personality_info.append(f"性格: {'/'.join(profile.personality)}") if profile.decision_speed: personality_info.append(f"决策速度: {profile.decision_speed}") if profile.communication_prefer: personality_info.append(f"沟通偏好: {profile.communication_prefer}") if personality_info: lines.append("--- 性格特征 ---") lines.append(" | ".join(personality_info)) # 图片习惯 image_info = [] image_info.append(f"累计发图: {profile.total_images_sent}张") if profile.complexity_history: avg_complexity = self._calc_avg_complexity(profile.complexity_history) image_info.append(f"平均复杂度: {avg_complexity}") if profile.image_type_history: from collections import Counter top_types = Counter(profile.image_type_history).most_common(3) types_str = "、".join(f"{t}({c}次)" for t, c in top_types) image_info.append(f"常见类型: {types_str}") if profile.preferred_format: image_info.append(f"格式偏好: {profile.preferred_format}") if profile.preferred_size: image_info.append(f"尺寸要求: {profile.preferred_size}") if profile.last_image_url: image_info.append(f"最近发图: {profile.last_image_url[:60]}...") lines.append("--- 图片习惯 ---") lines.append(" | ".join(image_info)) # 当前任务状态 if profile.processing_status: task_info = [] task_info.append(f"状态: {profile.processing_status}") if profile.processing_image_url: task_info.append(f"处理中: {profile.processing_image_url[:40]}...") if profile.expected_done_at: task_info.append(f"预计完成: {profile.expected_done_at}") lines.append("--- 当前任务 ---") lines.append(" | ".join(task_info)) # 上次对话摘要 if profile.last_conversation_summary: time_str = "" if profile.last_conversation_time: try: t = datetime.fromisoformat(profile.last_conversation_time) diff = datetime.now() - t if diff.days > 0: time_str = f"({diff.days}天前)" else: h = diff.seconds // 3600 time_str = f"({h}小时前)" if h > 0 else "(刚刚)" except Exception: pass lines.append(f"--- 上次对话 {time_str} ---") lines.append(profile.last_conversation_summary) # 个性化回复策略 hints = [] if profile.personality: if "爽快" in profile.personality: hints.append("回复简洁直接,不废话,快速报价") if "砍价" in profile.personality or "砍价狂" in profile.personality: hints.append("报价时强调性价比,只让价一次,第二次引导去 xinhui.cloud") if "纠结" in profile.personality or "墨迹" in profile.personality: hints.append("多给一点说明,耐心回答") if profile.price_sensitivity == "高": hints.append("报价时顺带提「满意再拍」降低顾虑") if profile.decision_speed == "快": hints.append("直接报价推成交,少铺垫") if profile.total_orders > 0 and profile.decision_speed == "快": hints.append("老客爽快,直接报价成交") if hints: lines.append("--- 回复策略 ---") lines.append(";".join(hints)) # 主动推荐 proactive = [] if profile.bulk_potential == "有" or (profile.total_images_sent or 0) >= 2: proactive.append("可问「要做多张吗,多张有优惠」") if profile.upsell_opportunity: proactive.append(f"加购机会: {'、'.join(profile.upsell_opportunity)}") if proactive: lines.append("--- 主动推荐 ---") lines.append(";".join(proactive)) return "\n".join(lines) except Exception as e: print(f"[Agent] 获取客户画像失败: {e}") return "" def _calc_avg_complexity(self, complexity_history: list) -> str: """计算平均复杂度""" if not complexity_history: return "未知" level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4} label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"} try: avg = sum(level_map.get(c, 2) for c in complexity_history) / len(complexity_history) return label_map.get(round(avg), "一般") except Exception: return "一般" def _get_refusal_context_hint(self, customer_id: str, current_msg: str, profile_context: str) -> str: """ 检测「刚拒绝某张图 + 客户问能找到吗」场景,注入显式提示,避免前后矛盾。 原因:last_conversation_summary 异步更新可能滞后,message_histories 模型可能忽略。 """ ask_keywords = ["能找到吗", "可以吗", "有吗", "能做吗", "可以找吗", "可以弄吗"] if not any(kw in current_msg for kw in ask_keywords): return "" refusal_keywords = ["不做", "不接", "拒绝", "不做这类", "这类不做"] # 检查 profile 摘要(可能因异步更新而滞后) if any(kw in profile_context for kw in refusal_keywords): return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。" # 检查内存历史中最近几条消息(ModelResponse 含客服回复) history = self.message_histories.get(customer_id, []) for msg in reversed(history[-6:]): msg_str = str(msg) if any(kw in msg_str for kw in refusal_keywords): return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。" return "" def _get_conversation_context(self, customer_id: str, acc_id: str = "", limit: int = 12, max_len: int = 80) -> str: """ 每一次对话都从数据库加载近期对话,压缩后注入 prompt。 确保模型看到上下文,同时控制 token 消耗。 """ try: try: from config.config import CHAT_CONTEXT_LIMIT, CHAT_CONTEXT_TRUNCATE_LEN limit = CHAT_CONTEXT_LIMIT max_len = CHAT_CONTEXT_TRUNCATE_LEN except Exception: pass from db.chat_log_db import get_recent_conversation msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=limit) if not msgs: return "" lines = [] for m in msgs: role = "客" if m.get("direction") == "in" else "服" msg_text = (m.get("message") or "").strip().replace("\n", " ")[:max_len] if not msg_text: continue lines.append(f"{role}:{msg_text}") if not lines: return "" return "【近期】\n" + "\n".join(lines) + "\n\n" except Exception: return "" def _get_intent_emotion_hint(self, msg: str) -> str: """语义匹配:意图/情绪识别,注入提示。EMBEDDING_MODEL 未配置时用关键词。""" try: from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords, detect_emotion_embedding intent = detect_intent_embedding(msg) if not intent: intent = detect_intent_keywords(msg) emotion = detect_emotion_embedding(msg) if os.getenv("EMBEDDING_MODEL") else None parts = [] if intent: parts.append(f"意图:{intent}") if emotion: parts.append(f"情绪:{emotion}") if parts: return f"【当前消息】{', '.join(parts)}" except Exception: pass return "" # 简单打招呼类消息(在近期已回复后无需再回) _COOLDOWN_PATTERNS = [ "你好", "您好", "在吗", "在么", "在不在", "有人吗", "嗯", "嗯嗯", "好", "好的", "好哒", "ok", "OK", "okay", "谢谢", "谢谢你", "感谢", "收到", "知道了", "明白了", ] _COOLDOWN_SECONDS = 5 * 60 # 5 分钟内不重复回复纯打招呼 def _in_cooldown(self, state: ConversationState, msg: str) -> bool: """最近刚回复过 + 消息是纯打招呼 → True 静默""" if not state.last_reply_at: return False elapsed = (datetime.now() - state.last_reply_at).total_seconds() if elapsed > self._COOLDOWN_SECONDS: return False clean = msg.strip().rstrip("!!??。.~~") return clean in self._COOLDOWN_PATTERNS async def process_message(self, message: CustomerMessage) -> AgentResponse: """处理客户消息并生成回复""" trace_id = build_trace_id(message.acc_id, message.from_id, message.msg_id, message.msg[:64]) self._activity_log( "agent_inbound", trace_id=trace_id, acc_id=message.acc_id, customer_id=message.from_id, msg=message.msg, msg_type=message.msg_type, ) metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id) # 获取或创建对话状态 state = self._get_conversation_state(message.from_id) pre_response = await self.pre_rule_service.run(message=message, state=state, trace_id=trace_id) if isinstance(pre_response, AgentResponse): return pre_response # 检测售前/售后 new_stage = self._detect_stage(message.msg) if new_stage != state.stage: state.stage = new_stage state.last_update = datetime.now().isoformat() order_response = await handle_order_notification(self, message=message, state=state) if isinstance(order_response, AgentResponse): return order_response customer_text, _ = self._split_customer_text(message.msg) shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "") flow_response = await handle_find_image_batch_flow( self, message=message, state=state, customer_text=customer_text, shop_type=shop_type, ) if isinstance(flow_response, AgentResponse): return flow_response # 构建提示词(包含对话状态 + 客户画像) user_prompt = self._build_prompt(message, state) # 注入客户历史画像(个性化语气、报价策略、主动预测) profile_context = self._get_customer_profile_context(message.from_id) if profile_context: user_prompt = profile_context + "\n\n" + user_prompt # 前后一致提示:刚拒绝某张图后,客户问「能找到吗」等时,必须区分能做/不能做 refusal_hint = self._get_refusal_context_hint(message.from_id, message.msg, profile_context or "") if refusal_hint: user_prompt = refusal_hint + "\n\n" + user_prompt # 每一次对话都注入近期上下文,确保模型看到完整对话 conv_context = self._get_conversation_context(message.from_id, acc_id=message.acc_id or "") if conv_context: user_prompt = conv_context + user_prompt # 语义匹配:意图/情绪识别(配置 EMBEDDING_MODEL 时用 embedding,否则关键词) intent_hint = self._get_intent_emotion_hint(message.msg) if intent_hint: user_prompt = intent_hint + "\n\n" + user_prompt deps = AgentDeps( msg_id=message.msg_id, acc_id=message.acc_id, from_id=message.from_id, platform=message.acc_type ) # 取出该客户的历史对话,传给 AI 保持上下文 history = self.message_histories.get(message.from_id, []) self._log_block("PROMPT->AI 前置提示词", user_prompt) try: reply_text = await execute_ai_turn( self, message=message, state=state, user_prompt=user_prompt, deps=deps, history=history, ) except Exception as e: err_str = str(e) print(f"[Agent] AI 调用失败: {e},使用兜底回复") self._activity_log("agent_ai_error", customer_id=message.from_id, acc_id=message.acc_id, error=err_str) metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id) if "AccountOverdueError" in err_str or "overdue" in err_str.lower(): asyncio.create_task(_notify_wechat_overdue()) else: asyncio.create_task(_notify_wechat( f"⚠️ **AI调用异常**\n" f"客户:{message.from_id}\n" f"店铺:{message.acc_id}\n" f"错误:{err_str[:200]}", tag="AI异常" )) reply_text = None else: metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id) # AI 失败兜底:给一个不出错的万能回复 if not reply_text: fallback_text = await self._rewrite_reply_with_ai( message=message, state=state, reply="好嘞,你稍等下,我这边看一下", scene="fallback_reply", ) return AgentResponse( reply=fallback_text, should_reply=True, need_transfer=False ) return await finalize_ai_reply( self, message=message, state=state, reply_text=reply_text, ) def _detect_price(self, reply: str, state: ConversationState): """从回复中提取价格,同步写入客户数据库(价格必须为5的整数倍)""" import re numbers = re.findall(r'(\d+)[元]', reply) if numbers: price = round(int(numbers[0]) / 5) * 5 # 强制为5的整数倍 state.last_price = price metrics_emit("quote_generated", customer_id=state.customer_id, price=price) # 持久化到客户数据库,重启后仍可读取 try: from db.customer_db import db db.update_last_price(state.customer_id, price) except Exception: pass async def _check_order_amount(self, customer_id: str, order: dict, acc_id: str): """核查订单实付金额是否与报价一致,异常时企业微信预警""" try: import re from db.customer_db import db profile = db.get_customer(customer_id) quoted = profile.last_price # 上次报价(元) if not quoted: return # 从订单解析实付金额 raw_amount = order.get("amount", "") m = re.search(r'[\d.]+', str(raw_amount)) if not m: return paid = float(m.group()) print(f"[Agent] 订单金额核查:报价 {quoted}元 vs 实付 {paid}元(客户 {customer_id})") # 实付金额明显低于报价(低于报价的 60%)才预警 if paid < quoted * 0.6: msg = ( f"⚠️ **订单金额异常**\n" f"店铺:{acc_id}\n" f"客户:{customer_id}({profile.name or ''})\n" f"报价:{quoted}元\n" f"实付:{paid}元\n" f"差额:{quoted - paid:.1f}元 — 请人工核查" ) print(f"[Agent] {msg}") await _notify_wechat(msg) except Exception as e: print(f"[Agent] 订单金额核查失败: {e}") async def _record_deal_success( self, customer_id: str, customer_name: str, acc_id: str, platform: str, order: dict, state: "ConversationState", ): """成交时写入数据库,供日报与数据分析""" try: import re from db.deal_outcome_db import record_deal order_id = order.get("order_id", "") raw_amount = order.get("amount", "") m = re.search(r"[\d.]+", str(raw_amount)) amount = float(m.group()) if m else 0 reason = "让价后成交" if (state.discount_count or 0) > 0 else "直接成交" record_deal( customer_id=customer_id, outcome="成交", reason=reason, customer_name=customer_name or "", acc_id=acc_id or "", platform=platform or "", order_id=order_id, amount=amount, discount_given=(state.discount_count or 0) > 0, ) # 同步到客户库 try: from db.customer_db import db if order_id: db.add_order(customer_id, order_id, amount) db.clear_quote_no_convert(customer_id) except Exception: pass print(f"[Agent] 成交记录: {customer_id} {reason} {amount}元") except Exception as e: print(f"[Agent] 成交记录失败: {e}") async def _record_deal_fail( self, customer_id: str, customer_name: str, acc_id: str, platform: str, reason: str, ): """未成交时写入数据库,供日报与数据分析;标记报价未成交,下次可适当降低""" try: from db.deal_outcome_db import record_deal from db.customer_db import db record_deal( customer_id=customer_id, outcome="未成交", reason=reason, customer_name=customer_name or "", acc_id=acc_id or "", platform=platform or "", ) db.mark_quote_no_convert(customer_id) print(f"[Agent] 未成交记录: {customer_id} {reason}") except Exception as e: print(f"[Agent] 未成交记录失败: {e}") async def _auto_tag(self, message: CustomerMessage, reply: str, state: ConversationState): """自动识别并写入各类标签""" try: from db.customer_db import db cid = message.from_id msg = message.msg.lower() # 批量潜力 if any(kw in msg for kw in ["还有", "多张", "好几张", "一批", "下次还"]): db.set_bulk_potential(cid, "有") db.add_upsell_opportunity(cid, "批量打包") # 加购机会:问过PSD/分层 if any(kw in msg for kw in ["psd", "分层", "源文件"]): db.add_upsell_opportunity(cid, "分层PSD") db.update_preferred_format(cid, "psd") # 格式偏好 if "jpg" in msg or "jpeg" in msg: db.update_preferred_format(cid, "jpg") if "png" in msg: db.update_preferred_format(cid, "png") # 尺寸/分辨率偏好 if any(kw in msg for kw in ["分辨率", "dpi", "尺寸", "大图", "印刷"]): db.update_preferred_size(cid, message.msg[:30]) # 决策速度:收到报价后立刻说拍了 → 快 if any(kw in msg for kw in ["拍了", "下单了", "好的", "行"]) and state.last_price: db.update_decision_speed(cid, "快") # 图片类型识别(简单关键词匹配) type_keywords = { "印花": ["印花", "花纹", "图案", "面料", "布料", "纺织"], "logo": ["logo", "标志", "品牌", "商标"], "人物": ["人物", "人像", "照片", "脸", "头像"], "产品": ["产品", "商品", "包装", "实物"], "老照片": ["老照片", "旧照片", "发黄", "修复"], } for img_type, keywords in type_keywords.items(): if any(kw in message.msg for kw in keywords): db.add_image_type(cid, img_type) break # 定期自动计算衍生标签 db.auto_compute_tags(cid) except Exception: pass def _detect_discount(self, message: str, state: ConversationState): """检测压价,并持久化让价记录""" if any(kw in message for kw in ["贵", "便宜", "太贵", "有点贵"]): state.discount_count += 1 if state.last_price: try: from db.customer_db import db db.record_discount(state.customer_id, state.last_price) except Exception: pass # 客户明确给价(如“10元/10块/能10吗”) import re m = re.search(r'(\d+)\s*元|\b(\d+)\s*块', message) offer = None if m: offer = int(m.group(1) or m.group(2)) if offer: try: from config.config import MIN_PRICE_FLOOR if offer < MIN_PRICE_FLOOR: # 标记本次为超低出价(用于后续拒绝话术提示) state.last_price = state.last_price or 0 except Exception: pass def _negotiation_strategy_reply(self, customer_text: str, state: ConversationState) -> str: """ 价格谈判固定策略(优先级高于通用 AI 自由回复): - 有点贵:给两张打包建议价 - 优惠点:引导3张以上打包价 - 先发效果图:给固定案例链接并说明不满意包退 """ text = (customer_text or "").strip() if not text: return "" if any(k in text for k in ["先发效果图", "先看效果", "不放心", "没法确认"]): return random.choice([ f"小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。有什么想要的效果随时告诉我哈,不满意我们这边包退。", f"先给你看案例哈,链接在这({CASE_LIBRARY_LINK})。你想要什么效果直接说,我们按你要求做,不满意可退。", f"我把类似案例给你准备好了,点这个看就行({CASE_LIBRARY_LINK})。你放心说需求,效果不满意我们包退。", f"怕效果不稳的话你先看案例,链接在这里({CASE_LIBRARY_LINK})。你确认风格后我按你要的做,不满意可以退。", f"可以先看下我们做过的案例({CASE_LIBRARY_LINK})。你觉得方向OK再拍,不满意我们这边支持退。", ]) if "有点贵" in text or "就是贵" in text: # 约定示例:两张优惠价,默认45;若已有单张价格则动态估算两张打包价 base = state.last_price if isinstance(state.last_price, int) and state.last_price > 0 else 25 two_pack = max(10, round(((base * 2) - 5) / 5) * 5) return random.choice([ f"理解你这边的预算,我给你个实在点的:两张一起按 {two_pack} 元做,行不行?", f"我懂你意思,这样吧,两张一起我给你算 {two_pack} 元。", f"那我给你压一口价,两张打包 {two_pack} 元,你看可以我就开做。", f"没问题,我给你优惠点,两张一起按 {two_pack} 元走。", f"你这边要省点的话,两张一起我给你做到 {two_pack} 元。", ]) if any(k in text for k in ["优惠点", "便宜点", "少点", "打折"]): return random.choice([ "可以的,你这边数量上来我就好给价,3张以上我给你打包价。", "能优惠,做得多会更划算,3张以上我这边可以给你打包算。", "没问题,3张起我可以给你一口打包价,会比单张省一些。", "可以便宜点,按量走更好谈,3张以上我给你打包价。", "你这边如果是多张做,3张以上我能给你更划算的打包价。", ]) return "" def _parse_order_info(self, msg: str) -> dict: """从系统订单消息中提取所有字段""" import re info = {} m = re.search(r'订单号[::]\s*(\d+)', msg) if m: info['order_id'] = m.group(1) # 订单大状态(新订单/交易成功/交易关闭等) m = re.search(r'订单状态[::]\s*([^\s\[]+)', msg) if m: info['order_status'] = m.group(1).strip() # 支付细状态(等待买家付款/等待发货/交易完成等) m = re.search(r'\[状态[::]\s*([^\]]+)\]', msg) if m: info['pay_status'] = m.group(1).strip() # 金额 m = re.search(r'金额[::]\s*([\d.]+)元', msg) if m: info['amount'] = m.group(1) # 数量 m = re.search(r'数量[::]\s*(\d+)', msg) if m: info['quantity'] = m.group(1) # 时间(格式:2026-2-24 19:52:52) m = re.search(r'(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})', msg) if m: info['order_time'] = m.group(1).strip() # 买家备注 m = re.search(r'买家备注[::]\s*([^\n]+)', msg) if m and m.group(1).strip(): info['buyer_note'] = m.group(1).strip() return info def _get_order_instruction(self, pay_status: str, order_status: str) -> str: """ 根据订单状态生成 AI 指令。 只有「已付款」才需要回复客户,其他状态一律静默。 """ paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"] if any(kw in pay_status or kw in order_status for kw in paid_keywords): return "【已付款-必须回复】客户已付款,立刻自然回复确认收款并告知马上安排。" else: # 所有其他状态(待付款、交易完成、关闭等)静默处理 return "【仅系统通知-无需回复客户】这是系统订单通知,不需要回复客户任何内容,直接跳过。" def _extract_image_url(self, msg: str) -> str: """从消息中提取图片URL,兼容纯URL和 text#*#url 两种格式""" urls = self._extract_image_urls(msg) return urls[0] if urls else "" def _extract_image_urls(self, msg: str) -> List[str]: """提取消息中的所有图片URL(去重保序)。""" import re if not msg: return [] image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com", "suning.com") candidates = re.findall(r'https?://[^\s#]+', msg) urls: List[str] = [] for u in candidates: low = u.lower() if any(ext in low for ext in image_exts) or any(h in low for h in image_hosts): if u not in urls: urls.append(u) return urls def _strip_urls_from_text(self, msg: str) -> str: """去掉 URL 后的纯文本,用于提取额外需求。""" import re if not msg: return "" text = re.sub(r'https?://\S+', ' ', msg) text = text.replace("#*#", " ").strip() text = re.sub(r'\s+', ' ', text) return text.strip(",,。.!!??;;:: ") def _is_batch_finish_signal(self, text: str) -> bool: """客户是否表达“图发完了,可以统一报价”。""" if not text: return False if self._classify_short_customer_text(text) == "finish_signal": return True finish_keywords = [ "发完了", "都发完了", "发齐了", "齐了", "先这些", "就这些", "全部", "一起报", "统一报价", "总共多少钱", "一共多少钱", "打包价", "总价", "报价吧", "报个总价", "给个总价", "没了", "没有了", "没图了", "就这", "就这张", "就这一张", "就这一个", "就一个", "先报吧", "报下价", "报个价", "可以报价了", "能报吗", ] return any(k in text for k in finish_keywords) def _is_batch_finish_intent( self, text: str, state: ConversationState, has_incoming_urls: bool, ) -> bool: """ 语义结束识别: - 显式口令:发完了/统一报价 - 隐式意图:询价/砍价 - 单图需求明确:如“这个门头上面的字做一下”可直接进入报价 """ if not text: return False if self._is_batch_finish_signal(text): return True if has_incoming_urls: return False if not state.pending_image_urls: return False # 意图识别:询价/砍价通常意味着“可以报价了” try: from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords intent = detect_intent_embedding(text) or detect_intent_keywords(text) except Exception: intent = "" if intent in ("询价", "砍价"): return True msg = (text or "").strip() if not msg: return False # 单图场景:客户给出明确加工指令,可直接报价 single_image_action_kw = ( "做一下", "改一下", "处理一下", "就这张", "按这个做", "照这个做", "这个门头", "上面的字", "这个字", "这个图做", "能做吗", ) multi_image_finish_kw = ( "就这些", "就这几张", "按这几张", "这几张一起做", "一起做一下", "先按这些", "先按这几张", "直接报价", "现在报价", "看下报价", "先报个总价", "总价多少", "一起多少钱", "先做这几张", ) hold_kw = ("还有", "再发", "先等", "稍后", "等会", "回头") if len(state.pending_image_urls) == 1: if any(k in msg for k in single_image_action_kw) and not any(k in msg for k in hold_kw): return True elif len(state.pending_image_urls) >= 2: if any(k in msg for k in multi_image_finish_kw) and not any(k in msg for k in hold_kw): return True if self._is_cross_image_composite_intent(msg) and not any(k in msg for k in hold_kw): return True return False @staticmethod def _is_cross_image_composite_intent(text: str) -> bool: """ 识别多图跨图修改意图(A图元素放到B图)。 例:A图的图案转到B图、这个图案放到另一张上。 """ s = (text or "").strip() if not s: return False pair_marks = ("a图", "b图", "第一张", "第二张", "这张", "那张", "上一张", "另一张") op_kw = ( "转到", "换到", "放到", "贴到", "移到", "套到", "合成", "融合", "替换到", "图案上去", "字放到", "元素放到", "logo放到", ) return any(k in s.lower() for k in pair_marks) and any(k in s for k in op_kw) @staticmethod def _is_related_image_followup_intent(text: str) -> bool: """ 识别“新发的是上一张的截图/局部细节”的关联意图。 这类输入应与前图关联处理,避免当成完全独立需求。 """ s = (text or "").strip().lower() if not s: return False relation_kw = ( "截图", "截屏", "局部", "细节", "放大", "裁剪", "同一张", "同一幅", "上一张", "上张", "前一张", "前面那张", "刚才那张", "这个是上面", "这个是那张", "补一张细节", "补个截图", ) return any(k in s for k in relation_kw) @staticmethod def _is_result_followup_query(text: str) -> bool: """识别客户在找图流程中的结果/进度追问。""" short_type = CustomerServiceAgent._classify_short_customer_text(text) if short_type == "progress_query": return True s = (text or "").strip() if not s: return False followup_kw = ( "找到了吗", "没找到吗", "找到没", "找到没有", "找到了没", "有吗", "有没", "有没有", "有结果吗", "结果呢", "进度", "多久好", "什么时候好", "好了没", "弄好了吗", "做了没", "你重新发", "重新发给我", "高清", "发我", ) if any(k in s for k in followup_kw): return True return s in {"?", "?", "在吗", "人呢"} @staticmethod def _classify_short_customer_text(text: str) -> str: """ 短句分类器(状态机前置): - finish_signal: 发图完成,可报价 - progress_query: 追问进度/结果 - ack: 简短确认 - unknown: 未识别 """ s = (text or "").strip() if not s: return "unknown" if len(s) > 8: return "unknown" finish_kw = ( "没了", "没有了", "就这", "就这张", "就这一张", "就这一个", "就一个", "先这些", "就这些", "发完了", "都发完了", ) if any(k in s for k in finish_kw): return "finish_signal" progress_kw = ( "有吗", "有没", "有没有", "找到了吗", "找到了没", "没找到吗", "找到没", "找到没有", "进度", "结果", "多久好", "什么时候好", "好了没", "弄好了吗", "做了没", "高清", "发我", "重新发", "你重新发给我", ) if any(k in s for k in progress_kw) or s in {"?", "?", "在吗", "人呢"}: return "progress_query" ack_kw = ("嗯", "嗯嗯", "好", "好的", "行", "可以", "ok", "OK", "收到", "明白") if s in ack_kw: return "ack" return "unknown" def _build_collect_ack(self, count: int, related_followup: bool = False) -> str: if related_followup and count >= 2: related_templates = [ "这张我收到了,看起来是上一张的截图/细节图,我按同一单一起处理。还有补充就继续发。", "收到,这张是关联补图我记上了(按同一需求处理)。你还有图就继续发。", "明白,这张是前图的局部截图,我会和前面那张一起算,不会分开漏掉。", ] return random.choice(related_templates) if count <= 1: one_templates = [ "这张收到啦,还有图就继续发,我一起给你看。", "图我看到了,后面还有就接着发,最后我一口价给你。", "收到这张了,你有其他图也发来,我统一帮你算。", "这张我先记上了,你那边还有的话接着发,我一起给你报。", "第1张收到,你继续发就行,发完我这边一次给你算清楚。", "这张没问题,我先收着。要是还有图,你直接连着发我就行。", "我先看到了这张,你后面还有就一起发来,我统一给你报价。", "这张图我已经记下了,后面有补充就继续甩过来哈。", ] return random.choice(one_templates) templates = [ "这几张我都收到了(现在{n}张)。还有的话继续发,我一起给你报。", "好嘞,先看到{n}张了。你可以继续发,或者直接说“就这些”我现在就报价。", "收到哈(共{n}张)。你还要补图就继续发,不补的话我现在也可以直接给价。", "我这边先收到了{n}张。你继续补图,或者直接说“按这些算”我就开始报。", "这波我已经记了{n}张,你要是还有就接着发,不补的话我立刻给总价。", "先看到{n}张图了,后面你看是继续发,还是直接让我现在报价都可以。", "好的,目前{n}张到位。你一句“就这些”,我马上给你打包价。", "图我都看到了({n}张)。你还发我就继续收,不发我现在就给你报。", ] return random.choice(templates).format(n=count) def _build_collect_progress_reply(self, count: int) -> str: if count <= 1: templates = [ "我这边在处理了,这张有结果我第一时间回你。", "在跟进中,这张一有进展我马上发你。", "这张我正在看,稍等我一会儿,结果出来就回你。", ] return random.choice(templates) templates = [ "我这边在按你这{n}张一起处理,有结果我立刻同步你。", "正在跟进这{n}张,出结果我第一时间发你,不会漏。", "进度在跑了(共{n}张),你稍等一下,我这边有结果马上回。", ] return random.choice(templates).format(n=count) def _build_collect_remind(self, count: int) -> str: if count <= 1: one_templates = [ "这个要求我记住了。你还有图就继续发,不补图我就按这张给你报价。", "明白,这个需求我加上了。你继续发图也行,想直接报价也可以。", "我先记下这张。你如果是要我找图,不是做图,直接说一声,我按找图思路给你走。", "收到,这张我先按你的要求记好了。就做这一张的话,我现在直接给你报实价。", "你这要求我记下了,后面还有图就发,没有的话我现在直接算价。", "行,我按你这个要求来。继续补图也行,不补我就先报这张。", "这个点我懂了,你还要补图就接着发,不补我立刻给你报价。", "要求我已经加上了。你看是继续发,还是我现在直接报这张。", ] return random.choice(one_templates) templates = [ "需求我记下了(当前{n}张)。你继续补图,或者直接说“就这些”我现在报价。", "好,这个要求也加上了(现在{n}张)。不再补图的话我立刻给你打包价。", "收到(共{n}张)。你还发就继续,不发的话我现在就给总价。", "这个需求我加进去了(现在{n}张)。你继续发也行,直接报价也行。", "我这边都记好了({n}张+需求)。你一句“先按这些算”,我马上报价。", "要求同步好了,目前{n}张。要补图继续发,不补图我现在就给你打包价。", "行,需求和图片我都收着了({n}张)。你直接让我报价也可以。", "好的,这条需求也算进去了(共{n}张)。你看要不要我现在直接报。", ] return random.choice(templates).format(n=count) @staticmethod def _is_find_image_not_edit_conflict(text: str) -> bool: """识别客户明确声明“要找图,不是做图”的冲突语义。""" s = (text or "").strip() if not s: return False find_kw = ("找图", "找原图", "找素材", "找同款") deny_edit_kw = ("不是让你做图", "不是做图", "不用做图", "不需要做图", "不是修图", "不用修图") return any(k in s for k in find_kw) and any(k in s for k in deny_edit_kw) @staticmethod def _needs_clarification_in_collecting(text: str) -> bool: """ 信息不足时先追问,不急着报价。 例:这个也是大图 / 一共几个图 / 啥意思 / 没明白 """ s = (text or "").strip() if not s: return False short_non_vague_kw = ( "?", "?", "没了", "没有了", "就这", "行", "好的", "ok", "报价", "找到了吗", "没找到吗", "找到没", "找到了没", "有吗", "有没", "有没有", "多久好", "什么时候好", "高清", ) if len(s) <= 4: if any(k in s for k in short_non_vague_kw): return False return True vague_kw = ( "这个也是", "一共几个图", "几个图", "啥意思", "没明白", "什么意思", "这个呢", "这个可以吗", "然后呢", "咋办", "怎么搞", ) return any(k in s for k in vague_kw) def _build_find_image_clarify_reply(self, state: ConversationState) -> str: count = len(state.pending_image_urls or []) return ( f"明白,你是要我帮你找图,不是做图。现在我这边先记了{count}张," "你告诉我具体要找哪种:原图/同款/高清版,我按这个方向给你找。" ) @staticmethod def _build_not_understood_reply() -> str: """信息不足时的澄清话术(随机)。""" templates = [ "不好意思,不太懂你的意思,你再具体说下哈。", "抱歉我这边没完全理解,你可以换个说法再说一次吗?", "我有点没听明白,你是要找图还是要做图呀?", "不好意思我没抓到重点,你再补一句我就能接着处理。", "这句我理解得不太准,你再说具体一点我马上给你办。", "抱歉,这里我没太看懂。你是想让我找原图,还是按图处理?", "我这边还没完全明白你的意思,麻烦你再具体描述一下。", "不好意思,这条我没读懂,你再详细说一点我马上跟上。", ] return random.choice(templates) def _append_requirement(self, state: ConversationState, text: str): """追加需求并做去重/截断,减少上下文噪音。""" t = (text or "").strip() if not t: return t = t[:120] if state.pending_requirements and state.pending_requirements[-1] == t: return if t in state.pending_requirements[-5:]: return state.pending_requirements.append(t) if len(state.pending_requirements) > 20: state.pending_requirements = state.pending_requirements[-20:] def _calc_requirement_surcharge(self, requirements: List[str]) -> Dict[str, Any]: """ 把客户补充需求做成结构化加价,避免纯靠模型自由发挥导致价格波动。 返回: {"extra": int, "hits": List[str]} """ text = " ".join(requirements or []) rules = [ (["分层", "psd", "源文件"], 30, "分层/源文件"), (["去背景", "抠图", "透明底", "白底"], 5, "去背景"), (["换背景", "换场景", "合成", "转到", "换到", "放到", "贴到", "移到", "套到", "图案上去", "元素放到"], 10, "跨图合成/换背景"), (["改字", "改文字", "替换文字", "排版"], 10, "改文字/排版"), (["调色", "改色", "换色", "配色"], 5, "调色"), (["多版本", "多个版本", "两版", "三版"], 10, "多版本"), (["加急", "今天要", "马上要", "尽快"], 10, "加急"), ] total = 0 hits: List[str] = [] for keywords, fee, label in rules: if any(k in text for k in keywords): total += fee hits.append(f"{label}+{fee}") # 防止需求加价过高,做个上限保护 total = min(total, 60) # 金额统一 5 的倍数 total = round(total / 5) * 5 return {"extra": total, "hits": hits} def _build_batch_quote_reply( self, results: List[Tuple[str, Dict[str, Any]]], total_suggest: int, bundle_price: int, req_fee: Dict[str, Any], ) -> str: """构建分图明细 + 单条总报价可选项回复。""" complexity_map = { "simple": "简单", "normal": "常规", "complex": "复杂", "hard": "高难", } detail_lines: List[str] = [] for i, (_, r) in enumerate(results, 1): p = int(r.get("price_suggest", 20) or 20) cx = complexity_map.get(str(r.get("complexity", "normal")), "常规") reason = str(r.get("reason", "常规处理")).replace("\n", " ").strip() if len(reason) > 18: reason = reason[:18] + "..." detail_lines.append(f"图{i}:{p}元({cx},{reason})") extra = int(req_fee.get("extra", 0) or 0) single_total = round((total_suggest + extra) / 5) * 5 req_hit = "、".join(req_fee.get("hits", [])) if req_fee.get("hits") else "" # 单图时不要使用“分图/这批/A-B方案”措辞,避免客户误解为多图。 if len(results) == 1: line = detail_lines[0].replace("图1:", "这张:") heads = [ "这张我看过了,先给你报下:", "这张可以做,价格给你报下:", "看了这张图,报价如下:", "我先按这张给你算下:", "这张处理没问题,我给你报个实在价:", "我看完这张了,价格给你说下:", "按这张图的难度,报价是:", "这张我已经评估完了,先给你个价格:", ] lines = [f"{random.choice(heads)}{line.split(':', 1)[1]}"] if req_hit: lines.append(f"按你的需求另加{extra}元({req_hit})。") tails = [ f"这张做下来共{single_total}元,定了我马上开工。", f"合下来是{single_total}元,你点头我这边立刻安排。", f"总价{single_total}元,可以的话我现在就给你做。", f"这一张算下来{single_total}元,你说开做我就马上弄。", f"给你按{single_total}元做,确定的话我现在就排上。", f"这张我按{single_total}元给你做,没问题就直接开始。", f"这张最终{single_total}元,你点头我立刻开干。", f"这张就按{single_total}元走,你确认我就马上安排。", ] lines.append(random.choice(tails)) return "\n".join(lines) heads = [ "我先按这几张给你报一下:", "这几张我都看过了,价格给你列一下:", "我把每张价格先给你说清楚:", "我先把这几张的价格拆开给你看:", "这几张我都评估过了,报价给你写明白:", "先别急,我把每张大概价给你列出来:", "我按这批图先报个明细给你:", "我先把每张费用和总价给你算出来:", ] lines = [random.choice(heads)] lines.extend(detail_lines) if req_hit: lines.append(f"需求加价:+{extra}元({req_hit})") option_line = random.choice([ f"可选:按单张做(共{single_total}元),或打包做({bundle_price}元,会更省一点)。", f"可选:单张算下来一共{single_total}元;打包给你{bundle_price}元,更划算。", f"可选:你按单张做共{single_total}元,按打包做我给你{bundle_price}元。", f"可选:分开做总共{single_total}元,打包做{bundle_price}元(省一点)。", f"可选:按张算共{single_total}元;直接打包{bundle_price}元。", ]) lines.append(option_line) lines.append(random.choice([ "你定一个,我这边马上开工。", "你选个方案,我立刻给你安排上。", "你拍板就行,我这边马上开做。", "你看选哪个合适,我这边马上给你做。", "你一句话定下来,我现在就给你安排。", ])) return "\n".join(lines) def _prepare_batch_intake(self, state: ConversationState) -> Dict[str, Any]: """Stage 1: 收集阶段,标准化输入并做上限约束。""" urls = list(state.pending_image_urls) if not urls: return {"ok": False, "reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False} try: from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY max_images = max(1, int(BATCH_MAX_IMAGES)) analyze_concurrency = max(1, int(BATCH_ANALYZE_CONCURRENCY)) except Exception: max_images = 12 analyze_concurrency = 3 if len(urls) > max_images: return { "ok": False, "reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。", "need_transfer": False, } return { "ok": True, "urls": urls[:max_images], "requirements": list(state.pending_requirements or []), "analyze_concurrency": analyze_concurrency, } async def _run_batch_feasibility(self, urls: List[str], concurrency: int) -> List[Tuple[str, Dict[str, Any]]]: """Stage 2: 可做性分析(逐图)。""" from image.image_analyzer import image_analyzer sem = asyncio.Semaphore(max(1, concurrency)) async def _analyze_one(url: str): async with sem: try: r = await image_analyzer.analyze(url) except Exception: r = { "complexity": "normal", "reason": "识别异常,按常规估价", "price_min": 15, "price_max": 25, "price_suggest": 20, "success": False, } return url, r return list(await asyncio.gather(*[_analyze_one(u) for u in urls])) async def _sync_batch_analysis_to_workflow(self, results: List[Tuple[str, Dict[str, Any]]], message: CustomerMessage) -> None: for url, r in results: try: from core.workflow import workflow await workflow.image_analysis_result( customer_id=message.from_id, image_url=url, complexity=r.get("complexity", "normal"), acc_id=message.acc_id, acc_type=message.acc_type, gemini_prompt=r.get("gemini_prompt", ""), aspect_ratio=r.get("aspect_ratio", "1:1"), perspective=r.get("perspective", "no"), proc_type=r.get("proc_type", ""), subject=r.get("subject", ""), quality=r.get("quality", ""), ) except Exception as e: print(f"[Agent] Workflow 批量任务创建失败: {e}") def _assess_batch_risk(self, results: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, List[str]]: """Stage 2.5: 分离可做和风险图。""" unsafe: List[str] = [] dense_text_reject: List[str] = [] for i, (_, r) in enumerate(results, 1): if r.get("feasibility") == "no" or r.get("risk") == "high": unsafe.append(f"图{i}") note = str(r.get("note", "") or "") if "文字内容过于密集" in note or "密集文字" in note: dense_text_reject.append(f"图{i}") return {"unsafe": unsafe, "dense_text_reject": dense_text_reject} def _build_batch_pricing_plan( self, results: List[Tuple[str, Dict[str, Any]]], requirements: List[str], ) -> Dict[str, Any]: """Stage 3: 报价计算(图片成本 + 需求加价 + 打包价)。""" total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results) req_fee = self._calc_requirement_surcharge(requirements) if len(results) == 2: bundle_price = max(10, total_suggest - 5) elif len(results) >= 3: bundle_price = max(10, round(total_suggest * 0.9 / 5) * 5) else: bundle_price = total_suggest bundle_price += int(req_fee.get("extra", 0) or 0) bundle_price = round(bundle_price / 5) * 5 return { "total_suggest": total_suggest, "req_fee": req_fee, "bundle_price": bundle_price, } async def _try_batch_auto_process( self, results: List[Tuple[str, Dict[str, Any]]], message: CustomerMessage, req_fee: Dict[str, Any], ) -> Dict[str, Any]: """Stage 4-A: 自动处理+图绘链接。失败时回退到需求澄清。""" links = [] try: from image.image_processor import image_processor from utils.image_queue import run_with_queue for idx, (url, r) in enumerate(results, 1): req_parts = [f"complexity:{r.get('complexity', 'normal')}"] if r.get("gemini_prompt"): req_parts.append(f"prompt:{r.get('gemini_prompt')}") if r.get("aspect_ratio"): req_parts.append(f"ratio:{r.get('aspect_ratio')}") if r.get("perspective") and r.get("perspective") != "no": req_parts.append(f"perspective:{r.get('perspective')}") if r.get("proc_type"): req_parts.append(f"proc_type:{r.get('proc_type')}") if r.get("subject"): req_parts.append(f"subject:{r.get('subject')}") if r.get("quality"): req_parts.append(f"quality:{r.get('quality')}") process_res = await run_with_queue(image_processor.process_image( url, "enhance", requirements="|".join(req_parts), gemini_prompt=r.get("gemini_prompt", ""), aspect_ratio=r.get("aspect_ratio", "1:1"), perspective=r.get("perspective", "no"), proc_type=r.get("proc_type", ""), subject=r.get("subject", ""), quality=r.get("quality", ""), )) if not process_res.get("success"): raise RuntimeError(process_res.get("message", "图片处理失败")) ok, link, _ = await upload_to_tuhui( process_res["result_path"], title=f"客户{message.from_id[-4:]}-图片{idx}", description="AI自动处理结果", price=max(10, int(r.get("price_suggest", 20) or 20) + int(req_fee.get("extra", 0) or 0) // max(1, len(results))), ) if not ok: raise RuntimeError(str(link)) links.append(link) except Exception as e: print(f"[Agent] 找图自动处理失败,回退需求澄清: {e}") return { "reply": "这种可以做类似款。你先说下具体需求:要几张、是否改字、尺寸比例、交付格式(单图/打包链接),我按需求给你直接做。", "need_transfer": False, } lines = ["找到了,链接如下:"] for i, link in enumerate(links, 1): lines.append(f"链接{i}:{link}") return {"reply": "\n".join(lines), "need_transfer": False} def _finalize_batch_state(self, state: ConversationState, customer_id: str, final_price: int = 0): if final_price > 0: state.last_price = final_price try: from db.customer_db import db db.update_last_price(customer_id, final_price) except Exception: pass state.pending_image_urls.clear() state.pending_requirements.clear() self._refresh_quote_phase(state, "idle") self._sync_pending_quote_state(customer_id, state) async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]: """ 统一报价主流程(分层): 1) Intake 收集 2) Feasibility 可做性 3) Pricing 报价 4) Router 自动处理/报价/转人工 """ intake = self._prepare_batch_intake(state) if not intake.get("ok", False): return {"reply": intake.get("reply", ""), "need_transfer": bool(intake.get("need_transfer", False))} urls = intake["urls"] requirements = intake["requirements"] analyze_concurrency = int(intake["analyze_concurrency"]) results = await self._run_batch_feasibility(urls=urls, concurrency=analyze_concurrency) await self._sync_batch_analysis_to_workflow(results=results, message=message) risk = self._assess_batch_risk(results) unsafe = risk["unsafe"] dense_text_reject = risk["dense_text_reject"] if unsafe: self._finalize_batch_state(state, message.from_id, final_price=0) if dense_text_reject and len(dense_text_reject) == len(unsafe): return {"reply": self._build_reject_message("文字密集类图片暂不接单"), "need_transfer": False} return { "reply": f"这批里{'、'.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。", "need_transfer": True, } pricing = self._build_batch_pricing_plan(results=results, requirements=requirements) total_suggest = int(pricing["total_suggest"]) bundle_price = int(pricing["bundle_price"]) req_fee = pricing["req_fee"] intent_text = (message.msg or "") + " " + " ".join(requirements[-5:]) workflow_type, _ = self.workflow_router.detect_workflow(intent_text) if workflow_type == "find_image": route_res = await self._try_batch_auto_process( results=results, message=message, req_fee=req_fee, ) self._finalize_batch_state(state, message.from_id, final_price=bundle_price) return route_res reply_text = self._build_batch_quote_reply( results=results, total_suggest=total_suggest, bundle_price=bundle_price, req_fee=req_fee, ) self._finalize_batch_state(state, message.from_id, final_price=bundle_price) return {"reply": reply_text, "need_transfer": False} def _split_customer_text(self, msg: str) -> tuple: """ 把混合消息拆分为(客户真实文字, 系统订单块)。 平台有时把客户文字和系统订单通知拼在同一条消息里。 """ import re # 找到系统订单块的起始位置 order_marker = re.search(r'\[系统订单信息\]|\[系统通知\]', msg) if order_marker: customer_text = msg[:order_marker.start()].strip() order_block = msg[order_marker.start():].strip() else: customer_text = msg.strip() order_block = "" return customer_text, order_block def _build_prompt(self, message: CustomerMessage, state: ConversationState) -> str: """构建提示词""" msg_content = message.msg stage_info = f"【当前阶段】{state.stage}" # 拆分:客户文字 vs 系统订单块 customer_text, order_block = self._split_customer_text(msg_content) has_order = bool(order_block) if has_order: order = self._parse_order_info(order_block) if order.get('order_id'): state.last_order_id = order['order_id'] stage_info += f"\n【订单号】{order['order_id']}" if order.get('order_status'): state.order_status = order['order_status'] stage_info += f"\n【订单状态】{order['order_status']}" if order.get('pay_status'): stage_info += f"\n【支付状态】{order['pay_status']}" if order.get('amount'): stage_info += f"\n【订单金额】{order['amount']}元" if order.get('quantity'): stage_info += f"\n【数量】{order['quantity']}件" if order.get('order_time'): stage_info += f"\n【下单时间】{order['order_time']}" if order.get('buyer_note'): stage_info += f"\n【买家备注】{order['buyer_note']}" if state.discount_count > 0: stage_info += f"\n【客户压价次数】{state.discount_count}" # 店铺类型:不同店铺不同回复策略 shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "") shop_hint = "" try: from config.config import CONFIG_DIR import json cfg_path = CONFIG_DIR / "shop_prompts.json" if cfg_path.exists(): with open(cfg_path, "r", encoding="utf-8") as f: cfg = json.load(f) hints = cfg.get("type_hints", {}) shop_hint = hints.get(shop_type, "") if not shop_hint and message.acc_id: sh = cfg.get("shops", {}).get(message.acc_id, {}) shop_hint = sh.get("hint", "") except Exception: pass prompt = f"""收到新消息: {stage_info} 发送者: {message.from_name} ({message.from_id}) """ if message.goods_name: prompt += f"商品名称: {message.goods_name}\n" if shop_hint: prompt += f"\n{shop_hint}\n" # ── 优先处理客户真实问题 ── # ── 判断订单付款状态(供后续逻辑使用)── order_paid = False order_unpaid = False if has_order: order = self._parse_order_info(order_block) paid_kws = ["等待发货", "已付款", "付款成功", "买家已付款"] unpaid_kws = ["等待买家付款", "待付款", "未付款"] ps = order.get('pay_status', '') os_ = order.get('order_status', '') if any(kw in ps or kw in os_ for kw in paid_kws): order_paid = True elif any(kw in ps or kw in os_ for kw in unpaid_kws): order_unpaid = True # ── 催单/进度询问关键词 ── progress_keywords = [ "安排了吗", "安排好了吗", "好了吗", "做了吗", "做好了吗", "弄好了吗", "好了没", "做了没", "什么时候好", "多久好", "进度", "催一下", "快点", "什么时候能好", "做完了吗", ] if customer_text: prompt += f"\n客户说:{customer_text}\n" image_url = self._extract_image_url(customer_text) price_keywords = ["多少钱", "多少", "价格", "几块", "怎么收费", "报个价"] size_keywords = [ "尺寸", "比例", "宽", "高", "米", "厘米", "mm", "cm", "横版", "竖版", "2米", "3米", "改成", "做成", ] has_size_change = any(kw in customer_text.lower() for kw in [k.lower() for k in size_keywords]) # gemini_api 店铺:不触发找图流程,按 API 客服回复 if shop_type == "gemini_api": prompt += "\n【Gemini API 店铺】客户问账号/pro/续费/套餐等,按 API 客服自然回复,不要求发图。" elif image_url: prompt += "\n客户在继续发图阶段:先确认“已收图”,并引导客户把图和要求一次发完;等客户明确“发完了/统一报价”后再统一报价。" elif any(kw in customer_text for kw in price_keywords): last_url = self._extract_image_url(msg_content) if last_url: prompt += "\n客户在询问价格:若客户已确认发完,则给总报价;若还在发图,先引导发完后统一报价。" else: prompt += "\n客户在询问价格但未发图:先简短承接(如“在看呢/收到”),不要机械连发;再自然引导对方发图。" if has_size_change: prompt += ( "\n⚠️ 尺寸改动场景:优先判断图片主体是否会被拉伸变形," "不是只看整张图宽高比。若会变形,要先提示“需要补图/扩边”,再给报价。" ) elif has_size_change: prompt += ( "\n客户在改尺寸/改比例:先按主体比例判断是否会变形," "不是只看整图比例。若目标尺寸会拉伸主体,先明确说明要补图(如上下补图/扩边)再报价。" ) elif any(kw in customer_text for kw in progress_keywords): # 客户问进度/催单,必须先核查付款状态 if order_unpaid: prompt += "\n⚠️【订单未付款】客户问安排进度,但订单还未付款。自然告知拍下付款后马上安排即可。" elif order_paid: prompt += "\n客户催单,订单已付款,自然回复在做了/快了之类。" else: prompt += "\n客户催单,查询当前处理状态后自然回复。" elif any(kw in customer_text for kw in ["贵", "有点贵", "太贵", "算了", "便宜点", "少点", "打折", "贵哦"]): # 客户嫌贵/要放弃 → 直接让价一次,不问「什么问题」 prompt += "\n⚠️【客户嫌贵】客户已表达价格顾虑或要放弃。禁止追问「什么问题」「说清楚点」。\n→ 第一步:直接让价一次(如原价 20→15),话术自然。\n→ 第二步:如果让价后客户还是嫌贵,引导客户自己去注册星绘平台:\n - 每个话术都必须带网址 xinhui.cloud\n - 强调「自己去弄」「自己操作」「自己下单」\n - 示例:「嫌贵就去 xinhui.cloud 注册个号,自己有余额可以弄」「xinhui.cloud 去注册下,新用户送体验金,自己弄便宜」\n - 语气直接,让客户自己去操作」" elif any(kw in customer_text for kw in ["擦边", "黄色", "色情", "大尺度", "性感图", "露点", "半裸"]): # 客户问擦边/黄色内容 → 直接拒绝,不说「发图来看看」 prompt += "\n⚠️【拒绝】客户询问擦边/黄色/敏感内容。直接拒绝,不接单,不说「发图来看看」。自然回复如:这类不做/不接/做不了。" else: prompt += "\n根据客户说的内容自然回应,像真人聊天,不要套模板。" # ── 附加订单信息(不覆盖客户问题的优先级)── if has_order: order = self._parse_order_info(order_block) order_instruction = self._get_order_instruction( order.get('pay_status', ''), order.get('order_status', '') ) if customer_text: if not order_unpaid: # 未付款情况已在上面明确处理,不重复添加背景 prompt += f"\n\n【背景参考-订单通知】{order_instruction}" else: # 纯系统通知,没有客户文字 prompt += f"\n\n{order_instruction}" if not customer_text and not has_order: prompt += f"\n消息内容: {msg_content}\n请按工作流规则回复。" return prompt async def _handle_image_workflow(self, message: str, data: dict, image_urls: list) -> bool: """处理图片工作流(根据客户说的话判断执行哪种工作流)""" if not image_urls: return False workflow_type, confidence = self.workflow_router.detect_workflow(message) customer_id = data.get('from_id') acc_id = data.get('acc_id', '') acc_type = data.get('acc_type', 'AliWorkbench') image_url = image_urls[0] print(f"[Agent] 检测到工作流类型:{workflow_type} (置信度:{confidence})") if workflow_type == "find_image": print(f"[Agent] 执行查找图片工作流 | 客户:{customer_id}") from core.workflow import workflow return await workflow.find_image_workflow( customer_id=customer_id, image_url=image_url, acc_id=acc_id, acc_type=acc_type ) elif workflow_type == "process_image": print(f"[Agent] 执行处理图片工作流 | 客户:{customer_id}") from core.workflow import workflow return await workflow.process_image_workflow( customer_id=customer_id, image_url=image_url, acc_id=acc_id, acc_type=acc_type ) elif workflow_type == "transfer_human": print(f"[Agent] 执行转人工派单工作流 | 客户:{customer_id}") from core.workflow import workflow return await workflow.transfer_to_designer_workflow( customer_id=customer_id, image_url=image_url, acc_id=acc_id, acc_type=acc_type, reason="客户主动要求转人工" ) return False async def test_agent(): """测试 Agent""" agent = CustomerServiceAgent(skills_dir="skills") test_msg = CustomerMessage( msg_id="123", acc_id="test_account", msg="这张图可以做吗?", from_id="customer001", from_name="张三", cy_id="customer001", acc_type="AliWorkbench", msg_type=0, cy_name="张三", goods_name="专业找图代找高清图片", goods_order="" ) response = await agent.process_message(test_msg) print(f"回复内容: {response.reply}") if __name__ == "__main__": import asyncio asyncio.run(test_agent())