Files
tw/core/pydantic_ai_agent.py

3550 lines
166 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""PydanticAI Agent 模块
架构:单 Agent + 多 Tool 模式
- Agent 负责对话逻辑和决策
- Tool 负责具体能力:看图/查客户/转接
- AI 自主决定何时调用哪个工具,时序自然,不需要外部协调
"""
import os
import glob
import asyncio
import random
import hashlib
import re
import json
import logging
from pathlib import Path
from typing import Optional, Dict, List, Any, Tuple
from datetime import datetime
from pydantic import BaseModel, Field, model_validator
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
from dotenv import load_dotenv
from utils.metrics_tracker import emit as metrics_emit
load_dotenv()
from services.service_tuhui_upload import upload_to_tuhui
from core.workflow_router import get_workflow_router
from core.workflow_router import get_workflow_router
from db.customer_risk_db import risk_db
# ========== 企业微信通知 ==========
_WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "")
logger = logging.getLogger("cs_agent")
async def _notify_wechat(content: str, tag: str = "通知"):
"""发送企业微信 markdown 通知,任何异常都发"""
if not _WECHAT_WEBHOOK:
print(f"[{tag}] 未配置 WECHAT_WEBHOOK跳过推送")
return
try:
import httpx
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(_WECHAT_WEBHOOK, json={
"msgtype": "markdown",
"markdown": {"content": content}
})
data = resp.json()
if data.get("errcode") == 0:
print(f"[{tag}] 企业微信推送成功 ✓")
else:
print(f"[{tag}] 企业微信推送失败: {data}")
except Exception as e:
print(f"[{tag}] 企业微信发送异常: {e}")
async def _notify_wechat_overdue():
"""API 欠费时发企业微信通知"""
await _notify_wechat(
"⚠️ **火山引擎 API 欠费**客服AI已停止响应请立即充值\n"
"地址https://console.volcengine.com/ark"
)
# ========== 转接常量 ==========
TRANSFER_MESSAGE = "话术|[转移会话],分组20252916034,无原因"
CASE_LIBRARY_LINK = "https://www.yuque.com/zuowei-dfvpq/kge0in/mynala0g35b8cec5"
TAOBAO_REPLY_TAILS = ("", "", "好的", "嗯咯", "嗯啦")
def _ensure_taobao_reply_tail(text: str) -> str:
"""淘宝口吻收尾:最终回复结尾带简短口语尾词。"""
t = (text or "").strip()
if not t:
return ""
transfer_keywords = ("TRANSFER_REQUESTED", "[转移会话]", "转移会话")
if any(k in t for k in transfer_keywords):
return t
trimmed = t.rstrip(" \t\r\n。.!~")
if any(trimmed.endswith(tail) for tail in TAOBAO_REPLY_TAILS):
return trimmed
return f"{trimmed} 好的"
# ========== 数据模型 ==========
class CustomerMessage(BaseModel):
"""客户消息模型"""
msg_id: str
acc_id: str
msg: str
from_id: str
from_name: str
cy_id: str
acc_type: str
msg_type: int
cy_name: str
goods_name: Optional[str] = None
goods_order: Optional[str] = None
class ConversationState(BaseModel):
"""对话状态"""
customer_id: str
stage: str = "售前" # 售前/售后
last_price: Optional[int] = None # 最后报价
last_min_price: Optional[int] = None # 最近图片的最低价
last_order_id: Optional[str] = None # 订单号
order_status: Optional[str] = None # 订单状态
discount_count: int = 0 # 让价次数
image_count: int = 0 # 图片数量
pending_image_urls: List[str] = Field(default_factory=list) # 待统一报价图片
pending_requirements: List[str] = Field(default_factory=list) # 待统一报价需求
quote_phase: str = "idle" # idle/collecting/ready_to_quote/waiting_result
quote_ready_turns: int = 0 # ready_to_quote 阶段还需等待的消息轮次
last_update: str = ""
last_reply_at: Optional[datetime] = None # 最后一次回复客户的时间
class AgentDeps(BaseModel):
"""Agent 依赖项 - 用于传递上下文"""
msg_id: str
acc_id: str
from_id: str
platform: str
class AgentResponse(BaseModel):
"""Agent 回复模型"""
reply: str
should_reply: bool = True
need_transfer: bool = False # 是否需要转人工
transfer_msg: str = "" # 转接消息
@model_validator(mode="after")
def _ensure_reply_tail(self):
if self.should_reply and self.reply:
self.reply = _ensure_taobao_reply_tail(self.reply)
return self
def _get_shop_type(acc_id: str = "", goods_name: str = "") -> str:
"""根据 acc_id 或 goods_name 判断店铺类型,返回 gemini_api / find_image / default"""
try:
from config.config import CONFIG_DIR
import json
cfg_path = CONFIG_DIR / "shop_prompts.json"
if not cfg_path.exists():
return "find_image"
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
shops = cfg.get("shops", {})
goods_kw = cfg.get("goods_keywords", {})
type_hints = cfg.get("type_hints", {})
# 优先按 acc_id
if acc_id and acc_id in shops:
return shops[acc_id].get("type", "find_image")
# 按商品名关键词
goods_lower = (goods_name or "").lower()
for kw, stype in goods_kw.items():
if kw in goods_lower:
return stype
except Exception:
pass
return "find_image"
def load_skill_map(skills_dir: str = "skills") -> Dict[str, str]:
"""按技能目录名加载 SKILL.md返回 {skill_name: content}。"""
skill_map: Dict[str, str] = {}
skill_files = glob.glob(os.path.join(skills_dir, "**/SKILL.md"), recursive=True)
for skill_file in skill_files:
try:
content = Path(skill_file).read_text(encoding="utf-8")
skill_name = Path(skill_file).parent.name.strip().lower()
if not skill_name:
continue
if skill_name in skill_map:
skill_map[skill_name] += "\n\n" + content
else:
skill_map[skill_name] = content
except Exception as e:
print(f"警告: 读取 {skill_file} 失败: {e}")
return skill_map
class CustomerServiceAgent:
"""客服 Agent - 支持 SKILL.md + 工作流"""
C_RESET = "\033[0m"
C_PROMPT = "\033[96m" # cyan
C_THINK = "\033[95m" # magenta
C_TOOL = "\033[93m" # yellow
C_REPLY = "\033[92m" # green
C_MUTED = "\033[90m" # gray
_DEFAULT_EVOLUTION_CANDIDATE = Path("config") / "evolution_candidate.json"
@staticmethod
def _activity_log(event: str, **kwargs):
safe = {}
for k, v in kwargs.items():
if isinstance(v, str):
safe[k] = v[:240]
else:
safe[k] = v
try:
logger.info(f"[ACTIVITY] event={event} data={json.dumps(safe, ensure_ascii=False)}")
except Exception:
logger.info(f"[ACTIVITY] event={event} data={safe}")
def __init__(self, skills_dir: str = "skills"):
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
self.model_name = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
self.reply_persona = os.getenv("AI_REPLY_PERSONA", "淘宝老店主,直爽利落,口语自然")
self.dynamic_collection_replies = os.getenv("AI_DYNAMIC_COLLECTION_REPLIES", "true").strip().lower() in {"1", "true", "yes", "on"}
self.rewrite_all_replies = os.getenv("AI_REWRITE_ALL_REPLIES", "true").strip().lower() in {"1", "true", "yes", "on"}
try:
self.batch_quote_delay_turns = max(0, int(os.getenv("BATCH_QUOTE_DELAY_TURNS", "1")))
except Exception:
self.batch_quote_delay_turns = 1
if not self.api_key:
raise ValueError("请设置 OPENAI_API_KEY 环境变量")
# 对话状态管理
self.conversations: Dict[str, ConversationState] = {}
# 多轮对话历史PydanticAI ModelMessage 列表按客户ID存储
self.message_histories: Dict[str, list] = {}
self.evolution_candidate = self._load_evolution_candidate()
# 加载技能并按角色拆分,避免所有 Agent 吃同一份大杂烩提示词
self.skill_map = load_skill_map(skills_dir)
self.skill_style = self._compose_skill_content(["style-skill", "owner-style"])
self.skill_pre_sales = self._compose_skill_content(["pre-sales-skill"])
self.skill_pricing = self._compose_skill_content(["pricing-skill"])
self.skill_after_sale = self._compose_skill_content(["after-sales-skill"])
self.skill_risk = self._compose_skill_content(["risk-skill"])
# 创建 OpenAI 模型
model = OpenAIChatModel(
model_name=self.model_name,
provider=OpenAIProvider(
api_key=self.api_key,
base_url=self.base_url
)
)
self.agent = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=self._get_system_prompt()
)
self.agent_after_sale = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=self._get_after_sale_prompt()
)
self.agent_pricing = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=self._get_pricing_prompt()
)
self.agent_processing = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=self._get_processing_prompt()
)
self.agent_similar = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=self._get_similar_prompt()
)
self.agent_natural_reply = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=self._get_natural_reply_prompt()
)
# 工作流程路由器
self.workflow_router = get_workflow_router()
self.agent_order = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=self._get_order_prompt()
)
self.agent_risk = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=self._get_risk_prompt()
)
# 注册工具
self._register_tools()
def _compose_skill_content(self, names: List[str]) -> str:
"""按技能名拼接技能文本,找不到则跳过。"""
parts: List[str] = []
for name in names:
key = (name or "").strip().lower()
if key and key in self.skill_map:
parts.append(self.skill_map[key])
return "\n\n".join(parts)
@staticmethod
def _attach_skill_docs(prompt: str, *skill_docs: str) -> str:
docs = [d for d in skill_docs if d]
if not docs:
return prompt
return prompt + "\n\n=== 角色技能 ===\n" + "\n\n".join(docs)
def _load_evolution_candidate(self) -> Dict[str, Any]:
"""读取自我进化候选配置(灰度策略),读取失败时返回空。"""
try:
path = Path(os.getenv("EVOLUTION_CANDIDATE_PATH", str(self._DEFAULT_EVOLUTION_CANDIDATE)))
if not path.exists():
return {}
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {}
return data
except Exception:
return {}
def _evolution_gray_percent(self) -> int:
"""灰度比例,默认 5%"""
try:
env_pct = os.getenv("EVOLUTION_GRAY_PERCENT", "").strip()
if env_pct:
pct = int(float(env_pct))
else:
pct = int(((self.evolution_candidate or {}).get("gray_percent", 5)))
return max(0, min(100, pct))
except Exception:
return 5
def _evolution_enabled_for_customer(self, customer_id: str) -> bool:
"""按客户哈希稳定灰度命中,命中后启用候选策略。"""
cand = self.evolution_candidate or {}
if str(cand.get("status", "")).strip() != "ready_for_gray_5_percent":
return False
if not customer_id:
return False
pct = self._evolution_gray_percent()
if pct <= 0:
return False
digest = hashlib.md5(customer_id.encode("utf-8")).hexdigest()
bucket = int(digest[:8], 16) % 100
hit = bucket < pct
if hit:
metrics_emit("evolution_gray_hit", customer_id=customer_id, percent=pct, version=str(cand.get("version", "")))
return hit
def _evolution_has_proposal(self, proposal_id: str) -> bool:
cand = self.evolution_candidate or {}
for p in cand.get("proposals", []) or []:
if str((p or {}).get("id", "")).strip() == proposal_id:
return True
return False
@staticmethod
def _is_service_risk_inquiry(text: str) -> bool:
"""识别退款/投诉等服务风险场景。"""
s = (text or "").strip().lower()
if not s:
return False
kw = ("退款", "退货", "投诉", "差评", "举报", "欺骗", "骗人", "起诉", "法院", "生气", "不满意")
return any(k in s for k in kw)
@staticmethod
def _log_block(title: str, content: str):
"""统一的控制台分层日志输出。"""
print(f"{CustomerServiceAgent.C_PROMPT}[{title}]{CustomerServiceAgent.C_RESET}")
print(content)
print(f"{CustomerServiceAgent.C_MUTED}────────────────────{CustomerServiceAgent.C_RESET}")
@staticmethod
def _normalize_reply_text(text: Optional[str]) -> str:
"""清洗模型输出,避免把占位词直接发给客户。"""
if text is None:
return ""
cleaned = str(text).strip()
if cleaned.lower() in {"", "none", "null", "n/a"}:
return ""
return cleaned
@staticmethod
def _colloquialize_reply(text: str) -> str:
"""把常见机械表达柔化为更口语的客服话术。"""
t = (text or "").strip()
if not t:
return t
repl = {
"确认我就安排": "你点头我就开做",
"可以的话我马上安排": "可以我就马上给你做",
"我这边马上安排": "我马上安排",
"立刻统一报价": "马上给你报价",
"统一报价": "一起给你报价",
"": "",
"请您": "",
"可选A": "可选:",
"流程完成": "已经安排好了",
}
for k, v in repl.items():
t = t.replace(k, v)
return t
async def _render_collection_reply_with_ai(
self,
*,
message: CustomerMessage,
state: ConversationState,
scene: str,
intent_hint: str,
fallback: str,
) -> str:
"""
收图阶段回复默认走 AI 改写,失败时回退到固定模板。
"""
# 首张收图先承接“我看一下”,避免机械地立刻催“发完统一报价”。
if scene == "collect_ack" and len(state.pending_image_urls) <= 1:
first_ack = [
"收到了,我先看一下哈,稍等哈",
"这张我收到了,我先看下,等我一下哈",
"收到这张了,我先过一眼,稍等哈",
"我先看这张哈,稍等我一下",
"图我收到了,我先看一眼,马上回你哈",
"这张先记上了,我先看下细节,稍等哈",
"收到哈,我先过一遍这张,等我会儿",
"我先看这张效果,稍等一下哈",
"图到了,我先看下清晰度,稍等哈",
"这张我先看着,稍等我一下就回你",
"收到这张了,我先核一下细节,稍等哈",
"我先把这张看完,稍等我一会儿哈",
]
return random.choice(first_ack)
if not self.dynamic_collection_replies:
return fallback
try:
deps = AgentDeps(
msg_id=message.msg_id,
acc_id=message.acc_id,
from_id=message.from_id,
platform=message.acc_type,
)
history = self.message_histories.get(message.from_id, [])
pending_req = "".join((state.pending_requirements or [])[-4:]) or ""
user_prompt = (
"请按下面意图生成给客户的自然回复。\n"
f"场景: {scene}\n"
f"回复意图: {intent_hint}\n"
f"客户原话: {message.msg}\n"
f"当前已收图片数: {len(state.pending_image_urls)}\n"
f"当前需求摘要: {pending_req}\n"
"输出要求: 不超过2句话像真人店主聊天。"
)
result = await self.agent_natural_reply.run(user_prompt, deps=deps, message_history=history)
self.message_histories[message.from_id] = result.all_messages()[-30:]
text = self._colloquialize_reply(self._normalize_reply_text(result.output))
if not text:
return fallback
transfer_keywords = ("TRANSFER_REQUESTED", "[转移会话]", "转移会话")
if any(k in text for k in transfer_keywords):
return fallback
return text
except Exception:
return fallback
async def _rewrite_reply_with_ai(
self,
*,
message: CustomerMessage,
state: ConversationState,
reply: str,
scene: str = "final_reply",
) -> str:
"""
对最终回复做 AI 润色,统一口吻。失败时返回原文。
"""
text = (reply or "").strip()
if not text or not self.rewrite_all_replies:
return text
transfer_keywords = ("TRANSFER_REQUESTED", "[转移会话]", "转移会话")
if any(k in text for k in transfer_keywords):
return text
try:
deps = AgentDeps(
msg_id=message.msg_id,
acc_id=message.acc_id,
from_id=message.from_id,
platform=message.acc_type,
)
history = self.message_histories.get(message.from_id, [])
pending_req = "".join((state.pending_requirements or [])[-4:]) or ""
prompt = (
"请把下面这句客服回复润色成更自然的微信聊天口吻,语义必须保持一致。\n"
f"场景: {scene}\n"
f"客户原话: {message.msg}\n"
f"当前已收图: {len(state.pending_image_urls)}\n"
f"当前需求摘要: {pending_req}\n"
f"原回复: {text}\n"
"要求: 不要新增承诺/价格/流程不超过2句话只输出润色后的最终回复。"
)
result = await self.agent_natural_reply.run(prompt, deps=deps, message_history=history)
self.message_histories[message.from_id] = result.all_messages()[-30:]
polished = self._colloquialize_reply(self._normalize_reply_text(result.output))
if not polished:
return text
if any(k in polished for k in transfer_keywords):
return text
return polished
except Exception:
return text
def _register_tools(self):
"""注册所有 Tool让 Agent 可以主动调用"""
@self.agent.tool
async def analyze_image(ctx: RunContext[AgentDeps], image_url: str) -> str:
"""
分析客户发来的图片复杂度,用于报价。
收到图片URL时调用此工具返回复杂度和建议报价。
"""
try:
from image.image_analyzer import image_analyzer
result = await image_analyzer.analyze(image_url)
complexity_label = {
"simple": "简单(画面干净)",
"normal": "一般复杂度",
"complex": "细节偏多",
"hard": "非常复杂",
}.get(result["complexity"], result["complexity"])
# 持久化图片URL和复杂度重启后仍能记住这张图
try:
from db.customer_db import db
db.update_last_image(
ctx.deps.from_id,
image_url,
complexity=result["complexity"],
gemini_prompt=result.get("gemini_prompt", ""),
aspect_ratio=result.get("aspect_ratio", "1:1"),
perspective=result.get("perspective", "no"),
)
except Exception:
pass
# 存图片类型到客户画像
try:
from db.customer_db import db as _db
if result.get("subject"):
_db.add_image_type(ctx.deps.from_id, result["subject"])
except Exception:
pass
# 在 workflow 里创建待处理任务(付款后自动触发 Gemini
try:
from core.workflow import workflow
await workflow.image_analysis_result(
customer_id=ctx.deps.from_id,
image_url=image_url,
complexity=result["complexity"],
acc_id=ctx.deps.acc_id,
acc_type=ctx.deps.platform,
gemini_prompt=result.get("gemini_prompt", ""),
aspect_ratio=result.get("aspect_ratio", "1:1"),
perspective=result.get("perspective", "no"),
proc_type=result.get("proc_type", ""),
subject=result.get("subject", ""),
quality=result.get("quality", ""),
)
print(f"[Agent] Workflow 任务已创建 | 客户: {ctx.deps.from_id} | 比例: {result.get('aspect_ratio')} | 透视: {result.get('perspective')} | 图片: {image_url[:60]}...")
except Exception as e:
print(f"[Agent] Workflow 任务创建失败: {e}")
# 组装给 AI 的分析报告
risk = result.get("risk", "none")
has_face = result.get("has_face", "no")
feasibility = result.get("feasibility", "yes")
note = result.get("note", "")
lines = [
f"图片主体:{result['subject'] or '未识别'}",
f"处理类型:{result['proc_type'] or '高清修复'}",
f"原图质量:{result['quality'] or '未知'}",
f"图片类型:{result.get('category', '') or '通用'}",
f"图片尺寸:{(result.get('width') or 0)}x{(result.get('height') or 0)}{result.get('megapixels', 0.0)}MP",
f"含人脸:{'' if has_face == 'yes' else ''}",
f"复杂度:{complexity_label}",
f"原因:{result['reason']}",
]
if result.get("size_surcharge"):
lines.append(f"尺寸加价:+{result['size_surcharge']}")
if result.get("size_note"):
lines.append(f"尺寸提示:{result['size_note']}")
try:
st = self._get_conversation_state(ctx.deps.from_id)
if isinstance(result.get("price_min"), (int, float)):
st.last_min_price = int(result.get("price_min") or 0)
try:
from db.customer_db import db as _db
_db.update_last_min_price(ctx.deps.from_id, st.last_min_price)
except Exception:
pass
except Exception:
pass
# 根据可做性和风险等级给 AI 不同的行动指引
if feasibility == "no":
if "敏感" in (note or ""):
lines.append("【拒绝】图片含敏感/黄色/擦边内容,不接单。")
lines.append("→ 直接拒绝,不说「发图来看看」,自然回复如:这类不做/不接。")
else:
lines.append("【无法处理】此图无法处理(纯黑/纯白/完全损坏/要找原始RAW文件")
lines.append("→ 告知客户无法处理,建议换图或说明原因,不要报价。")
elif risk == "high":
lines.append(f"【高风险】此图处理风险高:{note or 'AI修复后效果不能保证与原图一致'}")
lines.append(f"建议报价:{result['price_suggest']}")
lines.append("→ 先自然说明风险(人脸/效果可能不完美),再报价,满意再拍。话术自然。")
elif risk == "low":
lines.append(f"【低风险-含人脸】修复后人脸相似度约70-90%,效果不稳定。")
lines.append(f"建议报价:{result['price_suggest']}")
lines.append(f"→ 报价时自然加一句风险提示(人脸可能有轻微变化、满意再付等)")
else:
# 无风险,正常报价
base_price = result.get('price_suggest', 20)
text_surcharge = result.get('text_surcharge', 0)
layer_surcharge = result.get('layer_surcharge', 0)
total_price = base_price + text_surcharge + layer_surcharge
# 构建报价说明
price_explanation = f"建议报价:{total_price}"
if text_surcharge > 0:
price_explanation += f"(含文字处理 +{text_surcharge}元)"
if layer_surcharge > 0:
price_explanation += f"(含分层 +{layer_surcharge}元)"
lines.append(price_explanation)
# 添加文字数量说明
text_amount = result.get('text_amount', 'none')
if text_amount != 'none':
lines.append(f"文字数量:{text_amount},需要精细处理")
if feasibility == "partial":
lines.append(f"⚠️ 此图有一定难度:{note or '效果可能不完美'},回复时可加「效果不满意退款」")
if note and note not in ("", ""):
lines.append(f"提示:{note}")
lines.append(f"【立刻回复客户报价 {total_price} 元,话术自然多变】")
return "\n".join(lines)
except Exception as e:
return f"图片分析失败: {e},请根据经验判断报价"
@self.agent.tool
async def get_customer_info(ctx: RunContext[AgentDeps], customer_id: str) -> str:
"""
查询客户历史信息:消费记录、性格标签、报价历史等。
对话开始时或需要了解客户背景时调用。
"""
try:
from db.customer_db import db
return db.get_profile_text(customer_id)
except Exception as e:
return f"查询失败: {e}"
@self.agent.tool
async def transfer_to_human(ctx: RunContext[AgentDeps]) -> str:
"""
转接人工客服。
遇到退款/投诉/情绪激动/复杂售后时调用。
"""
return "TRANSFER_REQUESTED"
@self.agent.tool
async def get_customer_risk_profile(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
"""查询客户风控画像:退款/不付款/差评/人工黑名单等。"""
cid = customer_id or ctx.deps.from_id
try:
info = risk_db.evaluate_customer(cid)
return (
f"客户:{cid}\n"
f"不接单:{'' if info.get('do_not_serve') else ''}\n"
f"风险等级:{info.get('computed_level','low')} 分数:{info.get('computed_score',0)}\n"
f"近30天退款:{info.get('refund_30d',0)}\n"
f"近7天未付款下单:{info.get('unpaid_7d',0)}\n"
f"近90天差评:{info.get('bad_review_90d',0)}\n"
f"备注:{info.get('note','') or ''}"
)
except Exception as e:
return f"查询风控画像失败: {e}"
@self.agent.tool
async def mark_customer_risk(
ctx: RunContext[AgentDeps],
customer_id: str,
do_not_serve: bool = False,
risk_level: str = "low",
risk_score: int = 0,
note: str = "",
tag: str = "",
) -> str:
"""人工标记客户风控画像(不接单/高风险/备注标签)。"""
try:
tags = [tag] if tag else []
risk_db.set_profile(
customer_id=customer_id,
do_not_serve=do_not_serve,
risk_level=risk_level,
risk_score=risk_score,
note=note,
tags=tags,
)
return "风控画像已更新"
except Exception as e:
return f"更新风控画像失败: {e}"
@self.agent.tool
async def record_customer_risk_event(
ctx: RunContext[AgentDeps],
customer_id: str,
event_type: str,
event_count: int = 1,
note: str = "",
) -> str:
"""记录风控事件refund/unpaid_order/bad_review/blacklist_hit 等。"""
try:
risk_db.record_event(
customer_id=customer_id,
event_type=event_type,
event_count=event_count,
note=note,
)
return "风控事件已记录"
except Exception as e:
return f"记录风控事件失败: {e}"
@self.agent.tool
async def save_customer_note(
ctx: RunContext[AgentDeps],
customer_id: str,
note: str
) -> str:
"""
记录客户关键信息到画像(邮箱/微信/特殊需求等)。
客户提供联系方式或重要信息时调用。
"""
try:
from db.customer_db import db
db.add_note(customer_id, note)
return "已记录"
except Exception as e:
return f"记录失败: {e}"
@self.agent.tool
async def update_contact_info(
ctx: RunContext[AgentDeps],
customer_id: str,
contact_type: str,
value: str
) -> str:
"""
更新客户联系方式。
当客户说出邮箱/手机/微信时调用,比正则提取更准确。
contact_type 枚举值:
email - 邮箱
phone - 手机号
wechat - 微信号
"""
try:
from db.customer_db import db
if contact_type == "email":
db.update_email(customer_id, value)
elif contact_type == "phone":
db.update_phone(customer_id, value)
elif contact_type == "wechat":
db.update_wechat(customer_id, value)
else:
return f"未知联系方式类型: {contact_type}"
return f"已保存 {contact_type}: {value}"
except Exception as e:
return f"保存失败: {e}"
@self.agent.tool
async def record_quote(
ctx: RunContext[AgentDeps],
customer_id: str,
price: int,
description: str = ""
) -> str:
"""
记录本次报价到客户画像,用于后续对话保持价格一致。
每次给客户报价后调用。
Args:
customer_id: 客户ID
price: 报价金额(元)
description: 报价描述,如"单图处理"/"三图打包"
"""
try:
from db.customer_db import db
db.update_last_price(customer_id, price)
if description:
db.add_note(customer_id, f"报价 {price}元({description}")
# 同步到内存状态
state = self.conversations.get(customer_id)
if state:
state.last_price = price
return f"已记录报价 {price}"
except Exception as e:
return f"记录失败: {e}"
@self.agent.tool
async def process_image_gemini(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
"""
触发 Gemini 作图处理。客户付款后或说「安排一下」「处理一下」时调用。
会从客户档案读取上次发图的 URL 和处理参数(提示词、比例、透视),启动 Gemini 流程。
处理完成后会自动发图给客户。
"""
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不自动作图"
except Exception:
return "现在处理模块暂时暂停,先不自动作图"
cid = customer_id or ctx.deps.from_id
try:
from core.workflow import workflow
ok = await workflow.trigger_processing_on_payment(
customer_id=cid,
acc_id=ctx.deps.acc_id,
acc_type=ctx.deps.platform,
)
if ok:
return "已安排,稍后发你"
return "该客户暂无待处理图片,请先发图"
except Exception as e:
return f"触发作图失败: {e},请稍后重试或转人工"
@self.agent_pricing.tool
async def analyze_image_pricing(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from image.image_analyzer import image_analyzer
result = await image_analyzer.analyze(image_url)
if result.get("feasibility") == "no" or result.get("risk") == "high":
return "该图风险高或不可做:不报价,建议换图或转人工评估。"
if not result.get("success", False):
return "图片识别异常:先不报价,建议客户重发更清晰图片。"
p = result.get("price_suggest", 20)
try:
st = self._get_conversation_state(ctx.deps.from_id)
if isinstance(result.get("price_min"), (int, float)):
st.last_min_price = int(result.get("price_min") or 0)
try:
from db.customer_db import db as _db
_db.update_last_min_price(ctx.deps.from_id, st.last_min_price)
except Exception:
pass
except Exception:
pass
return f"建议报价:{p}"
except Exception as e:
return f"图片分析失败: {e}"
@self.agent_pricing.tool
async def record_quote_pricing(
ctx: RunContext[AgentDeps],
customer_id: str,
price: int,
description: str = ""
) -> str:
try:
from db.customer_db import db
db.update_last_price(customer_id, price)
return "ok"
except Exception as e:
return f"记录失败: {e}"
@self.agent_processing.tool
async def process_image_gemini_run(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
"""触发 Gemini 作图处理processing agent 专用入口)。"""
return await process_image_gemini(ctx, customer_id)
@self.agent_similar.tool
async def recommend_similar(ctx: RunContext[AgentDeps], hint: str = "") -> str:
try:
return "有类似款,拍下我发你参考图。"
except Exception as e:
return f"推荐失败: {e}"
@self.agent_order.tool
async def handle_order(ctx: RunContext[AgentDeps], raw_msg: str = "") -> str:
try:
info = self._parse_order_info(raw_msg or "")
paid_kw = ["等待发货", "已付款", "付款成功", "买家已付款"]
if any(k in (info.get("pay_status", "") or "") for k in paid_kw) or any(k in (info.get("order_status", "") or "") for k in paid_kw):
return "已安排,稍后发你"
return ""
except Exception:
return ""
@self.agent_risk.tool
async def risk_filter(ctx: RunContext[AgentDeps], text: str = "") -> str:
return "这类不做哈,政治/敏感内容都不接。"
@self.agent_risk.tool
async def get_customer_risk_profile_risk(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
return await get_customer_risk_profile(ctx, customer_id)
@self.agent_risk.tool
async def mark_customer_risk_risk(
ctx: RunContext[AgentDeps],
customer_id: str,
do_not_serve: bool = False,
risk_level: str = "low",
risk_score: int = 0,
note: str = "",
tag: str = "",
) -> str:
return await mark_customer_risk(
ctx=ctx,
customer_id=customer_id,
do_not_serve=do_not_serve,
risk_level=risk_level,
risk_score=risk_score,
note=note,
tag=tag,
)
@self.agent_risk.tool
async def record_customer_risk_event_risk(
ctx: RunContext[AgentDeps],
customer_id: str,
event_type: str,
event_count: int = 1,
note: str = "",
) -> str:
return await record_customer_risk_event(
ctx=ctx,
customer_id=customer_id,
event_type=event_type,
event_count=event_count,
note=note,
)
@self.agent.tool
async def remove_background(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】去背景,输出白底图。客户只要去背景时调用。"""
try:
from image.image_tools import remove_background as _rb
r = await _rb(image_url)
if r["success"]:
return f"去背景完成,已保存。自然回复客户好了发你"
return f"去背景失败:{r['message']}"
except Exception as e:
return f"去背景失败:{e}"
@self.agent.tool
async def perspective_correct(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】透视矫正。输入需白底图,输出展平图。"""
try:
from image.image_tools import perspective_correct as _pc
r = await _pc(image_url)
if r["success"]:
return f"透视矫正完成。自然回复客户好了"
return f"透视矫正失败:{r['message']}"
except Exception as e:
return f"透视矫正失败:{e}"
@self.agent.tool
async def extract_pattern_tool(
ctx: RunContext[AgentDeps],
image_url: str,
prompt: str = "",
aspect_ratio: str = "1:1"
) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】印花提取/主处理。按提示词和比例处理。"""
try:
from image.image_tools import extract_pattern
r = await extract_pattern(image_url, prompt=prompt, aspect_ratio=aspect_ratio)
if r["success"]:
return f"提取完成。自然回复客户好了发你"
return f"提取失败:{r['message']}"
except Exception as e:
return f"提取失败:{e}"
@self.agent.tool
async def enhance_image_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】高清增强。客户只要清晰化时调用。"""
try:
from image.image_tools import enhance_image
r = await enhance_image(image_url)
if r["success"]:
return f"高清增强完成。自然回复客户好了"
return f"增强失败:{r['message']}"
except Exception as e:
return f"增强失败:{e}"
@self.agent.tool
async def color_match_tool(
ctx: RunContext[AgentDeps],
orig_url: str,
result_url: str,
strength: float = 0.75
) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】颜色匹配。将 result 色调匹配到 orig。"""
try:
from image.image_tools import color_match_images
r = await color_match_images(orig_url, result_url, strength=strength)
if r["success"]:
return f"颜色匹配完成"
return f"颜色匹配失败:{r['message']}"
except Exception as e:
return f"颜色匹配失败:{e}"
@self.agent.tool
async def trim_border_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】裁切四周背景边(白/黄/米等)。"""
try:
from image.image_tools import trim_border
r = await trim_border(image_url)
if r["success"]:
return f"裁边完成"
return f"裁边失败:{r['message']}"
except Exception as e:
return f"裁边失败:{e}"
@self.agent.tool
async def vectorize_to_eps_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】矢量化 - 将图片转为 EPS 矢量文件。客户要做矢量图、转 EPS、转 AI 格式时调用。"""
try:
from image.image_tools import vectorize_to_eps
r = await vectorize_to_eps(image_url)
if r["success"]:
return f"矢量化完成,已生成 EPS 文件。自然回复客户好了发你"
return f"矢量化失败:{r['message']}"
except Exception as e:
return f"矢量化失败:{e}"
@self.agent.tool
async def meitu_enhance_tool(
ctx: RunContext[AgentDeps],
image_url: str,
mode: str = "standard"
) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""
【独立工具】美图画质增强。客户要画质增强、清晰化、美图处理时调用。
Args:
image_url: 图片 URL 或本地路径
mode: 处理模式。crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化)
"""
try:
from image.image_tools import meitu_enhance
r = await meitu_enhance(image_url, mode=mode)
if r["success"]:
return f"画质增强完成。自然回复客户好了发你"
return f"画质增强失败:{r['message']}"
except Exception as e:
return f"画质增强失败:{e}"
@self.agent.tool
async def resize_image(
ctx: RunContext[AgentDeps],
image_url: str,
width: int,
height: int = 0
) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""
改图片尺寸。客户说「改成1920x1080」「弄成横图」「改下尺寸」时调用。
Args:
image_url: 图片URL客户刚发的图或从对话中获取
width: 目标宽度(像素),如 1920
height: 目标高度0=按宽度等比缩放),如 1080
常用尺寸1920x1080(横屏) 1080x1920(竖屏) 2000x2000(方图)
"""
try:
from image.image_processor import image_processor
result = await image_processor.resize(image_url, width, height)
if result["success"]:
return f"改尺寸完成:{width}x{height},已保存。自然回复客户改好了"
else:
return f"改尺寸失败:{result['message']},告知客户稍后重试"
except Exception as e:
return f"改尺寸失败:{e}"
@self.agent.tool
async def calculate_bulk_price(
ctx: RunContext[AgentDeps],
image_count: int,
complexities: str = ""
) -> str:
"""
计算多图打包价格。
客户要做多张图时调用,返回建议总价。
Args:
image_count: 图片数量
complexities: 各图复杂度,逗号分隔,如 "normal,complex,simple"
没有识别结果时留空,按平均价格估算
"""
if image_count <= 0:
return "图片数量无效"
# 各复杂度单价必须为5的整数倍
unit_price = {"simple": 15, "normal": 20, "complex": 25, "hard": 30}
default_unit = 20 # 没有识别结果时的默认单价
if complexities:
levels = [c.strip() for c in complexities.split(",")]
total = sum(unit_price.get(lv, default_unit) for lv in levels)
else:
total = image_count * default_unit
# 打包优惠3张以上9折5张以上8折价格必须为5的整数倍
if image_count >= 5:
discounted = round(total * 0.8 / 5) * 5
tip = f"{image_count}张8折优惠"
elif image_count >= 3:
discounted = round(total * 0.9 / 5) * 5
tip = f"{image_count}张9折优惠"
else:
discounted = round(total / 5) * 5
tip = ""
return f"建议打包报价:{discounted}{tip}(原价{total}元)"
# 对话状态超过多少小时后重置(避免昨天的售后状态影响今天)
CONVERSATION_TIMEOUT_HOURS = 12
def _get_conversation_state(self, customer_id: str) -> ConversationState:
"""获取或创建对话状态,超时自动重置"""
now = datetime.now()
if customer_id in self.conversations:
state = self.conversations[customer_id]
# 超过 12 小时没有消息,重置阶段和压价次数
if state.last_update:
try:
last = datetime.fromisoformat(state.last_update)
hours = (now - last).total_seconds() / 3600
if hours > self.CONVERSATION_TIMEOUT_HOURS:
state.stage = "售前"
state.discount_count = 0
# 同时清理对话历史,避免发送过期上下文
self.message_histories.pop(customer_id, None)
except Exception:
pass
# 进程内状态为空时,尝试从持久化恢复
if not state.pending_image_urls and not state.pending_requirements:
self._restore_pending_quote_state(customer_id, state)
else:
self.conversations[customer_id] = ConversationState(
customer_id=customer_id,
last_update=now.isoformat()
)
self._restore_pending_quote_state(customer_id, self.conversations[customer_id])
# 定期清理长期不活跃客户(超过 7 天)
self._cleanup_inactive(now)
return self.conversations[customer_id]
def _cleanup_inactive(self, now: datetime):
"""清理超过 7 天没有消息的对话状态,释放内存"""
# 每 100 次调用清理一次,避免每次都遍历
if len(self.conversations) % 100 != 0:
return
expired = [
cid for cid, state in self.conversations.items()
if state.last_update and
(now - datetime.fromisoformat(state.last_update)).days > 7
]
for cid in expired:
self.conversations.pop(cid, None)
self.message_histories.pop(cid, None)
def _sync_pending_quote_state(self, customer_id: str, state: ConversationState):
"""把待报价队列同步到客户库,避免重启丢失。"""
try:
self._refresh_quote_phase(state)
from db.customer_db import db
db.update_pending_quote_state(
customer_id,
state.pending_image_urls,
state.pending_requirements,
)
except Exception:
pass
def _restore_pending_quote_state(self, customer_id: str, state: ConversationState):
"""从客户库恢复待报价队列。"""
try:
from db.customer_db import db
profile = db.get_customer(customer_id)
state.pending_image_urls = list(getattr(profile, "pending_quote_images", []) or [])
state.pending_requirements = list(getattr(profile, "pending_quote_requirements", []) or [])
state.image_count = len(state.pending_image_urls)
self._refresh_quote_phase(state)
except Exception:
pass
@staticmethod
def _refresh_quote_phase(state: ConversationState, phase_hint: str = ""):
"""统一维护收图报价状态机。"""
if phase_hint in {"idle", "collecting", "ready_to_quote", "waiting_result"}:
state.quote_phase = phase_hint
if phase_hint == "idle":
state.quote_ready_turns = 0
return
if not state.pending_image_urls:
state.quote_phase = "idle"
state.quote_ready_turns = 0
return
if state.quote_phase in {"ready_to_quote", "waiting_result"}:
return
if state.pending_image_urls and state.pending_requirements:
state.quote_phase = "collecting"
return
state.quote_phase = "collecting"
def _should_defer_batch_quote(self, state: ConversationState, mark_ready: bool = False) -> bool:
"""
批量报价延后控制:
- 首次进入 ready_to_quote 时按配置等待 N 轮
- 等待轮次归零后,本轮即可报价
"""
if mark_ready and state.quote_phase != "ready_to_quote":
state.quote_phase = "ready_to_quote"
state.quote_ready_turns = max(0, int(self.batch_quote_delay_turns))
if state.quote_phase == "ready_to_quote" and state.quote_ready_turns > 0:
state.quote_ready_turns -= 1
return True
return False
def _mark_quote_ready(self, state: ConversationState):
"""仅标记 ready 状态,不消费等待轮次。"""
if state.quote_phase != "ready_to_quote":
state.quote_phase = "ready_to_quote"
state.quote_ready_turns = max(0, int(self.batch_quote_delay_turns))
def _build_reject_message(self, reason: str = "") -> str:
templates = [
"这类图文字内容太密了,我们这边不接这单哈,建议精简后再发我看看。",
"这种密集文字/宣传栏类图片暂时做不了,抱歉啦,换一版简化内容我可以继续帮你看。",
"这张文字信息太多,处理风险高,我们先不接,您可以先筛重点文字再发我。",
]
msg = random.choice(templates)
if reason:
msg += f"{reason}"
return msg
def _is_batch_quote_enabled(self, customer_id: str, acc_id: str) -> bool:
"""灰度开关:按店铺白名单 + 客户哈希百分比控制新策略是否生效。"""
try:
from config.config import (
FEATURE_BATCH_QUOTE_ENABLED,
FEATURE_BATCH_QUOTE_PERCENT,
FEATURE_BATCH_QUOTE_SHOPS,
)
if not FEATURE_BATCH_QUOTE_ENABLED:
return False
pct = max(0, min(100, int(FEATURE_BATCH_QUOTE_PERCENT)))
if pct == 0:
return False
shops = [s.strip() for s in (FEATURE_BATCH_QUOTE_SHOPS or "").split(",") if s.strip()]
if shops and (acc_id or "") not in shops:
return False
if pct >= 100:
return True
h = int(hashlib.md5((customer_id or "").encode("utf-8")).hexdigest()[:8], 16) % 100
return h < pct
except Exception:
return True
def _detect_stage(self, message: str) -> str:
"""检测售前/售后"""
# 系统订单通知不属于售后,单独处理
if "系统订单信息" in message:
return "订单通知"
after_sale_keywords = ["已下单", "已付款", "催一下", "发文件", "要修改", "不满意", "退款", "退货"]
for keyword in after_sale_keywords:
if keyword in message:
return "售后"
return "售前"
def _get_system_prompt(self) -> str:
base_prompt = """你是淘宝找原图店的客服,对外说帮客户找原图/高清图,实际是帮客户把模糊图处理成高清版,擅长快速成交。
核心原则:快、准、狠。**回复要像真人聊天,自然多变,禁止套模板、背台词。**
【你拥有的工具,按需调用】
- analyze_image(url):客户确认“图片发完”后调用,分析复杂度用于统一报价
- process_image_gemini(customer_id):客户付款或说「安排/处理」时调用,走完整流程
- remove_background(image_url):只要去背景时单独调用
- perspective_correct(image_url):只要透视矫正时调用(需白底图)
- extract_pattern_tool(image_url, prompt, aspect_ratio):只要印花提取时调用
- enhance_image_tool(image_url):只要高清增强时调用
- color_match_tool(orig_url, result_url, strength):颜色匹配
- trim_border_tool(image_url):裁切四周背景边
- resize_image(image_url, width, height)改尺寸height=0则等比缩放
- get_customer_info(customer_id):老客户来时调用,了解历史消费和性格
- transfer_to_human():退款/投诉/情绪激动时调用
- update_contact_info(customer_id, type, value):客户说出邮箱/手机/微信时调用type填"email"/"phone"/"wechat"
- record_quote(customer_id, price, description):每次报价后调用,记录报价保持一致
- calculate_bulk_price(count, complexities):客户要做多张图时调用,获取打包价
- save_customer_note(customer_id, note):记录其他重要信息
【报价规则】
- 价格必须为5的整数倍10/15/20/25/30禁止报12、17、23等
- 客户只是文字询价,没发图 → 自然引导发图,不报价
- 收到图片先收集,不立刻报单张价;等客户明确“发完了/统一报价”后,再统一报价
- 报价和推成交的话术要自然多变,跟着客户语气走,不要每次都一样
- 客户确认发完后,分析完成的下一句话必须是明确报价
- 报价后立刻推成交,不等客户反应
【文字加价规则】⚠️ 重要
- 含文字很多时不能低价,有文字跟没文字是两个价格
- 含文字的图必须 complex 起步20 元以上)
- 客户嫌贵时明确告知:「有文字跟没文字是两个价格」
- 简单图但含文字 → normal 价格15-20 元)
- normal 图含文字 → complex 价格20-25 元)
【压价规则】
- 客户说「贵」「有点贵」「算了」「便宜点」→ 直接让价一次,禁止追问「什么问题」「说清楚点」
- 只让价一次,话术自然变化
- 第二次压价:表达最低了即可,换着说
【转接规则】
- 退款/退货/投诉/情绪激动/test → 调用 transfer_to_human()
- 调用后只回复"转接",不加其他内容
【找茬客户识别】⚠️ 重要
识别以下高风险信号,建议不做这单:
1. 下单后立即申请退款
2. 从高价砍到低价30→10 元)
3. 反复问"不满意可以退吗"2 次以上)
4. 质疑服务内容("源文件还是什么"
5. 质疑价值("就一张图片"
6. 问"小一点就快一点的嘛"(想占便宜)
7. 重复问同一个问题(想找麻烦)
识别到以上 3 个以上信号 → 建议转人工或直接拒绝接单
话术:「不好意思,这单做不了」「去别家做吧」
【售后规则】
- 催进度:自然回复在做了/快了/马上好之类
- 要修改:自然问哪里要改
【禁忌】
- 没看到图不报价
- 不说"不行/不可以"
- 不解释技术细节
- 不给价格区间
- 回复不超过2句话
- 绝对禁止输出任何内部独白或状态说明,包括但不限于:"无需回复""已完成""已经完成""不需要回复""流程结束""操作完成""任务完成""记录完成""报价已记录"
- 每次必须输出真实的、发给客户看的回复文字,哪怕只有一句话"""
base_prompt += f"\n\n【人设语气】\n- 人设:{self.reply_persona}\n- 语气像真人店主,不官腔,不机械,不背模板。"
return self._attach_skill_docs(base_prompt, self.skill_pre_sales, self.skill_style)
def _get_natural_reply_prompt(self) -> str:
base = f"""你是淘宝店主客服,专门把系统给你的“回复意图”改写成自然的一句话或两句话。
人设:{self.reply_persona}
规则:
- 只输出发给客户的话,不要解释你的思考。
- 口语化、简短、有温度,避免“这个需求我收到了”这类机械表达。
- 不要编造价格、订单、进度;只按输入意图表达。
- 默认不超过2句话。"""
return self._attach_skill_docs(base, self.skill_style)
def _get_after_sale_prompt(self) -> str:
base = """你是淘宝客服的售后助手,负责售后阶段的自然沟通与处理进度反馈。
核心:简洁、自然、不解释技术细节、尽量不调用报价相关工具。
规则:
- 已付款客户优先:确认安排、说明进度、承诺时间点
- 修改需求:礼貌询问具体改哪里,尽量一句话
- 催进度:自然回复在做了/快了/马上好,给预计时间
- 投诉/情绪激动/退款:转人工
- 输出不超过2句话不说内部状态"""
return self._attach_skill_docs(base, self.skill_after_sale, self.skill_style)
def _get_pricing_prompt(self) -> str:
try:
from config.config import MIN_PRICE_FLOOR
floor = MIN_PRICE_FLOOR
except Exception:
floor = 15
base = f"""你是淘宝客服的报价助手,负责在客户明确提到价格/询价时快速给出自然报价并推动成交。
规则:
- 收到图片或历史有图片依据时尽量结合复杂度给出单价价格为5的整数倍
- 没有图片时引导发图,不给价格区间
- 报价后紧跟一句推动成交,话术自然不重复,避免机械重复“最低了”
- 客户说“有点贵/优惠点/两张优惠点”时,优先给打包价或数量优惠,不要只会拒绝
- 客户说“不放心/先看效果”时,先建立信任:可发案例链接 {CASE_LIBRARY_LINK},并说明不满意可退
- 可直接复用这条信任话术(按需微调,不要每次完全一样):
小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。
有什么想要的效果随时告诉我哈,我这边都可以按您的要求来做哦~/:065 效果不好不满意,我们这边包退的哦。
- 最低价不低于{floor}元,客户出价低于底线时礼貌拒绝(不好意思)
- 输出不超过2句话"""
return self._attach_skill_docs(base, self.skill_pricing, self.skill_style)
def _get_processing_prompt(self) -> str:
base = """你是淘宝客服的处理助手,负责在客户说安排/处理/开始做或已付款的场景下进行处理安排与进度反馈。
规则:
- 已付款或明确要求开始时,确认安排并给预计时间点
- 可调用处理流程工具
- 投诉/退款时转人工
- 输出不超过2句话"""
return self._attach_skill_docs(base, self.skill_after_sale, self.skill_style)
def _get_similar_prompt(self) -> str:
base = """你是淘宝客服的相似图助手,客户问“有一样的吗/类似的吗/同款吗”时,给出自然回复与参考建议。
规则:
- 先确认可以找类似款,建议拍后我发参考图
- 如已知图案/类型,简要说明“同类型都有”,推动成交
- 输出不超过2句话"""
return self._attach_skill_docs(base, self.skill_pre_sales, self.skill_style)
def _get_order_prompt(self) -> str:
base = """你是淘宝客服的订单助手,负责系统订单通知的处理。
规则:
- 已付款时自然确认安排;其他状态静默(输出空字符串)
- 输出不超过1句话"""
return self._attach_skill_docs(base, self.skill_after_sale, self.skill_style)
def _get_risk_prompt(self) -> str:
base = """你是淘宝客服的风控助手,负责敏感/违规内容的前置拦截与替代话术。
规则:
- 黄色/擦边/涉政/政治人物/政治事件/政治图片/地图类内容等不接单,礼貌拒绝
- 输出不超过1句话"""
return self._attach_skill_docs(base, self.skill_risk, self.skill_style)
@staticmethod
def _is_political_inquiry(text: str) -> bool:
"""文本前置风控:政治人物/政治事件/政治图片相关询问一律拒绝。"""
s = (text or "").strip().lower()
if not s:
return False
kw = (
"政治", "涉政", "党政", "政治人物", "政治事件", "政治图片", "政治海报", "政治宣传",
"领导人", "伟人", "元帅", "将军", "红色人物", "党史",
"天安门", "人民大会堂", "中南海",
"习近平", "毛泽东", "邓小平", "江泽民", "胡锦涛", "李克强", "周恩来",
"特朗普", "拜登", "普京", "泽连斯基",
"trump", "biden", "putin", "zelensky", "xi jinping",
)
if any(k in s for k in kw):
return True
# 兜底:类似“有没有十大元帅的照片/图片”
return bool(re.search(r"(元帅|将军|领导人|政治人物|政治事件).*(照片|图片|头像|原图)?", s))
@staticmethod
def _is_map_inquiry(text: str) -> bool:
"""地图类需求一律拒绝(按业务规则)。"""
s = (text or "").strip().lower()
if not s:
return False
kw = (
"地图", "地形图", "行政区划图", "世界地图", "中国地图",
"卫星地图", "导航图", "航海图", "作战地图", "军事地图",
"map", "topographic map", "satellite map",
)
return any(k in s for k in kw)
def _get_customer_profile_context(self, customer_id: str) -> str:
"""从数据库读取客户画像,注入给 AI。含个性化语气、报价策略、主动预测、近期对话。"""
try:
from db.customer_db import db
profile = db.get_customer(customer_id)
if profile.blacklist:
return f"【⚠️黑名单客户】原因:{profile.blacklist_reason or '已标记'},请转接人工处理,不要自动回复"
lines = []
lines.append("=== 客户档案 ===")
# 基础信息
basic_info = []
basic_info.append(f"客户ID: {customer_id}")
basic_info.append(f"姓名: {profile.name or '未知'}")
if profile.email:
basic_info.append(f"邮箱: {profile.email}")
if profile.phone:
basic_info.append(f"电话: {profile.phone}")
if profile.wechat:
basic_info.append(f"微信: {profile.wechat}")
lines.append(" | ".join(basic_info))
# 消费分析
consume_info = []
consume_info.append(f"客户等级: {profile.customer_level}")
if profile.vip:
consume_info.append("VIP客户")
consume_info.append(f"总订单: {profile.total_orders}")
consume_info.append(f"总消费: {profile.total_spent}")
if profile.total_orders > 0:
consume_info.append(f"客单价: {profile.total_spent // profile.total_orders}")
lines.append("--- 消费分析 ---")
lines.append(" | ".join(consume_info))
# 报价历史
price_info = []
if profile.vip_custom_price:
price_info.append(f"VIP专属价: {profile.vip_custom_price}元(直接报这个价)")
if profile.last_price:
price_info.append(f"上次报价: {profile.last_price}")
if profile.lowest_price_accepted:
price_info.append(f"历史最低成交: {profile.lowest_price_accepted}")
if profile.discount_given_count:
price_info.append(f"历史让价: {profile.discount_given_count}")
if profile.price_sensitivity:
price_info.append(f"价格敏感度: {profile.price_sensitivity}")
if getattr(profile, "last_quote_no_convert", False):
price_info.append("【策略】上次报价未成交本次可降5-10元")
if price_info:
lines.append("--- 报价历史 ---")
lines.append(" | ".join(price_info))
# 性格与决策
personality_info = []
if profile.personality:
personality_info.append(f"性格: {'/'.join(profile.personality)}")
if profile.decision_speed:
personality_info.append(f"决策速度: {profile.decision_speed}")
if profile.communication_prefer:
personality_info.append(f"沟通偏好: {profile.communication_prefer}")
if personality_info:
lines.append("--- 性格特征 ---")
lines.append(" | ".join(personality_info))
# 图片习惯
image_info = []
image_info.append(f"累计发图: {profile.total_images_sent}")
if profile.complexity_history:
avg_complexity = self._calc_avg_complexity(profile.complexity_history)
image_info.append(f"平均复杂度: {avg_complexity}")
if profile.image_type_history:
from collections import Counter
top_types = Counter(profile.image_type_history).most_common(3)
types_str = "".join(f"{t}({c}次)" for t, c in top_types)
image_info.append(f"常见类型: {types_str}")
if profile.preferred_format:
image_info.append(f"格式偏好: {profile.preferred_format}")
if profile.preferred_size:
image_info.append(f"尺寸要求: {profile.preferred_size}")
if profile.last_image_url:
image_info.append(f"最近发图: {profile.last_image_url[:60]}...")
lines.append("--- 图片习惯 ---")
lines.append(" | ".join(image_info))
# 当前任务状态
if profile.processing_status:
task_info = []
task_info.append(f"状态: {profile.processing_status}")
if profile.processing_image_url:
task_info.append(f"处理中: {profile.processing_image_url[:40]}...")
if profile.expected_done_at:
task_info.append(f"预计完成: {profile.expected_done_at}")
lines.append("--- 当前任务 ---")
lines.append(" | ".join(task_info))
# 上次对话摘要
if profile.last_conversation_summary:
time_str = ""
if profile.last_conversation_time:
try:
t = datetime.fromisoformat(profile.last_conversation_time)
diff = datetime.now() - t
if diff.days > 0:
time_str = f"{diff.days}天前)"
else:
h = diff.seconds // 3600
time_str = f"{h}小时前)" if h > 0 else "(刚刚)"
except Exception:
pass
lines.append(f"--- 上次对话 {time_str} ---")
lines.append(profile.last_conversation_summary)
# 个性化回复策略
hints = []
if profile.personality:
if "爽快" in profile.personality:
hints.append("回复简洁直接,不废话,快速报价")
if "砍价" in profile.personality or "砍价狂" in profile.personality:
hints.append("报价时强调性价比,只让价一次,第二次引导去 xinhui.cloud")
if "纠结" in profile.personality or "墨迹" in profile.personality:
hints.append("多给一点说明,耐心回答")
if profile.price_sensitivity == "":
hints.append("报价时顺带提「满意再拍」降低顾虑")
if profile.decision_speed == "":
hints.append("直接报价推成交,少铺垫")
if profile.total_orders > 0 and profile.decision_speed == "":
hints.append("老客爽快,直接报价成交")
if hints:
lines.append("--- 回复策略 ---")
lines.append("".join(hints))
# 主动推荐
proactive = []
if profile.bulk_potential == "" or (profile.total_images_sent or 0) >= 2:
proactive.append("可问「要做多张吗,多张有优惠」")
if profile.upsell_opportunity:
proactive.append(f"加购机会: {''.join(profile.upsell_opportunity)}")
if proactive:
lines.append("--- 主动推荐 ---")
lines.append("".join(proactive))
return "\n".join(lines)
except Exception as e:
print(f"[Agent] 获取客户画像失败: {e}")
return ""
def _calc_avg_complexity(self, complexity_history: list) -> str:
"""计算平均复杂度"""
if not complexity_history:
return "未知"
level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4}
label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"}
try:
avg = sum(level_map.get(c, 2) for c in complexity_history) / len(complexity_history)
return label_map.get(round(avg), "一般")
except Exception:
return "一般"
def _get_refusal_context_hint(self, customer_id: str, current_msg: str, profile_context: str) -> str:
"""
检测「刚拒绝某张图 + 客户问能找到吗」场景,注入显式提示,避免前后矛盾。
原因last_conversation_summary 异步更新可能滞后message_histories 模型可能忽略。
"""
ask_keywords = ["能找到吗", "可以吗", "有吗", "能做吗", "可以找吗", "可以弄吗"]
if not any(kw in current_msg for kw in ask_keywords):
return ""
refusal_keywords = ["不做", "不接", "拒绝", "不做这类", "这类不做"]
# 检查 profile 摘要(可能因异步更新而滞后)
if any(kw in profile_context for kw in refusal_keywords):
return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。"
# 检查内存历史中最近几条消息ModelResponse 含客服回复)
history = self.message_histories.get(customer_id, [])
for msg in reversed(history[-6:]):
msg_str = str(msg)
if any(kw in msg_str for kw in refusal_keywords):
return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。"
return ""
def _get_conversation_context(self, customer_id: str, acc_id: str = "", limit: int = 12, max_len: int = 80) -> str:
"""
每一次对话都从数据库加载近期对话,压缩后注入 prompt。
确保模型看到上下文,同时控制 token 消耗。
"""
try:
try:
from config.config import CHAT_CONTEXT_LIMIT, CHAT_CONTEXT_TRUNCATE_LEN
limit = CHAT_CONTEXT_LIMIT
max_len = CHAT_CONTEXT_TRUNCATE_LEN
except Exception:
pass
from db.chat_log_db import get_recent_conversation
msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=limit)
if not msgs:
return ""
lines = []
for m in msgs:
role = "" if m.get("direction") == "in" else ""
msg_text = (m.get("message") or "").strip().replace("\n", " ")[:max_len]
if not msg_text:
continue
lines.append(f"{role}:{msg_text}")
if not lines:
return ""
return "【近期】\n" + "\n".join(lines) + "\n\n"
except Exception:
return ""
def _get_intent_emotion_hint(self, msg: str) -> str:
"""语义匹配:意图/情绪识别注入提示。EMBEDDING_MODEL 未配置时用关键词。"""
try:
from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords, detect_emotion_embedding
intent = detect_intent_embedding(msg)
if not intent:
intent = detect_intent_keywords(msg)
emotion = detect_emotion_embedding(msg) if os.getenv("EMBEDDING_MODEL") else None
parts = []
if intent:
parts.append(f"意图:{intent}")
if emotion:
parts.append(f"情绪:{emotion}")
if parts:
return f"【当前消息】{', '.join(parts)}"
except Exception:
pass
return ""
# 简单打招呼类消息(在近期已回复后无需再回)
_COOLDOWN_PATTERNS = [
"你好", "您好", "在吗", "在么", "在不在", "有人吗",
"", "嗯嗯", "", "好的", "好哒", "ok", "OK", "okay",
"谢谢", "谢谢你", "感谢", "收到", "知道了", "明白了",
]
_COOLDOWN_SECONDS = 5 * 60 # 5 分钟内不重复回复纯打招呼
def _in_cooldown(self, state: ConversationState, msg: str) -> bool:
"""最近刚回复过 + 消息是纯打招呼 → True 静默"""
if not state.last_reply_at:
return False
elapsed = (datetime.now() - state.last_reply_at).total_seconds()
if elapsed > self._COOLDOWN_SECONDS:
return False
clean = msg.strip().rstrip("!?。.~")
return clean in self._COOLDOWN_PATTERNS
async def process_message(self, message: CustomerMessage) -> AgentResponse:
"""处理客户消息并生成回复"""
self._activity_log(
"agent_inbound",
acc_id=message.acc_id,
customer_id=message.from_id,
msg=message.msg,
msg_type=message.msg_type,
)
metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id)
# 获取或创建对话状态
state = self._get_conversation_state(message.from_id)
# 冷却期检测:近期已回复 + 纯打招呼 → 静默
if self._in_cooldown(state, message.msg):
elapsed = int((datetime.now() - state.last_reply_at).total_seconds())
print(f"[Agent] 冷却期静默(距上次回复 {elapsed}s{message.msg!r}")
self._activity_log("agent_cooldown_silent", customer_id=message.from_id, elapsed_s=elapsed)
return AgentResponse(reply="", should_reply=False, need_transfer=False)
# 前置风控:客户文本一旦命中政治/敏感询问,直接拒绝,避免“发图我看看”类答非所问
try:
# 人工风控:标记为不接单的客户直接转人工
manual_risk = risk_db.evaluate_customer(message.from_id)
if bool(manual_risk.get("do_not_serve")):
self._activity_log(
"agent_manual_risk_reject",
customer_id=message.from_id,
risk=manual_risk,
)
return AgentResponse(
reply="这边无法继续为你处理该类需求,给你转人工专员对接。",
should_reply=True,
need_transfer=True,
transfer_msg=TRANSFER_MESSAGE,
)
from utils.content_filter import should_block_customer_smart
risk_hit, risk_category, _risk_reason = await should_block_customer_smart(message.msg)
map_hit = self._is_map_inquiry(message.msg) or (risk_category == "map")
political_hit = self._is_political_inquiry(message.msg) or (risk_category == "political")
if risk_hit or political_hit or map_hit:
# 命中敏感询问时清空待报价队列,避免旧图残留污染后续会话
state.pending_image_urls.clear()
state.pending_requirements.clear()
self._sync_pending_quote_state(message.from_id, state)
reject_text = "地图这类不做哈,这边不接地图相关需求。"
if risk_category == "sexual":
reject_text = "这类不做哈,涉黄擦边内容都不接。"
elif risk_category == "violent":
reject_text = "这类不做哈,暴力血腥相关都不接。"
elif political_hit and not map_hit:
reject_text = "这类不做哈,政治相关图片和人物都不接。"
reply = await self._rewrite_reply_with_ai(
message=message,
state=state,
reply=reject_text,
scene="risk_reject",
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply}")
self._activity_log(
"agent_risk_reject",
customer_id=message.from_id,
map_hit=map_hit,
political_hit=political_hit,
risk_category=risk_category,
reply=reply,
)
return AgentResponse(reply=reply, should_reply=True, need_transfer=False)
except Exception:
map_hit = self._is_map_inquiry(message.msg)
political_hit = self._is_political_inquiry(message.msg)
if political_hit or map_hit:
state.pending_image_urls.clear()
state.pending_requirements.clear()
self._sync_pending_quote_state(message.from_id, state)
reject_text = "地图这类不做哈,这边不接地图相关需求。"
if political_hit and not map_hit:
reject_text = "这类不做哈,政治相关图片和人物都不接。"
reply = await self._rewrite_reply_with_ai(
message=message,
state=state,
reply=reject_text,
scene="risk_reject",
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply}")
self._activity_log(
"agent_risk_reject",
customer_id=message.from_id,
map_hit=map_hit,
political_hit=political_hit,
reply=reply,
)
return AgentResponse(reply=reply, should_reply=True, need_transfer=False)
# 检测售前/售后
new_stage = self._detect_stage(message.msg)
if new_stage != state.stage:
state.stage = new_stage
state.last_update = datetime.now().isoformat()
# 订单通知前置处理
if "系统订单信息" in message.msg or "订单状态" in message.msg:
_, order_block = self._split_customer_text(message.msg)
customer_text, _ = self._split_customer_text(message.msg)
order = self._parse_order_info(order_block or message.msg)
pay_status = order.get("pay_status", "")
order_status = order.get("order_status", "")
paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"]
is_paid = any(kw in pay_status or kw in order_status for kw in paid_keywords)
if is_paid:
# 订单金额核查:对比报价和实付金额
asyncio.create_task(self._check_order_amount(
message.from_id, order, message.acc_id
))
# 成交记录:写入数据库供日报分析
asyncio.create_task(self._record_deal_success(
message.from_id, message.from_name, message.acc_id, message.acc_type,
order, state
))
# 已付款:触发 Gemini 作图
try:
from core.workflow import workflow
asyncio.create_task(workflow.trigger_processing_on_payment(
customer_id=message.from_id,
acc_id=message.acc_id,
acc_type=message.acc_type,
))
except Exception as e:
print(f"[Agent] 触发作图失败: {e}")
elif not customer_text:
# 非付款 + 没有客户文字 → 直接静默,不调用 AI
print(f"[Agent] 订单通知静默({pay_status or order_status}),跳过回复")
return AgentResponse(reply="", should_reply=False, need_transfer=False)
# 找图店:先收集图片和需求,等客户确认“发完”后统一报价
customer_text, _ = self._split_customer_text(message.msg)
shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "")
if shop_type == "find_image" and self._is_batch_quote_enabled(message.from_id, message.acc_id):
incoming_urls = self._extract_image_urls(customer_text)
text_without_urls = self._strip_urls_from_text(customer_text)
short_intent = self._classify_short_customer_text(text_without_urls)
if incoming_urls:
is_related_followup = bool(text_without_urls and self._is_related_image_followup_intent(text_without_urls))
for u in incoming_urls:
if u not in state.pending_image_urls:
state.pending_image_urls.append(u)
if text_without_urls:
self._append_requirement(state, text_without_urls)
if is_related_followup:
self._append_requirement(state, "与上一张相关(截图/局部细节)")
state.image_count = len(state.pending_image_urls)
self._refresh_quote_phase(state, "collecting")
self._sync_pending_quote_state(message.from_id, state)
if self._is_batch_finish_intent(
text=customer_text,
state=state,
has_incoming_urls=bool(incoming_urls),
):
should_defer = self._should_defer_batch_quote(state, mark_ready=True)
self._sync_pending_quote_state(message.from_id, state)
if should_defer:
defer_fallback = "图片和需求我都收齐了,我先整理下,马上给你报总价。"
defer_reply = await self._render_collection_reply_with_ai(
message=message,
state=state,
scene="quote_defer_notice",
intent_hint="确认已收齐图片与需求,先承接,告知稍后马上报价。",
fallback=defer_fallback,
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {defer_reply}")
return AgentResponse(reply=defer_reply, should_reply=True, need_transfer=False)
quote_res = await self._quote_pending_images(state, message)
reply_text = self._colloquialize_reply(quote_res.get("reply", ""))
reply_text = await self._rewrite_reply_with_ai(
message=message,
state=state,
reply=reply_text,
scene="batch_quote_reply",
)
need_transfer = bool(quote_res.get("need_transfer"))
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}")
return AgentResponse(
reply=reply_text,
should_reply=not need_transfer,
need_transfer=need_transfer,
transfer_msg=TRANSFER_MESSAGE if need_transfer else "",
)
ack_fallback = "图片收到了,你有补充就继续发,我这边一起看。"
ack_intent = (
"告知图片已收到;如果客户继续发图就继续收,发完可统一报价。"
if not is_related_followup
else "告知这是和上一张相关的截图/局部图,已按同一需求一起处理。"
)
ack = await self._render_collection_reply_with_ai(
message=message,
state=state,
scene="collect_ack",
intent_hint=ack_intent,
fallback=ack_fallback,
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {ack}")
return AgentResponse(reply=ack, should_reply=True, need_transfer=False)
if state.pending_image_urls:
if text_without_urls:
# 短句先分类再路由,避免误追加为需求导致上下文漂移
if short_intent == "finish_signal":
self._mark_quote_ready(state)
elif short_intent == "progress_query":
if state.quote_phase != "ready_to_quote":
self._refresh_quote_phase(state, "waiting_result")
elif short_intent == "ack":
if state.quote_phase != "ready_to_quote":
self._refresh_quote_phase(state, "collecting")
else:
self._append_requirement(state, text_without_urls)
self._refresh_quote_phase(state, "collecting")
self._sync_pending_quote_state(message.from_id, state)
# 客户明确“找图,不是做图”时,先澄清意图,不继续报价链路
if self._is_find_image_not_edit_conflict(text_without_urls):
clarify_fallback = "明白你是要找图,不是做图。你说下要找原图、同款还是高清版,我按这个给你找。"
clarify = await self._render_collection_reply_with_ai(
message=message,
state=state,
scene="find_not_edit_clarify",
intent_hint="确认客户要找图不是做图,并追问是找原图/同款/高清版。",
fallback=clarify_fallback,
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {clarify}")
return AgentResponse(reply=clarify, should_reply=True, need_transfer=False)
# 已到报价就绪阶段且等待轮次结束:对“有吗/进度”等追问直接报价
if state.quote_phase == "ready_to_quote" and state.quote_ready_turns <= 0 and short_intent in {"progress_query", "ack", "finish_signal"}:
quote_res = await self._quote_pending_images(state, message)
reply_text = self._colloquialize_reply(quote_res.get("reply", ""))
reply_text = await self._rewrite_reply_with_ai(
message=message,
state=state,
reply=reply_text,
scene="batch_quote_reply",
)
need_transfer = bool(quote_res.get("need_transfer"))
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}")
return AgentResponse(
reply=reply_text,
should_reply=not need_transfer,
need_transfer=need_transfer,
transfer_msg=TRANSFER_MESSAGE if need_transfer else "",
)
# 客户在追问“找到了吗/没找到吗/多久好”时,优先给进度承接,不走“没听懂”
if short_intent == "progress_query" or self._is_result_followup_query(text_without_urls):
progress_fallback = "我这边在跟进了,一有结果马上发你。"
progress = await self._render_collection_reply_with_ai(
message=message,
state=state,
scene="collect_progress",
intent_hint="承接客户的进度/结果追问,简短说明正在跟进,有结果会第一时间回复。",
fallback=progress_fallback,
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {progress}")
return AgentResponse(reply=progress, should_reply=True, need_transfer=False)
# 信息不足时先追问,避免误判为“直接报价”
if self._needs_clarification_in_collecting(text_without_urls):
ask_fallback = "你再补一句具体要什么效果,我马上按你的要求来。"
ask = await self._render_collection_reply_with_ai(
message=message,
state=state,
scene="collect_clarify",
intent_hint="客户表达不清,礼貌请对方补充一句关键需求,不要机械,不要生硬。",
fallback=ask_fallback,
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {ask}")
return AgentResponse(reply=ask, should_reply=True, need_transfer=False)
if self._is_batch_finish_intent(
text=customer_text,
state=state,
has_incoming_urls=False,
):
should_defer = self._should_defer_batch_quote(state, mark_ready=True)
self._sync_pending_quote_state(message.from_id, state)
if should_defer:
defer_fallback = "收到,我先把这批图过一遍,马上给你总价。"
defer_reply = await self._render_collection_reply_with_ai(
message=message,
state=state,
scene="quote_defer_notice",
intent_hint="确认已收齐,先承接并告知稍后马上报价。",
fallback=defer_fallback,
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {defer_reply}")
return AgentResponse(reply=defer_reply, should_reply=True, need_transfer=False)
quote_res = await self._quote_pending_images(state, message)
reply_text = self._colloquialize_reply(quote_res.get("reply", ""))
reply_text = await self._rewrite_reply_with_ai(
message=message,
state=state,
reply=reply_text,
scene="batch_quote_reply",
)
need_transfer = bool(quote_res.get("need_transfer"))
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}")
return AgentResponse(
reply=reply_text,
should_reply=not need_transfer,
need_transfer=need_transfer,
transfer_msg=TRANSFER_MESSAGE if need_transfer else "",
)
remind_fallback = "需求我记上了,你继续发图,或者让我直接给你报价都行。"
remind = await self._render_collection_reply_with_ai(
message=message,
state=state,
scene="collect_remind",
intent_hint="确认需求已记录,引导客户继续补图或直接让你报价。",
fallback=remind_fallback,
)
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {remind}")
return AgentResponse(reply=remind, should_reply=True, need_transfer=False)
# 构建提示词(包含对话状态 + 客户画像)
user_prompt = self._build_prompt(message, state)
# 注入客户历史画像(个性化语气、报价策略、主动预测)
profile_context = self._get_customer_profile_context(message.from_id)
if profile_context:
user_prompt = profile_context + "\n\n" + user_prompt
# 前后一致提示:刚拒绝某张图后,客户问「能找到吗」等时,必须区分能做/不能做
refusal_hint = self._get_refusal_context_hint(message.from_id, message.msg, profile_context or "")
if refusal_hint:
user_prompt = refusal_hint + "\n\n" + user_prompt
# 每一次对话都注入近期上下文,确保模型看到完整对话
conv_context = self._get_conversation_context(message.from_id, acc_id=message.acc_id or "")
if conv_context:
user_prompt = conv_context + user_prompt
# 语义匹配:意图/情绪识别(配置 EMBEDDING_MODEL 时用 embedding否则关键词
intent_hint = self._get_intent_emotion_hint(message.msg)
if intent_hint:
user_prompt = intent_hint + "\n\n" + user_prompt
deps = AgentDeps(
msg_id=message.msg_id,
acc_id=message.acc_id,
from_id=message.from_id,
platform=message.acc_type
)
# 取出该客户的历史对话,传给 AI 保持上下文
history = self.message_histories.get(message.from_id, [])
self._log_block("PROMPT->AI 前置提示词", user_prompt)
try:
msg_lower = message.msg.lower()
pricing_kw = ["多少钱", "多少一张", "报价", "给个价", "几块", "价位", "能便宜点吗"]
processing_kw = ["安排", "处理一下", "开始做", "做一下", "尽快", "加急", "付款了", "已付款"]
similar_kw = ["有一样的", "有一样吗", "一样的吗", "类似的", "类似的吗", "同款", "相似", "类似吗"]
order_markers = ["[系统订单信息]", "订单状态", "买家已付款"]
risk_kw = [
"黄色", "擦边", "色情", "涉黄", "涉政", "政治", "", "不雅",
"天安门", "政治人物", "政治事件", "领导人", "党政",
"习近平", "毛泽东", "邓小平", "江泽民", "胡锦涛",
"特朗普", "拜登", "普京", "泽连斯基",
"地图", "地形图", "行政区划图", "卫星地图",
]
target_agent = self.agent_after_sale if state.stage == "售后" else self.agent
risk_hit = any(k in msg_lower for k in risk_kw) or self._is_political_inquiry(message.msg) or self._is_map_inquiry(message.msg)
if risk_hit:
target_agent = self.agent_risk
elif any(k in message.msg for k in order_markers):
target_agent = self.agent_order
elif any(k in msg_lower for k in processing_kw):
target_agent = self.agent_processing
elif any(k in msg_lower for k in pricing_kw):
target_agent = self.agent_pricing
elif any(k in msg_lower for k in similar_kw):
target_agent = self.agent_similar
result = await target_agent.run(user_prompt, deps=deps, message_history=history)
# 更新历史,最多保留最近 30 条消息防止 token 超限
self.message_histories[message.from_id] = result.all_messages()[-30:]
reply_text = self._colloquialize_reply(self._normalize_reply_text(result.output))
# 价格谈判与信任建立固定策略(避免只回“最低了/先拍下”)
strategy_reply = self._negotiation_strategy_reply(message.msg, state)
if strategy_reply:
reply_text = strategy_reply
# 拦截超低杀价:客户报价低于底线时,统一礼貌拒绝
try:
from config.config import MIN_PRICE_FLOOR
import re
offer = None
m = re.search(r'(\d{1,4})\s*(?:元|块|块钱|元钱)\b', message.msg)
if m:
offer = int(m.group(1))
else:
m2 = re.search(r'(?:能|可以|可否|能否)\s*(\d{1,4})\b', message.msg)
offer = int(m2.group(1)) if m2 else None
st = self._get_conversation_state(message.from_id)
floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR
if offer is not None and offer < floor:
reply_text = "不好意思"
except Exception:
pass
# 降限若AI在回复中给出小于底线的报价提升到>=底线且为5的倍数
try:
from config.config import MIN_PRICE_FLOOR
st = self._get_conversation_state(message.from_id)
floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR
def _adjust(text: str) -> str:
import re
def _repl(m):
num = int(m.group(1))
adj = max(floor, round(num / 5) * 5)
return m.group(0).replace(str(num), str(adj))
patterns = [
r'按(\d{1,4})元',
r'报价[:]\s*(\d{1,4})\s*元',
r'(\d{1,4})\s*元一张',
r'打包(\d{1,4})\s*元',
]
t = text
for p in patterns:
t = re.sub(p, _repl, t)
return t
reply_text = _adjust(reply_text or "")
except Exception:
pass
# 打印工具调用记录
for msg in result.new_messages():
for part in getattr(msg, 'parts', []):
part_type = type(part).__name__
if 'ToolCall' in part_type:
print(f"{self.C_TOOL}[THINK/TOOL_CALL]{self.C_RESET} {getattr(part, 'tool_name', '')}({getattr(part, 'args', '')})")
elif 'ToolReturn' in part_type:
ret = str(getattr(part, 'content', ''))[:120]
print(f"{self.C_TOOL}[THINK/TOOL_RETURN]{self.C_RESET} {ret}")
print(f"{self.C_THINK}[THINK/RAW_OUTPUT]{self.C_RESET} {repr(reply_text)}")
except Exception as e:
err_str = str(e)
print(f"[Agent] AI 调用失败: {e},使用兜底回复")
self._activity_log("agent_ai_error", customer_id=message.from_id, acc_id=message.acc_id, error=err_str)
metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id)
if "AccountOverdueError" in err_str or "overdue" in err_str.lower():
asyncio.create_task(_notify_wechat_overdue())
else:
asyncio.create_task(_notify_wechat(
f"⚠️ **AI调用异常**\n"
f"客户:{message.from_id}\n"
f"店铺:{message.acc_id}\n"
f"错误:{err_str[:200]}",
tag="AI异常"
))
reply_text = None
else:
metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id)
# AI 失败兜底:给一个不出错的万能回复
if not reply_text:
fallback_text = await self._rewrite_reply_with_ai(
message=message,
state=state,
reply="好嘞,你稍等下,我这边看一下",
scene="fallback_reply",
)
return AgentResponse(
reply=fallback_text,
should_reply=True,
need_transfer=False
)
# 敏感词过滤:党政/暴力/血腥/黄色
try:
from utils.content_filter import should_block_reply
blocked, fallback = should_block_reply(reply_text)
if blocked:
print(f"[Agent] 敏感词拦截,使用兜底回复")
reply_text = fallback or "好的,您稍等,我帮您确认一下"
except Exception:
pass
# 成本统计(可选)
try:
from utils.api_cost_tracker import record
record("openai_chat", count=1)
except Exception:
pass
# 检测是否报价
self._detect_price(reply_text, state)
# 检测压价
self._detect_discount(message.msg, state)
# 自动打标签(异步,不阻塞)
asyncio.create_task(self._auto_tag(message, reply_text, state))
# 检测是否需要转接(文字触发 或 AI 调用了 transfer_to_human 工具)
need_transfer = False
transfer_msg = ""
transfer_keywords = ["TRANSFER_REQUESTED", "[转移会话]", "转移会话", "转人工", "转接"]
if reply_text and any(kw in reply_text for kw in transfer_keywords):
need_transfer = True
transfer_msg = TRANSFER_MESSAGE
metrics_emit("transfer_to_human", customer_id=message.from_id, acc_id=message.acc_id)
# 自我进化候选策略灰度(默认 5%):风险投诉场景强制转人工,并补安抚话术
evo_hit = self._evolution_enabled_for_customer(message.from_id)
if evo_hit and self._is_service_risk_inquiry(message.msg):
if self._evolution_has_proposal("policy-risk-transfer"):
need_transfer = True
transfer_msg = TRANSFER_MESSAGE
metrics_emit("evolution_force_transfer", customer_id=message.from_id, acc_id=message.acc_id)
if self._evolution_has_proposal("tone-empathy-pack"):
reply_text = "抱歉让您不舒服了,这边先为您转接人工专员马上处理。"
metrics_emit("evolution_empathy_reply", customer_id=message.from_id, acc_id=message.acc_id)
# 未成交记录:客户表达放弃且已报价过(转人工不记录)
customer_text, _ = self._split_customer_text(message.msg)
no_convert_keywords = ["算了", "不要了", "不做了", "下次再说", "先不弄了"]
if customer_text and state.last_price and state.last_price > 0:
if any(kw in customer_text for kw in no_convert_keywords):
reason = "嫌贵放弃" if any(k in customer_text for k in ["", "贵了", "便宜"]) else "放弃"
asyncio.create_task(self._record_deal_fail(
message.from_id, message.from_name, message.acc_id, message.acc_type, reason
))
# 需要转接时不把原始回复发给客户
should_reply = bool(reply_text and reply_text.strip()) and not need_transfer
if evo_hit and need_transfer and self._evolution_has_proposal("tone-empathy-pack"):
should_reply = True
if should_reply:
reply_text = await self._rewrite_reply_with_ai(
message=message,
state=state,
reply=reply_text,
scene="final_reply",
)
# 记录本次回复时间,供冷却期判断
if should_reply:
state.last_reply_at = datetime.now()
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}")
else:
print(f"{self.C_MUTED}[REPLY->CUSTOMER]{self.C_RESET} <静默/不发送>")
self._activity_log(
"agent_outbound_decision",
customer_id=message.from_id,
should_reply=should_reply,
need_transfer=need_transfer,
reply=reply_text or "",
transfer_msg=transfer_msg,
)
return AgentResponse(reply=reply_text or "", should_reply=should_reply, need_transfer=need_transfer, transfer_msg=transfer_msg)
def _detect_price(self, reply: str, state: ConversationState):
"""从回复中提取价格同步写入客户数据库价格必须为5的整数倍"""
import re
numbers = re.findall(r'(\d+)[元]', reply)
if numbers:
price = round(int(numbers[0]) / 5) * 5 # 强制为5的整数倍
state.last_price = price
metrics_emit("quote_generated", customer_id=state.customer_id, price=price)
# 持久化到客户数据库,重启后仍可读取
try:
from db.customer_db import db
db.update_last_price(state.customer_id, price)
except Exception:
pass
async def _check_order_amount(self, customer_id: str, order: dict, acc_id: str):
"""核查订单实付金额是否与报价一致,异常时企业微信预警"""
try:
import re
from db.customer_db import db
profile = db.get_customer(customer_id)
quoted = profile.last_price # 上次报价(元)
if not quoted:
return
# 从订单解析实付金额
raw_amount = order.get("amount", "")
m = re.search(r'[\d.]+', str(raw_amount))
if not m:
return
paid = float(m.group())
print(f"[Agent] 订单金额核查:报价 {quoted}元 vs 实付 {paid}元(客户 {customer_id}")
# 实付金额明显低于报价(低于报价的 60%)才预警
if paid < quoted * 0.6:
msg = (
f"⚠️ **订单金额异常**\n"
f"店铺:{acc_id}\n"
f"客户:{customer_id}{profile.name or ''}\n"
f"报价:{quoted}\n"
f"实付:{paid}\n"
f"差额:{quoted - paid:.1f}元 — 请人工核查"
)
print(f"[Agent] {msg}")
await _notify_wechat(msg)
except Exception as e:
print(f"[Agent] 订单金额核查失败: {e}")
async def _record_deal_success(
self,
customer_id: str,
customer_name: str,
acc_id: str,
platform: str,
order: dict,
state: "ConversationState",
):
"""成交时写入数据库,供日报与数据分析"""
try:
import re
from db.deal_outcome_db import record_deal
order_id = order.get("order_id", "")
raw_amount = order.get("amount", "")
m = re.search(r"[\d.]+", str(raw_amount))
amount = float(m.group()) if m else 0
reason = "让价后成交" if (state.discount_count or 0) > 0 else "直接成交"
record_deal(
customer_id=customer_id,
outcome="成交",
reason=reason,
customer_name=customer_name or "",
acc_id=acc_id or "",
platform=platform or "",
order_id=order_id,
amount=amount,
discount_given=(state.discount_count or 0) > 0,
)
# 同步到客户库
try:
from db.customer_db import db
if order_id:
db.add_order(customer_id, order_id, amount)
db.clear_quote_no_convert(customer_id)
except Exception:
pass
print(f"[Agent] 成交记录: {customer_id} {reason} {amount}")
except Exception as e:
print(f"[Agent] 成交记录失败: {e}")
async def _record_deal_fail(
self,
customer_id: str,
customer_name: str,
acc_id: str,
platform: str,
reason: str,
):
"""未成交时写入数据库,供日报与数据分析;标记报价未成交,下次可适当降低"""
try:
from db.deal_outcome_db import record_deal
from db.customer_db import db
record_deal(
customer_id=customer_id,
outcome="未成交",
reason=reason,
customer_name=customer_name or "",
acc_id=acc_id or "",
platform=platform or "",
)
db.mark_quote_no_convert(customer_id)
print(f"[Agent] 未成交记录: {customer_id} {reason}")
except Exception as e:
print(f"[Agent] 未成交记录失败: {e}")
async def _auto_tag(self, message: CustomerMessage, reply: str, state: ConversationState):
"""自动识别并写入各类标签"""
try:
from db.customer_db import db
cid = message.from_id
msg = message.msg.lower()
# 批量潜力
if any(kw in msg for kw in ["还有", "多张", "好几张", "一批", "下次还"]):
db.set_bulk_potential(cid, "")
db.add_upsell_opportunity(cid, "批量打包")
# 加购机会问过PSD/分层
if any(kw in msg for kw in ["psd", "分层", "源文件"]):
db.add_upsell_opportunity(cid, "分层PSD")
db.update_preferred_format(cid, "psd")
# 格式偏好
if "jpg" in msg or "jpeg" in msg:
db.update_preferred_format(cid, "jpg")
if "png" in msg:
db.update_preferred_format(cid, "png")
# 尺寸/分辨率偏好
if any(kw in msg for kw in ["分辨率", "dpi", "尺寸", "大图", "印刷"]):
db.update_preferred_size(cid, message.msg[:30])
# 决策速度:收到报价后立刻说拍了 → 快
if any(kw in msg for kw in ["拍了", "下单了", "好的", ""]) and state.last_price:
db.update_decision_speed(cid, "")
# 图片类型识别(简单关键词匹配)
type_keywords = {
"印花": ["印花", "花纹", "图案", "面料", "布料", "纺织"],
"logo": ["logo", "标志", "品牌", "商标"],
"人物": ["人物", "人像", "照片", "", "头像"],
"产品": ["产品", "商品", "包装", "实物"],
"老照片": ["老照片", "旧照片", "发黄", "修复"],
}
for img_type, keywords in type_keywords.items():
if any(kw in message.msg for kw in keywords):
db.add_image_type(cid, img_type)
break
# 定期自动计算衍生标签
db.auto_compute_tags(cid)
except Exception:
pass
def _detect_discount(self, message: str, state: ConversationState):
"""检测压价,并持久化让价记录"""
if any(kw in message for kw in ["", "便宜", "太贵", "有点贵"]):
state.discount_count += 1
if state.last_price:
try:
from db.customer_db import db
db.record_discount(state.customer_id, state.last_price)
except Exception:
pass
# 客户明确给价如“10元/10块/能10吗”
import re
m = re.search(r'(\d+)\s*元|\b(\d+)\s*块', message)
offer = None
if m:
offer = int(m.group(1) or m.group(2))
if offer:
try:
from config.config import MIN_PRICE_FLOOR
if offer < MIN_PRICE_FLOOR:
# 标记本次为超低出价(用于后续拒绝话术提示)
state.last_price = state.last_price or 0
except Exception:
pass
def _negotiation_strategy_reply(self, customer_text: str, state: ConversationState) -> str:
"""
价格谈判固定策略(优先级高于通用 AI 自由回复):
- 有点贵:给两张打包建议价
- 优惠点引导3张以上打包价
- 先发效果图:给固定案例链接并说明不满意包退
"""
text = (customer_text or "").strip()
if not text:
return ""
if any(k in text for k in ["先发效果图", "先看效果", "不放心", "没法确认"]):
return random.choice([
f"小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。有什么想要的效果随时告诉我哈,不满意我们这边包退。",
f"先给你看案例哈,链接在这({CASE_LIBRARY_LINK})。你想要什么效果直接说,我们按你要求做,不满意可退。",
f"我把类似案例给你准备好了,点这个看就行({CASE_LIBRARY_LINK})。你放心说需求,效果不满意我们包退。",
f"怕效果不稳的话你先看案例,链接在这里({CASE_LIBRARY_LINK})。你确认风格后我按你要的做,不满意可以退。",
f"可以先看下我们做过的案例({CASE_LIBRARY_LINK}。你觉得方向OK再拍不满意我们这边支持退。",
])
if "有点贵" in text or "就是贵" in text:
# 约定示例两张优惠价默认45若已有单张价格则动态估算两张打包价
base = state.last_price if isinstance(state.last_price, int) and state.last_price > 0 else 25
two_pack = max(10, round(((base * 2) - 5) / 5) * 5)
return random.choice([
f"理解你这边的预算,我给你个实在点的:两张一起按 {two_pack} 元做,行不行?",
f"我懂你意思,这样吧,两张一起我给你算 {two_pack} 元。",
f"那我给你压一口价,两张打包 {two_pack} 元,你看可以我就开做。",
f"没问题,我给你优惠点,两张一起按 {two_pack} 元走。",
f"你这边要省点的话,两张一起我给你做到 {two_pack} 元。",
])
if any(k in text for k in ["优惠点", "便宜点", "少点", "打折"]):
return random.choice([
"可以的你这边数量上来我就好给价3张以上我给你打包价。",
"能优惠做得多会更划算3张以上我这边可以给你打包算。",
"没问题3张起我可以给你一口打包价会比单张省一些。",
"可以便宜点按量走更好谈3张以上我给你打包价。",
"你这边如果是多张做3张以上我能给你更划算的打包价。",
])
return ""
def _parse_order_info(self, msg: str) -> dict:
"""从系统订单消息中提取所有字段"""
import re
info = {}
m = re.search(r'订单号[:]\s*(\d+)', msg)
if m:
info['order_id'] = m.group(1)
# 订单大状态(新订单/交易成功/交易关闭等)
m = re.search(r'订单状态[:]\s*([^\s\[]+)', msg)
if m:
info['order_status'] = m.group(1).strip()
# 支付细状态(等待买家付款/等待发货/交易完成等)
m = re.search(r'\[状态[:]\s*([^\]]+)\]', msg)
if m:
info['pay_status'] = m.group(1).strip()
# 金额
m = re.search(r'金额[:]\s*([\d.]+)元', msg)
if m:
info['amount'] = m.group(1)
# 数量
m = re.search(r'数量[:]\s*(\d+)', msg)
if m:
info['quantity'] = m.group(1)
# 时间格式2026-2-24 19:52:52
m = re.search(r'(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})', msg)
if m:
info['order_time'] = m.group(1).strip()
# 买家备注
m = re.search(r'买家备注[:]\s*([^\n]+)', msg)
if m and m.group(1).strip():
info['buyer_note'] = m.group(1).strip()
return info
def _get_order_instruction(self, pay_status: str, order_status: str) -> str:
"""
根据订单状态生成 AI 指令。
只有「已付款」才需要回复客户,其他状态一律静默。
"""
paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"]
if any(kw in pay_status or kw in order_status for kw in paid_keywords):
return "【已付款-必须回复】客户已付款,立刻自然回复确认收款并告知马上安排。"
else:
# 所有其他状态(待付款、交易完成、关闭等)静默处理
return "【仅系统通知-无需回复客户】这是系统订单通知,不需要回复客户任何内容,直接跳过。"
def _extract_image_url(self, msg: str) -> str:
"""从消息中提取图片URL兼容纯URL和 text#*#url 两种格式"""
urls = self._extract_image_urls(msg)
return urls[0] if urls else ""
def _extract_image_urls(self, msg: str) -> List[str]:
"""提取消息中的所有图片URL去重保序"""
import re
if not msg:
return []
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com", "suning.com")
candidates = re.findall(r'https?://[^\s#]+', msg)
urls: List[str] = []
for u in candidates:
low = u.lower()
if any(ext in low for ext in image_exts) or any(h in low for h in image_hosts):
if u not in urls:
urls.append(u)
return urls
def _strip_urls_from_text(self, msg: str) -> str:
"""去掉 URL 后的纯文本,用于提取额外需求。"""
import re
if not msg:
return ""
text = re.sub(r'https?://\S+', ' ', msg)
text = text.replace("#*#", " ").strip()
text = re.sub(r'\s+', ' ', text)
return text.strip(",。.!?;: ")
def _is_batch_finish_signal(self, text: str) -> bool:
"""客户是否表达“图发完了,可以统一报价”。"""
if not text:
return False
if self._classify_short_customer_text(text) == "finish_signal":
return True
finish_keywords = [
"发完了", "都发完了", "发齐了", "齐了", "先这些", "就这些", "全部", "一起报", "统一报价",
"总共多少钱", "一共多少钱", "打包价", "总价", "报价吧", "报个总价", "给个总价",
"没了", "没有了", "没图了", "就这", "就这张", "就这一张", "就这一个", "就一个",
"先报吧", "报下价", "报个价", "可以报价了", "能报吗",
]
return any(k in text for k in finish_keywords)
def _is_batch_finish_intent(
self,
text: str,
state: ConversationState,
has_incoming_urls: bool,
) -> bool:
"""
语义结束识别:
- 显式口令:发完了/统一报价
- 隐式意图:询价/砍价
- 单图需求明确:如“这个门头上面的字做一下”可直接进入报价
"""
if not text:
return False
if self._is_batch_finish_signal(text):
return True
if has_incoming_urls:
return False
if not state.pending_image_urls:
return False
# 意图识别:询价/砍价通常意味着“可以报价了”
try:
from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords
intent = detect_intent_embedding(text) or detect_intent_keywords(text)
except Exception:
intent = ""
if intent in ("询价", "砍价"):
return True
msg = (text or "").strip()
if not msg:
return False
# 单图场景:客户给出明确加工指令,可直接报价
single_image_action_kw = (
"做一下", "改一下", "处理一下", "就这张", "按这个做", "照这个做",
"这个门头", "上面的字", "这个字", "这个图做", "能做吗",
)
multi_image_finish_kw = (
"就这些", "就这几张", "按这几张", "这几张一起做", "一起做一下",
"先按这些", "先按这几张", "直接报价", "现在报价", "看下报价",
"先报个总价", "总价多少", "一起多少钱", "先做这几张",
)
hold_kw = ("还有", "再发", "先等", "稍后", "等会", "回头")
if len(state.pending_image_urls) == 1:
if any(k in msg for k in single_image_action_kw) and not any(k in msg for k in hold_kw):
return True
elif len(state.pending_image_urls) >= 2:
if any(k in msg for k in multi_image_finish_kw) and not any(k in msg for k in hold_kw):
return True
if self._is_cross_image_composite_intent(msg) and not any(k in msg for k in hold_kw):
return True
return False
@staticmethod
def _is_cross_image_composite_intent(text: str) -> bool:
"""
识别多图跨图修改意图A图元素放到B图
A图的图案转到B图、这个图案放到另一张上。
"""
s = (text or "").strip()
if not s:
return False
pair_marks = ("a图", "b图", "第一张", "第二张", "这张", "那张", "上一张", "另一张")
op_kw = (
"转到", "换到", "放到", "贴到", "移到", "套到", "合成", "融合", "替换到",
"图案上去", "字放到", "元素放到", "logo放到",
)
return any(k in s.lower() for k in pair_marks) and any(k in s for k in op_kw)
@staticmethod
def _is_related_image_followup_intent(text: str) -> bool:
"""
识别“新发的是上一张的截图/局部细节”的关联意图。
这类输入应与前图关联处理,避免当成完全独立需求。
"""
s = (text or "").strip().lower()
if not s:
return False
relation_kw = (
"截图", "截屏", "局部", "细节", "放大", "裁剪", "同一张", "同一幅",
"上一张", "上张", "前一张", "前面那张", "刚才那张", "这个是上面",
"这个是那张", "补一张细节", "补个截图",
)
return any(k in s for k in relation_kw)
@staticmethod
def _is_result_followup_query(text: str) -> bool:
"""识别客户在找图流程中的结果/进度追问。"""
short_type = CustomerServiceAgent._classify_short_customer_text(text)
if short_type == "progress_query":
return True
s = (text or "").strip()
if not s:
return False
followup_kw = (
"找到了吗", "没找到吗", "找到没", "找到没有", "找到了没", "有吗", "有没", "有没有",
"有结果吗", "结果呢",
"进度", "多久好", "什么时候好", "好了没", "弄好了吗", "做了没",
"你重新发", "重新发给我", "高清", "发我",
)
if any(k in s for k in followup_kw):
return True
return s in {"?", "", "在吗", "人呢"}
@staticmethod
def _classify_short_customer_text(text: str) -> str:
"""
短句分类器(状态机前置):
- finish_signal: 发图完成,可报价
- progress_query: 追问进度/结果
- ack: 简短确认
- unknown: 未识别
"""
s = (text or "").strip()
if not s:
return "unknown"
if len(s) > 8:
return "unknown"
finish_kw = (
"没了", "没有了", "就这", "就这张", "就这一张", "就这一个", "就一个",
"先这些", "就这些", "发完了", "都发完了",
)
if any(k in s for k in finish_kw):
return "finish_signal"
progress_kw = (
"有吗", "有没", "有没有", "找到了吗", "找到了没", "没找到吗", "找到没", "找到没有",
"进度", "结果", "多久好", "什么时候好", "好了没", "弄好了吗", "做了没",
"高清", "发我", "重新发", "你重新发给我",
)
if any(k in s for k in progress_kw) or s in {"?", "", "在吗", "人呢"}:
return "progress_query"
ack_kw = ("", "嗯嗯", "", "好的", "", "可以", "ok", "OK", "收到", "明白")
if s in ack_kw:
return "ack"
return "unknown"
def _build_collect_ack(self, count: int, related_followup: bool = False) -> str:
if related_followup and count >= 2:
related_templates = [
"这张我收到了,看起来是上一张的截图/细节图,我按同一单一起处理。还有补充就继续发。",
"收到,这张是关联补图我记上了(按同一需求处理)。你还有图就继续发。",
"明白,这张是前图的局部截图,我会和前面那张一起算,不会分开漏掉。",
]
return random.choice(related_templates)
if count <= 1:
one_templates = [
"这张收到啦,还有图就继续发,我一起给你看。",
"图我看到了,后面还有就接着发,最后我一口价给你。",
"收到这张了,你有其他图也发来,我统一帮你算。",
"这张我先记上了,你那边还有的话接着发,我一起给你报。",
"第1张收到你继续发就行发完我这边一次给你算清楚。",
"这张没问题,我先收着。要是还有图,你直接连着发我就行。",
"我先看到了这张,你后面还有就一起发来,我统一给你报价。",
"这张图我已经记下了,后面有补充就继续甩过来哈。",
]
return random.choice(one_templates)
templates = [
"这几张我都收到了(现在{n}张)。还有的话继续发,我一起给你报。",
"好嘞,先看到{n}张了。你可以继续发,或者直接说“就这些”我现在就报价。",
"收到哈(共{n}张)。你还要补图就继续发,不补的话我现在也可以直接给价。",
"我这边先收到了{n}张。你继续补图,或者直接说“按这些算”我就开始报。",
"这波我已经记了{n}张,你要是还有就接着发,不补的话我立刻给总价。",
"先看到{n}张图了,后面你看是继续发,还是直接让我现在报价都可以。",
"好的,目前{n}张到位。你一句“就这些”,我马上给你打包价。",
"图我都看到了({n}张)。你还发我就继续收,不发我现在就给你报。",
]
return random.choice(templates).format(n=count)
def _build_collect_progress_reply(self, count: int) -> str:
if count <= 1:
templates = [
"我这边在处理了,这张有结果我第一时间回你。",
"在跟进中,这张一有进展我马上发你。",
"这张我正在看,稍等我一会儿,结果出来就回你。",
]
return random.choice(templates)
templates = [
"我这边在按你这{n}张一起处理,有结果我立刻同步你。",
"正在跟进这{n}张,出结果我第一时间发你,不会漏。",
"进度在跑了(共{n}张),你稍等一下,我这边有结果马上回。",
]
return random.choice(templates).format(n=count)
def _build_collect_remind(self, count: int) -> str:
if count <= 1:
one_templates = [
"这个要求我记住了。你还有图就继续发,不补图我就按这张给你报价。",
"明白,这个需求我加上了。你继续发图也行,想直接报价也可以。",
"我先记下这张。你如果是要我找图,不是做图,直接说一声,我按找图思路给你走。",
"收到,这张我先按你的要求记好了。就做这一张的话,我现在直接给你报实价。",
"你这要求我记下了,后面还有图就发,没有的话我现在直接算价。",
"行,我按你这个要求来。继续补图也行,不补我就先报这张。",
"这个点我懂了,你还要补图就接着发,不补我立刻给你报价。",
"要求我已经加上了。你看是继续发,还是我现在直接报这张。",
]
return random.choice(one_templates)
templates = [
"需求我记下了(当前{n}张)。你继续补图,或者直接说“就这些”我现在报价。",
"好,这个要求也加上了(现在{n}张)。不再补图的话我立刻给你打包价。",
"收到(共{n}张)。你还发就继续,不发的话我现在就给总价。",
"这个需求我加进去了(现在{n}张)。你继续发也行,直接报价也行。",
"我这边都记好了({n}张+需求)。你一句“先按这些算”,我马上报价。",
"要求同步好了,目前{n}张。要补图继续发,不补图我现在就给你打包价。",
"行,需求和图片我都收着了({n}张)。你直接让我报价也可以。",
"好的,这条需求也算进去了(共{n}张)。你看要不要我现在直接报。",
]
return random.choice(templates).format(n=count)
@staticmethod
def _is_find_image_not_edit_conflict(text: str) -> bool:
"""识别客户明确声明“要找图,不是做图”的冲突语义。"""
s = (text or "").strip()
if not s:
return False
find_kw = ("找图", "找原图", "找素材", "找同款")
deny_edit_kw = ("不是让你做图", "不是做图", "不用做图", "不需要做图", "不是修图", "不用修图")
return any(k in s for k in find_kw) and any(k in s for k in deny_edit_kw)
@staticmethod
def _needs_clarification_in_collecting(text: str) -> bool:
"""
信息不足时先追问,不急着报价。
例:这个也是大图 / 一共几个图 / 啥意思 / 没明白
"""
s = (text or "").strip()
if not s:
return False
short_non_vague_kw = (
"", "?", "没了", "没有了", "就这", "", "好的", "ok", "报价",
"找到了吗", "没找到吗", "找到没", "找到了没", "有吗", "有没", "有没有",
"多久好", "什么时候好", "高清",
)
if len(s) <= 4:
if any(k in s for k in short_non_vague_kw):
return False
return True
vague_kw = (
"这个也是", "一共几个图", "几个图", "啥意思", "没明白", "什么意思",
"这个呢", "这个可以吗", "然后呢", "咋办", "怎么搞",
)
return any(k in s for k in vague_kw)
def _build_find_image_clarify_reply(self, state: ConversationState) -> str:
count = len(state.pending_image_urls or [])
return (
f"明白,你是要我帮你找图,不是做图。现在我这边先记了{count}张,"
"你告诉我具体要找哪种:原图/同款/高清版,我按这个方向给你找。"
)
@staticmethod
def _build_not_understood_reply() -> str:
"""信息不足时的澄清话术(随机)。"""
templates = [
"不好意思,不太懂你的意思,你再具体说下哈。",
"抱歉我这边没完全理解,你可以换个说法再说一次吗?",
"我有点没听明白,你是要找图还是要做图呀?",
"不好意思我没抓到重点,你再补一句我就能接着处理。",
"这句我理解得不太准,你再说具体一点我马上给你办。",
"抱歉,这里我没太看懂。你是想让我找原图,还是按图处理?",
"我这边还没完全明白你的意思,麻烦你再具体描述一下。",
"不好意思,这条我没读懂,你再详细说一点我马上跟上。",
]
return random.choice(templates)
def _append_requirement(self, state: ConversationState, text: str):
"""追加需求并做去重/截断,减少上下文噪音。"""
t = (text or "").strip()
if not t:
return
t = t[:120]
if state.pending_requirements and state.pending_requirements[-1] == t:
return
if t in state.pending_requirements[-5:]:
return
state.pending_requirements.append(t)
if len(state.pending_requirements) > 20:
state.pending_requirements = state.pending_requirements[-20:]
def _calc_requirement_surcharge(self, requirements: List[str]) -> Dict[str, Any]:
"""
把客户补充需求做成结构化加价,避免纯靠模型自由发挥导致价格波动。
返回:
{"extra": int, "hits": List[str]}
"""
text = " ".join(requirements or [])
rules = [
(["分层", "psd", "源文件"], 30, "分层/源文件"),
(["去背景", "抠图", "透明底", "白底"], 5, "去背景"),
(["换背景", "换场景", "合成", "转到", "换到", "放到", "贴到", "移到", "套到", "图案上去", "元素放到"], 10, "跨图合成/换背景"),
(["改字", "改文字", "替换文字", "排版"], 10, "改文字/排版"),
(["调色", "改色", "换色", "配色"], 5, "调色"),
(["多版本", "多个版本", "两版", "三版"], 10, "多版本"),
(["加急", "今天要", "马上要", "尽快"], 10, "加急"),
]
total = 0
hits: List[str] = []
for keywords, fee, label in rules:
if any(k in text for k in keywords):
total += fee
hits.append(f"{label}+{fee}")
# 防止需求加价过高,做个上限保护
total = min(total, 60)
# 金额统一 5 的倍数
total = round(total / 5) * 5
return {"extra": total, "hits": hits}
def _build_batch_quote_reply(
self,
results: List[Tuple[str, Dict[str, Any]]],
total_suggest: int,
bundle_price: int,
req_fee: Dict[str, Any],
) -> str:
"""构建分图明细 + 单条总报价可选项回复。"""
complexity_map = {
"simple": "简单",
"normal": "常规",
"complex": "复杂",
"hard": "高难",
}
detail_lines: List[str] = []
for i, (_, r) in enumerate(results, 1):
p = int(r.get("price_suggest", 20) or 20)
cx = complexity_map.get(str(r.get("complexity", "normal")), "常规")
reason = str(r.get("reason", "常规处理")).replace("\n", " ").strip()
if len(reason) > 18:
reason = reason[:18] + "..."
detail_lines.append(f"{i}{p}元({cx}{reason}")
extra = int(req_fee.get("extra", 0) or 0)
single_total = round((total_suggest + extra) / 5) * 5
req_hit = "".join(req_fee.get("hits", [])) if req_fee.get("hits") else ""
# 单图时不要使用“分图/这批/A-B方案”措辞避免客户误解为多图。
if len(results) == 1:
line = detail_lines[0].replace("图1", "这张:")
heads = [
"这张我看过了,先给你报下:",
"这张可以做,价格给你报下:",
"看了这张图,报价如下:",
"我先按这张给你算下:",
"这张处理没问题,我给你报个实在价:",
"我看完这张了,价格给你说下:",
"按这张图的难度,报价是:",
"这张我已经评估完了,先给你个价格:",
]
lines = [f"{random.choice(heads)}{line.split('', 1)[1]}"]
if req_hit:
lines.append(f"按你的需求另加{extra}元({req_hit})。")
tails = [
f"这张做下来共{single_total}元,定了我马上开工。",
f"合下来是{single_total}元,你点头我这边立刻安排。",
f"总价{single_total}元,可以的话我现在就给你做。",
f"这一张算下来{single_total}元,你说开做我就马上弄。",
f"给你按{single_total}元做,确定的话我现在就排上。",
f"这张我按{single_total}元给你做,没问题就直接开始。",
f"这张最终{single_total}元,你点头我立刻开干。",
f"这张就按{single_total}元走,你确认我就马上安排。",
]
lines.append(random.choice(tails))
return "\n".join(lines)
heads = [
"我先按这几张给你报一下:",
"这几张我都看过了,价格给你列一下:",
"我把每张价格先给你说清楚:",
"我先把这几张的价格拆开给你看:",
"这几张我都评估过了,报价给你写明白:",
"先别急,我把每张大概价给你列出来:",
"我按这批图先报个明细给你:",
"我先把每张费用和总价给你算出来:",
]
lines = [random.choice(heads)]
lines.extend(detail_lines)
if req_hit:
lines.append(f"需求加价:+{extra}元({req_hit}")
option_line = random.choice([
f"可选:按单张做(共{single_total}元),或打包做({bundle_price}元,会更省一点)。",
f"可选:单张算下来一共{single_total}元;打包给你{bundle_price}元,更划算。",
f"可选:你按单张做共{single_total}元,按打包做我给你{bundle_price}元。",
f"可选:分开做总共{single_total}元,打包做{bundle_price}元(省一点)。",
f"可选:按张算共{single_total}元;直接打包{bundle_price}元。",
])
lines.append(option_line)
lines.append(random.choice([
"你定一个,我这边马上开工。",
"你选个方案,我立刻给你安排上。",
"你拍板就行,我这边马上开做。",
"你看选哪个合适,我这边马上给你做。",
"你一句话定下来,我现在就给你安排。",
]))
return "\n".join(lines)
def _prepare_batch_intake(self, state: ConversationState) -> Dict[str, Any]:
"""Stage 1: 收集阶段,标准化输入并做上限约束。"""
urls = list(state.pending_image_urls)
if not urls:
return {"ok": False, "reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False}
try:
from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY
max_images = max(1, int(BATCH_MAX_IMAGES))
analyze_concurrency = max(1, int(BATCH_ANALYZE_CONCURRENCY))
except Exception:
max_images = 12
analyze_concurrency = 3
if len(urls) > max_images:
return {
"ok": False,
"reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。",
"need_transfer": False,
}
return {
"ok": True,
"urls": urls[:max_images],
"requirements": list(state.pending_requirements or []),
"analyze_concurrency": analyze_concurrency,
}
async def _run_batch_feasibility(self, urls: List[str], concurrency: int) -> List[Tuple[str, Dict[str, Any]]]:
"""Stage 2: 可做性分析(逐图)。"""
from image.image_analyzer import image_analyzer
sem = asyncio.Semaphore(max(1, concurrency))
async def _analyze_one(url: str):
async with sem:
try:
r = await image_analyzer.analyze(url)
except Exception:
r = {
"complexity": "normal",
"reason": "识别异常,按常规估价",
"price_min": 15,
"price_max": 25,
"price_suggest": 20,
"success": False,
}
return url, r
return list(await asyncio.gather(*[_analyze_one(u) for u in urls]))
async def _sync_batch_analysis_to_workflow(self, results: List[Tuple[str, Dict[str, Any]]], message: CustomerMessage) -> None:
for url, r in results:
try:
from core.workflow import workflow
await workflow.image_analysis_result(
customer_id=message.from_id,
image_url=url,
complexity=r.get("complexity", "normal"),
acc_id=message.acc_id,
acc_type=message.acc_type,
gemini_prompt=r.get("gemini_prompt", ""),
aspect_ratio=r.get("aspect_ratio", "1:1"),
perspective=r.get("perspective", "no"),
proc_type=r.get("proc_type", ""),
subject=r.get("subject", ""),
quality=r.get("quality", ""),
)
except Exception as e:
print(f"[Agent] Workflow 批量任务创建失败: {e}")
def _assess_batch_risk(self, results: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, List[str]]:
"""Stage 2.5: 分离可做和风险图。"""
unsafe: List[str] = []
dense_text_reject: List[str] = []
for i, (_, r) in enumerate(results, 1):
if r.get("feasibility") == "no" or r.get("risk") == "high":
unsafe.append(f"{i}")
note = str(r.get("note", "") or "")
if "文字内容过于密集" in note or "密集文字" in note:
dense_text_reject.append(f"{i}")
return {"unsafe": unsafe, "dense_text_reject": dense_text_reject}
def _build_batch_pricing_plan(
self,
results: List[Tuple[str, Dict[str, Any]]],
requirements: List[str],
) -> Dict[str, Any]:
"""Stage 3: 报价计算(图片成本 + 需求加价 + 打包价)。"""
total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results)
req_fee = self._calc_requirement_surcharge(requirements)
if len(results) == 2:
bundle_price = max(10, total_suggest - 5)
elif len(results) >= 3:
bundle_price = max(10, round(total_suggest * 0.9 / 5) * 5)
else:
bundle_price = total_suggest
bundle_price += int(req_fee.get("extra", 0) or 0)
bundle_price = round(bundle_price / 5) * 5
return {
"total_suggest": total_suggest,
"req_fee": req_fee,
"bundle_price": bundle_price,
}
async def _try_batch_auto_process(
self,
results: List[Tuple[str, Dict[str, Any]]],
message: CustomerMessage,
req_fee: Dict[str, Any],
) -> Dict[str, Any]:
"""Stage 4-A: 自动处理+图绘链接。失败时回退到需求澄清。"""
links = []
try:
from image.image_processor import image_processor
from utils.image_queue import run_with_queue
for idx, (url, r) in enumerate(results, 1):
req_parts = [f"complexity:{r.get('complexity', 'normal')}"]
if r.get("gemini_prompt"):
req_parts.append(f"prompt:{r.get('gemini_prompt')}")
if r.get("aspect_ratio"):
req_parts.append(f"ratio:{r.get('aspect_ratio')}")
if r.get("perspective") and r.get("perspective") != "no":
req_parts.append(f"perspective:{r.get('perspective')}")
if r.get("proc_type"):
req_parts.append(f"proc_type:{r.get('proc_type')}")
if r.get("subject"):
req_parts.append(f"subject:{r.get('subject')}")
if r.get("quality"):
req_parts.append(f"quality:{r.get('quality')}")
process_res = await run_with_queue(image_processor.process_image(
url,
"enhance",
requirements="|".join(req_parts),
gemini_prompt=r.get("gemini_prompt", ""),
aspect_ratio=r.get("aspect_ratio", "1:1"),
perspective=r.get("perspective", "no"),
proc_type=r.get("proc_type", ""),
subject=r.get("subject", ""),
quality=r.get("quality", ""),
))
if not process_res.get("success"):
raise RuntimeError(process_res.get("message", "图片处理失败"))
ok, link, _ = await upload_to_tuhui(
process_res["result_path"],
title=f"客户{message.from_id[-4:]}-图片{idx}",
description="AI自动处理结果",
price=max(10, int(r.get("price_suggest", 20) or 20) + int(req_fee.get("extra", 0) or 0) // max(1, len(results))),
)
if not ok:
raise RuntimeError(str(link))
links.append(link)
except Exception as e:
print(f"[Agent] 找图自动处理失败,回退需求澄清: {e}")
return {
"reply": "这种可以做类似款。你先说下具体需求:要几张、是否改字、尺寸比例、交付格式(单图/打包链接),我按需求给你直接做。",
"need_transfer": False,
}
lines = ["找到了,链接如下:"]
for i, link in enumerate(links, 1):
lines.append(f"链接{i}{link}")
return {"reply": "\n".join(lines), "need_transfer": False}
def _finalize_batch_state(self, state: ConversationState, customer_id: str, final_price: int = 0):
if final_price > 0:
state.last_price = final_price
try:
from db.customer_db import db
db.update_last_price(customer_id, final_price)
except Exception:
pass
state.pending_image_urls.clear()
state.pending_requirements.clear()
self._refresh_quote_phase(state, "idle")
self._sync_pending_quote_state(customer_id, state)
async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]:
"""
统一报价主流程(分层):
1) Intake 收集
2) Feasibility 可做性
3) Pricing 报价
4) Router 自动处理/报价/转人工
"""
intake = self._prepare_batch_intake(state)
if not intake.get("ok", False):
return {"reply": intake.get("reply", ""), "need_transfer": bool(intake.get("need_transfer", False))}
urls = intake["urls"]
requirements = intake["requirements"]
analyze_concurrency = int(intake["analyze_concurrency"])
results = await self._run_batch_feasibility(urls=urls, concurrency=analyze_concurrency)
await self._sync_batch_analysis_to_workflow(results=results, message=message)
risk = self._assess_batch_risk(results)
unsafe = risk["unsafe"]
dense_text_reject = risk["dense_text_reject"]
if unsafe:
self._finalize_batch_state(state, message.from_id, final_price=0)
if dense_text_reject and len(dense_text_reject) == len(unsafe):
return {"reply": self._build_reject_message("文字密集类图片暂不接单"), "need_transfer": False}
return {
"reply": f"这批里{''.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。",
"need_transfer": True,
}
pricing = self._build_batch_pricing_plan(results=results, requirements=requirements)
total_suggest = int(pricing["total_suggest"])
bundle_price = int(pricing["bundle_price"])
req_fee = pricing["req_fee"]
intent_text = (message.msg or "") + " " + " ".join(requirements[-5:])
workflow_type, _ = self.workflow_router.detect_workflow(intent_text)
if workflow_type == "find_image":
route_res = await self._try_batch_auto_process(
results=results,
message=message,
req_fee=req_fee,
)
self._finalize_batch_state(state, message.from_id, final_price=bundle_price)
return route_res
reply_text = self._build_batch_quote_reply(
results=results,
total_suggest=total_suggest,
bundle_price=bundle_price,
req_fee=req_fee,
)
self._finalize_batch_state(state, message.from_id, final_price=bundle_price)
return {"reply": reply_text, "need_transfer": False}
def _split_customer_text(self, msg: str) -> tuple:
"""
把混合消息拆分为(客户真实文字, 系统订单块)。
平台有时把客户文字和系统订单通知拼在同一条消息里。
"""
import re
# 找到系统订单块的起始位置
order_marker = re.search(r'\[系统订单信息\]|\[系统通知\]', msg)
if order_marker:
customer_text = msg[:order_marker.start()].strip()
order_block = msg[order_marker.start():].strip()
else:
customer_text = msg.strip()
order_block = ""
return customer_text, order_block
def _build_prompt(self, message: CustomerMessage, state: ConversationState) -> str:
"""构建提示词"""
msg_content = message.msg
stage_info = f"【当前阶段】{state.stage}"
# 拆分:客户文字 vs 系统订单块
customer_text, order_block = self._split_customer_text(msg_content)
has_order = bool(order_block)
if has_order:
order = self._parse_order_info(order_block)
if order.get('order_id'):
state.last_order_id = order['order_id']
stage_info += f"\n【订单号】{order['order_id']}"
if order.get('order_status'):
state.order_status = order['order_status']
stage_info += f"\n【订单状态】{order['order_status']}"
if order.get('pay_status'):
stage_info += f"\n【支付状态】{order['pay_status']}"
if order.get('amount'):
stage_info += f"\n【订单金额】{order['amount']}"
if order.get('quantity'):
stage_info += f"\n【数量】{order['quantity']}"
if order.get('order_time'):
stage_info += f"\n【下单时间】{order['order_time']}"
if order.get('buyer_note'):
stage_info += f"\n【买家备注】{order['buyer_note']}"
if state.discount_count > 0:
stage_info += f"\n【客户压价次数】{state.discount_count}"
# 店铺类型:不同店铺不同回复策略
shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "")
shop_hint = ""
try:
from config.config import CONFIG_DIR
import json
cfg_path = CONFIG_DIR / "shop_prompts.json"
if cfg_path.exists():
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
hints = cfg.get("type_hints", {})
shop_hint = hints.get(shop_type, "")
if not shop_hint and message.acc_id:
sh = cfg.get("shops", {}).get(message.acc_id, {})
shop_hint = sh.get("hint", "")
except Exception:
pass
prompt = f"""收到新消息:
{stage_info}
发送者: {message.from_name} ({message.from_id})
"""
if message.goods_name:
prompt += f"商品名称: {message.goods_name}\n"
if shop_hint:
prompt += f"\n{shop_hint}\n"
# ── 优先处理客户真实问题 ──
# ── 判断订单付款状态(供后续逻辑使用)──
order_paid = False
order_unpaid = False
if has_order:
order = self._parse_order_info(order_block)
paid_kws = ["等待发货", "已付款", "付款成功", "买家已付款"]
unpaid_kws = ["等待买家付款", "待付款", "未付款"]
ps = order.get('pay_status', '')
os_ = order.get('order_status', '')
if any(kw in ps or kw in os_ for kw in paid_kws):
order_paid = True
elif any(kw in ps or kw in os_ for kw in unpaid_kws):
order_unpaid = True
# ── 催单/进度询问关键词 ──
progress_keywords = [
"安排了吗", "安排好了吗", "好了吗", "做了吗", "做好了吗",
"弄好了吗", "好了没", "做了没", "什么时候好", "多久好",
"进度", "催一下", "快点", "什么时候能好", "做完了吗",
]
if customer_text:
prompt += f"\n客户说:{customer_text}\n"
image_url = self._extract_image_url(customer_text)
price_keywords = ["多少钱", "多少", "价格", "几块", "怎么收费", "报个价"]
size_keywords = [
"尺寸", "比例", "", "", "", "厘米", "mm", "cm",
"横版", "竖版", "2米", "3米", "改成", "做成",
]
has_size_change = any(kw in customer_text.lower() for kw in [k.lower() for k in size_keywords])
# gemini_api 店铺:不触发找图流程,按 API 客服回复
if shop_type == "gemini_api":
prompt += "\n【Gemini API 店铺】客户问账号/pro/续费/套餐等,按 API 客服自然回复,不要求发图。"
elif image_url:
prompt += "\n客户在继续发图阶段:先确认“已收图”,并引导客户把图和要求一次发完;等客户明确“发完了/统一报价”后再统一报价。"
elif any(kw in customer_text for kw in price_keywords):
last_url = self._extract_image_url(msg_content)
if last_url:
prompt += "\n客户在询问价格:若客户已确认发完,则给总报价;若还在发图,先引导发完后统一报价。"
else:
prompt += "\n客户在询问价格但未发图:先简短承接(如“在看呢/收到”),不要机械连发;再自然引导对方发图。"
if has_size_change:
prompt += (
"\n⚠️ 尺寸改动场景:优先判断图片主体是否会被拉伸变形,"
"不是只看整张图宽高比。若会变形,要先提示“需要补图/扩边”,再给报价。"
)
elif has_size_change:
prompt += (
"\n客户在改尺寸/改比例:先按主体比例判断是否会变形,"
"不是只看整图比例。若目标尺寸会拉伸主体,先明确说明要补图(如上下补图/扩边)再报价。"
)
elif any(kw in customer_text for kw in progress_keywords):
# 客户问进度/催单,必须先核查付款状态
if order_unpaid:
prompt += "\n⚠️【订单未付款】客户问安排进度,但订单还未付款。自然告知拍下付款后马上安排即可。"
elif order_paid:
prompt += "\n客户催单,订单已付款,自然回复在做了/快了之类。"
else:
prompt += "\n客户催单,查询当前处理状态后自然回复。"
elif any(kw in customer_text for kw in ["", "有点贵", "太贵", "算了", "便宜点", "少点", "打折", "贵哦"]):
# 客户嫌贵/要放弃 → 直接让价一次,不问「什么问题」
prompt += "\n⚠️【客户嫌贵】客户已表达价格顾虑或要放弃。禁止追问「什么问题」「说清楚点」。\n→ 第一步:直接让价一次(如原价 20→15话术自然。\n→ 第二步:如果让价后客户还是嫌贵,引导客户自己去注册星绘平台:\n - 每个话术都必须带网址 xinhui.cloud\n - 强调「自己去弄」「自己操作」「自己下单」\n - 示例:「嫌贵就去 xinhui.cloud 注册个号自己有余额可以弄」「xinhui.cloud 去注册下,新用户送体验金,自己弄便宜」\n - 语气直接,让客户自己去操作」"
elif any(kw in customer_text for kw in ["擦边", "黄色", "色情", "大尺度", "性感图", "露点", "半裸"]):
# 客户问擦边/黄色内容 → 直接拒绝,不说「发图来看看」
prompt += "\n⚠️【拒绝】客户询问擦边/黄色/敏感内容。直接拒绝,不接单,不说「发图来看看」。自然回复如:这类不做/不接/做不了。"
else:
prompt += "\n根据客户说的内容自然回应,像真人聊天,不要套模板。"
# ── 附加订单信息(不覆盖客户问题的优先级)──
if has_order:
order = self._parse_order_info(order_block)
order_instruction = self._get_order_instruction(
order.get('pay_status', ''),
order.get('order_status', '')
)
if customer_text:
if not order_unpaid:
# 未付款情况已在上面明确处理,不重复添加背景
prompt += f"\n\n【背景参考-订单通知】{order_instruction}"
else:
# 纯系统通知,没有客户文字
prompt += f"\n\n{order_instruction}"
if not customer_text and not has_order:
prompt += f"\n消息内容: {msg_content}\n请按工作流规则回复。"
return prompt
async def _handle_image_workflow(self, message: str, data: dict, image_urls: list) -> bool:
"""处理图片工作流(根据客户说的话判断执行哪种工作流)"""
if not image_urls:
return False
workflow_type, confidence = self.workflow_router.detect_workflow(message)
customer_id = data.get('from_id')
acc_id = data.get('acc_id', '')
acc_type = data.get('acc_type', 'AliWorkbench')
image_url = image_urls[0]
print(f"[Agent] 检测到工作流类型:{workflow_type} (置信度:{confidence})")
if workflow_type == "find_image":
print(f"[Agent] 执行查找图片工作流 | 客户:{customer_id}")
from core.workflow import workflow
return await workflow.find_image_workflow(
customer_id=customer_id,
image_url=image_url,
acc_id=acc_id,
acc_type=acc_type
)
elif workflow_type == "process_image":
print(f"[Agent] 执行处理图片工作流 | 客户:{customer_id}")
from core.workflow import workflow
return await workflow.process_image_workflow(
customer_id=customer_id,
image_url=image_url,
acc_id=acc_id,
acc_type=acc_type
)
elif workflow_type == "transfer_human":
print(f"[Agent] 执行转人工派单工作流 | 客户:{customer_id}")
from core.workflow import workflow
return await workflow.transfer_to_designer_workflow(
customer_id=customer_id,
image_url=image_url,
acc_id=acc_id,
acc_type=acc_type,
reason="客户主动要求转人工"
)
return False
async def test_agent():
"""测试 Agent"""
agent = CustomerServiceAgent(skills_dir="skills")
test_msg = CustomerMessage(
msg_id="123",
acc_id="test_account",
msg="这张图可以做吗?",
from_id="customer001",
from_name="张三",
cy_id="customer001",
acc_type="AliWorkbench",
msg_type=0,
cy_name="张三",
goods_name="专业找图代找高清图片",
goods_order=""
)
response = await agent.process_message(test_msg)
print(f"回复内容: {response.reply}")
if __name__ == "__main__":
import asyncio
asyncio.run(test_agent())