diff --git a/core/context_helpers.py b/core/context_helpers.py new file mode 100644 index 0000000..596c5a3 --- /dev/null +++ b/core/context_helpers.py @@ -0,0 +1,225 @@ +from __future__ import annotations + +import os +from collections import Counter +from datetime import datetime + + +def calc_avg_complexity(complexity_history: list) -> str: + """计算平均复杂度。""" + if not complexity_history: + return "未知" + level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4} + label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"} + try: + avg = sum(level_map.get(c, 2) for c in complexity_history) / len(complexity_history) + return label_map.get(round(avg), "一般") + except Exception: + return "一般" + + +def get_customer_profile_context(agent, customer_id: str) -> str: + """从数据库读取客户画像,注入给 AI。含个性化语气、报价策略、主动预测、近期对话。""" + try: + from db.customer_db import db + + profile = db.get_customer(customer_id) + + if profile.blacklist: + return f"【⚠️黑名单客户】原因:{profile.blacklist_reason or '已标记'},请转接人工处理,不要自动回复" + + lines = [] + lines.append("=== 客户档案 ===") + + basic_info = [] + basic_info.append(f"客户ID: {customer_id}") + basic_info.append(f"姓名: {profile.name or '未知'}") + if profile.email: + basic_info.append(f"邮箱: {profile.email}") + if profile.phone: + basic_info.append(f"电话: {profile.phone}") + if profile.wechat: + basic_info.append(f"微信: {profile.wechat}") + lines.append(" | ".join(basic_info)) + + consume_info = [] + consume_info.append(f"客户等级: {profile.customer_level}级") + if profile.vip: + consume_info.append("VIP客户") + consume_info.append(f"总订单: {profile.total_orders}单") + consume_info.append(f"总消费: {profile.total_spent}元") + if profile.total_orders > 0: + consume_info.append(f"客单价: {profile.total_spent // profile.total_orders}元") + lines.append("--- 消费分析 ---") + lines.append(" | ".join(consume_info)) + + price_info = [] + if profile.vip_custom_price: + price_info.append(f"VIP专属价: {profile.vip_custom_price}元(直接报这个价)") + if profile.last_price: + price_info.append(f"上次报价: {profile.last_price}元") + if profile.lowest_price_accepted: + price_info.append(f"历史最低成交: {profile.lowest_price_accepted}元") + if profile.discount_given_count: + price_info.append(f"历史让价: {profile.discount_given_count}次") + if profile.price_sensitivity: + price_info.append(f"价格敏感度: {profile.price_sensitivity}") + if getattr(profile, "last_quote_no_convert", False): + price_info.append("【策略】上次报价未成交,本次可降5-10元") + if price_info: + lines.append("--- 报价历史 ---") + lines.append(" | ".join(price_info)) + + personality_info = [] + if profile.personality: + personality_info.append(f"性格: {'/'.join(profile.personality)}") + if profile.decision_speed: + personality_info.append(f"决策速度: {profile.decision_speed}") + if profile.communication_prefer: + personality_info.append(f"沟通偏好: {profile.communication_prefer}") + if personality_info: + lines.append("--- 性格特征 ---") + lines.append(" | ".join(personality_info)) + + image_info = [] + image_info.append(f"累计发图: {profile.total_images_sent}张") + if profile.complexity_history: + image_info.append(f"平均复杂度: {calc_avg_complexity(profile.complexity_history)}") + if profile.image_type_history: + top_types = Counter(profile.image_type_history).most_common(3) + types_str = "、".join(f"{t}({c}次)" for t, c in top_types) + image_info.append(f"常见类型: {types_str}") + if profile.preferred_format: + image_info.append(f"格式偏好: {profile.preferred_format}") + if profile.preferred_size: + image_info.append(f"尺寸要求: {profile.preferred_size}") + if profile.last_image_url: + image_info.append(f"最近发图: {profile.last_image_url[:60]}...") + lines.append("--- 图片习惯 ---") + lines.append(" | ".join(image_info)) + + if profile.processing_status: + task_info = [] + task_info.append(f"状态: {profile.processing_status}") + if profile.processing_image_url: + task_info.append(f"处理中: {profile.processing_image_url[:40]}...") + if profile.expected_done_at: + task_info.append(f"预计完成: {profile.expected_done_at}") + lines.append("--- 当前任务 ---") + lines.append(" | ".join(task_info)) + + if profile.last_conversation_summary: + time_str = "" + if profile.last_conversation_time: + try: + t = datetime.fromisoformat(profile.last_conversation_time) + diff = datetime.now() - t + if diff.days > 0: + time_str = f"({diff.days}天前)" + else: + h = diff.seconds // 3600 + time_str = f"({h}小时前)" if h > 0 else "(刚刚)" + except Exception: + pass + lines.append(f"--- 上次对话 {time_str} ---") + lines.append(profile.last_conversation_summary) + + hints = [] + if profile.personality: + if "爽快" in profile.personality: + hints.append("回复简洁直接,不废话,快速报价") + if "砍价" in profile.personality or "砍价狂" in profile.personality: + hints.append("报价时强调性价比,只让价一次,第二次引导去 xinhui.cloud") + if "纠结" in profile.personality or "墨迹" in profile.personality: + hints.append("多给一点说明,耐心回答") + if profile.price_sensitivity == "高": + hints.append("报价时顺带提「满意再拍」降低顾虑") + if profile.decision_speed == "快": + hints.append("直接报价推成交,少铺垫") + if profile.total_orders > 0 and profile.decision_speed == "快": + hints.append("老客爽快,直接报价成交") + if hints: + lines.append("--- 回复策略 ---") + lines.append(";".join(hints)) + + proactive = [] + if profile.bulk_potential == "有" or (profile.total_images_sent or 0) >= 2: + proactive.append("可问「要做多张吗,多张有优惠」") + if profile.upsell_opportunity: + proactive.append(f"加购机会: {'、'.join(profile.upsell_opportunity)}") + if proactive: + lines.append("--- 主动推荐 ---") + lines.append(";".join(proactive)) + + return "\n".join(lines) + except Exception as e: + print(f"[Agent] 获取客户画像失败: {e}") + return "" + + +def get_refusal_context_hint(agent, customer_id: str, current_msg: str, profile_context: str) -> str: + """ + 检测「刚拒绝某张图 + 客户问能找到吗」场景,注入显式提示,避免前后矛盾。 + """ + ask_keywords = ["能找到吗", "可以吗", "有吗", "能做吗", "可以找吗", "可以弄吗"] + if not any(kw in current_msg for kw in ask_keywords): + return "" + refusal_keywords = ["不做", "不接", "拒绝", "不做这类", "这类不做"] + if any(kw in profile_context for kw in refusal_keywords): + return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。" + history = getattr(agent, "message_histories", {}).get(customer_id, []) + for msg in reversed(history[-6:]): + msg_str = str(msg) + if any(kw in msg_str for kw in refusal_keywords): + return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。" + return "" + + +def get_conversation_context(customer_id: str, acc_id: str = "", limit: int = 12, max_len: int = 80) -> str: + """每一次对话都从数据库加载近期对话,压缩后注入 prompt。""" + try: + try: + from config.config import CHAT_CONTEXT_LIMIT, CHAT_CONTEXT_TRUNCATE_LEN + + limit = CHAT_CONTEXT_LIMIT + max_len = CHAT_CONTEXT_TRUNCATE_LEN + except Exception: + pass + from db.chat_log_db import get_recent_conversation + + msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=limit) + if not msgs: + return "" + lines = [] + for m in msgs: + role = "客" if m.get("direction") == "in" else "服" + msg_text = (m.get("message") or "").strip().replace("\n", " ")[:max_len] + if not msg_text: + continue + lines.append(f"{role}:{msg_text}") + if not lines: + return "" + return "【近期】\n" + "\n".join(lines) + "\n\n" + except Exception: + return "" + + +def get_intent_emotion_hint(msg: str) -> str: + """语义匹配:意图/情绪识别,注入提示。EMBEDDING_MODEL 未配置时用关键词。""" + try: + from utils.intent_analyzer import detect_emotion_embedding, detect_intent_embedding, detect_intent_keywords + + intent = detect_intent_embedding(msg) + if not intent: + intent = detect_intent_keywords(msg) + emotion = detect_emotion_embedding(msg) if os.getenv("EMBEDDING_MODEL") else None + parts = [] + if intent: + parts.append(f"意图:{intent}") + if emotion: + parts.append(f"情绪:{emotion}") + if parts: + return f"【当前消息】{', '.join(parts)}" + except Exception: + pass + return "" diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 9592459..4e430b1 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -59,6 +59,13 @@ from core.agent_prompts import ( build_system_prompt, ) from core.risk_text_helpers import is_map_inquiry, is_political_inquiry +from core.context_helpers import ( + calc_avg_complexity, + get_conversation_context, + get_customer_profile_context, + get_intent_emotion_hint, + get_refusal_context_hint, +) load_dotenv() @@ -1380,233 +1387,15 @@ class CustomerServiceAgent: _is_political_inquiry = staticmethod(is_political_inquiry) _is_map_inquiry = staticmethod(is_map_inquiry) + _calc_avg_complexity = staticmethod(calc_avg_complexity) + _get_conversation_context = staticmethod(get_conversation_context) + _get_intent_emotion_hint = staticmethod(get_intent_emotion_hint) + def _get_customer_profile_context(self, customer_id: str) -> str: - """从数据库读取客户画像,注入给 AI。含个性化语气、报价策略、主动预测、近期对话。""" - try: - from db.customer_db import db - profile = db.get_customer(customer_id) - - if profile.blacklist: - return f"【⚠️黑名单客户】原因:{profile.blacklist_reason or '已标记'},请转接人工处理,不要自动回复" - - lines = [] - lines.append("=== 客户档案 ===") - - # 基础信息 - basic_info = [] - basic_info.append(f"客户ID: {customer_id}") - basic_info.append(f"姓名: {profile.name or '未知'}") - if profile.email: - basic_info.append(f"邮箱: {profile.email}") - if profile.phone: - basic_info.append(f"电话: {profile.phone}") - if profile.wechat: - basic_info.append(f"微信: {profile.wechat}") - lines.append(" | ".join(basic_info)) - - # 消费分析 - consume_info = [] - consume_info.append(f"客户等级: {profile.customer_level}级") - if profile.vip: - consume_info.append("VIP客户") - consume_info.append(f"总订单: {profile.total_orders}单") - consume_info.append(f"总消费: {profile.total_spent}元") - if profile.total_orders > 0: - consume_info.append(f"客单价: {profile.total_spent // profile.total_orders}元") - lines.append("--- 消费分析 ---") - lines.append(" | ".join(consume_info)) - - # 报价历史 - price_info = [] - if profile.vip_custom_price: - price_info.append(f"VIP专属价: {profile.vip_custom_price}元(直接报这个价)") - if profile.last_price: - price_info.append(f"上次报价: {profile.last_price}元") - if profile.lowest_price_accepted: - price_info.append(f"历史最低成交: {profile.lowest_price_accepted}元") - if profile.discount_given_count: - price_info.append(f"历史让价: {profile.discount_given_count}次") - if profile.price_sensitivity: - price_info.append(f"价格敏感度: {profile.price_sensitivity}") - if getattr(profile, "last_quote_no_convert", False): - price_info.append("【策略】上次报价未成交,本次可降5-10元") - if price_info: - lines.append("--- 报价历史 ---") - lines.append(" | ".join(price_info)) - - # 性格与决策 - personality_info = [] - if profile.personality: - personality_info.append(f"性格: {'/'.join(profile.personality)}") - if profile.decision_speed: - personality_info.append(f"决策速度: {profile.decision_speed}") - if profile.communication_prefer: - personality_info.append(f"沟通偏好: {profile.communication_prefer}") - if personality_info: - lines.append("--- 性格特征 ---") - lines.append(" | ".join(personality_info)) - - # 图片习惯 - image_info = [] - image_info.append(f"累计发图: {profile.total_images_sent}张") - if profile.complexity_history: - avg_complexity = self._calc_avg_complexity(profile.complexity_history) - image_info.append(f"平均复杂度: {avg_complexity}") - if profile.image_type_history: - from collections import Counter - top_types = Counter(profile.image_type_history).most_common(3) - types_str = "、".join(f"{t}({c}次)" for t, c in top_types) - image_info.append(f"常见类型: {types_str}") - if profile.preferred_format: - image_info.append(f"格式偏好: {profile.preferred_format}") - if profile.preferred_size: - image_info.append(f"尺寸要求: {profile.preferred_size}") - if profile.last_image_url: - image_info.append(f"最近发图: {profile.last_image_url[:60]}...") - lines.append("--- 图片习惯 ---") - lines.append(" | ".join(image_info)) - - # 当前任务状态 - if profile.processing_status: - task_info = [] - task_info.append(f"状态: {profile.processing_status}") - if profile.processing_image_url: - task_info.append(f"处理中: {profile.processing_image_url[:40]}...") - if profile.expected_done_at: - task_info.append(f"预计完成: {profile.expected_done_at}") - lines.append("--- 当前任务 ---") - lines.append(" | ".join(task_info)) - - # 上次对话摘要 - if profile.last_conversation_summary: - time_str = "" - if profile.last_conversation_time: - try: - t = datetime.fromisoformat(profile.last_conversation_time) - diff = datetime.now() - t - if diff.days > 0: - time_str = f"({diff.days}天前)" - else: - h = diff.seconds // 3600 - time_str = f"({h}小时前)" if h > 0 else "(刚刚)" - except Exception: - pass - lines.append(f"--- 上次对话 {time_str} ---") - lines.append(profile.last_conversation_summary) - - # 个性化回复策略 - hints = [] - if profile.personality: - if "爽快" in profile.personality: - hints.append("回复简洁直接,不废话,快速报价") - if "砍价" in profile.personality or "砍价狂" in profile.personality: - hints.append("报价时强调性价比,只让价一次,第二次引导去 xinhui.cloud") - if "纠结" in profile.personality or "墨迹" in profile.personality: - hints.append("多给一点说明,耐心回答") - if profile.price_sensitivity == "高": - hints.append("报价时顺带提「满意再拍」降低顾虑") - if profile.decision_speed == "快": - hints.append("直接报价推成交,少铺垫") - if profile.total_orders > 0 and profile.decision_speed == "快": - hints.append("老客爽快,直接报价成交") - if hints: - lines.append("--- 回复策略 ---") - lines.append(";".join(hints)) - - # 主动推荐 - proactive = [] - if profile.bulk_potential == "有" or (profile.total_images_sent or 0) >= 2: - proactive.append("可问「要做多张吗,多张有优惠」") - if profile.upsell_opportunity: - proactive.append(f"加购机会: {'、'.join(profile.upsell_opportunity)}") - if proactive: - lines.append("--- 主动推荐 ---") - lines.append(";".join(proactive)) - - return "\n".join(lines) - except Exception as e: - print(f"[Agent] 获取客户画像失败: {e}") - return "" - - def _calc_avg_complexity(self, complexity_history: list) -> str: - """计算平均复杂度""" - if not complexity_history: - return "未知" - level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4} - label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"} - try: - avg = sum(level_map.get(c, 2) for c in complexity_history) / len(complexity_history) - return label_map.get(round(avg), "一般") - except Exception: - return "一般" + return get_customer_profile_context(self, customer_id) def _get_refusal_context_hint(self, customer_id: str, current_msg: str, profile_context: str) -> str: - """ - 检测「刚拒绝某张图 + 客户问能找到吗」场景,注入显式提示,避免前后矛盾。 - 原因:last_conversation_summary 异步更新可能滞后,message_histories 模型可能忽略。 - """ - ask_keywords = ["能找到吗", "可以吗", "有吗", "能做吗", "可以找吗", "可以弄吗"] - if not any(kw in current_msg for kw in ask_keywords): - return "" - refusal_keywords = ["不做", "不接", "拒绝", "不做这类", "这类不做"] - # 检查 profile 摘要(可能因异步更新而滞后) - if any(kw in profile_context for kw in refusal_keywords): - return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。" - # 检查内存历史中最近几条消息(ModelResponse 含客服回复) - history = self.message_histories.get(customer_id, []) - for msg in reversed(history[-6:]): - msg_str = str(msg) - if any(kw in msg_str for kw in refusal_keywords): - return "【重要】上一句客服刚拒绝了某张图,客户问能找到吗时须明确:能做的是哪张(如第一张),不能做的是哪张。不可只说「放心拍」「可以」,会前后矛盾。" - return "" - - def _get_conversation_context(self, customer_id: str, acc_id: str = "", limit: int = 12, max_len: int = 80) -> str: - """ - 每一次对话都从数据库加载近期对话,压缩后注入 prompt。 - 确保模型看到上下文,同时控制 token 消耗。 - """ - try: - try: - from config.config import CHAT_CONTEXT_LIMIT, CHAT_CONTEXT_TRUNCATE_LEN - limit = CHAT_CONTEXT_LIMIT - max_len = CHAT_CONTEXT_TRUNCATE_LEN - except Exception: - pass - from db.chat_log_db import get_recent_conversation - msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=limit) - if not msgs: - return "" - lines = [] - for m in msgs: - role = "客" if m.get("direction") == "in" else "服" - msg_text = (m.get("message") or "").strip().replace("\n", " ")[:max_len] - if not msg_text: - continue - lines.append(f"{role}:{msg_text}") - if not lines: - return "" - return "【近期】\n" + "\n".join(lines) + "\n\n" - except Exception: - return "" - - def _get_intent_emotion_hint(self, msg: str) -> str: - """语义匹配:意图/情绪识别,注入提示。EMBEDDING_MODEL 未配置时用关键词。""" - try: - from utils.intent_analyzer import detect_intent_embedding, detect_intent_keywords, detect_emotion_embedding - intent = detect_intent_embedding(msg) - if not intent: - intent = detect_intent_keywords(msg) - emotion = detect_emotion_embedding(msg) if os.getenv("EMBEDDING_MODEL") else None - parts = [] - if intent: - parts.append(f"意图:{intent}") - if emotion: - parts.append(f"情绪:{emotion}") - if parts: - return f"【当前消息】{', '.join(parts)}" - except Exception: - pass - return "" + return get_refusal_context_hint(self, customer_id, current_msg, profile_context) # 简单打招呼类消息(在近期已回复后无需再回) _COOLDOWN_PATTERNS = [