refactor: extract post-ops helpers from pydantic agent

This commit is contained in:
2026-03-01 15:14:57 +08:00
parent 6458e7dcca
commit b323a64b0b
5 changed files with 190 additions and 211 deletions

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from core.post_ops import negotiation_strategy_reply
if TYPE_CHECKING:
from core.pydantic_ai_agent import AgentDeps, ConversationState, CustomerMessage, CustomerServiceAgent
@@ -69,7 +70,7 @@ async def execute_ai_turn(
agent.message_histories[message.from_id] = result.all_messages()[-30:]
reply_text = agent._colloquialize_reply(agent._normalize_reply_text(result.output))
strategy_reply = agent._negotiation_strategy_reply(message.msg, state)
strategy_reply = negotiation_strategy_reply(message.msg, state)
if strategy_reply:
reply_text = strategy_reply

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING, Optional
from core.post_ops import record_deal_success
if TYPE_CHECKING:
from core.pydantic_ai_agent import AgentResponse, ConversationState, CustomerMessage, CustomerServiceAgent
@@ -31,13 +32,13 @@ async def handle_order_notification(
if is_paid:
asyncio.create_task(agent._check_order_amount(message.from_id, order, message.acc_id))
asyncio.create_task(
agent._record_deal_success(
message.from_id,
message.from_name,
message.acc_id,
message.acc_type,
order,
state,
record_deal_success(
customer_id=message.from_id,
customer_name=message.from_name,
acc_id=message.acc_id,
platform=message.acc_type,
order=order,
state=state,
)
)
try:

169
core/post_ops.py Normal file
View File

@@ -0,0 +1,169 @@
from __future__ import annotations
import re
from typing import Any
from utils.metrics_tracker import emit as metrics_emit
CASE_LIBRARY_LINK = "https://www.yuque.com/zuowei-dfvpq/kge0in/mynala0g35b8cec5"
def detect_price(reply: str, state: Any) -> None:
numbers = re.findall(r"(\d+)[元]", reply or "")
if not numbers:
return
price = round(int(numbers[0]) / 5) * 5
state.last_price = price
metrics_emit("quote_generated", customer_id=state.customer_id, price=price)
try:
from db.customer_db import db
db.update_last_price(state.customer_id, price)
except Exception:
pass
def detect_discount(message: str, state: Any) -> None:
text = message or ""
if any(kw in text for kw in ["", "便宜", "太贵", "有点贵"]):
state.discount_count += 1
if state.last_price:
try:
from db.customer_db import db
db.record_discount(state.customer_id, state.last_price)
except Exception:
pass
m = re.search(r"(\d+)\s*元|\b(\d+)\s*块", text)
offer = None
if m:
offer = int(m.group(1) or m.group(2))
if offer:
try:
from config.config import MIN_PRICE_FLOOR
if offer < MIN_PRICE_FLOOR:
state.last_price = state.last_price or 0
except Exception:
pass
def negotiation_strategy_reply(customer_text: str, state: Any) -> str:
text = (customer_text or "").strip()
if not text:
return ""
if any(k in text for k in ["先发效果图", "先看效果", "不放心", "没法确认"]):
return (
f"小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。"
"有什么想要的效果随时告诉我哈,不满意我们这边包退。"
)
if "有点贵" in text or "就是贵" in text:
base = state.last_price if isinstance(state.last_price, int) and state.last_price > 0 else 25
two_pack = max(10, round(((base * 2) - 5) / 5) * 5)
return f"理解你这边的预算,我给你个实在点的:两张一起按 {two_pack} 元做,行不行?"
if any(k in text for k in ["优惠点", "便宜点", "少点", "打折"]):
return "可以的你这边数量上来我就好给价3张以上我给你打包价。"
return ""
async def record_deal_success(
*,
customer_id: str,
customer_name: str,
acc_id: str,
platform: str,
order: dict,
state: Any,
) -> None:
try:
from db.deal_outcome_db import record_deal
order_id = order.get("order_id", "")
raw_amount = order.get("amount", "")
m = re.search(r"[\d.]+", str(raw_amount))
amount = float(m.group()) if m else 0
reason = "让价后成交" if (state.discount_count or 0) > 0 else "直接成交"
record_deal(
customer_id=customer_id,
outcome="成交",
reason=reason,
customer_name=customer_name or "",
acc_id=acc_id or "",
platform=platform or "",
order_id=order_id,
amount=amount,
discount_given=(state.discount_count or 0) > 0,
)
try:
from db.customer_db import db
if order_id:
db.add_order(customer_id, order_id, amount)
db.clear_quote_no_convert(customer_id)
except Exception:
pass
print(f"[Agent] 成交记录: {customer_id} {reason} {amount}")
except Exception as e:
print(f"[Agent] 成交记录失败: {e}")
async def record_deal_fail(
*,
customer_id: str,
customer_name: str,
acc_id: str,
platform: str,
reason: str,
) -> None:
try:
from db.deal_outcome_db import record_deal
from db.customer_db import db
record_deal(
customer_id=customer_id,
outcome="未成交",
reason=reason,
customer_name=customer_name or "",
acc_id=acc_id or "",
platform=platform or "",
)
db.mark_quote_no_convert(customer_id)
print(f"[Agent] 未成交记录: {customer_id} {reason}")
except Exception as e:
print(f"[Agent] 未成交记录失败: {e}")
async def auto_tag(message: Any, state: Any) -> None:
try:
from db.customer_db import db
cid = message.from_id
msg = (message.msg or "").lower()
if any(kw in msg for kw in ["还有", "多张", "好几张", "一批", "下次还"]):
db.set_bulk_potential(cid, "")
db.add_upsell_opportunity(cid, "批量打包")
if any(kw in msg for kw in ["psd", "分层", "源文件"]):
db.add_upsell_opportunity(cid, "分层PSD")
db.update_preferred_format(cid, "psd")
if "jpg" in msg or "jpeg" in msg:
db.update_preferred_format(cid, "jpg")
if "png" in msg:
db.update_preferred_format(cid, "png")
if any(kw in msg for kw in ["分辨率", "dpi", "尺寸", "大图", "印刷"]):
db.update_preferred_size(cid, message.msg[:30])
if any(kw in msg for kw in ["拍了", "下单了", "好的", ""]) and state.last_price:
db.update_decision_speed(cid, "")
type_keywords = {
"印花": ["印花", "花纹", "图案", "面料", "布料", "纺织"],
"logo": ["logo", "标志", "品牌", "商标"],
"人物": ["人物", "人像", "照片", "", "头像"],
"产品": ["产品", "商品", "包装", "实物"],
"老照片": ["老照片", "旧照片", "发黄", "修复"],
}
for img_type, keywords in type_keywords.items():
if any(kw in message.msg for kw in keywords):
db.add_image_type(cid, img_type)
break
db.auto_compute_tags(cid)
except Exception:
pass

View File

@@ -1893,21 +1893,6 @@ class CustomerServiceAgent:
reply_text=reply_text,
)
def _detect_price(self, reply: str, state: ConversationState):
"""从回复中提取价格同步写入客户数据库价格必须为5的整数倍"""
import re
numbers = re.findall(r'(\d+)[元]', reply)
if numbers:
price = round(int(numbers[0]) / 5) * 5 # 强制为5的整数倍
state.last_price = price
metrics_emit("quote_generated", customer_id=state.customer_id, price=price)
# 持久化到客户数据库,重启后仍可读取
try:
from db.customer_db import db
db.update_last_price(state.customer_id, price)
except Exception:
pass
async def _check_order_amount(self, customer_id: str, order: dict, acc_id: str):
"""核查订单实付金额是否与报价一致,异常时企业微信预警"""
try:
@@ -1942,190 +1927,6 @@ class CustomerServiceAgent:
except Exception as e:
print(f"[Agent] 订单金额核查失败: {e}")
async def _record_deal_success(
self,
customer_id: str,
customer_name: str,
acc_id: str,
platform: str,
order: dict,
state: "ConversationState",
):
"""成交时写入数据库,供日报与数据分析"""
try:
import re
from db.deal_outcome_db import record_deal
order_id = order.get("order_id", "")
raw_amount = order.get("amount", "")
m = re.search(r"[\d.]+", str(raw_amount))
amount = float(m.group()) if m else 0
reason = "让价后成交" if (state.discount_count or 0) > 0 else "直接成交"
record_deal(
customer_id=customer_id,
outcome="成交",
reason=reason,
customer_name=customer_name or "",
acc_id=acc_id or "",
platform=platform or "",
order_id=order_id,
amount=amount,
discount_given=(state.discount_count or 0) > 0,
)
# 同步到客户库
try:
from db.customer_db import db
if order_id:
db.add_order(customer_id, order_id, amount)
db.clear_quote_no_convert(customer_id)
except Exception:
pass
print(f"[Agent] 成交记录: {customer_id} {reason} {amount}")
except Exception as e:
print(f"[Agent] 成交记录失败: {e}")
async def _record_deal_fail(
self,
customer_id: str,
customer_name: str,
acc_id: str,
platform: str,
reason: str,
):
"""未成交时写入数据库,供日报与数据分析;标记报价未成交,下次可适当降低"""
try:
from db.deal_outcome_db import record_deal
from db.customer_db import db
record_deal(
customer_id=customer_id,
outcome="未成交",
reason=reason,
customer_name=customer_name or "",
acc_id=acc_id or "",
platform=platform or "",
)
db.mark_quote_no_convert(customer_id)
print(f"[Agent] 未成交记录: {customer_id} {reason}")
except Exception as e:
print(f"[Agent] 未成交记录失败: {e}")
async def _auto_tag(self, message: CustomerMessage, reply: str, state: ConversationState):
"""自动识别并写入各类标签"""
try:
from db.customer_db import db
cid = message.from_id
msg = message.msg.lower()
# 批量潜力
if any(kw in msg for kw in ["还有", "多张", "好几张", "一批", "下次还"]):
db.set_bulk_potential(cid, "")
db.add_upsell_opportunity(cid, "批量打包")
# 加购机会问过PSD/分层
if any(kw in msg for kw in ["psd", "分层", "源文件"]):
db.add_upsell_opportunity(cid, "分层PSD")
db.update_preferred_format(cid, "psd")
# 格式偏好
if "jpg" in msg or "jpeg" in msg:
db.update_preferred_format(cid, "jpg")
if "png" in msg:
db.update_preferred_format(cid, "png")
# 尺寸/分辨率偏好
if any(kw in msg for kw in ["分辨率", "dpi", "尺寸", "大图", "印刷"]):
db.update_preferred_size(cid, message.msg[:30])
# 决策速度:收到报价后立刻说拍了 → 快
if any(kw in msg for kw in ["拍了", "下单了", "好的", ""]) and state.last_price:
db.update_decision_speed(cid, "")
# 图片类型识别(简单关键词匹配)
type_keywords = {
"印花": ["印花", "花纹", "图案", "面料", "布料", "纺织"],
"logo": ["logo", "标志", "品牌", "商标"],
"人物": ["人物", "人像", "照片", "", "头像"],
"产品": ["产品", "商品", "包装", "实物"],
"老照片": ["老照片", "旧照片", "发黄", "修复"],
}
for img_type, keywords in type_keywords.items():
if any(kw in message.msg for kw in keywords):
db.add_image_type(cid, img_type)
break
# 定期自动计算衍生标签
db.auto_compute_tags(cid)
except Exception:
pass
def _detect_discount(self, message: str, state: ConversationState):
"""检测压价,并持久化让价记录"""
if any(kw in message for kw in ["", "便宜", "太贵", "有点贵"]):
state.discount_count += 1
if state.last_price:
try:
from db.customer_db import db
db.record_discount(state.customer_id, state.last_price)
except Exception:
pass
# 客户明确给价如“10元/10块/能10吗”
import re
m = re.search(r'(\d+)\s*元|\b(\d+)\s*块', message)
offer = None
if m:
offer = int(m.group(1) or m.group(2))
if offer:
try:
from config.config import MIN_PRICE_FLOOR
if offer < MIN_PRICE_FLOOR:
# 标记本次为超低出价(用于后续拒绝话术提示)
state.last_price = state.last_price or 0
except Exception:
pass
def _negotiation_strategy_reply(self, customer_text: str, state: ConversationState) -> str:
"""
价格谈判固定策略(优先级高于通用 AI 自由回复):
- 有点贵:给两张打包建议价
- 优惠点引导3张以上打包价
- 先发效果图:给固定案例链接并说明不满意包退
"""
text = (customer_text or "").strip()
if not text:
return ""
if any(k in text for k in ["先发效果图", "先看效果", "不放心", "没法确认"]):
return random.choice([
f"小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。有什么想要的效果随时告诉我哈,不满意我们这边包退。",
f"先给你看案例哈,链接在这({CASE_LIBRARY_LINK})。你想要什么效果直接说,我们按你要求做,不满意可退。",
f"我把类似案例给你准备好了,点这个看就行({CASE_LIBRARY_LINK})。你放心说需求,效果不满意我们包退。",
f"怕效果不稳的话你先看案例,链接在这里({CASE_LIBRARY_LINK})。你确认风格后我按你要的做,不满意可以退。",
f"可以先看下我们做过的案例({CASE_LIBRARY_LINK}。你觉得方向OK再拍不满意我们这边支持退。",
])
if "有点贵" in text or "就是贵" in text:
# 约定示例两张优惠价默认45若已有单张价格则动态估算两张打包价
base = state.last_price if isinstance(state.last_price, int) and state.last_price > 0 else 25
two_pack = max(10, round(((base * 2) - 5) / 5) * 5)
return random.choice([
f"理解你这边的预算,我给你个实在点的:两张一起按 {two_pack} 元做,行不行?",
f"我懂你意思,这样吧,两张一起我给你算 {two_pack} 元。",
f"那我给你压一口价,两张打包 {two_pack} 元,你看可以我就开做。",
f"没问题,我给你优惠点,两张一起按 {two_pack} 元走。",
f"你这边要省点的话,两张一起我给你做到 {two_pack} 元。",
])
if any(k in text for k in ["优惠点", "便宜点", "少点", "打折"]):
return random.choice([
"可以的你这边数量上来我就好给价3张以上我给你打包价。",
"能优惠做得多会更划算3张以上我这边可以给你打包算。",
"没问题3张起我可以给你一口打包价会比单张省一些。",
"可以便宜点按量走更好谈3张以上我给你打包价。",
"你这边如果是多张做3张以上我能给你更划算的打包价。",
])
return ""
def _parse_order_info(self, msg: str) -> dict:
"""从系统订单消息中提取所有字段"""
import re

View File

@@ -5,6 +5,7 @@ from datetime import datetime
from typing import TYPE_CHECKING
from utils.metrics_tracker import emit as metrics_emit
from core.post_ops import auto_tag, detect_discount, detect_price, record_deal_fail
if TYPE_CHECKING:
from core.pydantic_ai_agent import AgentResponse, ConversationState, CustomerMessage, CustomerServiceAgent
@@ -36,9 +37,9 @@ async def finalize_ai_reply(
except Exception:
pass
agent._detect_price(reply_text, state)
agent._detect_discount(message.msg, state)
asyncio.create_task(agent._auto_tag(message, reply_text, state))
detect_price(reply_text, state)
detect_discount(message.msg, state)
asyncio.create_task(auto_tag(message, state))
need_transfer = False
transfer_msg = ""
@@ -64,7 +65,13 @@ async def finalize_ai_reply(
if any(kw in customer_text for kw in no_convert_keywords):
reason = "嫌贵放弃" if any(k in customer_text for k in ["", "贵了", "便宜"]) else "放弃"
asyncio.create_task(
agent._record_deal_fail(message.from_id, message.from_name, message.acc_id, message.acc_type, reason)
record_deal_fail(
customer_id=message.from_id,
customer_name=message.from_name,
acc_id=message.acc_id,
platform=message.acc_type,
reason=reason,
)
)
should_reply = bool(reply_text and reply_text.strip()) and not need_transfer