Files
tw/core/pydantic_ai_agent.py

2416 lines
107 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""PydanticAI Agent 模块
架构:单 Agent + 多 Tool 模式
- Agent 负责对话逻辑和决策
- Tool 负责具体能力:看图/查客户/转接
- AI 自主决定何时调用哪个工具,时序自然,不需要外部协调
"""
import os
import glob
import asyncio
import random
import hashlib
import re
import json
import logging
from pathlib import Path
from typing import Optional, Dict, List, Any, Tuple
from datetime import datetime
from pydantic import BaseModel, Field, model_validator
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
from dotenv import load_dotenv
from utils.metrics_tracker import emit as metrics_emit
from utils.observability import emit_activity, build_trace_id
from core.quote_state_machine import QuoteStateMachine
from services.risk_service import RiskService
from core.agent_pre_rules import AgentPreRuleService
from core.find_image_flow import handle_find_image_batch_flow
from core.order_flow import handle_order_notification
from core.ai_reply_flow import execute_ai_turn
from core.reply_finalize_flow import finalize_ai_reply
from core.prompt_flow import build_prompt_bundle
from core.order_helpers import parse_order_info, order_instruction as build_order_instruction
from core.collection_intent_helpers import (
append_requirement,
build_collect_ack,
build_collect_progress_reply,
build_collect_remind,
build_find_image_clarify_reply,
build_not_understood_reply,
classify_short_customer_text,
is_batch_finish_intent,
is_batch_finish_signal,
is_cross_image_composite_intent,
is_find_image_not_edit_conflict,
is_related_image_followup_intent,
is_result_followup_query,
needs_clarification_in_collecting,
)
from core.agent_prompts import (
build_after_sale_prompt,
build_natural_reply_prompt,
build_order_prompt,
build_pricing_prompt,
build_processing_prompt,
build_risk_prompt,
build_similar_prompt,
build_system_prompt,
)
load_dotenv()
from services.service_tuhui_upload import upload_to_tuhui
from core.workflow_router import get_workflow_router
from core.workflow_router import get_workflow_router
from db.customer_risk_db import risk_db
# ========== 企业微信通知 ==========
_WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "")
logger = logging.getLogger("cs_agent")
async def _notify_wechat(content: str, tag: str = "通知"):
"""发送企业微信 markdown 通知,任何异常都发"""
if not _WECHAT_WEBHOOK:
print(f"[{tag}] 未配置 WECHAT_WEBHOOK跳过推送")
return
try:
import httpx
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(_WECHAT_WEBHOOK, json={
"msgtype": "markdown",
"markdown": {"content": content}
})
data = resp.json()
if data.get("errcode") == 0:
print(f"[{tag}] 企业微信推送成功 ✓")
else:
print(f"[{tag}] 企业微信推送失败: {data}")
except Exception as e:
print(f"[{tag}] 企业微信发送异常: {e}")
async def _notify_wechat_overdue():
"""API 欠费时发企业微信通知"""
await _notify_wechat(
"⚠️ **火山引擎 API 欠费**客服AI已停止响应请立即充值\n"
"地址https://console.volcengine.com/ark"
)
# ========== 转接常量 ==========
TRANSFER_MESSAGE = "话术|[转移会话],分组20252916034,无原因"
CASE_LIBRARY_LINK = "https://www.yuque.com/zuowei-dfvpq/kge0in/mynala0g35b8cec5"
TAOBAO_REPLY_TAILS = ("", "", "好的", "嗯咯", "嗯啦")
def _is_ack_like_customer_text(text: str) -> bool:
"""客户是否为确认型短句(好的/嗯/收到/ok 等)。"""
s = (text or "").strip().lower()
if not s:
return False
s = s.rstrip("。.!~")
ack_set = {
"", "好的", "", "嗯嗯", "收到", "知道了", "明白了",
"ok", "okay", "", "可以", "好嘞", "好的呢",
}
return s in ack_set
def _is_meaningless_short_text(text: str) -> bool:
"""识别无意义短句:仅需简短承接,不进入复杂流程。"""
s = (text or "").strip().lower().rstrip("。.!~")
if not s:
return False
meaningless = {
"", "好的", "", "嗯嗯", "", "哦哦", "收到", "知道了", "明白了",
"ok", "okay", "", "可以", "好嘞", "好的呢", "在吗", "有人吗", "在不在",
}
return s in meaningless
# ========== 数据模型 ==========
class CustomerMessage(BaseModel):
"""客户消息模型"""
msg_id: str
acc_id: str
msg: str
from_id: str
from_name: str
cy_id: str
acc_type: str
msg_type: int
cy_name: str
goods_name: Optional[str] = None
goods_order: Optional[str] = None
class ConversationState(BaseModel):
"""对话状态"""
customer_id: str
stage: str = "售前" # 售前/售后
last_price: Optional[int] = None # 最后报价
last_min_price: Optional[int] = None # 最近图片的最低价
last_order_id: Optional[str] = None # 订单号
order_status: Optional[str] = None # 订单状态
discount_count: int = 0 # 让价次数
image_count: int = 0 # 图片数量
pending_image_urls: List[str] = Field(default_factory=list) # 待统一报价图片
pending_requirements: List[str] = Field(default_factory=list) # 待统一报价需求
quote_phase: str = "idle" # idle/collecting/ready_to_quote/waiting_result
quote_ready_turns: int = 0 # ready_to_quote 阶段还需等待的消息轮次
last_update: str = ""
last_reply_at: Optional[datetime] = None # 最后一次回复客户的时间
class AgentDeps(BaseModel):
"""Agent 依赖项 - 用于传递上下文"""
msg_id: str
acc_id: str
from_id: str
platform: str
class AgentResponse(BaseModel):
"""Agent 回复模型"""
reply: str
should_reply: bool = True
need_transfer: bool = False # 是否需要转人工
transfer_msg: str = "" # 转接消息
@model_validator(mode="after")
def _ensure_reply_tail(self):
# 统一在 process_message 中按客户输入决定是否补口语尾词
return self
def _get_shop_type(acc_id: str = "", goods_name: str = "") -> str:
"""根据 acc_id 或 goods_name 判断店铺类型,返回 gemini_api / find_image / default"""
try:
from config.config import CONFIG_DIR
import json
cfg_path = CONFIG_DIR / "shop_prompts.json"
if not cfg_path.exists():
return "find_image"
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
shops = cfg.get("shops", {})
goods_kw = cfg.get("goods_keywords", {})
type_hints = cfg.get("type_hints", {})
# 优先按 acc_id
if acc_id and acc_id in shops:
return shops[acc_id].get("type", "find_image")
# 按商品名关键词
goods_lower = (goods_name or "").lower()
for kw, stype in goods_kw.items():
if kw in goods_lower:
return stype
except Exception:
pass
return "find_image"
def load_skill_map(skills_dir: str = "skills") -> Dict[str, str]:
"""按技能目录名加载 SKILL.md返回 {skill_name: content}。"""
skill_map: Dict[str, str] = {}
skill_files = glob.glob(os.path.join(skills_dir, "**/SKILL.md"), recursive=True)
for skill_file in skill_files:
try:
content = Path(skill_file).read_text(encoding="utf-8")
skill_name = Path(skill_file).parent.name.strip().lower()
if not skill_name:
continue
if skill_name in skill_map:
skill_map[skill_name] += "\n\n" + content
else:
skill_map[skill_name] = content
except Exception as e:
print(f"警告: 读取 {skill_file} 失败: {e}")
return skill_map
class CustomerServiceAgent:
"""客服 Agent - 支持 SKILL.md + 工作流"""
C_RESET = "\033[0m"
C_PROMPT = "\033[96m" # cyan
C_THINK = "\033[95m" # magenta
C_TOOL = "\033[93m" # yellow
C_REPLY = "\033[92m" # green
C_MUTED = "\033[90m" # gray
_DEFAULT_EVOLUTION_CANDIDATE = Path("config") / "evolution_candidate.json"
@staticmethod
def _activity_log(event: str, **kwargs):
emit_activity(
logger,
event=event,
trace_id=str(kwargs.pop("trace_id", "")),
customer_id=str(kwargs.pop("customer_id", "")),
result=str(kwargs.pop("result", "ok")),
**kwargs,
)
def __init__(self, skills_dir: str = "skills"):
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
self.model_name = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
self.reply_persona = os.getenv("AI_REPLY_PERSONA", "淘宝老店主,直爽利落,口语自然")
self.dynamic_collection_replies = os.getenv("AI_DYNAMIC_COLLECTION_REPLIES", "true").strip().lower() in {"1", "true", "yes", "on"}
self.rewrite_all_replies = os.getenv("AI_REWRITE_ALL_REPLIES", "true").strip().lower() in {"1", "true", "yes", "on"}
try:
self.batch_quote_delay_turns = max(0, int(os.getenv("BATCH_QUOTE_DELAY_TURNS", "1")))
except Exception:
self.batch_quote_delay_turns = 1
self.quote_state_machine = QuoteStateMachine(delay_turns=self.batch_quote_delay_turns)
self.risk_service = RiskService()
self.pre_rule_service = AgentPreRuleService(self, self.risk_service)
if not self.api_key:
raise ValueError("请设置 OPENAI_API_KEY 环境变量")
# 对话状态管理
self.conversations: Dict[str, ConversationState] = {}
# 多轮对话历史PydanticAI ModelMessage 列表按客户ID存储
self.message_histories: Dict[str, list] = {}
self.evolution_candidate = self._load_evolution_candidate()
# 加载技能并按角色拆分,避免所有 Agent 吃同一份大杂烩提示词
self.skill_map = load_skill_map(skills_dir)
self.skill_style = self._compose_skill_content(["style-skill", "owner-style"])
self.skill_pre_sales = self._compose_skill_content(["pre-sales-skill"])
self.skill_pricing = self._compose_skill_content(["pricing-skill"])
self.skill_after_sale = self._compose_skill_content(["after-sales-skill"])
self.skill_risk = self._compose_skill_content(["risk-skill"])
# 创建 OpenAI 模型
model = OpenAIChatModel(
model_name=self.model_name,
provider=OpenAIProvider(
api_key=self.api_key,
base_url=self.base_url
)
)
try:
from config.config import MIN_PRICE_FLOOR
min_price_floor = MIN_PRICE_FLOOR
except Exception:
min_price_floor = 15
self.agent = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=build_system_prompt(self.reply_persona, self.skill_pre_sales, self.skill_style),
)
self.agent_after_sale = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=build_after_sale_prompt(self.skill_after_sale, self.skill_style),
)
self.agent_pricing = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=build_pricing_prompt(
min_price_floor=min_price_floor,
case_library_link=CASE_LIBRARY_LINK,
skill_pricing=self.skill_pricing,
skill_style=self.skill_style,
),
)
self.agent_processing = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=build_processing_prompt(self.skill_after_sale, self.skill_style),
)
self.agent_similar = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=build_similar_prompt(self.skill_pre_sales, self.skill_style),
)
self.agent_natural_reply = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=build_natural_reply_prompt(self.reply_persona, self.skill_style),
)
# 工作流程路由器
self.workflow_router = get_workflow_router()
self.agent_order = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=build_order_prompt(self.skill_after_sale, self.skill_style),
)
self.agent_risk = Agent(
model=model,
deps_type=AgentDeps,
system_prompt=build_risk_prompt(self.skill_risk, self.skill_style),
)
# 注册工具
self._register_tools()
def _compose_skill_content(self, names: List[str]) -> str:
"""按技能名拼接技能文本,找不到则跳过。"""
parts: List[str] = []
for name in names:
key = (name or "").strip().lower()
if key and key in self.skill_map:
parts.append(self.skill_map[key])
return "\n\n".join(parts)
def _load_evolution_candidate(self) -> Dict[str, Any]:
"""读取自我进化候选配置(灰度策略),读取失败时返回空。"""
try:
path = Path(os.getenv("EVOLUTION_CANDIDATE_PATH", str(self._DEFAULT_EVOLUTION_CANDIDATE)))
if not path.exists():
return {}
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {}
return data
except Exception:
return {}
def _evolution_gray_percent(self) -> int:
"""灰度比例,默认 5%"""
try:
env_pct = os.getenv("EVOLUTION_GRAY_PERCENT", "").strip()
if env_pct:
pct = int(float(env_pct))
else:
pct = int(((self.evolution_candidate or {}).get("gray_percent", 5)))
return max(0, min(100, pct))
except Exception:
return 5
def _evolution_enabled_for_customer(self, customer_id: str) -> bool:
"""按客户哈希稳定灰度命中,命中后启用候选策略。"""
cand = self.evolution_candidate or {}
if str(cand.get("status", "")).strip() != "ready_for_gray_5_percent":
return False
if not customer_id:
return False
pct = self._evolution_gray_percent()
if pct <= 0:
return False
digest = hashlib.md5(customer_id.encode("utf-8")).hexdigest()
bucket = int(digest[:8], 16) % 100
hit = bucket < pct
if hit:
metrics_emit("evolution_gray_hit", customer_id=customer_id, percent=pct, version=str(cand.get("version", "")))
return hit
def _evolution_has_proposal(self, proposal_id: str) -> bool:
cand = self.evolution_candidate or {}
for p in cand.get("proposals", []) or []:
if str((p or {}).get("id", "")).strip() == proposal_id:
return True
return False
@staticmethod
def _is_service_risk_inquiry(text: str) -> bool:
"""识别退款/投诉等服务风险场景。"""
s = (text or "").strip().lower()
if not s:
return False
kw = ("退款", "退货", "投诉", "差评", "举报", "欺骗", "骗人", "起诉", "法院", "生气", "不满意")
return any(k in s for k in kw)
@staticmethod
def _log_block(title: str, content: str):
"""统一的控制台分层日志输出。"""
print(f"{CustomerServiceAgent.C_PROMPT}[{title}]{CustomerServiceAgent.C_RESET}")
print(content)
print(f"{CustomerServiceAgent.C_MUTED}────────────────────{CustomerServiceAgent.C_RESET}")
@staticmethod
def _normalize_reply_text(text: Optional[str]) -> str:
"""清洗模型输出,避免把占位词直接发给客户。"""
if text is None:
return ""
cleaned = str(text).strip()
if cleaned.lower() in {"", "none", "null", "n/a"}:
return ""
return cleaned
@staticmethod
def _colloquialize_reply(text: str) -> str:
"""把常见机械表达柔化为更口语的客服话术。"""
t = (text or "").strip()
if not t:
return t
repl = {
"确认我就安排": "你点头我就开做",
"可以的话我马上安排": "可以我就马上给你做",
"我这边马上安排": "我马上安排",
"立刻统一报价": "马上给你报价",
"统一报价": "一起给你报价",
"": "",
"请您": "",
"可选A": "可选:",
"流程完成": "已经安排好了",
}
for k, v in repl.items():
t = t.replace(k, v)
return t
async def _render_collection_reply_with_ai(
self,
*,
message: CustomerMessage,
state: ConversationState,
scene: str,
intent_hint: str,
fallback: str,
) -> str:
"""
收图阶段回复默认走 AI 改写,失败时回退到固定模板。
"""
# 首张收图先承接“我看一下”,避免机械地立刻催“发完统一报价”。
if scene == "collect_ack" and len(state.pending_image_urls) == 1:
first_ack = [
"收到了,我先看一下哈,稍等哈",
"这张我收到了,我先看下,稍等我一下哈",
"收到这张了,我先过一眼,稍等哈",
"我先看这张哈,稍等我一下",
"图我收到了,我先看一眼,稍等我回你哈",
"这张先记上了,我先看下细节,稍等哈",
"收到哈,我先过一遍这张,稍等我会儿",
"我先看这张效果,稍等一下哈",
"图到了,我先看下清晰度,稍等哈",
"这张我先看着,稍等我一下就回你",
"收到这张了,我先核一下细节,稍等哈",
"我先把这张看完,稍等我一会儿哈",
]
return random.choice(first_ack)
if not self.dynamic_collection_replies:
return fallback
try:
deps = AgentDeps(
msg_id=message.msg_id,
acc_id=message.acc_id,
from_id=message.from_id,
platform=message.acc_type,
)
history = self.message_histories.get(message.from_id, [])
pending_req = "".join((state.pending_requirements or [])[-4:]) or ""
user_prompt = (
"请按下面意图生成给客户的自然回复。\n"
f"场景: {scene}\n"
f"回复意图: {intent_hint}\n"
f"客户原话: {message.msg}\n"
f"当前已收图片数: {len(state.pending_image_urls)}\n"
f"当前需求摘要: {pending_req}\n"
"输出要求: 不超过2句话像真人店主聊天。"
)
result = await self.agent_natural_reply.run(user_prompt, deps=deps, message_history=history)
self.message_histories[message.from_id] = result.all_messages()[-30:]
text = self._colloquialize_reply(self._normalize_reply_text(result.output))
if not text:
return fallback
transfer_keywords = ("TRANSFER_REQUESTED", "[转移会话]", "转移会话")
if any(k in text for k in transfer_keywords):
return fallback
return text
except Exception:
return fallback
async def _rewrite_reply_with_ai(
self,
*,
message: CustomerMessage,
state: ConversationState,
reply: str,
scene: str = "final_reply",
) -> str:
"""
对最终回复做 AI 润色,统一口吻。失败时返回原文。
"""
text = (reply or "").strip()
if not text or not self.rewrite_all_replies:
return text
transfer_keywords = ("TRANSFER_REQUESTED", "[转移会话]", "转移会话")
if any(k in text for k in transfer_keywords):
return text
try:
deps = AgentDeps(
msg_id=message.msg_id,
acc_id=message.acc_id,
from_id=message.from_id,
platform=message.acc_type,
)
history = self.message_histories.get(message.from_id, [])
pending_req = "".join((state.pending_requirements or [])[-4:]) or ""
prompt = (
"请把下面这句客服回复润色成更自然的微信聊天口吻,语义必须保持一致。\n"
f"场景: {scene}\n"
f"客户原话: {message.msg}\n"
f"当前已收图: {len(state.pending_image_urls)}\n"
f"当前需求摘要: {pending_req}\n"
f"原回复: {text}\n"
"要求: 不要新增承诺/价格/流程不超过2句话只输出润色后的最终回复。"
)
result = await self.agent_natural_reply.run(prompt, deps=deps, message_history=history)
self.message_histories[message.from_id] = result.all_messages()[-30:]
polished = self._colloquialize_reply(self._normalize_reply_text(result.output))
if not polished:
return text
if any(k in polished for k in transfer_keywords):
return text
return polished
except Exception:
return text
def _register_tools(self):
"""注册所有 Tool让 Agent 可以主动调用"""
@self.agent.tool
async def analyze_image(ctx: RunContext[AgentDeps], image_url: str) -> str:
"""
分析客户发来的图片复杂度,用于报价。
收到图片URL时调用此工具返回复杂度和建议报价。
"""
try:
from image.image_analyzer import image_analyzer
result = await image_analyzer.analyze(image_url)
complexity_label = {
"simple": "简单(画面干净)",
"normal": "一般复杂度",
"complex": "细节偏多",
"hard": "非常复杂",
}.get(result["complexity"], result["complexity"])
# 持久化图片URL和复杂度重启后仍能记住这张图
try:
from db.customer_db import db
db.update_last_image(
ctx.deps.from_id,
image_url,
complexity=result["complexity"],
gemini_prompt=result.get("gemini_prompt", ""),
aspect_ratio=result.get("aspect_ratio", "1:1"),
perspective=result.get("perspective", "no"),
)
except Exception:
pass
# 存图片类型到客户画像
try:
from db.customer_db import db as _db
if result.get("subject"):
_db.add_image_type(ctx.deps.from_id, result["subject"])
except Exception:
pass
# 在 workflow 里创建待处理任务(付款后自动触发 Gemini
try:
from core.workflow import workflow
await workflow.image_analysis_result(
customer_id=ctx.deps.from_id,
image_url=image_url,
complexity=result["complexity"],
acc_id=ctx.deps.acc_id,
acc_type=ctx.deps.platform,
gemini_prompt=result.get("gemini_prompt", ""),
aspect_ratio=result.get("aspect_ratio", "1:1"),
perspective=result.get("perspective", "no"),
proc_type=result.get("proc_type", ""),
subject=result.get("subject", ""),
quality=result.get("quality", ""),
)
print(f"[Agent] Workflow 任务已创建 | 客户: {ctx.deps.from_id} | 比例: {result.get('aspect_ratio')} | 透视: {result.get('perspective')} | 图片: {image_url[:60]}...")
except Exception as e:
print(f"[Agent] Workflow 任务创建失败: {e}")
# 组装给 AI 的分析报告
risk = result.get("risk", "none")
has_face = result.get("has_face", "no")
feasibility = result.get("feasibility", "yes")
note = result.get("note", "")
lines = [
f"图片主体:{result['subject'] or '未识别'}",
f"处理类型:{result['proc_type'] or '高清修复'}",
f"原图质量:{result['quality'] or '未知'}",
f"图片类型:{result.get('category', '') or '通用'}",
f"图片尺寸:{(result.get('width') or 0)}x{(result.get('height') or 0)}{result.get('megapixels', 0.0)}MP",
f"含人脸:{'' if has_face == 'yes' else ''}",
f"复杂度:{complexity_label}",
f"原因:{result['reason']}",
]
if result.get("size_surcharge"):
lines.append(f"尺寸加价:+{result['size_surcharge']}")
if result.get("size_note"):
lines.append(f"尺寸提示:{result['size_note']}")
try:
st = self._get_conversation_state(ctx.deps.from_id)
if isinstance(result.get("price_min"), (int, float)):
st.last_min_price = int(result.get("price_min") or 0)
try:
from db.customer_db import db as _db
_db.update_last_min_price(ctx.deps.from_id, st.last_min_price)
except Exception:
pass
except Exception:
pass
# 根据可做性和风险等级给 AI 不同的行动指引
if feasibility == "no":
if "敏感" in (note or ""):
lines.append("【拒绝】图片含敏感/黄色/擦边内容,不接单。")
lines.append("→ 直接拒绝,不说「发图来看看」,自然回复如:这类不做/不接。")
else:
lines.append("【无法处理】此图无法处理(纯黑/纯白/完全损坏/要找原始RAW文件")
lines.append("→ 告知客户无法处理,建议换图或说明原因,不要报价。")
elif risk == "high":
lines.append(f"【高风险】此图处理风险高:{note or 'AI修复后效果不能保证与原图一致'}")
lines.append(f"建议报价:{result['price_suggest']}")
lines.append("→ 先自然说明风险(人脸/效果可能不完美),再报价,满意再拍。话术自然。")
elif risk == "low":
lines.append(f"【低风险-含人脸】修复后人脸相似度约70-90%,效果不稳定。")
lines.append(f"建议报价:{result['price_suggest']}")
lines.append(f"→ 报价时自然加一句风险提示(人脸可能有轻微变化、满意再付等)")
else:
# 无风险,正常报价
base_price = result.get('price_suggest', 20)
text_surcharge = result.get('text_surcharge', 0)
layer_surcharge = result.get('layer_surcharge', 0)
total_price = base_price + text_surcharge + layer_surcharge
# 构建报价说明
price_explanation = f"建议报价:{total_price}"
if text_surcharge > 0:
price_explanation += f"(含文字处理 +{text_surcharge}元)"
if layer_surcharge > 0:
price_explanation += f"(含分层 +{layer_surcharge}元)"
lines.append(price_explanation)
# 添加文字数量说明
text_amount = result.get('text_amount', 'none')
if text_amount != 'none':
lines.append(f"文字数量:{text_amount},需要精细处理")
if feasibility == "partial":
lines.append(f"⚠️ 此图有一定难度:{note or '效果可能不完美'},回复时可加「效果不满意退款」")
if note and note not in ("", ""):
lines.append(f"提示:{note}")
lines.append(f"【立刻回复客户报价 {total_price} 元,话术自然多变】")
return "\n".join(lines)
except Exception as e:
return f"图片分析失败: {e},请根据经验判断报价"
@self.agent.tool
async def get_customer_info(ctx: RunContext[AgentDeps], customer_id: str) -> str:
"""
查询客户历史信息:消费记录、性格标签、报价历史等。
对话开始时或需要了解客户背景时调用。
"""
try:
from db.customer_db import db
return db.get_profile_text(customer_id)
except Exception as e:
return f"查询失败: {e}"
@self.agent.tool
async def transfer_to_human(ctx: RunContext[AgentDeps]) -> str:
"""
转接人工客服。
遇到退款/投诉/情绪激动/复杂售后时调用。
"""
return "TRANSFER_REQUESTED"
@self.agent.tool
async def get_customer_risk_profile(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
"""查询客户风控画像:退款/不付款/差评/人工黑名单等。"""
cid = customer_id or ctx.deps.from_id
try:
info = risk_db.evaluate_customer(cid)
return (
f"客户:{cid}\n"
f"不接单:{'' if info.get('do_not_serve') else ''}\n"
f"风险等级:{info.get('computed_level','low')} 分数:{info.get('computed_score',0)}\n"
f"近30天退款:{info.get('refund_30d',0)}\n"
f"近7天未付款下单:{info.get('unpaid_7d',0)}\n"
f"近90天差评:{info.get('bad_review_90d',0)}\n"
f"备注:{info.get('note','') or ''}"
)
except Exception as e:
return f"查询风控画像失败: {e}"
@self.agent.tool
async def mark_customer_risk(
ctx: RunContext[AgentDeps],
customer_id: str,
do_not_serve: bool = False,
risk_level: str = "low",
risk_score: int = 0,
note: str = "",
tag: str = "",
) -> str:
"""人工标记客户风控画像(不接单/高风险/备注标签)。"""
try:
tags = [tag] if tag else []
risk_db.set_profile(
customer_id=customer_id,
do_not_serve=do_not_serve,
risk_level=risk_level,
risk_score=risk_score,
note=note,
tags=tags,
)
return "风控画像已更新"
except Exception as e:
return f"更新风控画像失败: {e}"
@self.agent.tool
async def record_customer_risk_event(
ctx: RunContext[AgentDeps],
customer_id: str,
event_type: str,
event_count: int = 1,
note: str = "",
) -> str:
"""记录风控事件refund/unpaid_order/bad_review/blacklist_hit 等。"""
try:
risk_db.record_event(
customer_id=customer_id,
event_type=event_type,
event_count=event_count,
note=note,
)
return "风控事件已记录"
except Exception as e:
return f"记录风控事件失败: {e}"
@self.agent.tool
async def save_customer_note(
ctx: RunContext[AgentDeps],
customer_id: str,
note: str
) -> str:
"""
记录客户关键信息到画像(邮箱/微信/特殊需求等)。
客户提供联系方式或重要信息时调用。
"""
try:
from db.customer_db import db
db.add_note(customer_id, note)
return "已记录"
except Exception as e:
return f"记录失败: {e}"
@self.agent.tool
async def update_contact_info(
ctx: RunContext[AgentDeps],
customer_id: str,
contact_type: str,
value: str
) -> str:
"""
更新客户联系方式。
当客户说出邮箱/手机/微信时调用,比正则提取更准确。
contact_type 枚举值:
email - 邮箱
phone - 手机号
wechat - 微信号
"""
try:
from db.customer_db import db
if contact_type == "email":
db.update_email(customer_id, value)
elif contact_type == "phone":
db.update_phone(customer_id, value)
elif contact_type == "wechat":
db.update_wechat(customer_id, value)
else:
return f"未知联系方式类型: {contact_type}"
return f"已保存 {contact_type}: {value}"
except Exception as e:
return f"保存失败: {e}"
@self.agent.tool
async def record_quote(
ctx: RunContext[AgentDeps],
customer_id: str,
price: int,
description: str = ""
) -> str:
"""
记录本次报价到客户画像,用于后续对话保持价格一致。
每次给客户报价后调用。
Args:
customer_id: 客户ID
price: 报价金额(元)
description: 报价描述,如"单图处理"/"三图打包"
"""
try:
from db.customer_db import db
db.update_last_price(customer_id, price)
if description:
db.add_note(customer_id, f"报价 {price}元({description}")
# 同步到内存状态
state = self.conversations.get(customer_id)
if state:
state.last_price = price
return f"已记录报价 {price}"
except Exception as e:
return f"记录失败: {e}"
@self.agent.tool
async def process_image_gemini(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
"""
触发 Gemini 作图处理。客户付款后或说「安排一下」「处理一下」时调用。
会从客户档案读取上次发图的 URL 和处理参数(提示词、比例、透视),启动 Gemini 流程。
处理完成后会自动发图给客户。
"""
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不自动作图"
except Exception:
return "现在处理模块暂时暂停,先不自动作图"
cid = customer_id or ctx.deps.from_id
try:
from core.workflow import workflow
ok = await workflow.trigger_processing_on_payment(
customer_id=cid,
acc_id=ctx.deps.acc_id,
acc_type=ctx.deps.platform,
)
if ok:
return "已安排,稍后发你"
return "该客户暂无待处理图片,请先发图"
except Exception as e:
return f"触发作图失败: {e},请稍后重试或转人工"
@self.agent_pricing.tool
async def analyze_image_pricing(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from image.image_analyzer import image_analyzer
result = await image_analyzer.analyze(image_url)
if result.get("feasibility") == "no" or result.get("risk") == "high":
return "该图风险高或不可做:不报价,建议换图或转人工评估。"
if not result.get("success", False):
return "图片识别异常:先不报价,建议客户重发更清晰图片。"
p = result.get("price_suggest", 20)
try:
st = self._get_conversation_state(ctx.deps.from_id)
if isinstance(result.get("price_min"), (int, float)):
st.last_min_price = int(result.get("price_min") or 0)
try:
from db.customer_db import db as _db
_db.update_last_min_price(ctx.deps.from_id, st.last_min_price)
except Exception:
pass
except Exception:
pass
return f"建议报价:{p}"
except Exception as e:
return f"图片分析失败: {e}"
@self.agent_pricing.tool
async def record_quote_pricing(
ctx: RunContext[AgentDeps],
customer_id: str,
price: int,
description: str = ""
) -> str:
try:
from db.customer_db import db
db.update_last_price(customer_id, price)
return "ok"
except Exception as e:
return f"记录失败: {e}"
@self.agent_processing.tool
async def process_image_gemini_run(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
"""触发 Gemini 作图处理processing agent 专用入口)。"""
return await process_image_gemini(ctx, customer_id)
@self.agent_similar.tool
async def recommend_similar(ctx: RunContext[AgentDeps], hint: str = "") -> str:
try:
return "有类似款,拍下我发你参考图。"
except Exception as e:
return f"推荐失败: {e}"
@self.agent_order.tool
async def handle_order(ctx: RunContext[AgentDeps], raw_msg: str = "") -> str:
try:
info = parse_order_info(raw_msg or "")
paid_kw = ["等待发货", "已付款", "付款成功", "买家已付款"]
if any(k in (info.get("pay_status", "") or "") for k in paid_kw) or any(k in (info.get("order_status", "") or "") for k in paid_kw):
return "已安排,稍后发你"
return ""
except Exception:
return ""
@self.agent_risk.tool
async def risk_filter(ctx: RunContext[AgentDeps], text: str = "") -> str:
return "这类不做哈,政治/敏感内容都不接。"
@self.agent_risk.tool
async def get_customer_risk_profile_risk(ctx: RunContext[AgentDeps], customer_id: str = "") -> str:
return await get_customer_risk_profile(ctx, customer_id)
@self.agent_risk.tool
async def mark_customer_risk_risk(
ctx: RunContext[AgentDeps],
customer_id: str,
do_not_serve: bool = False,
risk_level: str = "low",
risk_score: int = 0,
note: str = "",
tag: str = "",
) -> str:
return await mark_customer_risk(
ctx=ctx,
customer_id=customer_id,
do_not_serve=do_not_serve,
risk_level=risk_level,
risk_score=risk_score,
note=note,
tag=tag,
)
@self.agent_risk.tool
async def record_customer_risk_event_risk(
ctx: RunContext[AgentDeps],
customer_id: str,
event_type: str,
event_count: int = 1,
note: str = "",
) -> str:
return await record_customer_risk_event(
ctx=ctx,
customer_id=customer_id,
event_type=event_type,
event_count=event_count,
note=note,
)
@self.agent.tool
async def remove_background(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】去背景,输出白底图。客户只要去背景时调用。"""
try:
from image.image_tools import remove_background as _rb
r = await _rb(image_url)
if r["success"]:
return f"去背景完成,已保存。自然回复客户好了发你"
return f"去背景失败:{r['message']}"
except Exception as e:
return f"去背景失败:{e}"
@self.agent.tool
async def perspective_correct(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】透视矫正。输入需白底图,输出展平图。"""
try:
from image.image_tools import perspective_correct as _pc
r = await _pc(image_url)
if r["success"]:
return f"透视矫正完成。自然回复客户好了"
return f"透视矫正失败:{r['message']}"
except Exception as e:
return f"透视矫正失败:{e}"
@self.agent.tool
async def extract_pattern_tool(
ctx: RunContext[AgentDeps],
image_url: str,
prompt: str = "",
aspect_ratio: str = "1:1"
) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】印花提取/主处理。按提示词和比例处理。"""
try:
from image.image_tools import extract_pattern
r = await extract_pattern(image_url, prompt=prompt, aspect_ratio=aspect_ratio)
if r["success"]:
return f"提取完成。自然回复客户好了发你"
return f"提取失败:{r['message']}"
except Exception as e:
return f"提取失败:{e}"
@self.agent.tool
async def enhance_image_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】高清增强。客户只要清晰化时调用。"""
try:
from image.image_tools import enhance_image
r = await enhance_image(image_url)
if r["success"]:
return f"高清增强完成。自然回复客户好了"
return f"增强失败:{r['message']}"
except Exception as e:
return f"增强失败:{e}"
@self.agent.tool
async def color_match_tool(
ctx: RunContext[AgentDeps],
orig_url: str,
result_url: str,
strength: float = 0.75
) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】颜色匹配。将 result 色调匹配到 orig。"""
try:
from image.image_tools import color_match_images
r = await color_match_images(orig_url, result_url, strength=strength)
if r["success"]:
return f"颜色匹配完成"
return f"颜色匹配失败:{r['message']}"
except Exception as e:
return f"颜色匹配失败:{e}"
@self.agent.tool
async def trim_border_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】裁切四周背景边(白/黄/米等)。"""
try:
from image.image_tools import trim_border
r = await trim_border(image_url)
if r["success"]:
return f"裁边完成"
return f"裁边失败:{r['message']}"
except Exception as e:
return f"裁边失败:{e}"
@self.agent.tool
async def vectorize_to_eps_tool(ctx: RunContext[AgentDeps], image_url: str) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""【独立工具】矢量化 - 将图片转为 EPS 矢量文件。客户要做矢量图、转 EPS、转 AI 格式时调用。"""
try:
from image.image_tools import vectorize_to_eps
r = await vectorize_to_eps(image_url)
if r["success"]:
return f"矢量化完成,已生成 EPS 文件。自然回复客户好了发你"
return f"矢量化失败:{r['message']}"
except Exception as e:
return f"矢量化失败:{e}"
@self.agent.tool
async def meitu_enhance_tool(
ctx: RunContext[AgentDeps],
image_url: str,
mode: str = "standard"
) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""
【独立工具】美图画质增强。客户要画质增强、清晰化、美图处理时调用。
Args:
image_url: 图片 URL 或本地路径
mode: 处理模式。crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化)
"""
try:
from image.image_tools import meitu_enhance
r = await meitu_enhance(image_url, mode=mode)
if r["success"]:
return f"画质增强完成。自然回复客户好了发你"
return f"画质增强失败:{r['message']}"
except Exception as e:
return f"画质增强失败:{e}"
@self.agent.tool
async def resize_image(
ctx: RunContext[AgentDeps],
image_url: str,
width: int,
height: int = 0
) -> str:
try:
from config.config import IMAGE_MODULE_ENABLED
if not IMAGE_MODULE_ENABLED:
return "现在处理模块暂时暂停,先不处理图片"
except Exception:
return "现在处理模块暂时暂停,先不处理图片"
"""
改图片尺寸。客户说「改成1920x1080」「弄成横图」「改下尺寸」时调用。
Args:
image_url: 图片URL客户刚发的图或从对话中获取
width: 目标宽度(像素),如 1920
height: 目标高度0=按宽度等比缩放),如 1080
常用尺寸1920x1080(横屏) 1080x1920(竖屏) 2000x2000(方图)
"""
try:
from image.image_processor import image_processor
result = await image_processor.resize(image_url, width, height)
if result["success"]:
return f"改尺寸完成:{width}x{height},已保存。自然回复客户改好了"
else:
return f"改尺寸失败:{result['message']},告知客户稍后重试"
except Exception as e:
return f"改尺寸失败:{e}"
@self.agent.tool
async def calculate_bulk_price(
ctx: RunContext[AgentDeps],
image_count: int,
complexities: str = ""
) -> str:
"""
计算多图打包价格。
客户要做多张图时调用,返回建议总价。
Args:
image_count: 图片数量
complexities: 各图复杂度,逗号分隔,如 "normal,complex,simple"
没有识别结果时留空,按平均价格估算
"""
if image_count <= 0:
return "图片数量无效"
# 各复杂度单价必须为5的整数倍
unit_price = {"simple": 15, "normal": 20, "complex": 25, "hard": 30}
default_unit = 20 # 没有识别结果时的默认单价
if complexities:
levels = [c.strip() for c in complexities.split(",")]
total = sum(unit_price.get(lv, default_unit) for lv in levels)
else:
total = image_count * default_unit
# 打包优惠3张以上9折5张以上8折价格必须为5的整数倍
if image_count >= 5:
discounted = round(total * 0.8 / 5) * 5
tip = f"{image_count}张8折优惠"
elif image_count >= 3:
discounted = round(total * 0.9 / 5) * 5
tip = f"{image_count}张9折优惠"
else:
discounted = round(total / 5) * 5
tip = ""
return f"建议打包报价:{discounted}{tip}(原价{total}元)"
# 对话状态超过多少小时后重置(避免昨天的售后状态影响今天)
CONVERSATION_TIMEOUT_HOURS = 12
def _get_conversation_state(self, customer_id: str) -> ConversationState:
"""获取或创建对话状态,超时自动重置"""
now = datetime.now()
if customer_id in self.conversations:
state = self.conversations[customer_id]
# 超过 12 小时没有消息,重置阶段和压价次数
if state.last_update:
try:
last = datetime.fromisoformat(state.last_update)
hours = (now - last).total_seconds() / 3600
if hours > self.CONVERSATION_TIMEOUT_HOURS:
state.stage = "售前"
state.discount_count = 0
# 同时清理对话历史,避免发送过期上下文
self.message_histories.pop(customer_id, None)
except Exception:
pass
# 进程内状态为空时,尝试从持久化恢复
if not state.pending_image_urls and not state.pending_requirements:
self._restore_pending_quote_state(customer_id, state)
else:
self.conversations[customer_id] = ConversationState(
customer_id=customer_id,
last_update=now.isoformat()
)
self._restore_pending_quote_state(customer_id, self.conversations[customer_id])
# 定期清理长期不活跃客户(超过 7 天)
self._cleanup_inactive(now)
return self.conversations[customer_id]
def _cleanup_inactive(self, now: datetime):
"""清理超过 7 天没有消息的对话状态,释放内存"""
# 每 100 次调用清理一次,避免每次都遍历
if len(self.conversations) % 100 != 0:
return
expired = [
cid for cid, state in self.conversations.items()
if state.last_update and
(now - datetime.fromisoformat(state.last_update)).days > 7
]
for cid in expired:
self.conversations.pop(cid, None)
self.message_histories.pop(cid, None)
def _sync_pending_quote_state(self, customer_id: str, state: ConversationState):
"""把待报价队列同步到客户库,避免重启丢失。"""
try:
self._refresh_quote_phase(state)
from db.customer_db import db
db.update_pending_quote_state(
customer_id,
state.pending_image_urls,
state.pending_requirements,
)
except Exception:
pass
def _restore_pending_quote_state(self, customer_id: str, state: ConversationState):
"""从客户库恢复待报价队列。"""
try:
from db.customer_db import db
profile = db.get_customer(customer_id)
state.pending_image_urls = list(getattr(profile, "pending_quote_images", []) or [])
state.pending_requirements = list(getattr(profile, "pending_quote_requirements", []) or [])
state.image_count = len(state.pending_image_urls)
self._refresh_quote_phase(state)
except Exception:
pass
@staticmethod
def _refresh_quote_phase(state: ConversationState, phase_hint: str = ""):
"""统一维护收图报价状态机。"""
QuoteStateMachine().refresh(state, phase_hint=phase_hint)
def _should_defer_batch_quote(self, state: ConversationState, mark_ready: bool = False) -> bool:
"""
批量报价延后控制:
- 首次进入 ready_to_quote 时按配置等待 N 轮
- 等待轮次归零后,本轮即可报价
"""
self.quote_state_machine.delay_turns = max(0, int(self.batch_quote_delay_turns))
return self.quote_state_machine.should_defer_batch_quote(state, mark_ready=mark_ready)
def _mark_quote_ready(self, state: ConversationState):
"""仅标记 ready 状态,不消费等待轮次。"""
self.quote_state_machine.delay_turns = max(0, int(self.batch_quote_delay_turns))
self.quote_state_machine.mark_ready(state)
def _build_reject_message(self, reason: str = "") -> str:
templates = [
"这类图文字内容太密了,我们这边不接这单哈,建议精简后再发我看看。",
"这种密集文字/宣传栏类图片暂时做不了,抱歉啦,换一版简化内容我可以继续帮你看。",
"这张文字信息太多,处理风险高,我们先不接,您可以先筛重点文字再发我。",
]
msg = random.choice(templates)
if reason:
msg += f"{reason}"
return msg
def _is_batch_quote_enabled(self, customer_id: str, acc_id: str) -> bool:
"""灰度开关:按店铺白名单 + 客户哈希百分比控制新策略是否生效。"""
try:
from config.config import (
FEATURE_BATCH_QUOTE_ENABLED,
FEATURE_BATCH_QUOTE_PERCENT,
FEATURE_BATCH_QUOTE_SHOPS,
)
if not FEATURE_BATCH_QUOTE_ENABLED:
return False
pct = max(0, min(100, int(FEATURE_BATCH_QUOTE_PERCENT)))
if pct == 0:
return False
shops = [s.strip() for s in (FEATURE_BATCH_QUOTE_SHOPS or "").split(",") if s.strip()]
if shops and (acc_id or "") not in shops:
return False
if pct >= 100:
return True
h = int(hashlib.md5((customer_id or "").encode("utf-8")).hexdigest()[:8], 16) % 100
return h < pct
except Exception:
return True
def _detect_stage(self, message: str) -> str:
"""检测售前/售后"""
# 系统订单通知不属于售后,单独处理
if "系统订单信息" in message:
return "订单通知"
after_sale_keywords = ["已下单", "已付款", "催一下", "发文件", "要修改", "不满意", "退款", "退货"]
for keyword in after_sale_keywords:
if keyword in message:
return "售后"
return "售前"
@staticmethod
def _is_political_inquiry(text: str) -> bool:
"""文本前置风控:政治人物/政治事件/政治图片相关询问一律拒绝。"""
s = (text or "").strip().lower()
if not s:
return False
kw = (
"政治", "涉政", "党政", "政治人物", "政治事件", "政治图片", "政治海报", "政治宣传",
"领导人", "伟人", "元帅", "将军", "红色人物", "党史",
"天安门", "人民大会堂", "中南海",
"习近平", "毛泽东", "邓小平", "江泽民", "胡锦涛", "李克强", "周恩来",
"特朗普", "拜登", "普京", "泽连斯基",
"trump", "biden", "putin", "zelensky", "xi jinping",
)
if any(k in s for k in kw):
return True
# 兜底:类似“有没有十大元帅的照片/图片”
return bool(re.search(r"(元帅|将军|领导人|政治人物|政治事件).*(照片|图片|头像|原图)?", s))
@staticmethod
def _is_map_inquiry(text: str) -> bool:
"""地图类需求一律拒绝(按业务规则)。"""
s = (text or "").strip().lower()
if not s:
return False
kw = (
"地图", "地形图", "行政区划图", "世界地图", "中国地图",
"卫星地图", "导航图", "航海图", "作战地图", "军事地图",
"map", "topographic map", "satellite map",
)
return any(k in s for k in kw)
def _get_customer_profile_context(self, customer_id: str) -> str:
"""从数据库读取客户画像,注入给 AI。含个性化语气、报价策略、主动预测、近期对话。"""
try:
from db.customer_db import db
profile = db.get_customer(customer_id)
if profile.blacklist:
return f"【⚠️黑名单客户】原因:{profile.blacklist_reason or '已标记'},请转接人工处理,不要自动回复"
lines = []
lines.append("=== 客户档案 ===")
# 基础信息
basic_info = []
basic_info.append(f"客户ID: {customer_id}")
basic_info.append(f"姓名: {profile.name or '未知'}")
if profile.email:
basic_info.append(f"邮箱: {profile.email}")
if profile.phone:
basic_info.append(f"电话: {profile.phone}")
if profile.wechat:
basic_info.append(f"微信: {profile.wechat}")
lines.append(" | ".join(basic_info))
# 消费分析
consume_info = []
consume_info.append(f"客户等级: {profile.customer_level}")
if profile.vip:
consume_info.append("VIP客户")
consume_info.append(f"总订单: {profile.total_orders}")
consume_info.append(f"总消费: {profile.total_spent}")
if profile.total_orders > 0:
consume_info.append(f"客单价: {profile.total_spent // profile.total_orders}")
lines.append("--- 消费分析 ---")
lines.append(" | ".join(consume_info))
# 报价历史
price_info = []
if profile.vip_custom_price:
price_info.append(f"VIP专属价: {profile.vip_custom_price}元(直接报这个价)")
if profile.last_price:
price_info.append(f"上次报价: {profile.last_price}")
if profile.lowest_price_accepted:
price_info.append(f"历史最低成交: {profile.lowest_price_accepted}")
if profile.discount_given_count:
price_info.append(f"历史让价: {profile.discount_given_count}")
if profile.price_sensitivity:
price_info.append(f"价格敏感度: {profile.price_sensitivity}")
if getattr(profile, "last_quote_no_convert", False):
price_info.append("【策略】上次报价未成交本次可降5-10元")
if price_info:
lines.append("--- 报价历史 ---")
lines.append(" | ".join(price_info))
# 性格与决策
personality_info = []
if profile.personality:
personality_info.append(f"性格: {'/'.join(profile.personality)}")
if profile.decision_speed:
personality_info.append(f"决策速度: {profile.decision_speed}")
if profile.communication_prefer:
personality_info.append(f"沟通偏好: {profile.communication_prefer}")
if personality_info:
lines.append("--- 性格特征 ---")
lines.append(" | ".join(personality_info))
# 图片习惯
image_info = []
image_info.append(f"累计发图: {profile.total_images_sent}")
if profile.complexity_history:
avg_complexity = self._calc_avg_complexity(profile.complexity_history)
image_info.append(f"平均复杂度: {avg_complexity}")
if profile.image_type_history:
from collections import Counter
top_types = Counter(profile.image_type_history).most_common(3)
types_str = "".join(f"{t}({c}次)" for t, c in top_types)
image_info.append(f"常见类型: {types_str}")
if profile.preferred_format:
image_info.append(f"格式偏好: {profile.preferred_format}")
if profile.preferred_size:
image_info.append(f"尺寸要求: {profile.preferred_size}")
if profile.last_image_url:
image_info.append(f"最近发图: {profile.last_image_url[:60]}...")
lines.append("--- 图片习惯 ---")
lines.append(" | ".join(image_info))
# 当前任务状态
if profile.processing_status:
task_info = []
task_info.append(f"状态: {profile.processing_status}")
if profile.processing_image_url:
task_info.append(f"处理中: {profile.processing_image_url[:40]}...")
if profile.expected_done_at:
task_info.append(f"预计完成: {profile.expected_done_at}")
lines.append("--- 当前任务 ---")
lines.append(" | ".join(task_info))
# 上次对话摘要
if profile.last_conversation_summary:
time_str = ""
if profile.last_conversation_time:
try:
t = datetime.fromisoformat(profile.last_conversation_time)
diff = datetime.now() - t
if diff.days > 0:
time_str = f"{diff.days}天前)"
else:
h = diff.seconds // 3600
time_str = f"{h}小时前)" if h > 0 else "(刚刚)"
except Exception:
pass
lines.append(f"--- 上次对话 {time_str} ---")
lines.append(profile.last_conversation_summary)
# 个性化回复策略
hints = []
if profile.personality:
if "爽快" in profile.personality:
hints.append("回复简洁直接,不废话,快速报价")
if "砍价" in profile.personality or "砍价狂" in profile.personality:
hints.append("报价时强调性价比,只让价一次,第二次引导去 xinhui.cloud")
if "纠结" in profile.personality or "墨迹" in profile.personality:
hints.append("多给一点说明,耐心回答")
if profile.price_sensitivity == "":
hints.append("报价时顺带提「满意再拍」降低顾虑")
if profile.decision_speed == "":
hints.append("直接报价推成交,少铺垫")
if profile.total_orders > 0 and profile.decision_speed == "":
hints.append("老客爽快,直接报价成交")
if hints:
lines.append("--- 回复策略 ---")
lines.append("".join(hints))
# 主动推荐
proactive = []
if profile.bulk_potential == "" or (profile.total_images_sent or 0) >= 2:
proactive.append("可问「要做多张吗,多张有优惠」")
if profile.upsell_opportunity:
proactive.append(f"加购机会: {''.join(profile.upsell_opportunity)}")
if proactive:
lines.append("--- 主动推荐 ---")
lines.append("".join(proactive))
return "\n".join(lines)
except Exception as e:
print(f"[Agent] 获取客户画像失败: {e}")
return ""
def _calc_avg_complexity(self, complexity_history: list) -> str:
"""计算平均复杂度"""
if not complexity_history:
return "未知"
level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4}
label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"}
try:
avg = sum(level_map.get(c, 2) for c in complexity_history) / len(complexity_history)
return label_map.get(round(avg), "一般")
except Exception:
return "一般"
def _get_refusal_context_hint(self, customer_id: str, current_msg: str, profile_context: str) -> str:
"""
检测「刚拒绝某张图 + 客户问能找到吗」场景,注入显式提示,避免前后矛盾。
原因last_conversation_summary 异步更新可能滞后message_histories 模型可能忽略。
"""
ask_keywords = ["能找到吗", "可以吗", "有吗", "能做吗", "可以找吗", "可以弄吗"]
if not any(kw in current_msg for kw in ask_keywords):
return ""
refusal_keywords = ["不做", "不接", "拒绝", "不做这类", "这类不做"]
# 检查 profile 摘要(可能因异步更新而滞后)
if any(kw in profile_context for kw in refusal_keywords):
return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。"
# 检查内存历史中最近几条消息ModelResponse 含客服回复)
history = self.message_histories.get(customer_id, [])
for msg in reversed(history[-6:]):
msg_str = str(msg)
if any(kw in msg_str for kw in refusal_keywords):
return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。"
return ""
def _get_conversation_context(self, customer_id: str, acc_id: str = "", limit: int = 12, max_len: int = 80) -> str:
"""
每一次对话都从数据库加载近期对话,压缩后注入 prompt。
确保模型看到上下文,同时控制 token 消耗。
"""
try:
try:
from config.config import CHAT_CONTEXT_LIMIT, CHAT_CONTEXT_TRUNCATE_LEN
limit = CHAT_CONTEXT_LIMIT
max_len = CHAT_CONTEXT_TRUNCATE_LEN
except Exception:
pass
from db.chat_log_db import get_recent_conversation
msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=limit)
if not msgs:
return ""
lines = []
for m in msgs:
role = "" if m.get("direction") == "in" else ""
msg_text = (m.get("message") or "").strip().replace("\n", " ")[:max_len]
if not msg_text:
continue
lines.append(f"{role}:{msg_text}")
if not lines:
return ""
return "【近期】\n" + "\n".join(lines) + "\n\n"
except Exception:
return ""
def _get_intent_emotion_hint(self, msg: str) -> str:
"""语义匹配:意图/情绪识别注入提示。EMBEDDING_MODEL 未配置时用关键词。"""
try:
from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords, detect_emotion_embedding
intent = detect_intent_embedding(msg)
if not intent:
intent = detect_intent_keywords(msg)
emotion = detect_emotion_embedding(msg) if os.getenv("EMBEDDING_MODEL") else None
parts = []
if intent:
parts.append(f"意图:{intent}")
if emotion:
parts.append(f"情绪:{emotion}")
if parts:
return f"【当前消息】{', '.join(parts)}"
except Exception:
pass
return ""
# 简单打招呼类消息(在近期已回复后无需再回)
_COOLDOWN_PATTERNS = [
"你好", "您好", "在吗", "在么", "在不在", "有人吗",
"", "嗯嗯", "", "好的", "好哒", "ok", "OK", "okay",
"谢谢", "谢谢你", "感谢", "收到", "知道了", "明白了",
]
_COOLDOWN_SECONDS = 5 * 60 # 5 分钟内不重复回复纯打招呼
def _in_cooldown(self, state: ConversationState, msg: str) -> bool:
"""最近刚回复过 + 消息是纯打招呼 → True 静默"""
if not state.last_reply_at:
return False
elapsed = (datetime.now() - state.last_reply_at).total_seconds()
if elapsed > self._COOLDOWN_SECONDS:
return False
clean = msg.strip().rstrip("!?。.~")
return clean in self._COOLDOWN_PATTERNS
async def process_message(self, message: CustomerMessage) -> AgentResponse:
"""处理客户消息并生成回复"""
trace_id = build_trace_id(message.acc_id, message.from_id, message.msg_id, message.msg[:64])
self._activity_log(
"agent_inbound",
trace_id=trace_id,
acc_id=message.acc_id,
customer_id=message.from_id,
msg=message.msg,
msg_type=message.msg_type,
)
metrics_emit("inbound_msg", customer_id=message.from_id, acc_id=message.acc_id)
# 获取或创建对话状态
state = self._get_conversation_state(message.from_id)
pre_response = await self.pre_rule_service.run(message=message, state=state, trace_id=trace_id)
if isinstance(pre_response, AgentResponse):
return pre_response
# 检测售前/售后
new_stage = self._detect_stage(message.msg)
if new_stage != state.stage:
state.stage = new_stage
state.last_update = datetime.now().isoformat()
order_response = await handle_order_notification(self, message=message, state=state)
if isinstance(order_response, AgentResponse):
return order_response
customer_text, _ = self._split_customer_text(message.msg)
shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "")
flow_response = await handle_find_image_batch_flow(
self,
message=message,
state=state,
customer_text=customer_text,
shop_type=shop_type,
)
if isinstance(flow_response, AgentResponse):
return flow_response
prompt_bundle = build_prompt_bundle(self, message=message, state=state)
user_prompt = prompt_bundle.user_prompt
deps = prompt_bundle.deps
history = prompt_bundle.history
self._log_block("PROMPT->AI 前置提示词", user_prompt)
try:
reply_text = await execute_ai_turn(
self,
message=message,
state=state,
user_prompt=user_prompt,
deps=deps,
history=history,
)
except Exception as e:
err_str = str(e)
print(f"[Agent] AI 调用失败: {e},使用兜底回复")
self._activity_log("agent_ai_error", customer_id=message.from_id, acc_id=message.acc_id, error=err_str)
metrics_emit("ai_call_failed", customer_id=message.from_id, acc_id=message.acc_id)
if "AccountOverdueError" in err_str or "overdue" in err_str.lower():
asyncio.create_task(_notify_wechat_overdue())
else:
asyncio.create_task(_notify_wechat(
f"⚠️ **AI调用异常**\n"
f"客户:{message.from_id}\n"
f"店铺:{message.acc_id}\n"
f"错误:{err_str[:200]}",
tag="AI异常"
))
reply_text = None
else:
metrics_emit("ai_call_success", customer_id=message.from_id, acc_id=message.acc_id)
# AI 失败兜底:给一个不出错的万能回复
if not reply_text:
fallback_text = await self._rewrite_reply_with_ai(
message=message,
state=state,
reply="好嘞,你稍等下,我这边看一下",
scene="fallback_reply",
)
return AgentResponse(
reply=fallback_text,
should_reply=True,
need_transfer=False
)
return await finalize_ai_reply(
self,
message=message,
state=state,
reply_text=reply_text,
)
async def _check_order_amount(self, customer_id: str, order: dict, acc_id: str):
"""核查订单实付金额是否与报价一致,异常时企业微信预警"""
try:
import re
from db.customer_db import db
profile = db.get_customer(customer_id)
quoted = profile.last_price # 上次报价(元)
if not quoted:
return
# 从订单解析实付金额
raw_amount = order.get("amount", "")
m = re.search(r'[\d.]+', str(raw_amount))
if not m:
return
paid = float(m.group())
print(f"[Agent] 订单金额核查:报价 {quoted}元 vs 实付 {paid}元(客户 {customer_id}")
# 实付金额明显低于报价(低于报价的 60%)才预警
if paid < quoted * 0.6:
msg = (
f"⚠️ **订单金额异常**\n"
f"店铺:{acc_id}\n"
f"客户:{customer_id}{profile.name or ''}\n"
f"报价:{quoted}\n"
f"实付:{paid}\n"
f"差额:{quoted - paid:.1f}元 — 请人工核查"
)
print(f"[Agent] {msg}")
await _notify_wechat(msg)
except Exception as e:
print(f"[Agent] 订单金额核查失败: {e}")
def _extract_image_url(self, msg: str) -> str:
"""从消息中提取图片URL兼容纯URL和 text#*#url 两种格式"""
urls = self._extract_image_urls(msg)
return urls[0] if urls else ""
def _extract_image_urls(self, msg: str) -> List[str]:
"""提取消息中的所有图片URL去重保序"""
import re
if not msg:
return []
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com", "suning.com")
candidates = re.findall(r'https?://[^\s#]+', msg)
urls: List[str] = []
for u in candidates:
low = u.lower()
if any(ext in low for ext in image_exts) or any(h in low for h in image_hosts):
if u not in urls:
urls.append(u)
return urls
def _strip_urls_from_text(self, msg: str) -> str:
"""去掉 URL 后的纯文本,用于提取额外需求。"""
import re
if not msg:
return ""
text = re.sub(r'https?://\S+', ' ', msg)
text = text.replace("#*#", " ").strip()
text = re.sub(r'\s+', ' ', text)
return text.strip(",。.!?;: ")
_is_batch_finish_signal = staticmethod(is_batch_finish_signal)
_is_batch_finish_intent = staticmethod(is_batch_finish_intent)
_is_cross_image_composite_intent = staticmethod(is_cross_image_composite_intent)
_is_related_image_followup_intent = staticmethod(is_related_image_followup_intent)
_is_result_followup_query = staticmethod(is_result_followup_query)
_classify_short_customer_text = staticmethod(classify_short_customer_text)
_build_collect_ack = staticmethod(build_collect_ack)
_build_collect_progress_reply = staticmethod(build_collect_progress_reply)
_build_collect_remind = staticmethod(build_collect_remind)
_is_find_image_not_edit_conflict = staticmethod(is_find_image_not_edit_conflict)
_needs_clarification_in_collecting = staticmethod(needs_clarification_in_collecting)
_build_find_image_clarify_reply = staticmethod(build_find_image_clarify_reply)
_build_not_understood_reply = staticmethod(build_not_understood_reply)
_append_requirement = staticmethod(append_requirement)
def _calc_requirement_surcharge(self, requirements: List[str]) -> Dict[str, Any]:
"""
把客户补充需求做成结构化加价,避免纯靠模型自由发挥导致价格波动。
返回:
{"extra": int, "hits": List[str]}
"""
text = " ".join(requirements or [])
rules = [
(["分层", "psd", "源文件"], 30, "分层/源文件"),
(["去背景", "抠图", "透明底", "白底"], 5, "去背景"),
(["换背景", "换场景", "合成", "转到", "换到", "放到", "贴到", "移到", "套到", "图案上去", "元素放到"], 10, "跨图合成/换背景"),
(["改字", "改文字", "替换文字", "排版"], 10, "改文字/排版"),
(["调色", "改色", "换色", "配色"], 5, "调色"),
(["多版本", "多个版本", "两版", "三版"], 10, "多版本"),
(["加急", "今天要", "马上要", "尽快"], 10, "加急"),
]
total = 0
hits: List[str] = []
for keywords, fee, label in rules:
if any(k in text for k in keywords):
total += fee
hits.append(f"{label}+{fee}")
# 防止需求加价过高,做个上限保护
total = min(total, 60)
# 金额统一 5 的倍数
total = round(total / 5) * 5
return {"extra": total, "hits": hits}
def _build_batch_quote_reply(
self,
results: List[Tuple[str, Dict[str, Any]]],
total_suggest: int,
bundle_price: int,
req_fee: Dict[str, Any],
) -> str:
"""构建分图明细 + 单条总报价可选项回复。"""
complexity_map = {
"simple": "简单",
"normal": "常规",
"complex": "复杂",
"hard": "高难",
}
detail_lines: List[str] = []
for i, (_, r) in enumerate(results, 1):
p = int(r.get("price_suggest", 20) or 20)
cx = complexity_map.get(str(r.get("complexity", "normal")), "常规")
reason = str(r.get("reason", "常规处理")).replace("\n", " ").strip()
if len(reason) > 18:
reason = reason[:18] + "..."
detail_lines.append(f"{i}{p}元({cx}{reason}")
extra = int(req_fee.get("extra", 0) or 0)
single_total = round((total_suggest + extra) / 5) * 5
req_hit = "".join(req_fee.get("hits", [])) if req_fee.get("hits") else ""
# 单图时不要使用“分图/这批/A-B方案”措辞避免客户误解为多图。
if len(results) == 1:
line = detail_lines[0].replace("图1", "这张:")
heads = [
"这张我看过了,先给你报下:",
"这张可以做,价格给你报下:",
"看了这张图,报价如下:",
"我先按这张给你算下:",
"这张处理没问题,我给你报个实在价:",
"我看完这张了,价格给你说下:",
"按这张图的难度,报价是:",
"这张我已经评估完了,先给你个价格:",
]
lines = [f"{random.choice(heads)}{line.split('', 1)[1]}"]
if req_hit:
lines.append(f"按你的需求另加{extra}元({req_hit})。")
tails = [
f"这张做下来共{single_total}元,定了我马上开工。",
f"合下来是{single_total}元,你点头我这边立刻安排。",
f"总价{single_total}元,可以的话我现在就给你做。",
f"这一张算下来{single_total}元,你说开做我就马上弄。",
f"给你按{single_total}元做,确定的话我现在就排上。",
f"这张我按{single_total}元给你做,没问题就直接开始。",
f"这张最终{single_total}元,你点头我立刻开干。",
f"这张就按{single_total}元走,你确认我就马上安排。",
]
lines.append(random.choice(tails))
return "\n".join(lines)
heads = [
"我先按这几张给你报一下:",
"这几张我都看过了,价格给你列一下:",
"我把每张价格先给你说清楚:",
"我先把这几张的价格拆开给你看:",
"这几张我都评估过了,报价给你写明白:",
"先别急,我把每张大概价给你列出来:",
"我按这批图先报个明细给你:",
"我先把每张费用和总价给你算出来:",
]
lines = [random.choice(heads)]
lines.extend(detail_lines)
if req_hit:
lines.append(f"需求加价:+{extra}元({req_hit}")
option_line = random.choice([
f"可选:按单张做(共{single_total}元),或打包做({bundle_price}元,会更省一点)。",
f"可选:单张算下来一共{single_total}元;打包给你{bundle_price}元,更划算。",
f"可选:你按单张做共{single_total}元,按打包做我给你{bundle_price}元。",
f"可选:分开做总共{single_total}元,打包做{bundle_price}元(省一点)。",
f"可选:按张算共{single_total}元;直接打包{bundle_price}元。",
])
lines.append(option_line)
lines.append(random.choice([
"你定一个,我这边马上开工。",
"你选个方案,我立刻给你安排上。",
"你拍板就行,我这边马上开做。",
"你看选哪个合适,我这边马上给你做。",
"你一句话定下来,我现在就给你安排。",
]))
return "\n".join(lines)
def _prepare_batch_intake(self, state: ConversationState) -> Dict[str, Any]:
"""Stage 1: 收集阶段,标准化输入并做上限约束。"""
urls = list(state.pending_image_urls)
if not urls:
return {"ok": False, "reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False}
try:
from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY
max_images = max(1, int(BATCH_MAX_IMAGES))
analyze_concurrency = max(1, int(BATCH_ANALYZE_CONCURRENCY))
except Exception:
max_images = 12
analyze_concurrency = 3
if len(urls) > max_images:
return {
"ok": False,
"reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。",
"need_transfer": False,
}
return {
"ok": True,
"urls": urls[:max_images],
"requirements": list(state.pending_requirements or []),
"analyze_concurrency": analyze_concurrency,
}
async def _run_batch_feasibility(self, urls: List[str], concurrency: int) -> List[Tuple[str, Dict[str, Any]]]:
"""Stage 2: 可做性分析(逐图)。"""
from image.image_analyzer import image_analyzer
sem = asyncio.Semaphore(max(1, concurrency))
async def _analyze_one(url: str):
async with sem:
try:
r = await image_analyzer.analyze(url)
except Exception:
r = {
"complexity": "normal",
"reason": "识别异常,按常规估价",
"price_min": 15,
"price_max": 25,
"price_suggest": 20,
"success": False,
}
return url, r
return list(await asyncio.gather(*[_analyze_one(u) for u in urls]))
async def _sync_batch_analysis_to_workflow(self, results: List[Tuple[str, Dict[str, Any]]], message: CustomerMessage) -> None:
for url, r in results:
try:
from core.workflow import workflow
await workflow.image_analysis_result(
customer_id=message.from_id,
image_url=url,
complexity=r.get("complexity", "normal"),
acc_id=message.acc_id,
acc_type=message.acc_type,
gemini_prompt=r.get("gemini_prompt", ""),
aspect_ratio=r.get("aspect_ratio", "1:1"),
perspective=r.get("perspective", "no"),
proc_type=r.get("proc_type", ""),
subject=r.get("subject", ""),
quality=r.get("quality", ""),
)
except Exception as e:
print(f"[Agent] Workflow 批量任务创建失败: {e}")
def _assess_batch_risk(self, results: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, List[str]]:
"""Stage 2.5: 分离可做和风险图。"""
unsafe: List[str] = []
dense_text_reject: List[str] = []
for i, (_, r) in enumerate(results, 1):
if r.get("feasibility") == "no" or r.get("risk") == "high":
unsafe.append(f"{i}")
note = str(r.get("note", "") or "")
if "文字内容过于密集" in note or "密集文字" in note:
dense_text_reject.append(f"{i}")
return {"unsafe": unsafe, "dense_text_reject": dense_text_reject}
def _build_batch_pricing_plan(
self,
results: List[Tuple[str, Dict[str, Any]]],
requirements: List[str],
) -> Dict[str, Any]:
"""Stage 3: 报价计算(图片成本 + 需求加价 + 打包价)。"""
total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results)
req_fee = self._calc_requirement_surcharge(requirements)
if len(results) == 2:
bundle_price = max(10, total_suggest - 5)
elif len(results) >= 3:
bundle_price = max(10, round(total_suggest * 0.9 / 5) * 5)
else:
bundle_price = total_suggest
bundle_price += int(req_fee.get("extra", 0) or 0)
bundle_price = round(bundle_price / 5) * 5
return {
"total_suggest": total_suggest,
"req_fee": req_fee,
"bundle_price": bundle_price,
}
async def _try_batch_auto_process(
self,
results: List[Tuple[str, Dict[str, Any]]],
message: CustomerMessage,
req_fee: Dict[str, Any],
) -> Dict[str, Any]:
"""Stage 4-A: 自动处理+图绘链接。失败时回退到需求澄清。"""
links = []
try:
from image.image_processor import image_processor
from utils.image_queue import run_with_queue
for idx, (url, r) in enumerate(results, 1):
req_parts = [f"complexity:{r.get('complexity', 'normal')}"]
if r.get("gemini_prompt"):
req_parts.append(f"prompt:{r.get('gemini_prompt')}")
if r.get("aspect_ratio"):
req_parts.append(f"ratio:{r.get('aspect_ratio')}")
if r.get("perspective") and r.get("perspective") != "no":
req_parts.append(f"perspective:{r.get('perspective')}")
if r.get("proc_type"):
req_parts.append(f"proc_type:{r.get('proc_type')}")
if r.get("subject"):
req_parts.append(f"subject:{r.get('subject')}")
if r.get("quality"):
req_parts.append(f"quality:{r.get('quality')}")
process_res = await run_with_queue(image_processor.process_image(
url,
"enhance",
requirements="|".join(req_parts),
gemini_prompt=r.get("gemini_prompt", ""),
aspect_ratio=r.get("aspect_ratio", "1:1"),
perspective=r.get("perspective", "no"),
proc_type=r.get("proc_type", ""),
subject=r.get("subject", ""),
quality=r.get("quality", ""),
))
if not process_res.get("success"):
raise RuntimeError(process_res.get("message", "图片处理失败"))
ok, link, _ = await upload_to_tuhui(
process_res["result_path"],
title=f"客户{message.from_id[-4:]}-图片{idx}",
description="AI自动处理结果",
price=max(10, int(r.get("price_suggest", 20) or 20) + int(req_fee.get("extra", 0) or 0) // max(1, len(results))),
)
if not ok:
raise RuntimeError(str(link))
links.append(link)
except Exception as e:
print(f"[Agent] 找图自动处理失败,回退需求澄清: {e}")
return {
"reply": "这种可以做类似款。你先说下具体需求:要几张、是否改字、尺寸比例、交付格式(单图/打包链接),我按需求给你直接做。",
"need_transfer": False,
}
lines = ["找到了,链接如下:"]
for i, link in enumerate(links, 1):
lines.append(f"链接{i}{link}")
return {"reply": "\n".join(lines), "need_transfer": False}
def _finalize_batch_state(self, state: ConversationState, customer_id: str, final_price: int = 0):
if final_price > 0:
state.last_price = final_price
try:
from db.customer_db import db
db.update_last_price(customer_id, final_price)
except Exception:
pass
state.pending_image_urls.clear()
state.pending_requirements.clear()
self._refresh_quote_phase(state, "idle")
self._sync_pending_quote_state(customer_id, state)
async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]:
"""
统一报价主流程(分层):
1) Intake 收集
2) Feasibility 可做性
3) Pricing 报价
4) Router 自动处理/报价/转人工
"""
intake = self._prepare_batch_intake(state)
if not intake.get("ok", False):
return {"reply": intake.get("reply", ""), "need_transfer": bool(intake.get("need_transfer", False))}
urls = intake["urls"]
requirements = intake["requirements"]
analyze_concurrency = int(intake["analyze_concurrency"])
results = await self._run_batch_feasibility(urls=urls, concurrency=analyze_concurrency)
await self._sync_batch_analysis_to_workflow(results=results, message=message)
risk = self._assess_batch_risk(results)
unsafe = risk["unsafe"]
dense_text_reject = risk["dense_text_reject"]
if unsafe:
self._finalize_batch_state(state, message.from_id, final_price=0)
if dense_text_reject and len(dense_text_reject) == len(unsafe):
return {"reply": self._build_reject_message("文字密集类图片暂不接单"), "need_transfer": False}
return {
"reply": f"这批里{''.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。",
"need_transfer": True,
}
pricing = self._build_batch_pricing_plan(results=results, requirements=requirements)
total_suggest = int(pricing["total_suggest"])
bundle_price = int(pricing["bundle_price"])
req_fee = pricing["req_fee"]
intent_text = (message.msg or "") + " " + " ".join(requirements[-5:])
workflow_type, _ = self.workflow_router.detect_workflow(intent_text)
if workflow_type == "find_image":
route_res = await self._try_batch_auto_process(
results=results,
message=message,
req_fee=req_fee,
)
self._finalize_batch_state(state, message.from_id, final_price=bundle_price)
return route_res
reply_text = self._build_batch_quote_reply(
results=results,
total_suggest=total_suggest,
bundle_price=bundle_price,
req_fee=req_fee,
)
self._finalize_batch_state(state, message.from_id, final_price=bundle_price)
return {"reply": reply_text, "need_transfer": False}
def _split_customer_text(self, msg: str) -> tuple:
"""
把混合消息拆分为(客户真实文字, 系统订单块)。
平台有时把客户文字和系统订单通知拼在同一条消息里。
"""
import re
# 找到系统订单块的起始位置
order_marker = re.search(r'\[系统订单信息\]|\[系统通知\]', msg)
if order_marker:
customer_text = msg[:order_marker.start()].strip()
order_block = msg[order_marker.start():].strip()
else:
customer_text = msg.strip()
order_block = ""
return customer_text, order_block
def _build_prompt(self, message: CustomerMessage, state: ConversationState) -> str:
"""构建提示词"""
msg_content = message.msg
stage_info = f"【当前阶段】{state.stage}"
# 拆分:客户文字 vs 系统订单块
customer_text, order_block = self._split_customer_text(msg_content)
has_order = bool(order_block)
if has_order:
order = parse_order_info(order_block)
if order.get('order_id'):
state.last_order_id = order['order_id']
stage_info += f"\n【订单号】{order['order_id']}"
if order.get('order_status'):
state.order_status = order['order_status']
stage_info += f"\n【订单状态】{order['order_status']}"
if order.get('pay_status'):
stage_info += f"\n【支付状态】{order['pay_status']}"
if order.get('amount'):
stage_info += f"\n【订单金额】{order['amount']}"
if order.get('quantity'):
stage_info += f"\n【数量】{order['quantity']}"
if order.get('order_time'):
stage_info += f"\n【下单时间】{order['order_time']}"
if order.get('buyer_note'):
stage_info += f"\n【买家备注】{order['buyer_note']}"
if state.discount_count > 0:
stage_info += f"\n【客户压价次数】{state.discount_count}"
# 店铺类型:不同店铺不同回复策略
shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "")
shop_hint = ""
try:
from config.config import CONFIG_DIR
import json
cfg_path = CONFIG_DIR / "shop_prompts.json"
if cfg_path.exists():
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
hints = cfg.get("type_hints", {})
shop_hint = hints.get(shop_type, "")
if not shop_hint and message.acc_id:
sh = cfg.get("shops", {}).get(message.acc_id, {})
shop_hint = sh.get("hint", "")
except Exception:
pass
prompt = f"""收到新消息:
{stage_info}
发送者: {message.from_name} ({message.from_id})
"""
if message.goods_name:
prompt += f"商品名称: {message.goods_name}\n"
if shop_hint:
prompt += f"\n{shop_hint}\n"
# ── 优先处理客户真实问题 ──
# ── 判断订单付款状态(供后续逻辑使用)──
order_paid = False
order_unpaid = False
if has_order:
order = parse_order_info(order_block)
paid_kws = ["等待发货", "已付款", "付款成功", "买家已付款"]
unpaid_kws = ["等待买家付款", "待付款", "未付款"]
ps = order.get('pay_status', '')
os_ = order.get('order_status', '')
if any(kw in ps or kw in os_ for kw in paid_kws):
order_paid = True
elif any(kw in ps or kw in os_ for kw in unpaid_kws):
order_unpaid = True
# ── 催单/进度询问关键词 ──
progress_keywords = [
"安排了吗", "安排好了吗", "好了吗", "做了吗", "做好了吗",
"弄好了吗", "好了没", "做了没", "什么时候好", "多久好",
"进度", "催一下", "快点", "什么时候能好", "做完了吗",
]
if customer_text:
prompt += f"\n客户说:{customer_text}\n"
image_url = self._extract_image_url(customer_text)
price_keywords = ["多少钱", "多少", "价格", "几块", "怎么收费", "报个价"]
size_keywords = [
"尺寸", "比例", "", "", "", "厘米", "mm", "cm",
"横版", "竖版", "2米", "3米", "改成", "做成",
]
has_size_change = any(kw in customer_text.lower() for kw in [k.lower() for k in size_keywords])
# gemini_api 店铺:不触发找图流程,按 API 客服回复
if shop_type == "gemini_api":
prompt += "\n【Gemini API 店铺】客户问账号/pro/续费/套餐等,按 API 客服自然回复,不要求发图。"
elif image_url:
prompt += "\n客户在继续发图阶段:先确认“已收图”,并引导客户把图和要求一次发完;等客户明确“发完了/统一报价”后再统一报价。"
elif any(kw in customer_text for kw in price_keywords):
last_url = self._extract_image_url(msg_content)
if last_url:
prompt += "\n客户在询问价格:若客户已确认发完,则给总报价;若还在发图,先引导发完后统一报价。"
else:
prompt += "\n客户在询问价格但未发图:先简短承接(如“在看呢/收到”),不要机械连发;再自然引导对方发图。"
if has_size_change:
prompt += (
"\n⚠️ 尺寸改动场景:优先判断图片主体是否会被拉伸变形,"
"不是只看整张图宽高比。若会变形,要先提示“需要补图/扩边”,再给报价。"
)
elif has_size_change:
prompt += (
"\n客户在改尺寸/改比例:先按主体比例判断是否会变形,"
"不是只看整图比例。若目标尺寸会拉伸主体,先明确说明要补图(如上下补图/扩边)再报价。"
)
elif any(kw in customer_text for kw in progress_keywords):
# 客户问进度/催单,必须先核查付款状态
if order_unpaid:
prompt += "\n⚠️【订单未付款】客户问安排进度,但订单还未付款。自然告知拍下付款后马上安排即可。"
elif order_paid:
prompt += "\n客户催单,订单已付款,自然回复在做了/快了之类。"
else:
prompt += "\n客户催单,查询当前处理状态后自然回复。"
elif any(kw in customer_text for kw in ["", "有点贵", "太贵", "算了", "便宜点", "少点", "打折", "贵哦"]):
# 客户嫌贵/要放弃 → 直接让价一次,不问「什么问题」
prompt += "\n⚠️【客户嫌贵】客户已表达价格顾虑或要放弃。禁止追问「什么问题」「说清楚点」。\n→ 第一步:直接让价一次(如原价 20→15话术自然。\n→ 第二步:如果让价后客户还是嫌贵,引导客户自己去注册星绘平台:\n - 每个话术都必须带网址 xinhui.cloud\n - 强调「自己去弄」「自己操作」「自己下单」\n - 示例:「嫌贵就去 xinhui.cloud 注册个号自己有余额可以弄」「xinhui.cloud 去注册下,新用户送体验金,自己弄便宜」\n - 语气直接,让客户自己去操作」"
elif any(kw in customer_text for kw in ["擦边", "黄色", "色情", "大尺度", "性感图", "露点", "半裸"]):
# 客户问擦边/黄色内容 → 直接拒绝,不说「发图来看看」
prompt += "\n⚠️【拒绝】客户询问擦边/黄色/敏感内容。直接拒绝,不接单,不说「发图来看看」。自然回复如:这类不做/不接/做不了。"
else:
prompt += "\n根据客户说的内容自然回应,像真人聊天,不要套模板。"
# ── 附加订单信息(不覆盖客户问题的优先级)──
if has_order:
order = parse_order_info(order_block)
order_instruction = build_order_instruction(
order.get('pay_status', ''),
order.get('order_status', '')
)
if customer_text:
if not order_unpaid:
# 未付款情况已在上面明确处理,不重复添加背景
prompt += f"\n\n【背景参考-订单通知】{order_instruction}"
else:
# 纯系统通知,没有客户文字
prompt += f"\n\n{order_instruction}"
if not customer_text and not has_order:
prompt += f"\n消息内容: {msg_content}\n请按工作流规则回复。"
return prompt
async def _handle_image_workflow(self, message: str, data: dict, image_urls: list) -> bool:
"""处理图片工作流(根据客户说的话判断执行哪种工作流)"""
if not image_urls:
return False
workflow_type, confidence = self.workflow_router.detect_workflow(message)
customer_id = data.get('from_id')
acc_id = data.get('acc_id', '')
acc_type = data.get('acc_type', 'AliWorkbench')
image_url = image_urls[0]
print(f"[Agent] 检测到工作流类型:{workflow_type} (置信度:{confidence})")
if workflow_type == "find_image":
print(f"[Agent] 执行查找图片工作流 | 客户:{customer_id}")
from core.workflow import workflow
return await workflow.find_image_workflow(
customer_id=customer_id,
image_url=image_url,
acc_id=acc_id,
acc_type=acc_type
)
elif workflow_type == "process_image":
print(f"[Agent] 执行处理图片工作流 | 客户:{customer_id}")
from core.workflow import workflow
return await workflow.process_image_workflow(
customer_id=customer_id,
image_url=image_url,
acc_id=acc_id,
acc_type=acc_type
)
elif workflow_type == "transfer_human":
print(f"[Agent] 执行转人工派单工作流 | 客户:{customer_id}")
from core.workflow import workflow
return await workflow.transfer_to_designer_workflow(
customer_id=customer_id,
image_url=image_url,
acc_id=acc_id,
acc_type=acc_type,
reason="客户主动要求转人工"
)
return False
async def test_agent():
"""测试 Agent"""
agent = CustomerServiceAgent(skills_dir="skills")
test_msg = CustomerMessage(
msg_id="123",
acc_id="test_account",
msg="这张图可以做吗?",
from_id="customer001",
from_name="张三",
cy_id="customer001",
acc_type="AliWorkbench",
msg_type=0,
cy_name="张三",
goods_name="专业找图代找高清图片",
goods_order=""
)
response = await agent.process_message(test_msg)
print(f"回复内容: {response.reply}")
if __name__ == "__main__":
import asyncio
asyncio.run(test_agent())