diff --git a/core/ai_reply_flow.py b/core/ai_reply_flow.py index b3bbe28..d23e57f 100644 --- a/core/ai_reply_flow.py +++ b/core/ai_reply_flow.py @@ -1,6 +1,7 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any +from core.post_ops import negotiation_strategy_reply if TYPE_CHECKING: from core.pydantic_ai_agent import AgentDeps, ConversationState, CustomerMessage, CustomerServiceAgent @@ -69,7 +70,7 @@ async def execute_ai_turn( agent.message_histories[message.from_id] = result.all_messages()[-30:] reply_text = agent._colloquialize_reply(agent._normalize_reply_text(result.output)) - strategy_reply = agent._negotiation_strategy_reply(message.msg, state) + strategy_reply = negotiation_strategy_reply(message.msg, state) if strategy_reply: reply_text = strategy_reply diff --git a/core/order_flow.py b/core/order_flow.py index f5df9ad..05cb93f 100644 --- a/core/order_flow.py +++ b/core/order_flow.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio from typing import TYPE_CHECKING, Optional +from core.post_ops import record_deal_success if TYPE_CHECKING: from core.pydantic_ai_agent import AgentResponse, ConversationState, CustomerMessage, CustomerServiceAgent @@ -31,13 +32,13 @@ async def handle_order_notification( if is_paid: asyncio.create_task(agent._check_order_amount(message.from_id, order, message.acc_id)) asyncio.create_task( - agent._record_deal_success( - message.from_id, - message.from_name, - message.acc_id, - message.acc_type, - order, - state, + record_deal_success( + customer_id=message.from_id, + customer_name=message.from_name, + acc_id=message.acc_id, + platform=message.acc_type, + order=order, + state=state, ) ) try: diff --git a/core/post_ops.py b/core/post_ops.py new file mode 100644 index 0000000..751946f --- /dev/null +++ b/core/post_ops.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import re +from typing import Any + +from utils.metrics_tracker import emit as metrics_emit + +CASE_LIBRARY_LINK = "https://www.yuque.com/zuowei-dfvpq/kge0in/mynala0g35b8cec5" + + +def detect_price(reply: str, state: Any) -> None: + numbers = re.findall(r"(\d+)[元]", reply or "") + if not numbers: + return + price = round(int(numbers[0]) / 5) * 5 + state.last_price = price + metrics_emit("quote_generated", customer_id=state.customer_id, price=price) + try: + from db.customer_db import db + + db.update_last_price(state.customer_id, price) + except Exception: + pass + + +def detect_discount(message: str, state: Any) -> None: + text = message or "" + if any(kw in text for kw in ["贵", "便宜", "太贵", "有点贵"]): + state.discount_count += 1 + if state.last_price: + try: + from db.customer_db import db + + db.record_discount(state.customer_id, state.last_price) + except Exception: + pass + m = re.search(r"(\d+)\s*元|\b(\d+)\s*块", text) + offer = None + if m: + offer = int(m.group(1) or m.group(2)) + if offer: + try: + from config.config import MIN_PRICE_FLOOR + + if offer < MIN_PRICE_FLOOR: + state.last_price = state.last_price or 0 + except Exception: + pass + + +def negotiation_strategy_reply(customer_text: str, state: Any) -> str: + text = (customer_text or "").strip() + if not text: + return "" + if any(k in text for k in ["先发效果图", "先看效果", "不放心", "没法确认"]): + return ( + f"小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。" + "有什么想要的效果随时告诉我哈,不满意我们这边包退。" + ) + if "有点贵" in text or "就是贵" in text: + base = state.last_price if isinstance(state.last_price, int) and state.last_price > 0 else 25 + two_pack = max(10, round(((base * 2) - 5) / 5) * 5) + return f"理解你这边的预算,我给你个实在点的:两张一起按 {two_pack} 元做,行不行?" + if any(k in text for k in ["优惠点", "便宜点", "少点", "打折"]): + return "可以的,你这边数量上来我就好给价,3张以上我给你打包价。" + return "" + + +async def record_deal_success( + *, + customer_id: str, + customer_name: str, + acc_id: str, + platform: str, + order: dict, + state: Any, +) -> None: + try: + from db.deal_outcome_db import record_deal + + order_id = order.get("order_id", "") + raw_amount = order.get("amount", "") + m = re.search(r"[\d.]+", str(raw_amount)) + amount = float(m.group()) if m else 0 + reason = "让价后成交" if (state.discount_count or 0) > 0 else "直接成交" + record_deal( + customer_id=customer_id, + outcome="成交", + reason=reason, + customer_name=customer_name or "", + acc_id=acc_id or "", + platform=platform or "", + order_id=order_id, + amount=amount, + discount_given=(state.discount_count or 0) > 0, + ) + try: + from db.customer_db import db + + if order_id: + db.add_order(customer_id, order_id, amount) + db.clear_quote_no_convert(customer_id) + except Exception: + pass + print(f"[Agent] 成交记录: {customer_id} {reason} {amount}元") + except Exception as e: + print(f"[Agent] 成交记录失败: {e}") + + +async def record_deal_fail( + *, + customer_id: str, + customer_name: str, + acc_id: str, + platform: str, + reason: str, +) -> None: + try: + from db.deal_outcome_db import record_deal + from db.customer_db import db + + record_deal( + customer_id=customer_id, + outcome="未成交", + reason=reason, + customer_name=customer_name or "", + acc_id=acc_id or "", + platform=platform or "", + ) + db.mark_quote_no_convert(customer_id) + print(f"[Agent] 未成交记录: {customer_id} {reason}") + except Exception as e: + print(f"[Agent] 未成交记录失败: {e}") + + +async def auto_tag(message: Any, state: Any) -> None: + try: + from db.customer_db import db + + cid = message.from_id + msg = (message.msg or "").lower() + if any(kw in msg for kw in ["还有", "多张", "好几张", "一批", "下次还"]): + db.set_bulk_potential(cid, "有") + db.add_upsell_opportunity(cid, "批量打包") + if any(kw in msg for kw in ["psd", "分层", "源文件"]): + db.add_upsell_opportunity(cid, "分层PSD") + db.update_preferred_format(cid, "psd") + if "jpg" in msg or "jpeg" in msg: + db.update_preferred_format(cid, "jpg") + if "png" in msg: + db.update_preferred_format(cid, "png") + if any(kw in msg for kw in ["分辨率", "dpi", "尺寸", "大图", "印刷"]): + db.update_preferred_size(cid, message.msg[:30]) + if any(kw in msg for kw in ["拍了", "下单了", "好的", "行"]) and state.last_price: + db.update_decision_speed(cid, "快") + type_keywords = { + "印花": ["印花", "花纹", "图案", "面料", "布料", "纺织"], + "logo": ["logo", "标志", "品牌", "商标"], + "人物": ["人物", "人像", "照片", "脸", "头像"], + "产品": ["产品", "商品", "包装", "实物"], + "老照片": ["老照片", "旧照片", "发黄", "修复"], + } + for img_type, keywords in type_keywords.items(): + if any(kw in message.msg for kw in keywords): + db.add_image_type(cid, img_type) + break + db.auto_compute_tags(cid) + except Exception: + pass diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index a716e50..207e929 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -1893,21 +1893,6 @@ class CustomerServiceAgent: reply_text=reply_text, ) - def _detect_price(self, reply: str, state: ConversationState): - """从回复中提取价格,同步写入客户数据库(价格必须为5的整数倍)""" - import re - numbers = re.findall(r'(\d+)[元]', reply) - if numbers: - price = round(int(numbers[0]) / 5) * 5 # 强制为5的整数倍 - state.last_price = price - metrics_emit("quote_generated", customer_id=state.customer_id, price=price) - # 持久化到客户数据库,重启后仍可读取 - try: - from db.customer_db import db - db.update_last_price(state.customer_id, price) - except Exception: - pass - async def _check_order_amount(self, customer_id: str, order: dict, acc_id: str): """核查订单实付金额是否与报价一致,异常时企业微信预警""" try: @@ -1942,190 +1927,6 @@ class CustomerServiceAgent: except Exception as e: print(f"[Agent] 订单金额核查失败: {e}") - async def _record_deal_success( - self, - customer_id: str, - customer_name: str, - acc_id: str, - platform: str, - order: dict, - state: "ConversationState", - ): - """成交时写入数据库,供日报与数据分析""" - try: - import re - from db.deal_outcome_db import record_deal - order_id = order.get("order_id", "") - raw_amount = order.get("amount", "") - m = re.search(r"[\d.]+", str(raw_amount)) - amount = float(m.group()) if m else 0 - reason = "让价后成交" if (state.discount_count or 0) > 0 else "直接成交" - record_deal( - customer_id=customer_id, - outcome="成交", - reason=reason, - customer_name=customer_name or "", - acc_id=acc_id or "", - platform=platform or "", - order_id=order_id, - amount=amount, - discount_given=(state.discount_count or 0) > 0, - ) - # 同步到客户库 - try: - from db.customer_db import db - if order_id: - db.add_order(customer_id, order_id, amount) - db.clear_quote_no_convert(customer_id) - except Exception: - pass - print(f"[Agent] 成交记录: {customer_id} {reason} {amount}元") - except Exception as e: - print(f"[Agent] 成交记录失败: {e}") - - async def _record_deal_fail( - self, - customer_id: str, - customer_name: str, - acc_id: str, - platform: str, - reason: str, - ): - """未成交时写入数据库,供日报与数据分析;标记报价未成交,下次可适当降低""" - try: - from db.deal_outcome_db import record_deal - from db.customer_db import db - record_deal( - customer_id=customer_id, - outcome="未成交", - reason=reason, - customer_name=customer_name or "", - acc_id=acc_id or "", - platform=platform or "", - ) - db.mark_quote_no_convert(customer_id) - print(f"[Agent] 未成交记录: {customer_id} {reason}") - except Exception as e: - print(f"[Agent] 未成交记录失败: {e}") - - async def _auto_tag(self, message: CustomerMessage, reply: str, state: ConversationState): - """自动识别并写入各类标签""" - try: - from db.customer_db import db - cid = message.from_id - msg = message.msg.lower() - - # 批量潜力 - if any(kw in msg for kw in ["还有", "多张", "好几张", "一批", "下次还"]): - db.set_bulk_potential(cid, "有") - db.add_upsell_opportunity(cid, "批量打包") - - # 加购机会:问过PSD/分层 - if any(kw in msg for kw in ["psd", "分层", "源文件"]): - db.add_upsell_opportunity(cid, "分层PSD") - db.update_preferred_format(cid, "psd") - - # 格式偏好 - if "jpg" in msg or "jpeg" in msg: - db.update_preferred_format(cid, "jpg") - if "png" in msg: - db.update_preferred_format(cid, "png") - - # 尺寸/分辨率偏好 - if any(kw in msg for kw in ["分辨率", "dpi", "尺寸", "大图", "印刷"]): - db.update_preferred_size(cid, message.msg[:30]) - - # 决策速度:收到报价后立刻说拍了 → 快 - if any(kw in msg for kw in ["拍了", "下单了", "好的", "行"]) and state.last_price: - db.update_decision_speed(cid, "快") - - # 图片类型识别(简单关键词匹配) - type_keywords = { - "印花": ["印花", "花纹", "图案", "面料", "布料", "纺织"], - "logo": ["logo", "标志", "品牌", "商标"], - "人物": ["人物", "人像", "照片", "脸", "头像"], - "产品": ["产品", "商品", "包装", "实物"], - "老照片": ["老照片", "旧照片", "发黄", "修复"], - } - for img_type, keywords in type_keywords.items(): - if any(kw in message.msg for kw in keywords): - db.add_image_type(cid, img_type) - break - - # 定期自动计算衍生标签 - db.auto_compute_tags(cid) - - except Exception: - pass - - def _detect_discount(self, message: str, state: ConversationState): - """检测压价,并持久化让价记录""" - if any(kw in message for kw in ["贵", "便宜", "太贵", "有点贵"]): - state.discount_count += 1 - if state.last_price: - try: - from db.customer_db import db - db.record_discount(state.customer_id, state.last_price) - except Exception: - pass - # 客户明确给价(如“10元/10块/能10吗”) - import re - m = re.search(r'(\d+)\s*元|\b(\d+)\s*块', message) - offer = None - if m: - offer = int(m.group(1) or m.group(2)) - if offer: - try: - from config.config import MIN_PRICE_FLOOR - if offer < MIN_PRICE_FLOOR: - # 标记本次为超低出价(用于后续拒绝话术提示) - state.last_price = state.last_price or 0 - except Exception: - pass - - def _negotiation_strategy_reply(self, customer_text: str, state: ConversationState) -> str: - """ - 价格谈判固定策略(优先级高于通用 AI 自由回复): - - 有点贵:给两张打包建议价 - - 优惠点:引导3张以上打包价 - - 先发效果图:给固定案例链接并说明不满意包退 - """ - text = (customer_text or "").strip() - if not text: - return "" - - if any(k in text for k in ["先发效果图", "先看效果", "不放心", "没法确认"]): - return random.choice([ - f"小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。有什么想要的效果随时告诉我哈,不满意我们这边包退。", - f"先给你看案例哈,链接在这({CASE_LIBRARY_LINK})。你想要什么效果直接说,我们按你要求做,不满意可退。", - f"我把类似案例给你准备好了,点这个看就行({CASE_LIBRARY_LINK})。你放心说需求,效果不满意我们包退。", - f"怕效果不稳的话你先看案例,链接在这里({CASE_LIBRARY_LINK})。你确认风格后我按你要的做,不满意可以退。", - f"可以先看下我们做过的案例({CASE_LIBRARY_LINK})。你觉得方向OK再拍,不满意我们这边支持退。", - ]) - - if "有点贵" in text or "就是贵" in text: - # 约定示例:两张优惠价,默认45;若已有单张价格则动态估算两张打包价 - base = state.last_price if isinstance(state.last_price, int) and state.last_price > 0 else 25 - two_pack = max(10, round(((base * 2) - 5) / 5) * 5) - return random.choice([ - f"理解你这边的预算,我给你个实在点的:两张一起按 {two_pack} 元做,行不行?", - f"我懂你意思,这样吧,两张一起我给你算 {two_pack} 元。", - f"那我给你压一口价,两张打包 {two_pack} 元,你看可以我就开做。", - f"没问题,我给你优惠点,两张一起按 {two_pack} 元走。", - f"你这边要省点的话,两张一起我给你做到 {two_pack} 元。", - ]) - - if any(k in text for k in ["优惠点", "便宜点", "少点", "打折"]): - return random.choice([ - "可以的,你这边数量上来我就好给价,3张以上我给你打包价。", - "能优惠,做得多会更划算,3张以上我这边可以给你打包算。", - "没问题,3张起我可以给你一口打包价,会比单张省一些。", - "可以便宜点,按量走更好谈,3张以上我给你打包价。", - "你这边如果是多张做,3张以上我能给你更划算的打包价。", - ]) - - return "" - def _parse_order_info(self, msg: str) -> dict: """从系统订单消息中提取所有字段""" import re diff --git a/core/reply_finalize_flow.py b/core/reply_finalize_flow.py index 583f8bb..bca5a00 100644 --- a/core/reply_finalize_flow.py +++ b/core/reply_finalize_flow.py @@ -5,6 +5,7 @@ from datetime import datetime from typing import TYPE_CHECKING from utils.metrics_tracker import emit as metrics_emit +from core.post_ops import auto_tag, detect_discount, detect_price, record_deal_fail if TYPE_CHECKING: from core.pydantic_ai_agent import AgentResponse, ConversationState, CustomerMessage, CustomerServiceAgent @@ -36,9 +37,9 @@ async def finalize_ai_reply( except Exception: pass - agent._detect_price(reply_text, state) - agent._detect_discount(message.msg, state) - asyncio.create_task(agent._auto_tag(message, reply_text, state)) + detect_price(reply_text, state) + detect_discount(message.msg, state) + asyncio.create_task(auto_tag(message, state)) need_transfer = False transfer_msg = "" @@ -64,7 +65,13 @@ async def finalize_ai_reply( if any(kw in customer_text for kw in no_convert_keywords): reason = "嫌贵放弃" if any(k in customer_text for k in ["贵", "贵了", "便宜"]) else "放弃" asyncio.create_task( - agent._record_deal_fail(message.from_id, message.from_name, message.acc_id, message.acc_type, reason) + record_deal_fail( + customer_id=message.from_id, + customer_name=message.from_name, + acc_id=message.acc_id, + platform=message.acc_type, + reason=reason, + ) ) should_reply = bool(reply_text and reply_text.strip()) and not need_transfer