2435 lines
110 KiB
Python
Executable File
2435 lines
110 KiB
Python
Executable File
"""PydanticAI Agent 模块
|
||
|
||
架构:单 Agent + 多 Tool 模式
|
||
- Agent 负责对话逻辑和决策
|
||
- Tool 负责具体能力:看图/查客户/转接
|
||
- AI 自主决定何时调用哪个工具,时序自然,不需要外部协调
|
||
"""
|
||
import os
|
||
import glob
|
||
import asyncio
|
||
import random
|
||
import hashlib
|
||
import re
|
||
from typing import Optional, Dict, List, Any, Tuple
|
||
from datetime import datetime
|
||
from pydantic import BaseModel, Field
|
||
from pydantic_ai import Agent, RunContext
|
||
from pydantic_ai.models.openai import OpenAIChatModel
|
||
from pydantic_ai.providers.openai import OpenAIProvider
|
||
from dotenv import load_dotenv
|
||
from utils.metrics_tracker import emit as metrics_emit
|
||
|
||
load_dotenv()
|
||
|
||
from services.service_tuhui_upload import upload_to_tuhui
|
||
from core.workflow_router import get_workflow_router
|
||
from core.workflow_router import get_workflow_router
|
||
|
||
# ========== 企业微信通知 ==========
|
||
_WECHAT_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cc88bdef-a13f-4d7e-bdb6-ee51b68b8205"
|
||
|
||
|
||
async def _notify_wechat(content: str, tag: str = "通知"):
|
||
"""发送企业微信 markdown 通知,任何异常都发"""
|
||
if not _WECHAT_WEBHOOK:
|
||
print(f"[{tag}] 未配置 WECHAT_WEBHOOK,跳过推送")
|
||
return
|
||
try:
|
||
import httpx
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(_WECHAT_WEBHOOK, json={
|
||
"msgtype": "markdown",
|
||
"markdown": {"content": content}
|
||
})
|
||
data = resp.json()
|
||
if data.get("errcode") == 0:
|
||
print(f"[{tag}] 企业微信推送成功 ✓")
|
||
else:
|
||
print(f"[{tag}] 企业微信推送失败: {data}")
|
||
except Exception as e:
|
||
print(f"[{tag}] 企业微信发送异常: {e}")
|
||
|
||
|
||
async def _notify_wechat_overdue():
|
||
"""API 欠费时发企业微信通知"""
|
||
await _notify_wechat(
|
||
"⚠️ **火山引擎 API 欠费**,客服AI已停止响应,请立即充值!\n"
|
||
"地址:https://console.volcengine.com/ark"
|
||
)
|
||
|
||
|
||
# ========== 转接常量 ==========
|
||
TRANSFER_MESSAGE = "话术|[转移会话],分组20252916034,无原因"
|
||
|
||
|
||
# ========== 数据模型 ==========
|
||
|
||
|
||
class CustomerMessage(BaseModel):
|
||
"""客户消息模型"""
|
||
msg_id: str
|
||
acc_id: str
|
||
msg: str
|
||
from_id: str
|
||
from_name: str
|
||
cy_id: str
|
||
acc_type: str
|
||
msg_type: int
|
||
cy_name: str
|
||
goods_name: Optional[str] = None
|
||
goods_order: Optional[str] = None
|
||
|
||
|
||
class ConversationState(BaseModel):
|
||
"""对话状态"""
|
||
customer_id: str
|
||
stage: str = "售前" # 售前/售后
|
||
last_price: Optional[int] = None # 最后报价
|
||
last_min_price: Optional[int] = None # 最近图片的最低价
|
||
last_order_id: Optional[str] = None # 订单号
|
||
order_status: Optional[str] = None # 订单状态
|
||
discount_count: int = 0 # 让价次数
|
||
image_count: int = 0 # 图片数量
|
||
pending_image_urls: List[str] = Field(default_factory=list) # 待统一报价图片
|
||
pending_requirements: List[str] = Field(default_factory=list) # 待统一报价需求
|
||
last_update: str = ""
|
||
last_reply_at: Optional[datetime] = None # 最后一次回复客户的时间
|
||
|
||
|
||
class AgentDeps(BaseModel):
|
||
"""Agent 依赖项 - 用于传递上下文"""
|
||
msg_id: str
|
||
acc_id: str
|
||
from_id: str
|
||
platform: str
|
||
|
||
|
||
class AgentResponse(BaseModel):
|
||
"""Agent 回复模型"""
|
||
reply: str
|
||
should_reply: bool = True
|
||
need_transfer: bool = False # 是否需要转人工
|
||
transfer_msg: str = "" # 转接消息
|
||
|
||
|
||
def _get_shop_type(acc_id: str = "", goods_name: str = "") -> str:
|
||
"""根据 acc_id 或 goods_name 判断店铺类型,返回 gemini_api / find_image / default"""
|
||
try:
|
||
from config.config import CONFIG_DIR
|
||
import json
|
||
cfg_path = CONFIG_DIR / "shop_prompts.json"
|
||
if not cfg_path.exists():
|
||
return "find_image"
|
||
with open(cfg_path, "r", encoding="utf-8") as f:
|
||
cfg = json.load(f)
|
||
shops = cfg.get("shops", {})
|
||
goods_kw = cfg.get("goods_keywords", {})
|
||
type_hints = cfg.get("type_hints", {})
|
||
# 优先按 acc_id
|
||
if acc_id and acc_id in shops:
|
||
return shops[acc_id].get("type", "find_image")
|
||
# 按商品名关键词
|
||
goods_lower = (goods_name or "").lower()
|
||
for kw, stype in goods_kw.items():
|
||
if kw in goods_lower:
|
||
return stype
|
||
except Exception:
|
||
pass
|
||
return "find_image"
|
||
|
||
|
||
def load_skill_md(skills_dir: str = "skills") -> str:
|
||
"""加载 skills 目录下的所有 SKILL.md 文件内容"""
|
||
skill_contents = []
|
||
skill_files = glob.glob(os.path.join(skills_dir, "**/SKILL.md"), recursive=True)
|
||
for skill_file in skill_files:
|
||
try:
|
||
with open(skill_file, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
skill_contents.append(content)
|
||
except Exception as e:
|
||
print(f"警告: 读取 {skill_file} 失败: {e}")
|
||
|
||
return "\n\n".join(skill_contents)
|
||
|
||
|
||
class CustomerServiceAgent:
|
||
"""客服 Agent - 支持 SKILL.md + 工作流"""
|
||
C_RESET = "\033[0m"
|
||
C_PROMPT = "\033[96m" # cyan
|
||
C_THINK = "\033[95m" # magenta
|
||
C_TOOL = "\033[93m" # yellow
|
||
C_REPLY = "\033[92m" # green
|
||
C_MUTED = "\033[90m" # gray
|
||
|
||
def __init__(self, skills_dir: str = "skills"):
|
||
self.api_key = os.getenv("OPENAI_API_KEY")
|
||
self.base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
|
||
self.model_name = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
|
||
|
||
if not self.api_key:
|
||
raise ValueError("请设置 OPENAI_API_KEY 环境变量")
|
||
|
||
# 对话状态管理
|
||
self.conversations: Dict[str, ConversationState] = {}
|
||
# 多轮对话历史(PydanticAI ModelMessage 列表,按客户ID存储)
|
||
self.message_histories: Dict[str, list] = {}
|
||
|
||
# 加载 skills 内容
|
||
self.skills_content = load_skill_md(skills_dir)
|
||
|
||
# 创建 OpenAI 模型
|
||
model = OpenAIChatModel(
|
||
model_name=self.model_name,
|
||
provider=OpenAIProvider(
|
||
api_key=self.api_key,
|
||
base_url=self.base_url
|
||
)
|
||
)
|
||
|
||
self.agent = Agent(
|
||
model=model,
|
||
deps_type=AgentDeps,
|
||
system_prompt=self._get_system_prompt()
|
||
)
|
||
self.agent_after_sale = Agent(
|
||
model=model,
|
||
deps_type=AgentDeps,
|
||
system_prompt=self._get_after_sale_prompt()
|
||
)
|
||
self.agent_pricing = Agent(
|
||
model=model,
|
||
deps_type=AgentDeps,
|
||
system_prompt=self._get_pricing_prompt()
|
||
)
|
||
self.agent_processing = Agent(
|
||
model=model,
|
||
deps_type=AgentDeps,
|
||
system_prompt=self._get_processing_prompt()
|
||
)
|
||
self.agent_similar = Agent(
|
||
model=model,
|
||
deps_type=AgentDeps,
|
||
system_prompt=self._get_similar_prompt()
|
||
)
|
||
# 工作流程路由器
|
||
self.workflow_router = get_workflow_router()
|
||
|
||
self.agent_order = Agent(
|
||
model=model,
|
||
deps_type=AgentDeps,
|
||
system_prompt=self._get_order_prompt()
|
||
)
|
||
self.agent_risk = Agent(
|
||
model=model,
|
||
deps_type=AgentDeps,
|
||
system_prompt=self._get_risk_prompt()
|
||
)
|
||
|
||
# 注册工具
|
||
self._register_tools()
|
||
|
||
@staticmethod
|
||
def _log_block(title: str, content: str):
|
||
"""统一的控制台分层日志输出。"""
|
||
print(f"{CustomerServiceAgent.C_PROMPT}[{title}]{CustomerServiceAgent.C_RESET}")
|
||
print(content)
|
||
print(f"{CustomerServiceAgent.C_MUTED}────────────────────{CustomerServiceAgent.C_RESET}")
|
||
|
||
@staticmethod
|
||
def _normalize_reply_text(text: Optional[str]) -> str:
|
||
"""清洗模型输出,避免把占位词直接发给客户。"""
|
||
if text is None:
|
||
return ""
|
||
cleaned = str(text).strip()
|
||
if cleaned.lower() in {"无", "none", "null", "n/a"}:
|
||
return ""
|
||
return cleaned
|
||
|
||
def _register_tools(self):
|
||
"""注册所有 Tool,让 Agent 可以主动调用"""
|
||
|
||
|
||
@self.agent.tool
|
||
async def analyze_image(ctx: RunContext[AgentDeps], image_url: str) -> str:
|
||
"""
|
||
分析客户发来的图片复杂度,用于报价。
|
||
收到图片URL时调用此工具,返回复杂度和建议报价。
|
||
"""
|
||
try:
|
||
from image.image_analyzer import image_analyzer
|
||
result = await image_analyzer.analyze(image_url)
|
||
complexity_label = {
|
||
"simple": "简单(画面干净)",
|
||
"normal": "一般复杂度",
|
||
"complex": "细节偏多",
|
||
"hard": "非常复杂",
|
||
}.get(result["complexity"], result["complexity"])
|
||
# 持久化图片URL和复杂度,重启后仍能记住这张图
|
||
try:
|
||
from db.customer_db import db
|
||
db.update_last_image(
|
||
ctx.deps.from_id,
|
||
image_url,
|
||
complexity=result["complexity"],
|
||
gemini_prompt=result.get("gemini_prompt", ""),
|
||
aspect_ratio=result.get("aspect_ratio", "1:1"),
|
||
perspective=result.get("perspective", "no"),
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
# 存图片类型到客户画像
|
||
try:
|
||
from db.customer_db import db as _db
|
||
if result.get("subject"):
|
||
_db.add_image_type(ctx.deps.from_id, result["subject"])
|
||
except Exception:
|
||
pass
|
||
|
||
# 在 workflow 里创建待处理任务(付款后自动触发 Gemini)
|
||
try:
|
||
from core.workflow import workflow
|
||
await workflow.image_analysis_result(
|
||
customer_id=ctx.deps.from_id,
|
||
image_url=image_url,
|
||
complexity=result["complexity"],
|
||
acc_id=ctx.deps.acc_id,
|
||
acc_type=ctx.deps.platform,
|
||
gemini_prompt=result.get("gemini_prompt", ""),
|
||
aspect_ratio=result.get("aspect_ratio", "1:1"),
|
||
perspective=result.get("perspective", "no"),
|
||
proc_type=result.get("proc_type", ""),
|
||
subject=result.get("subject", ""),
|
||
quality=result.get("quality", ""),
|
||
)
|
||
print(f"[Agent] Workflow 任务已创建 | 客户: {ctx.deps.from_id} | 比例: {result.get('aspect_ratio')} | 透视: {result.get('perspective')} | 图片: {image_url[:60]}...")
|
||
except Exception as e:
|
||
print(f"[Agent] Workflow 任务创建失败: {e}")
|
||
|
||
# 组装给 AI 的分析报告
|
||
risk = result.get("risk", "none")
|
||
has_face = result.get("has_face", "no")
|
||
feasibility = result.get("feasibility", "yes")
|
||
note = result.get("note", "")
|
||
|
||
lines = [
|
||
f"图片主体:{result['subject'] or '未识别'}",
|
||
f"处理类型:{result['proc_type'] or '高清修复'}",
|
||
f"原图质量:{result['quality'] or '未知'}",
|
||
f"图片类型:{result.get('category', '') or '通用'}",
|
||
f"图片尺寸:{(result.get('width') or 0)}x{(result.get('height') or 0)}({result.get('megapixels', 0.0)}MP)",
|
||
f"含人脸:{'是' if has_face == 'yes' else '否'}",
|
||
f"复杂度:{complexity_label}",
|
||
f"原因:{result['reason']}",
|
||
]
|
||
if result.get("size_surcharge"):
|
||
lines.append(f"尺寸加价:+{result['size_surcharge']}元")
|
||
if result.get("size_note"):
|
||
lines.append(f"尺寸提示:{result['size_note']}")
|
||
try:
|
||
st = self._get_conversation_state(ctx.deps.from_id)
|
||
if isinstance(result.get("price_min"), (int, float)):
|
||
st.last_min_price = int(result.get("price_min") or 0)
|
||
try:
|
||
from db.customer_db import db as _db
|
||
_db.update_last_min_price(ctx.deps.from_id, st.last_min_price)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# 根据可做性和风险等级给 AI 不同的行动指引
|
||
if feasibility == "no":
|
||
if "敏感" in (note or ""):
|
||
lines.append("【拒绝】图片含敏感/黄色/擦边内容,不接单。")
|
||
lines.append("→ 直接拒绝,不说「发图来看看」,自然回复如:这类不做/不接。")
|
||
else:
|
||
lines.append("【无法处理】此图无法处理(纯黑/纯白/完全损坏/要找原始RAW文件)。")
|
||
lines.append("→ 告知客户无法处理,建议换图或说明原因,不要报价。")
|
||
elif risk == "high":
|
||
lines.append(f"【高风险】此图处理风险高:{note or 'AI修复后效果不能保证与原图一致'}")
|
||
lines.append(f"建议报价:{result['price_suggest']}元")
|
||
lines.append("→ 先自然说明风险(人脸/效果可能不完美),再报价,满意再拍。话术自然。")
|
||
elif risk == "low":
|
||
lines.append(f"【低风险-含人脸】修复后人脸相似度约70-90%,效果不稳定。")
|
||
lines.append(f"建议报价:{result['price_suggest']}元")
|
||
lines.append(f"→ 报价时自然加一句风险提示(人脸可能有轻微变化、满意再付等)")
|
||
else:
|
||
# 无风险,正常报价
|
||
base_price = result.get('price_suggest', 20)
|
||
text_surcharge = result.get('text_surcharge', 0)
|
||
layer_surcharge = result.get('layer_surcharge', 0)
|
||
total_price = base_price + text_surcharge + layer_surcharge
|
||
|
||
# 构建报价说明
|
||
price_explanation = f"建议报价:{total_price}元"
|
||
if text_surcharge > 0:
|
||
price_explanation += f"(含文字处理 +{text_surcharge}元)"
|
||
if layer_surcharge > 0:
|
||
price_explanation += f"(含分层 +{layer_surcharge}元)"
|
||
|
||
lines.append(price_explanation)
|
||
|
||
# 添加文字数量说明
|
||
text_amount = result.get('text_amount', 'none')
|
||
if text_amount != 'none':
|
||
lines.append(f"文字数量:{text_amount},需要精细处理")
|
||
|
||
if feasibility == "partial":
|
||
lines.append(f"⚠️ 此图有一定难度:{note or '效果可能不完美'},回复时可加「效果不满意退款」")
|
||
if note and note not in ("无", ""):
|
||
lines.append(f"提示:{note}")
|
||
lines.append(f"【立刻回复客户报价 {total_price} 元,话术自然多变】")
|
||
|
||
return "\n".join(lines)
|
||
except Exception as e:
|
||
return f"图片分析失败: {e},请根据经验判断报价"
|
||
|
||
@self.agent.tool
|
||
async def get_customer_info(ctx: RunContext[AgentDeps], customer_id: str) -> str:
|
||
"""
|
||
查询客户历史信息:消费记录、性格标签、报价历史等。
|
||
对话开始时或需要了解客户背景时调用。
|
||
"""
|
||
try:
|
||
from db.customer_db import db
|
||
return db.get_profile_text(customer_id)
|
||
except Exception as e:
|
||
return f"查询失败: {e}"
|
||
|
||
@self.agent.tool
|
||
async def transfer_to_human(ctx: RunContext[AgentDeps]) -> str:
|
||
"""
|
||
转接人工客服。
|
||
遇到退款/投诉/情绪激动/复杂售后时调用。
|
||
"""
|
||
return "TRANSFER_REQUESTED"
|
||
|
||
@self.agent.tool
|
||
async def save_customer_note(
|
||
ctx: RunContext[AgentDeps],
|
||
customer_id: str,
|
||
note: str
|
||
) -> str:
|
||
"""
|
||
记录客户关键信息到画像(邮箱/微信/特殊需求等)。
|
||
客户提供联系方式或重要信息时调用。
|
||
"""
|
||
try:
|
||
from db.customer_db import db
|
||
db.add_note(customer_id, note)
|
||
return "已记录"
|
||
except Exception as e:
|
||
return f"记录失败: {e}"
|
||
|
||
@self.agent.tool
|
||
async def update_contact_info(
|
||
ctx: RunContext[AgentDeps],
|
||
customer_id: str,
|
||
contact_type: str,
|
||
value: str
|
||
) -> str:
|
||
"""
|
||
更新客户联系方式。
|
||
当客户说出邮箱/手机/微信时调用,比正则提取更准确。
|
||
|
||
contact_type 枚举值:
|
||
email - 邮箱
|
||
phone - 手机号
|
||
wechat - 微信号
|
||
"""
|
||
try:
|
||
from db.customer_db import db
|
||
if contact_type == "email":
|
||
db.update_email(customer_id, value)
|
||
elif contact_type == "phone":
|
||
db.update_phone(customer_id, value)
|
||
elif contact_type == "wechat":
|
||
db.update_wechat(customer_id, value)
|
||
else:
|
||
return f"未知联系方式类型: {contact_type}"
|
||
return f"已保存 {contact_type}: {value}"
|
||
except Exception as e:
|
||
return f"保存失败: {e}"
|
||
|
||
@self.agent.tool
|
||
async def record_quote(
|
||
ctx: RunContext[AgentDeps],
|
||
customer_id: str,
|
||
price: int,
|
||
description: str = ""
|
||
) -> str:
|
||
"""
|
||
记录本次报价到客户画像,用于后续对话保持价格一致。
|
||
每次给客户报价后调用。
|
||
|
||
Args:
|
||
customer_id: 客户ID
|
||
price: 报价金额(元)
|
||
description: 报价描述,如"单图处理"/"三图打包"
|
||
"""
|
||
try:
|
||
from db.customer_db import db
|
||
db.update_last_price(customer_id, price)
|
||
if description:
|
||
db.add_note(customer_id, f"报价 {price}元({description})")
|
||
# 同步到内存状态
|
||
state = self.conversations.get(customer_id)
|
||
if state:
|
||
state.last_price = price
|
||
return f"已记录报价 {price}元"
|
||
except Exception as e:
|
||
return f"记录失败: {e}"
|
||
|
||
@self.agent.tool
|
||
async def process_image_gemini(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
|
||
"""
|
||
触发 Gemini 作图处理。客户付款后或说「安排一下」「处理一下」时调用。
|
||
会从客户档案读取上次发图的 URL 和处理参数(提示词、比例、透视),启动 Gemini 流程。
|
||
处理完成后会自动发图给客户。
|
||
"""
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不自动作图"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不自动作图"
|
||
cid = customer_id or ctx.deps.from_id
|
||
try:
|
||
from core.workflow import workflow
|
||
ok = await workflow.trigger_processing_on_payment(
|
||
customer_id=cid,
|
||
acc_id=ctx.deps.acc_id,
|
||
acc_type=ctx.deps.platform,
|
||
)
|
||
if ok:
|
||
return "已安排,稍后发你"
|
||
return "该客户暂无待处理图片,请先发图"
|
||
except Exception as e:
|
||
return f"触发作图失败: {e},请稍后重试或转人工"
|
||
|
||
@self.agent_pricing.tool
|
||
async def analyze_image_pricing(ctx: RunContext[AgentDeps], image_url: str) -> str:
|
||
try:
|
||
from image.image_analyzer import image_analyzer
|
||
result = await image_analyzer.analyze(image_url)
|
||
p = result.get("price_suggest", 20)
|
||
try:
|
||
st = self._get_conversation_state(ctx.deps.from_id)
|
||
if isinstance(result.get("price_min"), (int, float)):
|
||
st.last_min_price = int(result.get("price_min") or 0)
|
||
try:
|
||
from db.customer_db import db as _db
|
||
_db.update_last_min_price(ctx.deps.from_id, st.last_min_price)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
return f"建议报价:{p}元"
|
||
except Exception as e:
|
||
return f"图片分析失败: {e}"
|
||
|
||
@self.agent_pricing.tool
|
||
async def record_quote_pricing(
|
||
ctx: RunContext[AgentDeps],
|
||
customer_id: str,
|
||
price: int,
|
||
description: str = ""
|
||
) -> str:
|
||
try:
|
||
from db.customer_db import db
|
||
db.update_last_price(customer_id, price)
|
||
return "ok"
|
||
except Exception as e:
|
||
return f"记录失败: {e}"
|
||
|
||
@self.agent_processing.tool
|
||
async def process_image_gemini_run(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
|
||
"""触发 Gemini 作图处理(processing agent 专用入口)。"""
|
||
return await process_image_gemini(ctx, customer_id)
|
||
|
||
@self.agent_similar.tool
|
||
async def recommend_similar(ctx: RunContext[AgentDeps], hint: str = "") -> str:
|
||
try:
|
||
return "有类似款,拍下我发你参考图。"
|
||
except Exception as e:
|
||
return f"推荐失败: {e}"
|
||
|
||
@self.agent_order.tool
|
||
async def handle_order(ctx: RunContext[AgentDeps], raw_msg: str = "") -> str:
|
||
try:
|
||
info = self._parse_order_info(raw_msg or "")
|
||
paid_kw = ["等待发货", "已付款", "付款成功", "买家已付款"]
|
||
if any(k in (info.get("pay_status", "") or "") for k in paid_kw) or any(k in (info.get("order_status", "") or "") for k in paid_kw):
|
||
return "已安排,稍后发你"
|
||
return ""
|
||
except Exception:
|
||
return ""
|
||
|
||
@self.agent_risk.tool
|
||
async def risk_filter(ctx: RunContext[AgentDeps], text: str = "") -> str:
|
||
return "这类不做哈,政治/敏感内容都不接。"
|
||
|
||
@self.agent.tool
|
||
async def remove_background(ctx: RunContext[AgentDeps], image_url: str) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""【独立工具】去背景,输出白底图。客户只要去背景时调用。"""
|
||
try:
|
||
from image.image_tools import remove_background as _rb
|
||
r = await _rb(image_url)
|
||
if r["success"]:
|
||
return f"去背景完成,已保存。自然回复客户好了发你"
|
||
return f"去背景失败:{r['message']}"
|
||
except Exception as e:
|
||
return f"去背景失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def perspective_correct(ctx: RunContext[AgentDeps], image_url: str) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""【独立工具】透视矫正。输入需白底图,输出展平图。"""
|
||
try:
|
||
from image.image_tools import perspective_correct as _pc
|
||
r = await _pc(image_url)
|
||
if r["success"]:
|
||
return f"透视矫正完成。自然回复客户好了"
|
||
return f"透视矫正失败:{r['message']}"
|
||
except Exception as e:
|
||
return f"透视矫正失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def extract_pattern_tool(
|
||
ctx: RunContext[AgentDeps],
|
||
image_url: str,
|
||
prompt: str = "",
|
||
aspect_ratio: str = "1:1"
|
||
) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""【独立工具】印花提取/主处理。按提示词和比例处理。"""
|
||
try:
|
||
from image.image_tools import extract_pattern
|
||
r = await extract_pattern(image_url, prompt=prompt, aspect_ratio=aspect_ratio)
|
||
if r["success"]:
|
||
return f"提取完成。自然回复客户好了发你"
|
||
return f"提取失败:{r['message']}"
|
||
except Exception as e:
|
||
return f"提取失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def enhance_image_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""【独立工具】高清增强。客户只要清晰化时调用。"""
|
||
try:
|
||
from image.image_tools import enhance_image
|
||
r = await enhance_image(image_url)
|
||
if r["success"]:
|
||
return f"高清增强完成。自然回复客户好了"
|
||
return f"增强失败:{r['message']}"
|
||
except Exception as e:
|
||
return f"增强失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def color_match_tool(
|
||
ctx: RunContext[AgentDeps],
|
||
orig_url: str,
|
||
result_url: str,
|
||
strength: float = 0.75
|
||
) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""【独立工具】颜色匹配。将 result 色调匹配到 orig。"""
|
||
try:
|
||
from image.image_tools import color_match_images
|
||
r = await color_match_images(orig_url, result_url, strength=strength)
|
||
if r["success"]:
|
||
return f"颜色匹配完成"
|
||
return f"颜色匹配失败:{r['message']}"
|
||
except Exception as e:
|
||
return f"颜色匹配失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def trim_border_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""【独立工具】裁切四周背景边(白/黄/米等)。"""
|
||
try:
|
||
from image.image_tools import trim_border
|
||
r = await trim_border(image_url)
|
||
if r["success"]:
|
||
return f"裁边完成"
|
||
return f"裁边失败:{r['message']}"
|
||
except Exception as e:
|
||
return f"裁边失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def vectorize_to_eps_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""【独立工具】矢量化 - 将图片转为 EPS 矢量文件。客户要做矢量图、转 EPS、转 AI 格式时调用。"""
|
||
try:
|
||
from image.image_tools import vectorize_to_eps
|
||
r = await vectorize_to_eps(image_url)
|
||
if r["success"]:
|
||
return f"矢量化完成,已生成 EPS 文件。自然回复客户好了发你"
|
||
return f"矢量化失败:{r['message']}"
|
||
except Exception as e:
|
||
return f"矢量化失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def meitu_enhance_tool(
|
||
ctx: RunContext[AgentDeps],
|
||
image_url: str,
|
||
mode: str = "standard"
|
||
) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""
|
||
【独立工具】美图画质增强。客户要画质增强、清晰化、美图处理时调用。
|
||
|
||
Args:
|
||
image_url: 图片 URL 或本地路径
|
||
mode: 处理模式。crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化)
|
||
"""
|
||
try:
|
||
from image.image_tools import meitu_enhance
|
||
r = await meitu_enhance(image_url, mode=mode)
|
||
if r["success"]:
|
||
return f"画质增强完成。自然回复客户好了发你"
|
||
return f"画质增强失败:{r['message']}"
|
||
except Exception as e:
|
||
return f"画质增强失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def resize_image(
|
||
ctx: RunContext[AgentDeps],
|
||
image_url: str,
|
||
width: int,
|
||
height: int = 0
|
||
) -> str:
|
||
try:
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
if not IMAGE_MODULE_ENABLED:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
except Exception:
|
||
return "现在处理模块暂时暂停,先不处理图片"
|
||
"""
|
||
改图片尺寸。客户说「改成1920x1080」「弄成横图」「改下尺寸」时调用。
|
||
|
||
Args:
|
||
image_url: 图片URL(客户刚发的图,或从对话中获取)
|
||
width: 目标宽度(像素),如 1920
|
||
height: 目标高度(0=按宽度等比缩放),如 1080
|
||
|
||
常用尺寸:1920x1080(横屏) 1080x1920(竖屏) 2000x2000(方图)
|
||
"""
|
||
try:
|
||
from image.image_processor import image_processor
|
||
result = await image_processor.resize(image_url, width, height)
|
||
if result["success"]:
|
||
return f"改尺寸完成:{width}x{height},已保存。自然回复客户改好了"
|
||
else:
|
||
return f"改尺寸失败:{result['message']},告知客户稍后重试"
|
||
except Exception as e:
|
||
return f"改尺寸失败:{e}"
|
||
|
||
@self.agent.tool
|
||
async def calculate_bulk_price(
|
||
ctx: RunContext[AgentDeps],
|
||
image_count: int,
|
||
complexities: str = ""
|
||
) -> str:
|
||
"""
|
||
计算多图打包价格。
|
||
客户要做多张图时调用,返回建议总价。
|
||
|
||
Args:
|
||
image_count: 图片数量
|
||
complexities: 各图复杂度,逗号分隔,如 "normal,complex,simple"
|
||
没有识别结果时留空,按平均价格估算
|
||
"""
|
||
if image_count <= 0:
|
||
return "图片数量无效"
|
||
|
||
# 各复杂度单价(必须为5的整数倍)
|
||
unit_price = {"simple": 15, "normal": 20, "complex": 25, "hard": 30}
|
||
default_unit = 20 # 没有识别结果时的默认单价
|
||
|
||
if complexities:
|
||
levels = [c.strip() for c in complexities.split(",")]
|
||
total = sum(unit_price.get(lv, default_unit) for lv in levels)
|
||
else:
|
||
total = image_count * default_unit
|
||
|
||
# 打包优惠:3张以上9折,5张以上8折,价格必须为5的整数倍
|
||
if image_count >= 5:
|
||
discounted = round(total * 0.8 / 5) * 5
|
||
tip = f"({image_count}张8折优惠)"
|
||
elif image_count >= 3:
|
||
discounted = round(total * 0.9 / 5) * 5
|
||
tip = f"({image_count}张9折优惠)"
|
||
else:
|
||
discounted = round(total / 5) * 5
|
||
tip = ""
|
||
|
||
return f"建议打包报价:{discounted}元{tip}(原价{total}元)"
|
||
|
||
# 对话状态超过多少小时后重置(避免昨天的售后状态影响今天)
|
||
CONVERSATION_TIMEOUT_HOURS = 12
|
||
|
||
def _get_conversation_state(self, customer_id: str) -> ConversationState:
|
||
"""获取或创建对话状态,超时自动重置"""
|
||
now = datetime.now()
|
||
|
||
if customer_id in self.conversations:
|
||
state = self.conversations[customer_id]
|
||
# 超过 12 小时没有消息,重置阶段和压价次数
|
||
if state.last_update:
|
||
try:
|
||
last = datetime.fromisoformat(state.last_update)
|
||
hours = (now - last).total_seconds() / 3600
|
||
if hours > self.CONVERSATION_TIMEOUT_HOURS:
|
||
state.stage = "售前"
|
||
state.discount_count = 0
|
||
# 同时清理对话历史,避免发送过期上下文
|
||
self.message_histories.pop(customer_id, None)
|
||
except Exception:
|
||
pass
|
||
# 进程内状态为空时,尝试从持久化恢复
|
||
if not state.pending_image_urls and not state.pending_requirements:
|
||
self._restore_pending_quote_state(customer_id, state)
|
||
else:
|
||
self.conversations[customer_id] = ConversationState(
|
||
customer_id=customer_id,
|
||
last_update=now.isoformat()
|
||
)
|
||
self._restore_pending_quote_state(customer_id, self.conversations[customer_id])
|
||
|
||
# 定期清理长期不活跃客户(超过 7 天)
|
||
self._cleanup_inactive(now)
|
||
|
||
return self.conversations[customer_id]
|
||
|
||
def _cleanup_inactive(self, now: datetime):
|
||
"""清理超过 7 天没有消息的对话状态,释放内存"""
|
||
# 每 100 次调用清理一次,避免每次都遍历
|
||
if len(self.conversations) % 100 != 0:
|
||
return
|
||
expired = [
|
||
cid for cid, state in self.conversations.items()
|
||
if state.last_update and
|
||
(now - datetime.fromisoformat(state.last_update)).days > 7
|
||
]
|
||
for cid in expired:
|
||
self.conversations.pop(cid, None)
|
||
self.message_histories.pop(cid, None)
|
||
|
||
def _sync_pending_quote_state(self, customer_id: str, state: ConversationState):
|
||
"""把待报价队列同步到客户库,避免重启丢失。"""
|
||
try:
|
||
from db.customer_db import db
|
||
db.update_pending_quote_state(
|
||
customer_id,
|
||
state.pending_image_urls,
|
||
state.pending_requirements,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
def _restore_pending_quote_state(self, customer_id: str, state: ConversationState):
|
||
"""从客户库恢复待报价队列。"""
|
||
try:
|
||
from db.customer_db import db
|
||
profile = db.get_customer(customer_id)
|
||
state.pending_image_urls = list(getattr(profile, "pending_quote_images", []) or [])
|
||
state.pending_requirements = list(getattr(profile, "pending_quote_requirements", []) or [])
|
||
state.image_count = len(state.pending_image_urls)
|
||
except Exception:
|
||
pass
|
||
|
||
def _build_reject_message(self, reason: str = "") -> str:
|
||
templates = [
|
||
"这类图文字内容太密了,我们这边不接这单哈,建议精简后再发我看看。",
|
||
"这种密集文字/宣传栏类图片暂时做不了,抱歉啦,换一版简化内容我可以继续帮你看。",
|
||
"这张文字信息太多,处理风险高,我们先不接,您可以先筛重点文字再发我。",
|
||
]
|
||
msg = random.choice(templates)
|
||
if reason:
|
||
msg += f"({reason})"
|
||
return msg
|
||
|
||
def _is_batch_quote_enabled(self, customer_id: str, acc_id: str) -> bool:
|
||
"""灰度开关:按店铺白名单 + 客户哈希百分比控制新策略是否生效。"""
|
||
try:
|
||
from config.config import (
|
||
FEATURE_BATCH_QUOTE_ENABLED,
|
||
FEATURE_BATCH_QUOTE_PERCENT,
|
||
FEATURE_BATCH_QUOTE_SHOPS,
|
||
)
|
||
if not FEATURE_BATCH_QUOTE_ENABLED:
|
||
return False
|
||
pct = max(0, min(100, int(FEATURE_BATCH_QUOTE_PERCENT)))
|
||
if pct == 0:
|
||
return False
|
||
shops = [s.strip() for s in (FEATURE_BATCH_QUOTE_SHOPS or "").split(",") if s.strip()]
|
||
if shops and (acc_id or "") not in shops:
|
||
return False
|
||
if pct >= 100:
|
||
return True
|
||
h = int(hashlib.md5((customer_id or "").encode("utf-8")).hexdigest()[:8], 16) % 100
|
||
return h < pct
|
||
except Exception:
|
||
return True
|
||
|
||
def _detect_stage(self, message: str) -> str:
|
||
"""检测售前/售后"""
|
||
# 系统订单通知不属于售后,单独处理
|
||
if "系统订单信息" in message:
|
||
return "订单通知"
|
||
|
||
after_sale_keywords = ["已下单", "已付款", "催一下", "发文件", "要修改", "不满意", "退款", "退货"]
|
||
for keyword in after_sale_keywords:
|
||
if keyword in message:
|
||
return "售后"
|
||
return "售前"
|
||
|
||
def _get_system_prompt(self) -> str:
|
||
base_prompt = """你是淘宝找原图店的客服,对外说帮客户找原图/高清图,实际是帮客户把模糊图处理成高清版,擅长快速成交。
|
||
|
||
核心原则:快、准、狠。**回复要像真人聊天,自然多变,禁止套模板、背台词。**
|
||
|
||
【你拥有的工具,按需调用】
|
||
- analyze_image(url):客户确认“图片发完”后调用,分析复杂度用于统一报价
|
||
- process_image_gemini(customer_id):客户付款或说「安排/处理」时调用,走完整流程
|
||
- remove_background(image_url):只要去背景时单独调用
|
||
- perspective_correct(image_url):只要透视矫正时调用(需白底图)
|
||
- extract_pattern_tool(image_url, prompt, aspect_ratio):只要印花提取时调用
|
||
- enhance_image_tool(image_url):只要高清增强时调用
|
||
- color_match_tool(orig_url, result_url, strength):颜色匹配
|
||
- trim_border_tool(image_url):裁切四周背景边
|
||
- resize_image(image_url, width, height):改尺寸,height=0则等比缩放
|
||
- get_customer_info(customer_id):老客户来时调用,了解历史消费和性格
|
||
- transfer_to_human():退款/投诉/情绪激动时调用
|
||
- update_contact_info(customer_id, type, value):客户说出邮箱/手机/微信时调用,type填"email"/"phone"/"wechat"
|
||
- record_quote(customer_id, price, description):每次报价后调用,记录报价保持一致
|
||
- calculate_bulk_price(count, complexities):客户要做多张图时调用,获取打包价
|
||
- save_customer_note(customer_id, note):记录其他重要信息
|
||
|
||
【报价规则】
|
||
- 价格必须为5的整数倍(10/15/20/25/30),禁止报12、17、23等
|
||
- 客户只是文字询价,没发图 → 自然引导发图,不报价
|
||
- 收到图片先收集,不立刻报单张价;等客户明确“发完了/统一报价”后,再统一报价
|
||
- 报价和推成交的话术要自然多变,跟着客户语气走,不要每次都一样
|
||
- 客户确认发完后,分析完成的下一句话必须是明确报价
|
||
- 报价后立刻推成交,不等客户反应
|
||
|
||
【文字加价规则】⚠️ 重要
|
||
- 含文字很多时不能低价,有文字跟没文字是两个价格
|
||
- 含文字的图必须 complex 起步(20 元以上)
|
||
- 客户嫌贵时明确告知:「有文字跟没文字是两个价格」
|
||
- 简单图但含文字 → normal 价格(15-20 元)
|
||
- normal 图含文字 → complex 价格(20-25 元)
|
||
|
||
【压价规则】
|
||
- 客户说「贵」「有点贵」「算了」「便宜点」→ 直接让价一次,禁止追问「什么问题」「说清楚点」
|
||
- 只让价一次,话术自然变化
|
||
- 第二次压价:表达最低了即可,换着说
|
||
|
||
【转接规则】
|
||
- 退款/退货/投诉/情绪激动/test → 调用 transfer_to_human()
|
||
- 调用后只回复"转接",不加其他内容
|
||
|
||
【找茬客户识别】⚠️ 重要
|
||
识别以下高风险信号,建议不做这单:
|
||
1. 下单后立即申请退款
|
||
2. 从高价砍到低价(30→10 元)
|
||
3. 反复问"不满意可以退吗"(2 次以上)
|
||
4. 质疑服务内容("源文件还是什么")
|
||
5. 质疑价值("就一张图片")
|
||
6. 问"小一点就快一点的嘛"(想占便宜)
|
||
7. 重复问同一个问题(想找麻烦)
|
||
|
||
识别到以上 3 个以上信号 → 建议转人工或直接拒绝接单
|
||
话术:「不好意思,这单做不了」「去别家做吧」
|
||
|
||
【售后规则】
|
||
- 催进度:自然回复在做了/快了/马上好之类
|
||
- 要修改:自然问哪里要改
|
||
|
||
【禁忌】
|
||
- 没看到图不报价
|
||
- 不说"不行/不可以"
|
||
- 不解释技术细节
|
||
- 不给价格区间
|
||
- 回复不超过2句话
|
||
- 绝对禁止输出任何内部独白或状态说明,包括但不限于:"无需回复""已完成""已经完成""不需要回复""流程结束""操作完成""任务完成""记录完成""报价已记录"等
|
||
- 每次必须输出真实的、发给客户看的回复文字,哪怕只有一句话"""
|
||
|
||
if self.skills_content:
|
||
base_prompt += f"\n\n=== 技能文档 ===\n{self.skills_content}"
|
||
|
||
return base_prompt
|
||
|
||
def _get_after_sale_prompt(self) -> str:
|
||
return """你是淘宝客服的售后助手,负责售后阶段的自然沟通与处理进度反馈。
|
||
核心:简洁、自然、不解释技术细节、尽量不调用报价相关工具。
|
||
规则:
|
||
- 已付款客户优先:确认安排、说明进度、承诺时间点
|
||
- 修改需求:礼貌询问具体改哪里,尽量一句话
|
||
- 催进度:自然回复在做了/快了/马上好,给预计时间
|
||
- 投诉/情绪激动/退款:转人工
|
||
- 输出不超过2句话,不说内部状态"""
|
||
|
||
def _get_pricing_prompt(self) -> str:
|
||
try:
|
||
from config.config import MIN_PRICE_FLOOR
|
||
floor = MIN_PRICE_FLOOR
|
||
except Exception:
|
||
floor = 15
|
||
return f"""你是淘宝客服的报价助手,负责在客户明确提到价格/询价时快速给出自然报价并推动成交。
|
||
规则:
|
||
- 收到图片或历史有图片依据时尽量结合复杂度给出单价,价格为5的整数倍
|
||
- 没有图片时引导发图,不给价格区间
|
||
- 报价后紧跟一句推动成交,话术自然不重复
|
||
- 最低价不低于{floor}元,客户出价低于底线时礼貌拒绝(不好意思)
|
||
- 输出不超过2句话"""
|
||
|
||
def _get_processing_prompt(self) -> str:
|
||
return """你是淘宝客服的处理助手,负责在客户说安排/处理/开始做或已付款的场景下进行处理安排与进度反馈。
|
||
规则:
|
||
- 已付款或明确要求开始时,确认安排并给预计时间点
|
||
- 可调用处理流程工具
|
||
- 投诉/退款时转人工
|
||
- 输出不超过2句话"""
|
||
|
||
def _get_similar_prompt(self) -> str:
|
||
return """你是淘宝客服的相似图助手,客户问“有一样的吗/类似的吗/同款吗”时,给出自然回复与参考建议。
|
||
规则:
|
||
- 先确认可以找类似款,建议拍后我发参考图
|
||
- 如已知图案/类型,简要说明“同类型都有”,推动成交
|
||
- 输出不超过2句话"""
|
||
|
||
def _get_order_prompt(self) -> str:
|
||
return """你是淘宝客服的订单助手,负责系统订单通知的处理。
|
||
规则:
|
||
- 已付款时自然确认安排;其他状态静默(输出空字符串)
|
||
- 输出不超过1句话"""
|
||
|
||
def _get_risk_prompt(self) -> str:
|
||
return """你是淘宝客服的风控助手,负责敏感/违规内容的前置拦截与替代话术。
|
||
规则:
|
||
- 黄色/擦边/涉政/政治人物/政治事件/政治图片等不接单,礼貌拒绝
|
||
- 输出不超过1句话"""
|
||
|
||
@staticmethod
|
||
def _is_political_inquiry(text: str) -> bool:
|
||
"""文本前置风控:政治人物/政治事件/政治图片相关询问一律拒绝。"""
|
||
s = (text or "").strip().lower()
|
||
if not s:
|
||
return False
|
||
kw = (
|
||
"政治", "涉政", "党政", "政治人物", "政治事件", "政治图片", "政治海报", "政治宣传",
|
||
"领导人", "伟人", "元帅", "将军", "红色人物", "党史",
|
||
"天安门", "人民大会堂", "中南海",
|
||
"习近平", "毛泽东", "邓小平", "江泽民", "胡锦涛", "李克强", "周恩来",
|
||
"特朗普", "拜登", "普京", "泽连斯基",
|
||
"trump", "biden", "putin", "zelensky", "xi jinping",
|
||
)
|
||
if any(k in s for k in kw):
|
||
return True
|
||
# 兜底:类似“有没有十大元帅的照片/图片”
|
||
return bool(re.search(r"(元帅|将军|领导人|政治人物|政治事件).*(照片|图片|头像|原图)?", s))
|
||
|
||
def _get_customer_profile_context(self, customer_id: str) -> str:
|
||
"""从数据库读取客户画像,注入给 AI。含个性化语气、报价策略、主动预测、近期对话。"""
|
||
try:
|
||
from db.customer_db import db
|
||
profile = db.get_customer(customer_id)
|
||
|
||
if profile.blacklist:
|
||
return f"【⚠️黑名单客户】原因:{profile.blacklist_reason or '已标记'},请转接人工处理,不要自动回复"
|
||
|
||
lines = []
|
||
lines.append("=== 客户档案 ===")
|
||
|
||
# 基础信息
|
||
basic_info = []
|
||
basic_info.append(f"客户ID: {customer_id}")
|
||
basic_info.append(f"姓名: {profile.name or '未知'}")
|
||
if profile.email:
|
||
basic_info.append(f"邮箱: {profile.email}")
|
||
if profile.phone:
|
||
basic_info.append(f"电话: {profile.phone}")
|
||
if profile.wechat:
|
||
basic_info.append(f"微信: {profile.wechat}")
|
||
lines.append(" | ".join(basic_info))
|
||
|
||
# 消费分析
|
||
consume_info = []
|
||
consume_info.append(f"客户等级: {profile.customer_level}级")
|
||
if profile.vip:
|
||
consume_info.append("VIP客户")
|
||
consume_info.append(f"总订单: {profile.total_orders}单")
|
||
consume_info.append(f"总消费: {profile.total_spent}元")
|
||
if profile.total_orders > 0:
|
||
consume_info.append(f"客单价: {profile.total_spent // profile.total_orders}元")
|
||
lines.append("--- 消费分析 ---")
|
||
lines.append(" | ".join(consume_info))
|
||
|
||
# 报价历史
|
||
price_info = []
|
||
if profile.vip_custom_price:
|
||
price_info.append(f"VIP专属价: {profile.vip_custom_price}元(直接报这个价)")
|
||
if profile.last_price:
|
||
price_info.append(f"上次报价: {profile.last_price}元")
|
||
if profile.lowest_price_accepted:
|
||
price_info.append(f"历史最低成交: {profile.lowest_price_accepted}元")
|
||
if profile.discount_given_count:
|
||
price_info.append(f"历史让价: {profile.discount_given_count}次")
|
||
if profile.price_sensitivity:
|
||
price_info.append(f"价格敏感度: {profile.price_sensitivity}")
|
||
if getattr(profile, "last_quote_no_convert", False):
|
||
price_info.append("【策略】上次报价未成交,本次可降5-10元")
|
||
if price_info:
|
||
lines.append("--- 报价历史 ---")
|
||
lines.append(" | ".join(price_info))
|
||
|
||
# 性格与决策
|
||
personality_info = []
|
||
if profile.personality:
|
||
personality_info.append(f"性格: {'/'.join(profile.personality)}")
|
||
if profile.decision_speed:
|
||
personality_info.append(f"决策速度: {profile.decision_speed}")
|
||
if profile.communication_prefer:
|
||
personality_info.append(f"沟通偏好: {profile.communication_prefer}")
|
||
if personality_info:
|
||
lines.append("--- 性格特征 ---")
|
||
lines.append(" | ".join(personality_info))
|
||
|
||
# 图片习惯
|
||
image_info = []
|
||
image_info.append(f"累计发图: {profile.total_images_sent}张")
|
||
if profile.complexity_history:
|
||
avg_complexity = self._calc_avg_complexity(profile.complexity_history)
|
||
image_info.append(f"平均复杂度: {avg_complexity}")
|
||
if profile.image_type_history:
|
||
from collections import Counter
|
||
top_types = Counter(profile.image_type_history).most_common(3)
|
||
types_str = "、".join(f"{t}({c}次)" for t, c in top_types)
|
||
image_info.append(f"常见类型: {types_str}")
|
||
if profile.preferred_format:
|
||
image_info.append(f"格式偏好: {profile.preferred_format}")
|
||
if profile.preferred_size:
|
||
image_info.append(f"尺寸要求: {profile.preferred_size}")
|
||
if profile.last_image_url:
|
||
image_info.append(f"最近发图: {profile.last_image_url[:60]}...")
|
||
lines.append("--- 图片习惯 ---")
|
||
lines.append(" | ".join(image_info))
|
||
|
||
# 当前任务状态
|
||
if profile.processing_status:
|
||
task_info = []
|
||
task_info.append(f"状态: {profile.processing_status}")
|
||
if profile.processing_image_url:
|
||
task_info.append(f"处理中: {profile.processing_image_url[:40]}...")
|
||
if profile.expected_done_at:
|
||
task_info.append(f"预计完成: {profile.expected_done_at}")
|
||
lines.append("--- 当前任务 ---")
|
||
lines.append(" | ".join(task_info))
|
||
|
||
# 上次对话摘要
|
||
if profile.last_conversation_summary:
|
||
time_str = ""
|
||
if profile.last_conversation_time:
|
||
try:
|
||
t = datetime.fromisoformat(profile.last_conversation_time)
|
||
diff = datetime.now() - t
|
||
if diff.days > 0:
|
||
time_str = f"({diff.days}天前)"
|
||
else:
|
||
h = diff.seconds // 3600
|
||
time_str = f"({h}小时前)" if h > 0 else "(刚刚)"
|
||
except Exception:
|
||
pass
|
||
lines.append(f"--- 上次对话 {time_str} ---")
|
||
lines.append(profile.last_conversation_summary)
|
||
|
||
# 个性化回复策略
|
||
hints = []
|
||
if profile.personality:
|
||
if "爽快" in profile.personality:
|
||
hints.append("回复简洁直接,不废话,快速报价")
|
||
if "砍价" in profile.personality or "砍价狂" in profile.personality:
|
||
hints.append("报价时强调性价比,只让价一次,第二次引导去 xinhui.cloud")
|
||
if "纠结" in profile.personality or "墨迹" in profile.personality:
|
||
hints.append("多给一点说明,耐心回答")
|
||
if profile.price_sensitivity == "高":
|
||
hints.append("报价时顺带提「满意再拍」降低顾虑")
|
||
if profile.decision_speed == "快":
|
||
hints.append("直接报价推成交,少铺垫")
|
||
if profile.total_orders > 0 and profile.decision_speed == "快":
|
||
hints.append("老客爽快,直接报价成交")
|
||
if hints:
|
||
lines.append("--- 回复策略 ---")
|
||
lines.append(";".join(hints))
|
||
|
||
# 主动推荐
|
||
proactive = []
|
||
if profile.bulk_potential == "有" or (profile.total_images_sent or 0) >= 2:
|
||
proactive.append("可问「要做多张吗,多张有优惠」")
|
||
if profile.upsell_opportunity:
|
||
proactive.append(f"加购机会: {'、'.join(profile.upsell_opportunity)}")
|
||
if proactive:
|
||
lines.append("--- 主动推荐 ---")
|
||
lines.append(";".join(proactive))
|
||
|
||
return "\n".join(lines)
|
||
except Exception as e:
|
||
print(f"[Agent] 获取客户画像失败: {e}")
|
||
return ""
|
||
|
||
def _calc_avg_complexity(self, complexity_history: list) -> str:
|
||
"""计算平均复杂度"""
|
||
if not complexity_history:
|
||
return "未知"
|
||
level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4}
|
||
label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"}
|
||
try:
|
||
avg = sum(level_map.get(c, 2) for c in complexity_history) / len(complexity_history)
|
||
return label_map.get(round(avg), "一般")
|
||
except Exception:
|
||
return "一般"
|
||
|
||
def _get_refusal_context_hint(self, customer_id: str, current_msg: str, profile_context: str) -> str:
|
||
"""
|
||
检测「刚拒绝某张图 + 客户问能找到吗」场景,注入显式提示,避免前后矛盾。
|
||
原因:last_conversation_summary 异步更新可能滞后,message_histories 模型可能忽略。
|
||
"""
|
||
ask_keywords = ["能找到吗", "可以吗", "有吗", "能做吗", "可以找吗", "可以弄吗"]
|
||
if not any(kw in current_msg for kw in ask_keywords):
|
||
return ""
|
||
refusal_keywords = ["不做", "不接", "拒绝", "不做这类", "这类不做"]
|
||
# 检查 profile 摘要(可能因异步更新而滞后)
|
||
if any(kw in profile_context for kw in refusal_keywords):
|
||
return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。"
|
||
# 检查内存历史中最近几条消息(ModelResponse 含客服回复)
|
||
history = self.message_histories.get(customer_id, [])
|
||
for msg in reversed(history[-6:]):
|
||
msg_str = str(msg)
|
||
if any(kw in msg_str for kw in refusal_keywords):
|
||
return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。"
|
||
return ""
|
||
|
||
def _get_conversation_context(self, customer_id: str, acc_id: str = "", limit: int = 12, max_len: int = 80) -> str:
|
||
"""
|
||
每一次对话都从数据库加载近期对话,压缩后注入 prompt。
|
||
确保模型看到上下文,同时控制 token 消耗。
|
||
"""
|
||
try:
|
||
try:
|
||
from config.config import CHAT_CONTEXT_LIMIT, CHAT_CONTEXT_TRUNCATE_LEN
|
||
limit = CHAT_CONTEXT_LIMIT
|
||
max_len = CHAT_CONTEXT_TRUNCATE_LEN
|
||
except Exception:
|
||
pass
|
||
from db.chat_log_db import get_recent_conversation
|
||
msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=limit)
|
||
if not msgs:
|
||
return ""
|
||
lines = []
|
||
for m in msgs:
|
||
role = "客" if m.get("direction") == "in" else "服"
|
||
msg_text = (m.get("message") or "").strip().replace("\n", " ")[:max_len]
|
||
if not msg_text:
|
||
continue
|
||
lines.append(f"{role}:{msg_text}")
|
||
if not lines:
|
||
return ""
|
||
return "【近期】\n" + "\n".join(lines) + "\n\n"
|
||
except Exception:
|
||
return ""
|
||
|
||
def _get_intent_emotion_hint(self, msg: str) -> str:
|
||
"""语义匹配:意图/情绪识别,注入提示。EMBEDDING_MODEL 未配置时用关键词。"""
|
||
try:
|
||
from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords, detect_emotion_embedding
|
||
intent = detect_intent_embedding(msg)
|
||
if not intent:
|
||
intent = detect_intent_keywords(msg)
|
||
emotion = detect_emotion_embedding(msg) if os.getenv("EMBEDDING_MODEL") else None
|
||
parts = []
|
||
if intent:
|
||
parts.append(f"意图:{intent}")
|
||
if emotion:
|
||
parts.append(f"情绪:{emotion}")
|
||
if parts:
|
||
return f"【当前消息】{', '.join(parts)}"
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
# 简单打招呼类消息(在近期已回复后无需再回)
|
||
_COOLDOWN_PATTERNS = [
|
||
"你好", "您好", "在吗", "在么", "在不在", "有人吗",
|
||
"嗯", "嗯嗯", "好", "好的", "好哒", "ok", "OK", "okay",
|
||
"谢谢", "谢谢你", "感谢", "收到", "知道了", "明白了",
|
||
]
|
||
_COOLDOWN_SECONDS = 5 * 60 # 5 分钟内不重复回复纯打招呼
|
||
|
||
def _in_cooldown(self, state: ConversationState, msg: str) -> bool:
|
||
"""最近刚回复过 + 消息是纯打招呼 → True 静默"""
|
||
if not state.last_reply_at:
|
||
return False
|
||
elapsed = (datetime.now() - state.last_reply_at).total_seconds()
|
||
if elapsed > self._COOLDOWN_SECONDS:
|
||
return False
|
||
clean = msg.strip().rstrip("!!??。.~~")
|
||
return clean in self._COOLDOWN_PATTERNS
|
||
|
||
async def process_message(self, message: CustomerMessage) -> AgentResponse:
|
||
"""处理客户消息并生成回复"""
|
||
metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id)
|
||
# 获取或创建对话状态
|
||
state = self._get_conversation_state(message.from_id)
|
||
|
||
# 冷却期检测:近期已回复 + 纯打招呼 → 静默
|
||
if self._in_cooldown(state, message.msg):
|
||
elapsed = int((datetime.now() - state.last_reply_at).total_seconds())
|
||
print(f"[Agent] 冷却期静默(距上次回复 {elapsed}s):{message.msg!r}")
|
||
return AgentResponse(reply="", should_reply=False, need_transfer=False)
|
||
|
||
# 前置风控:客户文本一旦命中政治/敏感询问,直接拒绝,避免“发图我看看”类答非所问
|
||
try:
|
||
from utils.content_filter import should_block_customer
|
||
if should_block_customer(message.msg) or self._is_political_inquiry(message.msg):
|
||
reply = "这类不做哈,政治相关图片和人物都不接。"
|
||
state.last_reply_at = datetime.now()
|
||
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply}")
|
||
return AgentResponse(reply=reply, should_reply=True, need_transfer=False)
|
||
except Exception:
|
||
if self._is_political_inquiry(message.msg):
|
||
reply = "这类不做哈,政治相关图片和人物都不接。"
|
||
state.last_reply_at = datetime.now()
|
||
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply}")
|
||
return AgentResponse(reply=reply, should_reply=True, need_transfer=False)
|
||
|
||
# 检测售前/售后
|
||
new_stage = self._detect_stage(message.msg)
|
||
if new_stage != state.stage:
|
||
state.stage = new_stage
|
||
|
||
state.last_update = datetime.now().isoformat()
|
||
|
||
# 订单通知前置处理
|
||
if "系统订单信息" in message.msg or "订单状态" in message.msg:
|
||
_, order_block = self._split_customer_text(message.msg)
|
||
customer_text, _ = self._split_customer_text(message.msg)
|
||
order = self._parse_order_info(order_block or message.msg)
|
||
pay_status = order.get("pay_status", "")
|
||
order_status = order.get("order_status", "")
|
||
|
||
paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"]
|
||
is_paid = any(kw in pay_status or kw in order_status for kw in paid_keywords)
|
||
|
||
if is_paid:
|
||
# 订单金额核查:对比报价和实付金额
|
||
asyncio.create_task(self._check_order_amount(
|
||
message.from_id, order, message.acc_id
|
||
))
|
||
# 成交记录:写入数据库供日报分析
|
||
asyncio.create_task(self._record_deal_success(
|
||
message.from_id, message.from_name, message.acc_id, message.acc_type,
|
||
order, state
|
||
))
|
||
# 已付款:触发 Gemini 作图
|
||
try:
|
||
from core.workflow import workflow
|
||
asyncio.create_task(workflow.trigger_processing_on_payment(
|
||
customer_id=message.from_id,
|
||
acc_id=message.acc_id,
|
||
acc_type=message.acc_type,
|
||
))
|
||
except Exception as e:
|
||
print(f"[Agent] 触发作图失败: {e}")
|
||
elif not customer_text:
|
||
# 非付款 + 没有客户文字 → 直接静默,不调用 AI
|
||
print(f"[Agent] 订单通知静默({pay_status or order_status}),跳过回复")
|
||
return AgentResponse(reply="", should_reply=False, need_transfer=False)
|
||
|
||
# 找图店:先收集图片和需求,等客户确认“发完”后统一报价
|
||
customer_text, _ = self._split_customer_text(message.msg)
|
||
shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "")
|
||
if shop_type == "find_image" and self._is_batch_quote_enabled(message.from_id, message.acc_id):
|
||
incoming_urls = self._extract_image_urls(customer_text)
|
||
text_without_urls = self._strip_urls_from_text(customer_text)
|
||
|
||
if incoming_urls:
|
||
for u in incoming_urls:
|
||
if u not in state.pending_image_urls:
|
||
state.pending_image_urls.append(u)
|
||
if text_without_urls:
|
||
self._append_requirement(state, text_without_urls)
|
||
state.image_count = len(state.pending_image_urls)
|
||
self._sync_pending_quote_state(message.from_id, state)
|
||
|
||
if self._is_batch_finish_signal(customer_text):
|
||
quote_res = await self._quote_pending_images(state, message)
|
||
reply_text = quote_res.get("reply", "")
|
||
need_transfer = bool(quote_res.get("need_transfer"))
|
||
state.last_reply_at = datetime.now()
|
||
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}")
|
||
return AgentResponse(
|
||
reply=reply_text,
|
||
should_reply=not need_transfer,
|
||
need_transfer=need_transfer,
|
||
transfer_msg=TRANSFER_MESSAGE if need_transfer else "",
|
||
)
|
||
|
||
ack = self._build_collect_ack(len(state.pending_image_urls))
|
||
state.last_reply_at = datetime.now()
|
||
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {ack}")
|
||
return AgentResponse(reply=ack, should_reply=True, need_transfer=False)
|
||
|
||
if state.pending_image_urls:
|
||
if text_without_urls:
|
||
self._append_requirement(state, text_without_urls)
|
||
self._sync_pending_quote_state(message.from_id, state)
|
||
if self._is_batch_finish_signal(customer_text):
|
||
quote_res = await self._quote_pending_images(state, message)
|
||
reply_text = quote_res.get("reply", "")
|
||
need_transfer = bool(quote_res.get("need_transfer"))
|
||
state.last_reply_at = datetime.now()
|
||
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}")
|
||
return AgentResponse(
|
||
reply=reply_text,
|
||
should_reply=not need_transfer,
|
||
need_transfer=need_transfer,
|
||
transfer_msg=TRANSFER_MESSAGE if need_transfer else "",
|
||
)
|
||
|
||
remind = self._build_collect_remind(len(state.pending_image_urls))
|
||
state.last_reply_at = datetime.now()
|
||
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {remind}")
|
||
return AgentResponse(reply=remind, should_reply=True, need_transfer=False)
|
||
|
||
# 构建提示词(包含对话状态 + 客户画像)
|
||
user_prompt = self._build_prompt(message, state)
|
||
|
||
# 注入客户历史画像(个性化语气、报价策略、主动预测)
|
||
profile_context = self._get_customer_profile_context(message.from_id)
|
||
if profile_context:
|
||
user_prompt = profile_context + "\n\n" + user_prompt
|
||
|
||
# 前后一致提示:刚拒绝某张图后,客户问「能找到吗」等时,必须区分能做/不能做
|
||
refusal_hint = self._get_refusal_context_hint(message.from_id, message.msg, profile_context or "")
|
||
if refusal_hint:
|
||
user_prompt = refusal_hint + "\n\n" + user_prompt
|
||
|
||
# 每一次对话都注入近期上下文,确保模型看到完整对话
|
||
conv_context = self._get_conversation_context(message.from_id, acc_id=message.acc_id or "")
|
||
if conv_context:
|
||
user_prompt = conv_context + user_prompt
|
||
|
||
# 语义匹配:意图/情绪识别(配置 EMBEDDING_MODEL 时用 embedding,否则关键词)
|
||
intent_hint = self._get_intent_emotion_hint(message.msg)
|
||
if intent_hint:
|
||
user_prompt = intent_hint + "\n\n" + user_prompt
|
||
|
||
deps = AgentDeps(
|
||
msg_id=message.msg_id,
|
||
acc_id=message.acc_id,
|
||
from_id=message.from_id,
|
||
platform=message.acc_type
|
||
)
|
||
|
||
# 取出该客户的历史对话,传给 AI 保持上下文
|
||
history = self.message_histories.get(message.from_id, [])
|
||
|
||
self._log_block("PROMPT->AI 前置提示词", user_prompt)
|
||
|
||
try:
|
||
msg_lower = message.msg.lower()
|
||
pricing_kw = ["多少钱", "多少一张", "报价", "给个价", "几块", "价位", "能便宜点吗"]
|
||
processing_kw = ["安排", "处理一下", "开始做", "做一下", "尽快", "加急", "付款了", "已付款"]
|
||
similar_kw = ["有一样的", "有一样吗", "一样的吗", "类似的", "类似的吗", "同款", "相似", "类似吗"]
|
||
order_markers = ["[系统订单信息]", "订单状态", "买家已付款"]
|
||
risk_kw = [
|
||
"黄色", "擦边", "色情", "涉黄", "涉政", "政治", "裸", "不雅",
|
||
"天安门", "政治人物", "政治事件", "领导人", "党政",
|
||
"习近平", "毛泽东", "邓小平", "江泽民", "胡锦涛",
|
||
"特朗普", "拜登", "普京", "泽连斯基",
|
||
]
|
||
target_agent = self.agent_after_sale if state.stage == "售后" else self.agent
|
||
risk_hit = any(k in msg_lower for k in risk_kw) or self._is_political_inquiry(message.msg)
|
||
if risk_hit:
|
||
target_agent = self.agent_risk
|
||
elif any(k in message.msg for k in order_markers):
|
||
target_agent = self.agent_order
|
||
elif any(k in msg_lower for k in processing_kw):
|
||
target_agent = self.agent_processing
|
||
elif any(k in msg_lower for k in pricing_kw):
|
||
target_agent = self.agent_pricing
|
||
elif any(k in msg_lower for k in similar_kw):
|
||
target_agent = self.agent_similar
|
||
result = await target_agent.run(user_prompt, deps=deps, message_history=history)
|
||
# 更新历史,最多保留最近 30 条消息防止 token 超限
|
||
self.message_histories[message.from_id] = result.all_messages()[-30:]
|
||
reply_text = self._normalize_reply_text(result.output)
|
||
# 拦截超低杀价:客户报价低于底线时,统一礼貌拒绝
|
||
try:
|
||
from config.config import MIN_PRICE_FLOOR
|
||
import re
|
||
offer = None
|
||
m = re.search(r'(\d{1,4})\s*(?:元|块|块钱|元钱)\b', message.msg)
|
||
if m:
|
||
offer = int(m.group(1))
|
||
else:
|
||
m2 = re.search(r'(?:能|可以|可否|能否)\s*(\d{1,4})\b', message.msg)
|
||
offer = int(m2.group(1)) if m2 else None
|
||
st = self._get_conversation_state(message.from_id)
|
||
floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR
|
||
if offer is not None and offer < floor:
|
||
reply_text = "不好意思"
|
||
except Exception:
|
||
pass
|
||
# 降限:若AI在回复中给出小于底线的报价,提升到>=底线且为5的倍数
|
||
try:
|
||
from config.config import MIN_PRICE_FLOOR
|
||
st = self._get_conversation_state(message.from_id)
|
||
floor = st.last_min_price if isinstance(st.last_min_price, int) and st.last_min_price > 0 else MIN_PRICE_FLOOR
|
||
def _adjust(text: str) -> str:
|
||
import re
|
||
def _repl(m):
|
||
num = int(m.group(1))
|
||
adj = max(floor, round(num / 5) * 5)
|
||
return m.group(0).replace(str(num), str(adj))
|
||
patterns = [
|
||
r'按(\d{1,4})元',
|
||
r'报价[::]\s*(\d{1,4})\s*元',
|
||
r'(\d{1,4})\s*元一张',
|
||
r'打包(\d{1,4})\s*元',
|
||
]
|
||
t = text
|
||
for p in patterns:
|
||
t = re.sub(p, _repl, t)
|
||
return t
|
||
reply_text = _adjust(reply_text or "")
|
||
except Exception:
|
||
pass
|
||
|
||
# 打印工具调用记录
|
||
for msg in result.new_messages():
|
||
for part in getattr(msg, 'parts', []):
|
||
part_type = type(part).__name__
|
||
if 'ToolCall' in part_type:
|
||
print(f"{self.C_TOOL}[THINK/TOOL_CALL]{self.C_RESET} {getattr(part, 'tool_name', '')}({getattr(part, 'args', '')})")
|
||
elif 'ToolReturn' in part_type:
|
||
ret = str(getattr(part, 'content', ''))[:120]
|
||
print(f"{self.C_TOOL}[THINK/TOOL_RETURN]{self.C_RESET} {ret}")
|
||
|
||
print(f"{self.C_THINK}[THINK/RAW_OUTPUT]{self.C_RESET} {repr(reply_text)}")
|
||
|
||
except Exception as e:
|
||
err_str = str(e)
|
||
print(f"[Agent] AI 调用失败: {e},使用兜底回复")
|
||
metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id)
|
||
if "AccountOverdueError" in err_str or "overdue" in err_str.lower():
|
||
asyncio.create_task(_notify_wechat_overdue())
|
||
else:
|
||
asyncio.create_task(_notify_wechat(
|
||
f"⚠️ **AI调用异常**\n"
|
||
f"客户:{message.from_id}\n"
|
||
f"店铺:{message.acc_id}\n"
|
||
f"错误:{err_str[:200]}",
|
||
tag="AI异常"
|
||
))
|
||
reply_text = None
|
||
else:
|
||
metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id)
|
||
|
||
# AI 失败兜底:给一个不出错的万能回复
|
||
if not reply_text:
|
||
return AgentResponse(
|
||
reply="好的稍等,我看一下",
|
||
should_reply=True,
|
||
need_transfer=False
|
||
)
|
||
|
||
# 敏感词过滤:党政/暴力/血腥/黄色
|
||
try:
|
||
from utils.content_filter import should_block_reply
|
||
blocked, fallback = should_block_reply(reply_text)
|
||
if blocked:
|
||
print(f"[Agent] 敏感词拦截,使用兜底回复")
|
||
reply_text = fallback or "好的,您稍等,我帮您确认一下"
|
||
except Exception:
|
||
pass
|
||
|
||
# 成本统计(可选)
|
||
try:
|
||
from utils.api_cost_tracker import record
|
||
record("openai_chat", count=1)
|
||
except Exception:
|
||
pass
|
||
|
||
# 检测是否报价
|
||
self._detect_price(reply_text, state)
|
||
|
||
# 检测压价
|
||
self._detect_discount(message.msg, state)
|
||
|
||
# 自动打标签(异步,不阻塞)
|
||
asyncio.create_task(self._auto_tag(message, reply_text, state))
|
||
|
||
# 检测是否需要转接(文字触发 或 AI 调用了 transfer_to_human 工具)
|
||
need_transfer = False
|
||
transfer_msg = ""
|
||
transfer_keywords = ["TRANSFER_REQUESTED", "[转移会话]", "转移会话", "转人工", "转接"]
|
||
if reply_text and any(kw in reply_text for kw in transfer_keywords):
|
||
need_transfer = True
|
||
transfer_msg = TRANSFER_MESSAGE
|
||
metrics_emit("transfer_to_human", customer_id=message.from_id, acc_id=message.acc_id)
|
||
|
||
# 未成交记录:客户表达放弃且已报价过(转人工不记录)
|
||
customer_text, _ = self._split_customer_text(message.msg)
|
||
no_convert_keywords = ["算了", "不要了", "不做了", "下次再说", "先不弄了"]
|
||
if customer_text and state.last_price and state.last_price > 0:
|
||
if any(kw in customer_text for kw in no_convert_keywords):
|
||
reason = "嫌贵放弃" if any(k in customer_text for k in ["贵", "贵了", "便宜"]) else "放弃"
|
||
asyncio.create_task(self._record_deal_fail(
|
||
message.from_id, message.from_name, message.acc_id, message.acc_type, reason
|
||
))
|
||
|
||
# 需要转接时不把原始回复发给客户
|
||
should_reply = bool(reply_text and reply_text.strip()) and not need_transfer
|
||
|
||
# 记录本次回复时间,供冷却期判断
|
||
if should_reply:
|
||
state.last_reply_at = datetime.now()
|
||
print(f"{self.C_REPLY}[REPLY->CUSTOMER]{self.C_RESET} {reply_text}")
|
||
else:
|
||
print(f"{self.C_MUTED}[REPLY->CUSTOMER]{self.C_RESET} <静默/不发送>")
|
||
|
||
return AgentResponse(reply=reply_text, should_reply=should_reply, need_transfer=need_transfer, transfer_msg=transfer_msg)
|
||
|
||
def _detect_price(self, reply: str, state: ConversationState):
|
||
"""从回复中提取价格,同步写入客户数据库(价格必须为5的整数倍)"""
|
||
import re
|
||
numbers = re.findall(r'(\d+)[元]', reply)
|
||
if numbers:
|
||
price = round(int(numbers[0]) / 5) * 5 # 强制为5的整数倍
|
||
state.last_price = price
|
||
metrics_emit("quote_generated", customer_id=state.customer_id, price=price)
|
||
# 持久化到客户数据库,重启后仍可读取
|
||
try:
|
||
from db.customer_db import db
|
||
db.update_last_price(state.customer_id, price)
|
||
except Exception:
|
||
pass
|
||
|
||
async def _check_order_amount(self, customer_id: str, order: dict, acc_id: str):
|
||
"""核查订单实付金额是否与报价一致,异常时企业微信预警"""
|
||
try:
|
||
import re
|
||
from db.customer_db import db
|
||
profile = db.get_customer(customer_id)
|
||
quoted = profile.last_price # 上次报价(元)
|
||
if not quoted:
|
||
return
|
||
|
||
# 从订单解析实付金额
|
||
raw_amount = order.get("amount", "")
|
||
m = re.search(r'[\d.]+', str(raw_amount))
|
||
if not m:
|
||
return
|
||
paid = float(m.group())
|
||
|
||
print(f"[Agent] 订单金额核查:报价 {quoted}元 vs 实付 {paid}元(客户 {customer_id})")
|
||
|
||
# 实付金额明显低于报价(低于报价的 60%)才预警
|
||
if paid < quoted * 0.6:
|
||
msg = (
|
||
f"⚠️ **订单金额异常**\n"
|
||
f"店铺:{acc_id}\n"
|
||
f"客户:{customer_id}({profile.name or ''})\n"
|
||
f"报价:{quoted}元\n"
|
||
f"实付:{paid}元\n"
|
||
f"差额:{quoted - paid:.1f}元 — 请人工核查"
|
||
)
|
||
print(f"[Agent] {msg}")
|
||
await _notify_wechat(msg)
|
||
except Exception as e:
|
||
print(f"[Agent] 订单金额核查失败: {e}")
|
||
|
||
async def _record_deal_success(
|
||
self,
|
||
customer_id: str,
|
||
customer_name: str,
|
||
acc_id: str,
|
||
platform: str,
|
||
order: dict,
|
||
state: "ConversationState",
|
||
):
|
||
"""成交时写入数据库,供日报与数据分析"""
|
||
try:
|
||
import re
|
||
from db.deal_outcome_db import record_deal
|
||
order_id = order.get("order_id", "")
|
||
raw_amount = order.get("amount", "")
|
||
m = re.search(r"[\d.]+", str(raw_amount))
|
||
amount = float(m.group()) if m else 0
|
||
reason = "让价后成交" if (state.discount_count or 0) > 0 else "直接成交"
|
||
record_deal(
|
||
customer_id=customer_id,
|
||
outcome="成交",
|
||
reason=reason,
|
||
customer_name=customer_name or "",
|
||
acc_id=acc_id or "",
|
||
platform=platform or "",
|
||
order_id=order_id,
|
||
amount=amount,
|
||
discount_given=(state.discount_count or 0) > 0,
|
||
)
|
||
# 同步到客户库
|
||
try:
|
||
from db.customer_db import db
|
||
if order_id:
|
||
db.add_order(customer_id, order_id, amount)
|
||
db.clear_quote_no_convert(customer_id)
|
||
except Exception:
|
||
pass
|
||
print(f"[Agent] 成交记录: {customer_id} {reason} {amount}元")
|
||
except Exception as e:
|
||
print(f"[Agent] 成交记录失败: {e}")
|
||
|
||
async def _record_deal_fail(
|
||
self,
|
||
customer_id: str,
|
||
customer_name: str,
|
||
acc_id: str,
|
||
platform: str,
|
||
reason: str,
|
||
):
|
||
"""未成交时写入数据库,供日报与数据分析;标记报价未成交,下次可适当降低"""
|
||
try:
|
||
from db.deal_outcome_db import record_deal
|
||
from db.customer_db import db
|
||
record_deal(
|
||
customer_id=customer_id,
|
||
outcome="未成交",
|
||
reason=reason,
|
||
customer_name=customer_name or "",
|
||
acc_id=acc_id or "",
|
||
platform=platform or "",
|
||
)
|
||
db.mark_quote_no_convert(customer_id)
|
||
print(f"[Agent] 未成交记录: {customer_id} {reason}")
|
||
except Exception as e:
|
||
print(f"[Agent] 未成交记录失败: {e}")
|
||
|
||
async def _auto_tag(self, message: CustomerMessage, reply: str, state: ConversationState):
|
||
"""自动识别并写入各类标签"""
|
||
try:
|
||
from db.customer_db import db
|
||
cid = message.from_id
|
||
msg = message.msg.lower()
|
||
|
||
# 批量潜力
|
||
if any(kw in msg for kw in ["还有", "多张", "好几张", "一批", "下次还"]):
|
||
db.set_bulk_potential(cid, "有")
|
||
db.add_upsell_opportunity(cid, "批量打包")
|
||
|
||
# 加购机会:问过PSD/分层
|
||
if any(kw in msg for kw in ["psd", "分层", "源文件"]):
|
||
db.add_upsell_opportunity(cid, "分层PSD")
|
||
db.update_preferred_format(cid, "psd")
|
||
|
||
# 格式偏好
|
||
if "jpg" in msg or "jpeg" in msg:
|
||
db.update_preferred_format(cid, "jpg")
|
||
if "png" in msg:
|
||
db.update_preferred_format(cid, "png")
|
||
|
||
# 尺寸/分辨率偏好
|
||
if any(kw in msg for kw in ["分辨率", "dpi", "尺寸", "大图", "印刷"]):
|
||
db.update_preferred_size(cid, message.msg[:30])
|
||
|
||
# 决策速度:收到报价后立刻说拍了 → 快
|
||
if any(kw in msg for kw in ["拍了", "下单了", "好的", "行"]) and state.last_price:
|
||
db.update_decision_speed(cid, "快")
|
||
|
||
# 图片类型识别(简单关键词匹配)
|
||
type_keywords = {
|
||
"印花": ["印花", "花纹", "图案", "面料", "布料", "纺织"],
|
||
"logo": ["logo", "标志", "品牌", "商标"],
|
||
"人物": ["人物", "人像", "照片", "脸", "头像"],
|
||
"产品": ["产品", "商品", "包装", "实物"],
|
||
"老照片": ["老照片", "旧照片", "发黄", "修复"],
|
||
}
|
||
for img_type, keywords in type_keywords.items():
|
||
if any(kw in message.msg for kw in keywords):
|
||
db.add_image_type(cid, img_type)
|
||
break
|
||
|
||
# 定期自动计算衍生标签
|
||
db.auto_compute_tags(cid)
|
||
|
||
except Exception:
|
||
pass
|
||
|
||
def _detect_discount(self, message: str, state: ConversationState):
|
||
"""检测压价,并持久化让价记录"""
|
||
if any(kw in message for kw in ["贵", "便宜", "太贵", "有点贵"]):
|
||
state.discount_count += 1
|
||
if state.last_price:
|
||
try:
|
||
from db.customer_db import db
|
||
db.record_discount(state.customer_id, state.last_price)
|
||
except Exception:
|
||
pass
|
||
# 客户明确给价(如“10元/10块/能10吗”)
|
||
import re
|
||
m = re.search(r'(\d+)\s*元|\b(\d+)\s*块', message)
|
||
offer = None
|
||
if m:
|
||
offer = int(m.group(1) or m.group(2))
|
||
if offer:
|
||
try:
|
||
from config.config import MIN_PRICE_FLOOR
|
||
if offer < MIN_PRICE_FLOOR:
|
||
# 标记本次为超低出价(用于后续拒绝话术提示)
|
||
state.last_price = state.last_price or 0
|
||
except Exception:
|
||
pass
|
||
|
||
def _parse_order_info(self, msg: str) -> dict:
|
||
"""从系统订单消息中提取所有字段"""
|
||
import re
|
||
info = {}
|
||
|
||
m = re.search(r'订单号[::]\s*(\d+)', msg)
|
||
if m:
|
||
info['order_id'] = m.group(1)
|
||
|
||
# 订单大状态(新订单/交易成功/交易关闭等)
|
||
m = re.search(r'订单状态[::]\s*([^\s\[]+)', msg)
|
||
if m:
|
||
info['order_status'] = m.group(1).strip()
|
||
|
||
# 支付细状态(等待买家付款/等待发货/交易完成等)
|
||
m = re.search(r'\[状态[::]\s*([^\]]+)\]', msg)
|
||
if m:
|
||
info['pay_status'] = m.group(1).strip()
|
||
|
||
# 金额
|
||
m = re.search(r'金额[::]\s*([\d.]+)元', msg)
|
||
if m:
|
||
info['amount'] = m.group(1)
|
||
|
||
# 数量
|
||
m = re.search(r'数量[::]\s*(\d+)', msg)
|
||
if m:
|
||
info['quantity'] = m.group(1)
|
||
|
||
# 时间(格式:2026-2-24 19:52:52)
|
||
m = re.search(r'(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})', msg)
|
||
if m:
|
||
info['order_time'] = m.group(1).strip()
|
||
|
||
# 买家备注
|
||
m = re.search(r'买家备注[::]\s*([^\n]+)', msg)
|
||
if m and m.group(1).strip():
|
||
info['buyer_note'] = m.group(1).strip()
|
||
|
||
return info
|
||
|
||
def _get_order_instruction(self, pay_status: str, order_status: str) -> str:
|
||
"""
|
||
根据订单状态生成 AI 指令。
|
||
只有「已付款」才需要回复客户,其他状态一律静默。
|
||
"""
|
||
paid_keywords = ["等待发货", "已付款", "付款成功", "买家已付款"]
|
||
if any(kw in pay_status or kw in order_status for kw in paid_keywords):
|
||
return "【已付款-必须回复】客户已付款,立刻自然回复确认收款并告知马上安排。"
|
||
else:
|
||
# 所有其他状态(待付款、交易完成、关闭等)静默处理
|
||
return "【仅系统通知-无需回复客户】这是系统订单通知,不需要回复客户任何内容,直接跳过。"
|
||
|
||
def _extract_image_url(self, msg: str) -> str:
|
||
"""从消息中提取图片URL,兼容纯URL和 text#*#url 两种格式"""
|
||
urls = self._extract_image_urls(msg)
|
||
return urls[0] if urls else ""
|
||
|
||
def _extract_image_urls(self, msg: str) -> List[str]:
|
||
"""提取消息中的所有图片URL(去重保序)。"""
|
||
import re
|
||
if not msg:
|
||
return []
|
||
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
|
||
image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com", "suning.com")
|
||
candidates = re.findall(r'https?://[^\s#]+', msg)
|
||
urls: List[str] = []
|
||
for u in candidates:
|
||
low = u.lower()
|
||
if any(ext in low for ext in image_exts) or any(h in low for h in image_hosts):
|
||
if u not in urls:
|
||
urls.append(u)
|
||
return urls
|
||
|
||
def _strip_urls_from_text(self, msg: str) -> str:
|
||
"""去掉 URL 后的纯文本,用于提取额外需求。"""
|
||
import re
|
||
if not msg:
|
||
return ""
|
||
text = re.sub(r'https?://\S+', ' ', msg)
|
||
text = text.replace("#*#", " ").strip()
|
||
text = re.sub(r'\s+', ' ', text)
|
||
return text.strip(",,。.!!??;;:: ")
|
||
|
||
def _is_batch_finish_signal(self, text: str) -> bool:
|
||
"""客户是否表达“图发完了,可以统一报价”。"""
|
||
if not text:
|
||
return False
|
||
finish_keywords = [
|
||
"发完了", "都发完了", "发齐了", "齐了", "先这些", "就这些", "全部", "一起报", "统一报价",
|
||
"总共多少钱", "一共多少钱", "打包价", "总价", "报价吧", "报个总价", "给个总价",
|
||
]
|
||
return any(k in text for k in finish_keywords)
|
||
|
||
def _build_collect_ack(self, count: int) -> str:
|
||
templates = [
|
||
"收到,这边先记下了(已收{n}张)。你继续发,等你发完我再一起给你打包报价。",
|
||
"好的,当前这批先收到了(第{n}张)。还有图就继续发,发齐我一次性给你总价。",
|
||
"没问题,已记录到第{n}张。你把需求和图片都发完,我统一给你报更合适的价格。",
|
||
]
|
||
return random.choice(templates).format(n=count)
|
||
|
||
def _build_collect_remind(self, count: int) -> str:
|
||
templates = [
|
||
"需求我记下了(当前共{n}张图)。你继续发齐,发完回我“发完了”,我一次性给你总价。",
|
||
"好的,这条需求也加上了(现在{n}张)。等你说发完,我立刻统一报价。",
|
||
"收到,这个要求我也记住了(共{n}张)。你发完我就给你打包价。",
|
||
]
|
||
return random.choice(templates).format(n=count)
|
||
|
||
def _append_requirement(self, state: ConversationState, text: str):
|
||
"""追加需求并做去重/截断,减少上下文噪音。"""
|
||
t = (text or "").strip()
|
||
if not t:
|
||
return
|
||
t = t[:120]
|
||
if state.pending_requirements and state.pending_requirements[-1] == t:
|
||
return
|
||
if t in state.pending_requirements[-5:]:
|
||
return
|
||
state.pending_requirements.append(t)
|
||
if len(state.pending_requirements) > 20:
|
||
state.pending_requirements = state.pending_requirements[-20:]
|
||
|
||
def _calc_requirement_surcharge(self, requirements: List[str]) -> Dict[str, Any]:
|
||
"""
|
||
把客户补充需求做成结构化加价,避免纯靠模型自由发挥导致价格波动。
|
||
返回:
|
||
{"extra": int, "hits": List[str]}
|
||
"""
|
||
text = " ".join(requirements or [])
|
||
rules = [
|
||
(["分层", "psd", "源文件"], 30, "分层/源文件"),
|
||
(["去背景", "抠图", "透明底", "白底"], 5, "去背景"),
|
||
(["换背景", "换场景", "合成"], 10, "合成/换背景"),
|
||
(["改字", "改文字", "替换文字", "排版"], 10, "改文字/排版"),
|
||
(["调色", "改色", "换色", "配色"], 5, "调色"),
|
||
(["多版本", "多个版本", "两版", "三版"], 10, "多版本"),
|
||
(["加急", "今天要", "马上要", "尽快"], 10, "加急"),
|
||
]
|
||
total = 0
|
||
hits: List[str] = []
|
||
for keywords, fee, label in rules:
|
||
if any(k in text for k in keywords):
|
||
total += fee
|
||
hits.append(f"{label}+{fee}")
|
||
# 防止需求加价过高,做个上限保护
|
||
total = min(total, 60)
|
||
# 金额统一 5 的倍数
|
||
total = round(total / 5) * 5
|
||
return {"extra": total, "hits": hits}
|
||
|
||
def _build_batch_quote_reply(
|
||
self,
|
||
results: List[Tuple[str, Dict[str, Any]]],
|
||
total_suggest: int,
|
||
bundle_price: int,
|
||
req_fee: Dict[str, Any],
|
||
) -> str:
|
||
"""构建分图明细 + 单条总报价可选项回复。"""
|
||
complexity_map = {
|
||
"simple": "简单",
|
||
"normal": "常规",
|
||
"complex": "复杂",
|
||
"hard": "高难",
|
||
}
|
||
detail_lines: List[str] = []
|
||
for i, (_, r) in enumerate(results, 1):
|
||
p = int(r.get("price_suggest", 20) or 20)
|
||
cx = complexity_map.get(str(r.get("complexity", "normal")), "常规")
|
||
reason = str(r.get("reason", "常规处理")).replace("\n", " ").strip()
|
||
if len(reason) > 18:
|
||
reason = reason[:18] + "..."
|
||
detail_lines.append(f"图{i}:{p}元({cx},{reason})")
|
||
|
||
extra = int(req_fee.get("extra", 0) or 0)
|
||
single_total = round((total_suggest + extra) / 5) * 5
|
||
req_hit = "、".join(req_fee.get("hits", [])) if req_fee.get("hits") else ""
|
||
|
||
lines = ["先给你分图报下:"]
|
||
lines.extend(detail_lines)
|
||
if req_hit:
|
||
lines.append(f"需求加价:+{extra}元({req_hit})")
|
||
option_line = f"可选:A按单张做共{single_total}元;B打包一起做{bundle_price}元(更划算)。"
|
||
lines.append(option_line)
|
||
lines.append("你定一个方案,我这边马上安排。")
|
||
return "\n".join(lines)
|
||
|
||
async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]:
|
||
"""
|
||
批量识别待处理图片并统一处理:
|
||
- find_image 意图且可自动处理:直接 Gemini 处理 + 上传图绘 + 回链接
|
||
- 高风险/不可做:转人工
|
||
- 其他:统一报价
|
||
"""
|
||
from image.image_analyzer import image_analyzer
|
||
|
||
urls = list(state.pending_image_urls)
|
||
if not urls:
|
||
return {"reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False}
|
||
try:
|
||
from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY
|
||
max_images = max(1, int(BATCH_MAX_IMAGES))
|
||
analyze_concurrency = max(1, int(BATCH_ANALYZE_CONCURRENCY))
|
||
except Exception:
|
||
max_images = 12
|
||
analyze_concurrency = 3
|
||
if len(urls) > max_images:
|
||
return {
|
||
"reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。",
|
||
"need_transfer": False,
|
||
}
|
||
urls = urls[:max_images]
|
||
|
||
sem = asyncio.Semaphore(analyze_concurrency)
|
||
async def _analyze_one(url: str):
|
||
async with sem:
|
||
try:
|
||
r = await image_analyzer.analyze(url)
|
||
except Exception:
|
||
r = {
|
||
"complexity": "normal",
|
||
"reason": "识别异常,按常规估价",
|
||
"price_min": 15,
|
||
"price_max": 25,
|
||
"price_suggest": 20,
|
||
"success": False,
|
||
}
|
||
return url, r
|
||
|
||
results = list(await asyncio.gather(*[_analyze_one(u) for u in urls]))
|
||
for url, r in results:
|
||
# 与单图流程一致:识别后写入 workflow 任务
|
||
try:
|
||
from core.workflow import workflow
|
||
await workflow.image_analysis_result(
|
||
customer_id=message.from_id,
|
||
image_url=url,
|
||
complexity=r.get("complexity", "normal"),
|
||
acc_id=message.acc_id,
|
||
acc_type=message.acc_type,
|
||
gemini_prompt=r.get("gemini_prompt", ""),
|
||
aspect_ratio=r.get("aspect_ratio", "1:1"),
|
||
perspective=r.get("perspective", "no"),
|
||
proc_type=r.get("proc_type", ""),
|
||
subject=r.get("subject", ""),
|
||
quality=r.get("quality", ""),
|
||
)
|
||
except Exception as e:
|
||
print(f"[Agent] Workflow 批量任务创建失败: {e}")
|
||
|
||
total_min = sum(int(r.get("price_min", 15) or 15) for _, r in results)
|
||
total_max = sum(int(r.get("price_max", 25) or 25) for _, r in results)
|
||
total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results)
|
||
req_fee = self._calc_requirement_surcharge(state.pending_requirements)
|
||
|
||
# 打包优惠:2 张减 5,3 张及以上按 9 折(四舍五入到 5 元)
|
||
if len(results) == 2:
|
||
bundle_price = max(10, total_suggest - 5)
|
||
elif len(results) >= 3:
|
||
bundle_price = max(10, round(total_suggest * 0.9 / 5) * 5)
|
||
else:
|
||
bundle_price = total_suggest
|
||
bundle_price += int(req_fee.get("extra", 0) or 0)
|
||
bundle_price = round(bundle_price / 5) * 5
|
||
|
||
# 先分流:高风险/不可做 -> 转人工
|
||
unsafe = []
|
||
dense_text_reject = []
|
||
for i, (_, r) in enumerate(results, 1):
|
||
if r.get("feasibility") == "no" or r.get("risk") == "high":
|
||
unsafe.append(f"图{i}")
|
||
note = str(r.get("note", "") or "")
|
||
if "文字内容过于密集" in note or "密集文字" in note:
|
||
dense_text_reject.append(f"图{i}")
|
||
if unsafe:
|
||
state.pending_image_urls.clear()
|
||
state.pending_requirements.clear()
|
||
self._sync_pending_quote_state(message.from_id, state)
|
||
if dense_text_reject and len(dense_text_reject) == len(unsafe):
|
||
return {
|
||
"reply": self._build_reject_message("文字密集类图片暂不接单"),
|
||
"need_transfer": False,
|
||
}
|
||
return {
|
||
"reply": f"这批里{'、'.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。",
|
||
"need_transfer": True,
|
||
}
|
||
|
||
# 查找图片意图:直接自动处理并返回图绘链接
|
||
intent_text = (message.msg or "") + " " + " ".join(state.pending_requirements[-5:])
|
||
workflow_type, _ = self.workflow_router.detect_workflow(intent_text)
|
||
if workflow_type == "find_image":
|
||
links = []
|
||
try:
|
||
from image.image_processor import image_processor
|
||
from utils.image_queue import run_with_queue
|
||
for idx, (url, r) in enumerate(results, 1):
|
||
req_parts = [f"complexity:{r.get('complexity', 'normal')}"]
|
||
if r.get("gemini_prompt"):
|
||
req_parts.append(f"prompt:{r.get('gemini_prompt')}")
|
||
if r.get("aspect_ratio"):
|
||
req_parts.append(f"ratio:{r.get('aspect_ratio')}")
|
||
if r.get("perspective") and r.get("perspective") != "no":
|
||
req_parts.append(f"perspective:{r.get('perspective')}")
|
||
if r.get("proc_type"):
|
||
req_parts.append(f"proc_type:{r.get('proc_type')}")
|
||
if r.get("subject"):
|
||
req_parts.append(f"subject:{r.get('subject')}")
|
||
if r.get("quality"):
|
||
req_parts.append(f"quality:{r.get('quality')}")
|
||
|
||
process_res = await run_with_queue(image_processor.process_image(
|
||
url,
|
||
"enhance",
|
||
requirements="|".join(req_parts),
|
||
gemini_prompt=r.get("gemini_prompt", ""),
|
||
aspect_ratio=r.get("aspect_ratio", "1:1"),
|
||
perspective=r.get("perspective", "no"),
|
||
proc_type=r.get("proc_type", ""),
|
||
subject=r.get("subject", ""),
|
||
quality=r.get("quality", ""),
|
||
))
|
||
if not process_res.get("success"):
|
||
raise RuntimeError(process_res.get("message", "图片处理失败"))
|
||
|
||
ok, link, _ = await upload_to_tuhui(
|
||
process_res["result_path"],
|
||
title=f"客户{message.from_id[-4:]}-图片{idx}",
|
||
description="AI自动处理结果",
|
||
price=max(10, int(r.get("price_suggest", 20) or 20) + int(req_fee.get("extra", 0) or 0) // max(1, len(results))),
|
||
)
|
||
if not ok:
|
||
raise RuntimeError(str(link))
|
||
links.append(link)
|
||
except Exception as e:
|
||
print(f"[Agent] 自动处理并上传失败,回退统一报价: {e}")
|
||
else:
|
||
lines = [f"这批我先给你处理好了,按打包 {bundle_price} 元。"]
|
||
for i, link in enumerate(links, 1):
|
||
lines.append(f"链接{i}:{link}")
|
||
lines.append("你先看下效果,没问题我就按这个标准继续给你做。")
|
||
state.last_price = bundle_price
|
||
try:
|
||
from db.customer_db import db
|
||
db.update_last_price(message.from_id, bundle_price)
|
||
except Exception:
|
||
pass
|
||
state.pending_image_urls.clear()
|
||
state.pending_requirements.clear()
|
||
self._sync_pending_quote_state(message.from_id, state)
|
||
return {"reply": "\n".join(lines), "need_transfer": False}
|
||
|
||
reply_text = self._build_batch_quote_reply(
|
||
results=results,
|
||
total_suggest=total_suggest,
|
||
bundle_price=bundle_price,
|
||
req_fee=req_fee,
|
||
)
|
||
|
||
state.last_price = bundle_price
|
||
try:
|
||
from db.customer_db import db
|
||
db.update_last_price(message.from_id, bundle_price)
|
||
except Exception:
|
||
pass
|
||
|
||
# 清空待报价队列(本轮已统一报价)
|
||
state.pending_image_urls.clear()
|
||
state.pending_requirements.clear()
|
||
self._sync_pending_quote_state(message.from_id, state)
|
||
return {"reply": reply_text, "need_transfer": False}
|
||
|
||
def _split_customer_text(self, msg: str) -> tuple:
|
||
"""
|
||
把混合消息拆分为(客户真实文字, 系统订单块)。
|
||
平台有时把客户文字和系统订单通知拼在同一条消息里。
|
||
"""
|
||
import re
|
||
# 找到系统订单块的起始位置
|
||
order_marker = re.search(r'\[系统订单信息\]|\[系统通知\]', msg)
|
||
if order_marker:
|
||
customer_text = msg[:order_marker.start()].strip()
|
||
order_block = msg[order_marker.start():].strip()
|
||
else:
|
||
customer_text = msg.strip()
|
||
order_block = ""
|
||
return customer_text, order_block
|
||
|
||
def _build_prompt(self, message: CustomerMessage, state: ConversationState) -> str:
|
||
"""构建提示词"""
|
||
msg_content = message.msg
|
||
stage_info = f"【当前阶段】{state.stage}"
|
||
|
||
# 拆分:客户文字 vs 系统订单块
|
||
customer_text, order_block = self._split_customer_text(msg_content)
|
||
has_order = bool(order_block)
|
||
|
||
if has_order:
|
||
order = self._parse_order_info(order_block)
|
||
if order.get('order_id'):
|
||
state.last_order_id = order['order_id']
|
||
stage_info += f"\n【订单号】{order['order_id']}"
|
||
if order.get('order_status'):
|
||
state.order_status = order['order_status']
|
||
stage_info += f"\n【订单状态】{order['order_status']}"
|
||
if order.get('pay_status'):
|
||
stage_info += f"\n【支付状态】{order['pay_status']}"
|
||
if order.get('amount'):
|
||
stage_info += f"\n【订单金额】{order['amount']}元"
|
||
if order.get('quantity'):
|
||
stage_info += f"\n【数量】{order['quantity']}件"
|
||
if order.get('order_time'):
|
||
stage_info += f"\n【下单时间】{order['order_time']}"
|
||
if order.get('buyer_note'):
|
||
stage_info += f"\n【买家备注】{order['buyer_note']}"
|
||
|
||
if state.discount_count > 0:
|
||
stage_info += f"\n【客户压价次数】{state.discount_count}"
|
||
|
||
# 店铺类型:不同店铺不同回复策略
|
||
shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "")
|
||
shop_hint = ""
|
||
try:
|
||
from config.config import CONFIG_DIR
|
||
import json
|
||
cfg_path = CONFIG_DIR / "shop_prompts.json"
|
||
if cfg_path.exists():
|
||
with open(cfg_path, "r", encoding="utf-8") as f:
|
||
cfg = json.load(f)
|
||
hints = cfg.get("type_hints", {})
|
||
shop_hint = hints.get(shop_type, "")
|
||
if not shop_hint and message.acc_id:
|
||
sh = cfg.get("shops", {}).get(message.acc_id, {})
|
||
shop_hint = sh.get("hint", "")
|
||
except Exception:
|
||
pass
|
||
|
||
prompt = f"""收到新消息:
|
||
{stage_info}
|
||
|
||
发送者: {message.from_name} ({message.from_id})
|
||
"""
|
||
if message.goods_name:
|
||
prompt += f"商品名称: {message.goods_name}\n"
|
||
if shop_hint:
|
||
prompt += f"\n{shop_hint}\n"
|
||
|
||
# ── 优先处理客户真实问题 ──
|
||
# ── 判断订单付款状态(供后续逻辑使用)──
|
||
order_paid = False
|
||
order_unpaid = False
|
||
if has_order:
|
||
order = self._parse_order_info(order_block)
|
||
paid_kws = ["等待发货", "已付款", "付款成功", "买家已付款"]
|
||
unpaid_kws = ["等待买家付款", "待付款", "未付款"]
|
||
ps = order.get('pay_status', '')
|
||
os_ = order.get('order_status', '')
|
||
if any(kw in ps or kw in os_ for kw in paid_kws):
|
||
order_paid = True
|
||
elif any(kw in ps or kw in os_ for kw in unpaid_kws):
|
||
order_unpaid = True
|
||
|
||
# ── 催单/进度询问关键词 ──
|
||
progress_keywords = [
|
||
"安排了吗", "安排好了吗", "好了吗", "做了吗", "做好了吗",
|
||
"弄好了吗", "好了没", "做了没", "什么时候好", "多久好",
|
||
"进度", "催一下", "快点", "什么时候能好", "做完了吗",
|
||
]
|
||
|
||
if customer_text:
|
||
prompt += f"\n客户说:{customer_text}\n"
|
||
image_url = self._extract_image_url(customer_text)
|
||
price_keywords = ["多少钱", "多少", "价格", "几块", "怎么收费", "报个价"]
|
||
|
||
# gemini_api 店铺:不触发找图流程,按 API 客服回复
|
||
if shop_type == "gemini_api":
|
||
prompt += "\n【Gemini API 店铺】客户问账号/pro/续费/套餐等,按 API 客服自然回复,不要求发图。"
|
||
elif image_url:
|
||
prompt += "\n客户在继续发图阶段:先确认“已收图”,并引导客户把图和要求一次发完;等客户明确“发完了/统一报价”后再统一报价。"
|
||
elif any(kw in customer_text for kw in price_keywords):
|
||
last_url = self._extract_image_url(msg_content)
|
||
if last_url:
|
||
prompt += "\n客户在询问价格:若客户已确认发完,则给总报价;若还在发图,先引导发完后统一报价。"
|
||
else:
|
||
prompt += "\n客户在询问价格但未发图,回复「发图来我看看」。"
|
||
elif any(kw in customer_text for kw in progress_keywords):
|
||
# 客户问进度/催单,必须先核查付款状态
|
||
if order_unpaid:
|
||
prompt += "\n⚠️【订单未付款】客户问安排进度,但订单还未付款。自然告知拍下付款后马上安排即可。"
|
||
elif order_paid:
|
||
prompt += "\n客户催单,订单已付款,自然回复在做了/快了之类。"
|
||
else:
|
||
prompt += "\n客户催单,查询当前处理状态后自然回复。"
|
||
elif any(kw in customer_text for kw in ["贵", "有点贵", "太贵", "算了", "便宜点", "少点", "打折", "贵哦"]):
|
||
# 客户嫌贵/要放弃 → 直接让价一次,不问「什么问题」
|
||
prompt += "\n⚠️【客户嫌贵】客户已表达价格顾虑或要放弃。禁止追问「什么问题」「说清楚点」。\n→ 第一步:直接让价一次(如原价 20→15),话术自然。\n→ 第二步:如果让价后客户还是嫌贵,引导客户自己去注册星绘平台:\n - 每个话术都必须带网址 xinhui.cloud\n - 强调「自己去弄」「自己操作」「自己下单」\n - 示例:「嫌贵就去 xinhui.cloud 注册个号,自己有余额可以弄」「xinhui.cloud 去注册下,新用户送体验金,自己弄便宜」\n - 语气直接,让客户自己去操作」"
|
||
elif any(kw in customer_text for kw in ["擦边", "黄色", "色情", "大尺度", "性感图", "露点", "半裸"]):
|
||
# 客户问擦边/黄色内容 → 直接拒绝,不说「发图来看看」
|
||
prompt += "\n⚠️【拒绝】客户询问擦边/黄色/敏感内容。直接拒绝,不接单,不说「发图来看看」。自然回复如:这类不做/不接/做不了。"
|
||
else:
|
||
prompt += "\n根据客户说的内容自然回应,像真人聊天,不要套模板。"
|
||
|
||
# ── 附加订单信息(不覆盖客户问题的优先级)──
|
||
if has_order:
|
||
order = self._parse_order_info(order_block)
|
||
order_instruction = self._get_order_instruction(
|
||
order.get('pay_status', ''),
|
||
order.get('order_status', '')
|
||
)
|
||
if customer_text:
|
||
if not order_unpaid:
|
||
# 未付款情况已在上面明确处理,不重复添加背景
|
||
prompt += f"\n\n【背景参考-订单通知】{order_instruction}"
|
||
else:
|
||
# 纯系统通知,没有客户文字
|
||
prompt += f"\n\n{order_instruction}"
|
||
|
||
if not customer_text and not has_order:
|
||
prompt += f"\n消息内容: {msg_content}\n请按工作流规则回复。"
|
||
|
||
return prompt
|
||
|
||
async def _handle_image_workflow(self, message: str, data: dict, image_urls: list) -> bool:
|
||
"""处理图片工作流(根据客户说的话判断执行哪种工作流)"""
|
||
if not image_urls:
|
||
return False
|
||
|
||
workflow_type, confidence = self.workflow_router.detect_workflow(message)
|
||
|
||
customer_id = data.get('from_id')
|
||
acc_id = data.get('acc_id', '')
|
||
acc_type = data.get('acc_type', 'AliWorkbench')
|
||
image_url = image_urls[0]
|
||
|
||
print(f"[Agent] 检测到工作流类型:{workflow_type} (置信度:{confidence})")
|
||
|
||
if workflow_type == "find_image":
|
||
print(f"[Agent] 执行查找图片工作流 | 客户:{customer_id}")
|
||
from core.workflow import workflow
|
||
return await workflow.find_image_workflow(
|
||
customer_id=customer_id,
|
||
image_url=image_url,
|
||
acc_id=acc_id,
|
||
acc_type=acc_type
|
||
)
|
||
elif workflow_type == "process_image":
|
||
print(f"[Agent] 执行处理图片工作流 | 客户:{customer_id}")
|
||
from core.workflow import workflow
|
||
return await workflow.process_image_workflow(
|
||
customer_id=customer_id,
|
||
image_url=image_url,
|
||
acc_id=acc_id,
|
||
acc_type=acc_type
|
||
)
|
||
elif workflow_type == "transfer_human":
|
||
print(f"[Agent] 执行转人工派单工作流 | 客户:{customer_id}")
|
||
from core.workflow import workflow
|
||
return await workflow.transfer_to_designer_workflow(
|
||
customer_id=customer_id,
|
||
image_url=image_url,
|
||
acc_id=acc_id,
|
||
acc_type=acc_type,
|
||
reason="客户主动要求转人工"
|
||
)
|
||
|
||
return False
|
||
|
||
|
||
async def test_agent():
|
||
"""测试 Agent"""
|
||
agent = CustomerServiceAgent(skills_dir="skills")
|
||
|
||
test_msg = CustomerMessage(
|
||
msg_id="123",
|
||
acc_id="test_account",
|
||
msg="这张图可以做吗?",
|
||
from_id="customer001",
|
||
from_name="张三",
|
||
cy_id="customer001",
|
||
acc_type="AliWorkbench",
|
||
msg_type=0,
|
||
cy_name="张三",
|
||
goods_name="专业找图代找高清图片",
|
||
goods_order=""
|
||
)
|
||
|
||
response = await agent.process_message(test_msg)
|
||
print(f"回复内容: {response.reply}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import asyncio
|
||
asyncio.run(test_agent())
|