Compare commits

...

2 Commits

View File

@@ -63,6 +63,7 @@ async def _notify_wechat_overdue():
# ========== 转接常量 ==========
TRANSFER_MESSAGE = "话术|[转移会话],分组20252916034,无原因"
CASE_LIBRARY_LINK = "https://www.yuque.com/zuowei-dfvpq/kge0in/mynala0g35b8cec5"
# ========== 数据模型 ==========
@@ -1092,7 +1093,12 @@ class CustomerServiceAgent:
规则:
- 收到图片或历史有图片依据时尽量结合复杂度给出单价价格为5的整数倍
- 没有图片时引导发图,不给价格区间
- 报价后紧跟一句推动成交,话术自然不重复
- 报价后紧跟一句推动成交,话术自然不重复,避免机械重复“最低了”
- 客户说“有点贵/优惠点/两张优惠点”时,优先给打包价或数量优惠,不要只会拒绝
- 客户说“不放心/先看效果”时,先建立信任:可发案例链接 {CASE_LIBRARY_LINK},并说明不满意可退
- 可直接复用这条信任话术(按需微调,不要每次完全一样):
小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。
有什么想要的效果随时告诉我哈,我这边都可以按您的要求来做哦~/:065 效果不好不满意,我们这边包退的哦。
- 最低价不低于{floor}元,客户出价低于底线时礼貌拒绝(不好意思)
- 输出不超过2句话"""
@@ -1584,6 +1590,11 @@ class CustomerServiceAgent:
# 更新历史,最多保留最近 30 条消息防止 token 超限
self.message_histories[message.from_id] = result.all_messages()[-30:]
reply_text = self._normalize_reply_text(result.output)
# 价格谈判与信任建立固定策略(避免只回“最低了/先拍下”)
strategy_reply = self._negotiation_strategy_reply(message.msg, state)
if strategy_reply:
reply_text = strategy_reply
# 拦截超低杀价:客户报价低于底线时,统一礼貌拒绝
try:
from config.config import MIN_PRICE_FLOOR
@@ -1924,6 +1935,34 @@ class CustomerServiceAgent:
except Exception:
pass
def _negotiation_strategy_reply(self, customer_text: str, state: ConversationState) -> str:
"""
价格谈判固定策略(优先级高于通用 AI 自由回复):
- 有点贵:给两张打包建议价
- 优惠点引导3张以上打包价
- 先发效果图:给固定案例链接并说明不满意包退
"""
text = (customer_text or "").strip()
if not text:
return ""
if any(k in text for k in ["先发效果图", "先看效果", "不放心", "没法确认"]):
return (
f"小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。"
"有什么想要的效果随时可以告诉我哈,我这边都可以按您的要求来做哦~/:065 效果不好不满意,我们这边包退的哦。"
)
if "有点贵" in text or "就是贵" in text:
# 约定示例两张优惠价默认45若已有单张价格则动态估算两张打包价
base = state.last_price if isinstance(state.last_price, int) and state.last_price > 0 else 25
two_pack = max(10, round(((base * 2) - 5) / 5) * 5)
return f"理解你的顾虑,这样吧,两张一起的话给你算 {two_pack} 元?"
if any(k in text for k in ["优惠点", "便宜点", "少点", "打折"]):
return "看你要做的数量3张以上可以给你打包价"
return ""
def _parse_order_info(self, msg: str) -> dict:
"""从系统订单消息中提取所有字段"""
import re
@@ -2125,18 +2164,11 @@ class CustomerServiceAgent:
lines.append("你定一个方案,我这边马上安排。")
return "\n".join(lines)
async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]:
"""
批量识别待处理图片并统一处理:
- find_image 意图且可自动处理:直接 Gemini 处理 + 上传图绘 + 回链接
- 高风险/不可做:转人工
- 其他:统一报价
"""
from image.image_analyzer import image_analyzer
def _prepare_batch_intake(self, state: ConversationState) -> Dict[str, Any]:
"""Stage 1: 收集阶段,标准化输入并做上限约束。"""
urls = list(state.pending_image_urls)
if not urls:
return {"reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False}
return {"ok": False, "reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False}
try:
from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY
max_images = max(1, int(BATCH_MAX_IMAGES))
@@ -2146,12 +2178,23 @@ class CustomerServiceAgent:
analyze_concurrency = 3
if len(urls) > max_images:
return {
"ok": False,
"reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。",
"need_transfer": False,
}
urls = urls[:max_images]
return {
"ok": True,
"urls": urls[:max_images],
"requirements": list(state.pending_requirements or []),
"analyze_concurrency": analyze_concurrency,
}
async def _run_batch_feasibility(self, urls: List[str], concurrency: int) -> List[Tuple[str, Dict[str, Any]]]:
"""Stage 2: 可做性分析(逐图)。"""
from image.image_analyzer import image_analyzer
sem = asyncio.Semaphore(max(1, concurrency))
sem = asyncio.Semaphore(analyze_concurrency)
async def _analyze_one(url: str):
async with sem:
try:
@@ -2167,9 +2210,10 @@ class CustomerServiceAgent:
}
return url, r
results = list(await asyncio.gather(*[_analyze_one(u) for u in urls]))
return list(await asyncio.gather(*[_analyze_one(u) for u in urls]))
async def _sync_batch_analysis_to_workflow(self, results: List[Tuple[str, Dict[str, Any]]], message: CustomerMessage) -> None:
for url, r in results:
# 与单图流程一致:识别后写入 workflow 任务
try:
from core.workflow import workflow
await workflow.image_analysis_result(
@@ -2188,12 +2232,26 @@ class CustomerServiceAgent:
except Exception as e:
print(f"[Agent] Workflow 批量任务创建失败: {e}")
total_min = sum(int(r.get("price_min", 15) or 15) for _, r in results)
total_max = sum(int(r.get("price_max", 25) or 25) for _, r in results)
total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results)
req_fee = self._calc_requirement_surcharge(state.pending_requirements)
def _assess_batch_risk(self, results: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, List[str]]:
"""Stage 2.5: 分离可做和风险图。"""
unsafe: List[str] = []
dense_text_reject: List[str] = []
for i, (_, r) in enumerate(results, 1):
if r.get("feasibility") == "no" or r.get("risk") == "high":
unsafe.append(f"{i}")
note = str(r.get("note", "") or "")
if "文字内容过于密集" in note or "密集文字" in note:
dense_text_reject.append(f"{i}")
return {"unsafe": unsafe, "dense_text_reject": dense_text_reject}
# 打包优惠2 张减 53 张及以上按 9 折(四舍五入到 5 元)
def _build_batch_pricing_plan(
self,
results: List[Tuple[str, Dict[str, Any]]],
requirements: List[str],
) -> Dict[str, Any]:
"""Stage 3: 报价计算(图片成本 + 需求加价 + 打包价)。"""
total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results)
req_fee = self._calc_requirement_surcharge(requirements)
if len(results) == 2:
bundle_price = max(10, total_suggest - 5)
elif len(results) >= 3:
@@ -2202,34 +2260,19 @@ class CustomerServiceAgent:
bundle_price = total_suggest
bundle_price += int(req_fee.get("extra", 0) or 0)
bundle_price = round(bundle_price / 5) * 5
# 先分流:高风险/不可做 -> 转人工
unsafe = []
dense_text_reject = []
for i, (_, r) in enumerate(results, 1):
if r.get("feasibility") == "no" or r.get("risk") == "high":
unsafe.append(f"{i}")
note = str(r.get("note", "") or "")
if "文字内容过于密集" in note or "密集文字" in note:
dense_text_reject.append(f"{i}")
if unsafe:
state.pending_image_urls.clear()
state.pending_requirements.clear()
self._sync_pending_quote_state(message.from_id, state)
if dense_text_reject and len(dense_text_reject) == len(unsafe):
return {
"reply": self._build_reject_message("文字密集类图片暂不接单"),
"need_transfer": False,
}
return {
"reply": f"这批里{''.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。",
"need_transfer": True,
"total_suggest": total_suggest,
"req_fee": req_fee,
"bundle_price": bundle_price,
}
# 查找图片意图:优先直接自动处理并回图绘链接
intent_text = (message.msg or "") + " " + " ".join(state.pending_requirements[-5:])
workflow_type, _ = self.workflow_router.detect_workflow(intent_text)
if workflow_type == "find_image":
async def _try_batch_auto_process(
self,
results: List[Tuple[str, Dict[str, Any]]],
message: CustomerMessage,
req_fee: Dict[str, Any],
) -> Dict[str, Any]:
"""Stage 4-A: 自动处理+图绘链接。失败时回退到需求澄清。"""
links = []
try:
from image.image_processor import image_processor
@@ -2273,26 +2316,74 @@ class CustomerServiceAgent:
raise RuntimeError(str(link))
links.append(link)
except Exception as e:
# 找图分支失败时,不直接报价,先回到“收集需求再做”
print(f"[Agent] 找图自动处理失败,回退需求澄清: {e}")
return {
"reply": "这种可以做类似款。你先说下具体需求:要几张、是否改字、尺寸比例、交付格式(单图/打包链接),我按需求给你直接做。",
"need_transfer": False,
}
else:
lines = [f"找到了,链接如下:"]
lines = ["找到了,链接如下:"]
for i, link in enumerate(links, 1):
lines.append(f"链接{i}{link}")
state.last_price = bundle_price
return {"reply": "\n".join(lines), "need_transfer": False}
def _finalize_batch_state(self, state: ConversationState, customer_id: str, final_price: int = 0):
if final_price > 0:
state.last_price = final_price
try:
from db.customer_db import db
db.update_last_price(message.from_id, bundle_price)
db.update_last_price(customer_id, final_price)
except Exception:
pass
state.pending_image_urls.clear()
state.pending_requirements.clear()
self._sync_pending_quote_state(message.from_id, state)
return {"reply": "\n".join(lines), "need_transfer": False}
self._sync_pending_quote_state(customer_id, state)
async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]:
"""
统一报价主流程(分层):
1) Intake 收集
2) Feasibility 可做性
3) Pricing 报价
4) Router 自动处理/报价/转人工
"""
intake = self._prepare_batch_intake(state)
if not intake.get("ok", False):
return {"reply": intake.get("reply", ""), "need_transfer": bool(intake.get("need_transfer", False))}
urls = intake["urls"]
requirements = intake["requirements"]
analyze_concurrency = int(intake["analyze_concurrency"])
results = await self._run_batch_feasibility(urls=urls, concurrency=analyze_concurrency)
await self._sync_batch_analysis_to_workflow(results=results, message=message)
risk = self._assess_batch_risk(results)
unsafe = risk["unsafe"]
dense_text_reject = risk["dense_text_reject"]
if unsafe:
self._finalize_batch_state(state, message.from_id, final_price=0)
if dense_text_reject and len(dense_text_reject) == len(unsafe):
return {"reply": self._build_reject_message("文字密集类图片暂不接单"), "need_transfer": False}
return {
"reply": f"这批里{''.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。",
"need_transfer": True,
}
pricing = self._build_batch_pricing_plan(results=results, requirements=requirements)
total_suggest = int(pricing["total_suggest"])
bundle_price = int(pricing["bundle_price"])
req_fee = pricing["req_fee"]
intent_text = (message.msg or "") + " " + " ".join(requirements[-5:])
workflow_type, _ = self.workflow_router.detect_workflow(intent_text)
if workflow_type == "find_image":
route_res = await self._try_batch_auto_process(
results=results,
message=message,
req_fee=req_fee,
)
self._finalize_batch_state(state, message.from_id, final_price=bundle_price)
return route_res
reply_text = self._build_batch_quote_reply(
results=results,
@@ -2300,18 +2391,7 @@ class CustomerServiceAgent:
bundle_price=bundle_price,
req_fee=req_fee,
)
state.last_price = bundle_price
try:
from db.customer_db import db
db.update_last_price(message.from_id, bundle_price)
except Exception:
pass
# 清空待报价队列(本轮已统一报价)
state.pending_image_urls.clear()
state.pending_requirements.clear()
self._sync_pending_quote_state(message.from_id, state)
self._finalize_batch_state(state, message.from_id, final_price=bundle_price)
return {"reply": reply_text, "need_transfer": False}
def _split_customer_text(self, msg: str) -> tuple: