"""客户画像数据库""" import os import json from datetime import datetime from typing import Optional, Dict, Any, List from dataclasses import dataclass, asdict from collections import defaultdict _DB_TYPE = os.getenv("DB_TYPE", "sqlite").lower() _MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1") _MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306")) _MYSQL_USER = os.getenv("MYSQL_USER", "root") _MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "") _MYSQL_DATABASE = os.getenv("MYSQL_DATABASE", "ai_cs") # 复用 chat_log_db 的连接池 from db.chat_log_db import _get_pooled_conn, _return_conn def _is_mysql() -> bool: return _DB_TYPE in ("mysql", "mariadb") @dataclass class CustomerProfile: """客户画像""" # 基础信息 customer_id: str name: str = "" nickname: str = "" email: str = "" phone: str = "" wechat: str = "" # 微信号 address: str = "" # 收货地址 # 账号信息 platform: str = "" # 平台(淘宝/天猫/京东等) platform_id: str = "" # 平台账号ID # 需求相关 budget: str = "" # 预算(如:50-100元) budget_range_min: float = 0 # 预算下限 budget_range_max: float = 0 # 预算上限 requirements: List[str] = None # 历史需求列表 preference_services: List[str] = None # 偏好服务类型(修图/找图/设计等) # 消费分析 total_orders: int = 0 total_spent: float = 0.0 avg_order_value: float = 0.0 # 平均客单价 purchase_frequency: str = "" # 购买频次(高/中/低) last_order_date: str = "" # 最后下单时间 first_order_date: str = "" # 首次下单时间 # 订单相关 order_ids: List[str] = None pending_orders: int = 0 # 待处理订单 completed_orders: int = 0 # 已完成订单 refund_count: int = 0 # 退款次数 # 性格/行为 personality: List[str] = None # 性格标签(爽快/纠结/砍价狂/爽快/墨迹) communication_prefer: str = "" # 沟通偏好(文字/语音/图片) response_speed: str = "" # 响应速度偏好(快/慢) patience_level: str = "" # 耐心程度(高/中/低) # 客户价值 customer_level: str = "C" # 客户等级(A/B/C/D) vip: bool = False vip_level: int = 0 # VIP等级 # 报价记录 last_price: int = 0 # 上次报价 last_price_time: str = "" # 上次报价时间 last_quote_no_convert: bool = False # 上次报价后未成交,下次可适当降低 last_min_price: int = 0 # 最近图片分析的最低价 last_image_url: str = "" # 最近一次发来的图片URL last_image_time: str = "" # 图片发送时间 last_gemini_prompt: str = "" # 最近一次图片的 Gemini 处理提示词 last_aspect_ratio: str = "1:1" # 最近一次图片的建议输出比例 last_perspective: str = "no" # 最近一次图片的透视状态 last_image_analysis: str = "" # 最近一次图片分析结果(JSON字符串,用于数据标定) image_analysis_history: List[str] = None # 图片分析历史记录(JSON列表,用于数据标定) pending_quote_images: List[str] = None # 待统一报价图片队列(持久化) pending_quote_requirements: List[str] = None # 待统一报价需求队列(持久化) # 当前任务状态 processing_status: str = "" # 待处理/处理中/等待确认/已完成 processing_image_url: str = "" # 当前正在处理的图片URL expected_done_at: str = "" # 预计完成时间 # 让价记录 discount_given_count: int = 0 # 历史累计让价次数 lowest_price_accepted: int = 0 # 客户接受过的最低价 # 格式偏好 preferred_format: str = "" # jpg / psd / png preferred_size: str = "" # 有没有问过分辨率/尺寸要求 # 对话摘要 last_conversation_summary: str = "" # 上次对话摘要(一句话) last_conversation_time: str = "" # 上次对话时间 # 来图习惯 total_images_sent: int = 0 # 历史发图总数 complexity_history: List[str] = None # 历史复杂度列表,如 ["hard","complex","normal"] image_type_history: List[str] = None # 图片类型历史,如 ["印花","logo","人物"] # AI 决策辅助 price_sensitivity: str = "" # 价格敏感度:高/中/低(自动计算) decision_speed: str = "" # 决策速度:快/慢(自动标记) revision_count: int = 0 # 历史总改稿次数 revision_orders: int = 0 # 有改稿的订单数 total_completed_orders: int = 0 # 已完成出图的订单数 # 业务分析 bulk_potential: str = "" # 批量潜力:有/无/未知 churn_risk: str = "" # 流失风险:高/中/低(自动计算) upsell_opportunity: List[str] = None # 加购机会,如 ["分层PSD","批量打包"] # 运营 blacklist: bool = False # 黑名单 blacklist_reason: str = "" # 拉黑原因 vip_custom_price: int = 0 # VIP专属报价(0=无) last_email_status: str = "" # 最后一次邮件发送状态:sent/failed # 评价 good_reviews: int = 0 # 好评数 bad_reviews: int = 0 # 差评数 dispute_count: int = 0 # 纠纷次数 # 跟进信息 follow_up_by: str = "" # 跟进销售 follow_up_date: str = "" next_follow_date: str = "" # 下次跟进日期 # 来源 source: str = "" # 来源渠道(自然流量/推广/老客户转介绍) coupon_used: str = "" # 使用过的优惠券 # 备注 notes: List[str] = None # 备注列表 tags: List[str] = None # 自定义标签 # 时间 created_at: str = "" last_contact: str = "" last_update: str = "" def __post_init__(self): if self.requirements is None: self.requirements = [] if self.preference_services is None: self.preference_services = [] if self.order_ids is None: self.order_ids = [] if self.personality is None: self.personality = [] if self.notes is None: self.notes = [] if self.tags is None: self.tags = [] if self.complexity_history is None: self.complexity_history = [] if self.image_type_history is None: self.image_type_history = [] if self.upsell_opportunity is None: self.upsell_opportunity = [] if self.pending_quote_images is None: self.pending_quote_images = [] if self.pending_quote_requirements is None: self.pending_quote_requirements = [] if self.image_analysis_history is None: self.image_analysis_history = [] class _PooledMySQLConn: """包装 pymysql 连接,支持连接池归还""" def __init__(self, conn): self._conn = conn def __enter__(self): return self._conn def __exit__(self, exc_type, exc, tb): if exc_type: try: self._conn.rollback() except Exception: pass _return_conn(self._conn) class CustomerDatabase: """客户数据库""" def __init__(self, db_path: str = "customer_db"): self.db_path = db_path self.customers_file = os.path.join(db_path, "customers.json") self._ensure_db() def _get_mysql_conn(self): """从连接池获取 MySQL 连接""" return _PooledMySQLConn(_get_pooled_conn(timeout=5.0)) def _ensure_db(self): if _is_mysql(): with self._get_mysql_conn() as conn: with conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS customer_profiles ( customer_id VARCHAR(128) PRIMARY KEY, profile_json LONGTEXT NOT NULL, last_update DATETIME NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) cur.execute("SHOW INDEX FROM customer_profiles") exists = {str(r.get("Key_name", "")) for r in cur.fetchall()} if "idx_last_update" not in exists: cur.execute("CREATE INDEX idx_last_update ON customer_profiles(last_update)") conn.commit() return if not os.path.exists(self.db_path): os.makedirs(self.db_path) if not os.path.exists(self.customers_file): self._save_customers({}) def _load_customers(self) -> Dict[str, dict]: if _is_mysql(): out: Dict[str, dict] = {} try: with self._get_mysql_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT customer_id, profile_json FROM customer_profiles") rows = cur.fetchall() for r in rows: cid = str(r.get("customer_id") or "") if not cid: continue try: out[cid] = json.loads(r.get("profile_json") or "{}") except Exception: out[cid] = {} except Exception: return {} return out try: with open(self.customers_file, 'r', encoding='utf-8') as f: return json.load(f) except: return {} def _save_customers(self, customers: Dict[str, dict]): if _is_mysql(): now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with self._get_mysql_conn() as conn: with conn.cursor() as cur: for cid, data in (customers or {}).items(): cur.execute( """ REPLACE INTO customer_profiles (customer_id, profile_json, last_update) VALUES (%s, %s, %s) """, (cid, json.dumps(data, ensure_ascii=False), now), ) conn.commit() return with open(self.customers_file, 'w', encoding='utf-8') as f: json.dump(customers, f, ensure_ascii=False, indent=2) def get_customer(self, customer_id: str) -> CustomerProfile: if _is_mysql(): try: with self._get_mysql_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT profile_json FROM customer_profiles WHERE customer_id=%s LIMIT 1", (customer_id,), ) row = cur.fetchone() if row and row.get("profile_json"): data = json.loads(row.get("profile_json") or "{}") else: data = {} data.pop('customer_id', None) return CustomerProfile(customer_id=customer_id, **data) except Exception: return CustomerProfile(customer_id=customer_id) customers = self._load_customers() data = customers.get(customer_id, {}) # 确保不重复传递 customer_id data.pop('customer_id', None) return CustomerProfile(customer_id=customer_id, **data) def save_customer(self, profile: CustomerProfile, max_retries: int = 3): """保存客户画像(带重试机制)""" import time profile.last_update = datetime.now().isoformat() if _is_mysql(): last_error = None for attempt in range(max_retries): try: with self._get_mysql_conn() as conn: with conn.cursor() as cur: cur.execute( """ REPLACE INTO customer_profiles (customer_id, profile_json, last_update) VALUES (%s, %s, %s) """, ( profile.customer_id, json.dumps(asdict(profile), ensure_ascii=False), datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ), ) conn.commit() return except Exception as e: last_error = e err_str = str(e).lower() is_conn_error = any(k in err_str for k in [ "lost connection", "gone away", "connection reset", "can't connect", "connection refused", "2013", "2006" ]) if is_conn_error and attempt < max_retries - 1: time.sleep(0.5 * (attempt + 1)) continue raise raise last_error customers = self._load_customers() customers[profile.customer_id] = asdict(profile) self._save_customers(customers) # ========== 基础信息 ========== def update_info(self, customer_id: str, **kwargs): """批量更新客户信息""" profile = self.get_customer(customer_id) for key, value in kwargs.items(): if hasattr(profile, key): setattr(profile, key, value) self.save_customer(profile) def update_email(self, customer_id: str, email: str): profile = self.get_customer(customer_id) profile.email = email self.save_customer(profile) def update_phone(self, customer_id: str, phone: str): profile = self.get_customer(customer_id) profile.phone = phone self.save_customer(profile) def update_wechat(self, customer_id: str, wechat: str): profile = self.get_customer(customer_id) profile.wechat = wechat self.save_customer(profile) def update_address(self, customer_id: str, address: str): profile = self.get_customer(customer_id) profile.address = address self.save_customer(profile) # ========== 需求相关 ========== def add_requirement(self, customer_id: str, requirement: str): """添加需求""" profile = self.get_customer(customer_id) if requirement not in profile.requirements: profile.requirements.append(requirement) profile.last_contact = datetime.now().isoformat() self.save_customer(profile) def set_budget(self, customer_id: str, budget: str, min_val: float = 0, max_val: float = 0): """设置预算""" profile = self.get_customer(customer_id) profile.budget = budget profile.budget_range_min = min_val profile.budget_range_max = max_val self.save_customer(profile) def add_preference_service(self, customer_id: str, service: str): """添加偏好服务""" profile = self.get_customer(customer_id) if service not in profile.preference_services: profile.preference_services.append(service) self.save_customer(profile) # ========== 消费分析 ========== def add_order(self, customer_id: str, order_id: str, amount: float = 0): """添加订单""" profile = self.get_customer(customer_id) if order_id not in profile.order_ids: profile.order_ids.append(order_id) profile.total_orders += 1 profile.total_spent += amount # 计算平均客单价 if profile.total_orders > 0: profile.avg_order_value = profile.total_spent / profile.total_orders # 更新客单价等级 if profile.avg_order_value >= 100: profile.customer_level = "A" elif profile.avg_order_value >= 50: profile.customer_level = "B" elif profile.avg_order_value >= 20: profile.customer_level = "C" else: profile.customer_level = "D" profile.last_order_date = datetime.now().isoformat() if not profile.first_order_date: profile.first_order_date = datetime.now().isoformat() self.save_customer(profile) def update_order_status(self, customer_id: str, pending: int = None, completed: int = None): """更新订单状态""" profile = self.get_customer(customer_id) if pending is not None: profile.pending_orders = pending if completed is not None: profile.completed_orders = completed self.save_customer(profile) def add_refund(self, customer_id: str): """增加退款次数""" profile = self.get_customer(customer_id) profile.refund_count += 1 self.save_customer(profile) # ========== 性格分析 ========== def add_personality_tag(self, customer_id: str, tag: str): """添加性格标签""" profile = self.get_customer(customer_id) if tag not in profile.personality: profile.personality.append(tag) self.save_customer(profile) def set_communication_prefer(self, customer_id: str, prefer: str): """设置沟通偏好""" profile = self.get_customer(customer_id) profile.communication_prefer = prefer self.save_customer(profile) def analyze_purchase_frequency(self, customer_id: str): """分析购买频次""" profile = self.get_customer(customer_id) if profile.first_order_date and profile.last_order_date: try: first = datetime.fromisoformat(profile.first_order_date) last = datetime.fromisoformat(profile.last_order_date) days = (last - first).days if days == 0: profile.purchase_frequency = "高" elif days < 30: profile.purchase_frequency = "高" elif days < 90: profile.purchase_frequency = "中" else: profile.purchase_frequency = "低" self.save_customer(profile) except: pass # ========== 评价相关 ========== def add_good_review(self, customer_id: str): profile = self.get_customer(customer_id) profile.good_reviews += 1 self.save_customer(profile) def add_bad_review(self, customer_id: str): profile = self.get_customer(customer_id) profile.bad_reviews += 1 self.save_customer(profile) def add_dispute(self, customer_id: str): profile = self.get_customer(customer_id) profile.dispute_count += 1 self.save_customer(profile) # ========== 标签/备注 ========== def add_tag(self, customer_id: str, tag: str): profile = self.get_customer(customer_id) if tag not in profile.tags: profile.tags.append(tag) self.save_customer(profile) def remove_tag(self, customer_id: str, tag: str): profile = self.get_customer(customer_id) if tag in profile.tags: profile.tags.remove(tag) self.save_customer(profile) def add_note(self, customer_id: str, note: str): profile = self.get_customer(customer_id) note_with_time = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] {note}" profile.notes.append(note_with_time) self.save_customer(profile) # ========== 跟进 ========== def set_follow_up(self, customer_id: str, by: str, next_date: str = ""): profile = self.get_customer(customer_id) profile.follow_up_by = by profile.follow_up_date = datetime.now().isoformat() profile.next_follow_date = next_date self.save_customer(profile) def set_source(self, customer_id: str, source: str): profile = self.get_customer(customer_id) profile.source = source self.save_customer(profile) # ========== 报价记录 ========== def update_last_price(self, customer_id: str, price: int): """更新最近一次报价""" profile = self.get_customer(customer_id) profile.last_price = price profile.last_price_time = datetime.now().isoformat() self.save_customer(profile) def update_last_min_price(self, customer_id: str, min_price: int): """更新最近图片的最低价(用于杀价拦截与策略)""" profile = self.get_customer(customer_id) profile.last_min_price = int(min_price or 0) self.save_customer(profile) def mark_quote_no_convert(self, customer_id: str): """标记:上次报价后未成交,下次可适当降低""" profile = self.get_customer(customer_id) profile.last_quote_no_convert = True self.save_customer(profile) def clear_quote_no_convert(self, customer_id: str): """成交后清除未成交标记""" profile = self.get_customer(customer_id) profile.last_quote_no_convert = False self.save_customer(profile) def update_last_image( self, customer_id: str, image_url: str, complexity: str = "", gemini_prompt: str = "", aspect_ratio: str = "", perspective: str = "", ): """更新客户最近发来的图片URL,并记录复杂度历史及处理参数""" profile = self.get_customer(customer_id) profile.last_image_url = image_url profile.last_image_time = datetime.now().isoformat() profile.total_images_sent += 1 if complexity: profile.complexity_history.append(complexity) profile.complexity_history = profile.complexity_history[-20:] if gemini_prompt: profile.last_gemini_prompt = gemini_prompt if aspect_ratio: profile.last_aspect_ratio = aspect_ratio if perspective: profile.last_perspective = perspective self.save_customer(profile) def update_pending_quote_state( self, customer_id: str, images: List[str], requirements: List[str] ): """持久化收图阶段状态,防止服务重启丢失。""" profile = self.get_customer(customer_id) profile.pending_quote_images = list(images or []) profile.pending_quote_requirements = list(requirements or []) self.save_customer(profile) def clear_pending_quote_state(self, customer_id: str): profile = self.get_customer(customer_id) profile.pending_quote_images = [] profile.pending_quote_requirements = [] self.save_customer(profile) def update_processing_status(self, customer_id: str, status: str, image_url: str = "", expected_done_at: str = ""): """更新当前任务处理状态""" profile = self.get_customer(customer_id) profile.processing_status = status if image_url: profile.processing_image_url = image_url if expected_done_at: profile.expected_done_at = expected_done_at self.save_customer(profile) def record_discount(self, customer_id: str, final_price: int): """记录一次让价,并更新客户接受的最低价""" profile = self.get_customer(customer_id) profile.discount_given_count += 1 if final_price > 0 and (profile.lowest_price_accepted == 0 or final_price < profile.lowest_price_accepted): profile.lowest_price_accepted = final_price self.save_customer(profile) def update_preferred_format(self, customer_id: str, fmt: str): """更新客户格式偏好(jpg/psd/png)""" profile = self.get_customer(customer_id) profile.preferred_format = fmt self.save_customer(profile) def update_preferred_size(self, customer_id: str, size_desc: str): """更新客户尺寸/分辨率偏好""" profile = self.get_customer(customer_id) profile.preferred_size = size_desc self.save_customer(profile) def save_conversation_summary(self, customer_id: str, summary: str): """保存本次对话摘要(一句话)""" profile = self.get_customer(customer_id) profile.last_conversation_summary = summary profile.last_conversation_time = datetime.now().isoformat() self.save_customer(profile) def add_image_type(self, customer_id: str, image_type: str): """记录客户发图的类型(印花/logo/人物/产品等)""" profile = self.get_customer(customer_id) profile.image_type_history.append(image_type) profile.image_type_history = profile.image_type_history[-20:] self.save_customer(profile) def update_decision_speed(self, customer_id: str, speed: str): """更新决策速度:快/慢""" profile = self.get_customer(customer_id) profile.decision_speed = speed self.save_customer(profile) def record_revision(self, customer_id: str): """记录一次改稿""" profile = self.get_customer(customer_id) profile.revision_count += 1 self.save_customer(profile) def complete_order(self, customer_id: str, had_revision: bool = False): """标记一笔订单完成出图""" profile = self.get_customer(customer_id) profile.total_completed_orders += 1 if had_revision: profile.revision_orders += 1 self.save_customer(profile) def set_bulk_potential(self, customer_id: str, potential: str): """设置批量潜力:有/无/未知""" profile = self.get_customer(customer_id) profile.bulk_potential = potential self.save_customer(profile) def add_upsell_opportunity(self, customer_id: str, opportunity: str): """添加加购机会标记,如 '分层PSD' / '批量打包'""" profile = self.get_customer(customer_id) if opportunity not in profile.upsell_opportunity: profile.upsell_opportunity.append(opportunity) self.save_customer(profile) def set_blacklist(self, customer_id: str, reason: str = ""): """将客户加入黑名单""" profile = self.get_customer(customer_id) profile.blacklist = True profile.blacklist_reason = reason self.save_customer(profile) def unset_blacklist(self, customer_id: str): """解除黑名单""" profile = self.get_customer(customer_id) profile.blacklist = False profile.blacklist_reason = "" self.save_customer(profile) def set_vip_custom_price(self, customer_id: str, price: int): """设置 VIP 专属报价(0=取消专属价)""" profile = self.get_customer(customer_id) profile.vip_custom_price = price self.save_customer(profile) def update_email_status(self, customer_id: str, status: str): """更新邮件发送状态:sent/failed""" profile = self.get_customer(customer_id) profile.last_email_status = status self.save_customer(profile) def auto_compute_tags(self, customer_id: str): """自动计算并更新衍生标签(价格敏感度、流失风险、决策速度)""" profile = self.get_customer(customer_id) # 价格敏感度:根据让价次数和订单数 if profile.total_orders > 0: ratio = profile.discount_given_count / max(profile.total_orders, 1) if ratio >= 0.6 or profile.discount_given_count >= 3: profile.price_sensitivity = "高" elif ratio >= 0.2 or profile.discount_given_count >= 1: profile.price_sensitivity = "中" else: profile.price_sensitivity = "低" # 流失风险:根据最后联系时间 if profile.last_contact: try: last = datetime.fromisoformat(profile.last_contact) days = (datetime.now() - last).days if days > 60: profile.churn_risk = "高" elif days > 30: profile.churn_risk = "中" else: profile.churn_risk = "低" except Exception: pass self.save_customer(profile) # ========== VIP ========== def set_vip(self, customer_id: str, vip_level: int = 1): profile = self.get_customer(customer_id) profile.vip = True profile.vip_level = vip_level self.save_customer(profile) def cancel_vip(self, customer_id: str): profile = self.get_customer(customer_id) profile.vip = False profile.vip_level = 0 self.save_customer(profile) # ========== 搜索 ========== def _make_profile(self, cid: str, data: dict) -> CustomerProfile: """从原始数据构建 CustomerProfile,避免 customer_id 重复""" d = dict(data) d.pop('customer_id', None) return CustomerProfile(customer_id=cid, **d) def search_by_requirement(self, keyword: str) -> List[CustomerProfile]: """按需求搜索""" customers = self._load_customers() results = [] for cid, data in customers.items(): requirements = data.get('requirements', []) if any(keyword in req for req in requirements): results.append(self._make_profile(cid, data)) return results def search_by_tag(self, tag: str) -> List[CustomerProfile]: """按标签搜索""" customers = self._load_customers() results = [] for cid, data in customers.items(): tags = data.get('tags', []) if tag in tags: results.append(self._make_profile(cid, data)) return results def search_by_level(self, level: str) -> List[CustomerProfile]: """按客户等级搜索""" customers = self._load_customers() results = [] for cid, data in customers.items(): if data.get('customer_level') == level: results.append(self._make_profile(cid, data)) return results def get_vip_customers(self) -> List[CustomerProfile]: """获取所有VIP客户""" customers = self._load_customers() results = [] for cid, data in customers.items(): if data.get('vip'): results.append(self._make_profile(cid, data)) return results def get_high_value_customers(self) -> List[CustomerProfile]: """获取高价值客户(A级)""" return self.search_by_level("A") # ========== 统计 ========== def get_stats(self) -> dict: """获取统计信息""" customers = self._load_customers() total = len(customers) levels = defaultdict(int) vip_count = 0 total_revenue = 0 total_orders = 0 for cid, data in customers.items(): levels[data.get('customer_level', 'C')] += 1 if data.get('vip'): vip_count += 1 total_revenue += data.get('total_spent', 0) total_orders += data.get('total_orders', 0) return { "total_customers": total, "level_distribution": dict(levels), "vip_count": vip_count, "total_revenue": total_revenue, "total_orders": total_orders, "avg_order_value": total_revenue / total_orders if total_orders > 0 else 0 } # ========== 输出 ========== def get_profile_text(self, customer_id: str) -> str: """获取客户画像文本(用于AI)""" p = self.get_customer(customer_id) # 平均复杂度 avg_complexity = "" if p.complexity_history: level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4} label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"} avg = sum(level_map.get(c, 2) for c in p.complexity_history) / len(p.complexity_history) avg_complexity = label_map.get(round(avg), "一般") # 改稿率 revision_rate = "" if p.total_completed_orders > 0: rate = p.revision_orders / p.total_completed_orders revision_rate = f"{rate:.0%}({p.revision_orders}/{p.total_completed_orders}单有改稿)" # 常见图片类型 top_types = "" if p.image_type_history: from collections import Counter top = Counter(p.image_type_history).most_common(3) top_types = "、".join(f"{t}({c}次)" for t, c in top) blacklist_line = f"\n⚠️ 黑名单:{p.blacklist_reason or '已拉黑'}" if p.blacklist else "" text = f""" === 客户档案 ==={blacklist_line} 客户ID: {p.customer_id} 姓名: {p.name or '未知'} 邮箱: {p.email or '未记录'} 电话: {p.phone or '未记录'} 微信: {p.wechat or '未记录'} --- 消费分析 --- 客户等级: {p.customer_level}级{'(VIP)' if p.vip else ''} 总订单数: {p.total_orders} | 总消费: {p.total_spent}元 购买频次: {p.purchase_frequency or '未分析'} --- 报价与让价 --- 上次报价: {f"{p.last_price}元" if p.last_price else "暂无"}{f' | VIP专属价: {p.vip_custom_price}元' if p.vip_custom_price else ''} 历史让价: {p.discount_given_count}次 | 接受过的最低价: {f"{p.lowest_price_accepted}元" if p.lowest_price_accepted else "暂无"} 价格敏感度: {p.price_sensitivity or '未分析'} | 决策速度: {p.decision_speed or '未知'} --- 图片习惯 --- 累计发图: {p.total_images_sent}张 | 平均复杂度: {avg_complexity or '暂无'} 常见类型: {top_types or '暂无'} 格式偏好: {p.preferred_format or '未知'} | 尺寸要求: {p.preferred_size or '未知'} 上次发图: {p.last_image_url or '暂无'} --- 当前任务 --- 处理状态: {p.processing_status or '无'} | 预计完成: {p.expected_done_at or '未知'} --- 服务质量 --- 改稿率: {revision_rate or '暂无'} 批量潜力: {p.bulk_potential or '未知'} | 流失风险: {p.churn_risk or '未分析'} 加购机会: {', '.join(p.upsell_opportunity) if p.upsell_opportunity else '暂无'} --- 上次对话 --- 摘要: {p.last_conversation_summary or '暂无'} 时间: {p.last_conversation_time or '暂无'} --- 邮件 --- 最后发送状态: {p.last_email_status or '未知'} --- 性格与备注 --- 性格标签: {', '.join(p.personality) if p.personality else '暂无'} 备注: {'; '.join(p.notes[-5:]) if p.notes else '暂无'} """ return text # 全局实例 db = CustomerDatabase()