Files
tw/db/customer_db.py
2026-03-06 12:44:57 +08:00

866 lines
33 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""客户画像数据库"""
import os
import json
from datetime import datetime
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, asdict
from collections import defaultdict
_DB_TYPE = os.getenv("DB_TYPE", "sqlite").lower()
_MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
_MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306"))
_MYSQL_USER = os.getenv("MYSQL_USER", "root")
_MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "")
_MYSQL_DATABASE = os.getenv("MYSQL_DATABASE", "ai_cs")
def _is_mysql() -> bool:
return _DB_TYPE in ("mysql", "mariadb")
@dataclass
class CustomerProfile:
"""客户画像"""
# 基础信息
customer_id: str
name: str = ""
nickname: str = ""
email: str = ""
phone: str = ""
wechat: str = "" # 微信号
address: str = "" # 收货地址
# 账号信息
platform: str = "" # 平台(淘宝/天猫/京东等)
platform_id: str = "" # 平台账号ID
# 需求相关
budget: str = "" # 预算50-100元
budget_range_min: float = 0 # 预算下限
budget_range_max: float = 0 # 预算上限
requirements: List[str] = None # 历史需求列表
preference_services: List[str] = None # 偏好服务类型(修图/找图/设计等)
# 消费分析
total_orders: int = 0
total_spent: float = 0.0
avg_order_value: float = 0.0 # 平均客单价
purchase_frequency: str = "" # 购买频次(高/中/低)
last_order_date: str = "" # 最后下单时间
first_order_date: str = "" # 首次下单时间
# 订单相关
order_ids: List[str] = None
pending_orders: int = 0 # 待处理订单
completed_orders: int = 0 # 已完成订单
refund_count: int = 0 # 退款次数
# 性格/行为
personality: List[str] = None # 性格标签(爽快/纠结/砍价狂/爽快/墨迹)
communication_prefer: str = "" # 沟通偏好(文字/语音/图片)
response_speed: str = "" # 响应速度偏好(快/慢)
patience_level: str = "" # 耐心程度(高/中/低)
# 客户价值
customer_level: str = "C" # 客户等级A/B/C/D
vip: bool = False
vip_level: int = 0 # VIP等级
# 报价记录
last_price: int = 0 # 上次报价
last_price_time: str = "" # 上次报价时间
last_quote_no_convert: bool = False # 上次报价后未成交,下次可适当降低
last_min_price: int = 0 # 最近图片分析的最低价
last_image_url: str = "" # 最近一次发来的图片URL
last_image_time: str = "" # 图片发送时间
last_gemini_prompt: str = "" # 最近一次图片的 Gemini 处理提示词
last_aspect_ratio: str = "1:1" # 最近一次图片的建议输出比例
last_perspective: str = "no" # 最近一次图片的透视状态
last_image_analysis: str = "" # 最近一次图片分析结果JSON字符串用于数据标定
image_analysis_history: List[str] = None # 图片分析历史记录JSON列表用于数据标定
pending_quote_images: List[str] = None # 待统一报价图片队列(持久化)
pending_quote_requirements: List[str] = None # 待统一报价需求队列(持久化)
# 当前任务状态
processing_status: str = "" # 待处理/处理中/等待确认/已完成
processing_image_url: str = "" # 当前正在处理的图片URL
expected_done_at: str = "" # 预计完成时间
# 让价记录
discount_given_count: int = 0 # 历史累计让价次数
lowest_price_accepted: int = 0 # 客户接受过的最低价
# 格式偏好
preferred_format: str = "" # jpg / psd / png
preferred_size: str = "" # 有没有问过分辨率/尺寸要求
# 对话摘要
last_conversation_summary: str = "" # 上次对话摘要(一句话)
last_conversation_time: str = "" # 上次对话时间
# 来图习惯
total_images_sent: int = 0 # 历史发图总数
complexity_history: List[str] = None # 历史复杂度列表,如 ["hard","complex","normal"]
image_type_history: List[str] = None # 图片类型历史,如 ["印花","logo","人物"]
# AI 决策辅助
price_sensitivity: str = "" # 价格敏感度:高/中/低(自动计算)
decision_speed: str = "" # 决策速度:快/慢(自动标记)
revision_count: int = 0 # 历史总改稿次数
revision_orders: int = 0 # 有改稿的订单数
total_completed_orders: int = 0 # 已完成出图的订单数
# 业务分析
bulk_potential: str = "" # 批量潜力:有/无/未知
churn_risk: str = "" # 流失风险:高/中/低(自动计算)
upsell_opportunity: List[str] = None # 加购机会,如 ["分层PSD","批量打包"]
# 运营
blacklist: bool = False # 黑名单
blacklist_reason: str = "" # 拉黑原因
vip_custom_price: int = 0 # VIP专属报价0=无)
last_email_status: str = "" # 最后一次邮件发送状态sent/failed
# 评价
good_reviews: int = 0 # 好评数
bad_reviews: int = 0 # 差评数
dispute_count: int = 0 # 纠纷次数
# 跟进信息
follow_up_by: str = "" # 跟进销售
follow_up_date: str = ""
next_follow_date: str = "" # 下次跟进日期
# 来源
source: str = "" # 来源渠道(自然流量/推广/老客户转介绍)
coupon_used: str = "" # 使用过的优惠券
# 备注
notes: List[str] = None # 备注列表
tags: List[str] = None # 自定义标签
# 时间
created_at: str = ""
last_contact: str = ""
last_update: str = ""
def __post_init__(self):
if self.requirements is None:
self.requirements = []
if self.preference_services is None:
self.preference_services = []
if self.order_ids is None:
self.order_ids = []
if self.personality is None:
self.personality = []
if self.notes is None:
self.notes = []
if self.tags is None:
self.tags = []
if self.complexity_history is None:
self.complexity_history = []
if self.image_type_history is None:
self.image_type_history = []
if self.upsell_opportunity is None:
self.upsell_opportunity = []
if self.pending_quote_images is None:
self.pending_quote_images = []
if self.pending_quote_requirements is None:
self.pending_quote_requirements = []
if self.image_analysis_history is None:
self.image_analysis_history = []
class CustomerDatabase:
"""客户数据库"""
def __init__(self, db_path: str = "customer_db"):
self.db_path = db_path
self.customers_file = os.path.join(db_path, "customers.json")
self._ensure_db()
def _get_mysql_conn(self):
import pymysql
return pymysql.connect(
host=_MYSQL_HOST,
port=_MYSQL_PORT,
user=_MYSQL_USER,
password=_MYSQL_PASSWORD,
database=_MYSQL_DATABASE,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=False,
)
def _ensure_db(self):
if _is_mysql():
with self._get_mysql_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS customer_profiles (
customer_id VARCHAR(128) PRIMARY KEY,
profile_json LONGTEXT NOT NULL,
last_update DATETIME NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
)
cur.execute("SHOW INDEX FROM customer_profiles")
exists = {str(r.get("Key_name", "")) for r in cur.fetchall()}
if "idx_last_update" not in exists:
cur.execute("CREATE INDEX idx_last_update ON customer_profiles(last_update)")
conn.commit()
return
if not os.path.exists(self.db_path):
os.makedirs(self.db_path)
if not os.path.exists(self.customers_file):
self._save_customers({})
def _load_customers(self) -> Dict[str, dict]:
if _is_mysql():
out: Dict[str, dict] = {}
try:
with self._get_mysql_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT customer_id, profile_json FROM customer_profiles")
rows = cur.fetchall()
for r in rows:
cid = str(r.get("customer_id") or "")
if not cid:
continue
try:
out[cid] = json.loads(r.get("profile_json") or "{}")
except Exception:
out[cid] = {}
except Exception:
return {}
return out
try:
with open(self.customers_file, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {}
def _save_customers(self, customers: Dict[str, dict]):
if _is_mysql():
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with self._get_mysql_conn() as conn:
with conn.cursor() as cur:
for cid, data in (customers or {}).items():
cur.execute(
"""
REPLACE INTO customer_profiles (customer_id, profile_json, last_update)
VALUES (%s, %s, %s)
""",
(cid, json.dumps(data, ensure_ascii=False), now),
)
conn.commit()
return
with open(self.customers_file, 'w', encoding='utf-8') as f:
json.dump(customers, f, ensure_ascii=False, indent=2)
def get_customer(self, customer_id: str) -> CustomerProfile:
if _is_mysql():
try:
with self._get_mysql_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT profile_json FROM customer_profiles WHERE customer_id=%s LIMIT 1",
(customer_id,),
)
row = cur.fetchone()
if row and row.get("profile_json"):
data = json.loads(row.get("profile_json") or "{}")
else:
data = {}
data.pop('customer_id', None)
return CustomerProfile(customer_id=customer_id, **data)
except Exception:
return CustomerProfile(customer_id=customer_id)
customers = self._load_customers()
data = customers.get(customer_id, {})
# 确保不重复传递 customer_id
data.pop('customer_id', None)
return CustomerProfile(customer_id=customer_id, **data)
def save_customer(self, profile: CustomerProfile):
profile.last_update = datetime.now().isoformat()
if _is_mysql():
with self._get_mysql_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
REPLACE INTO customer_profiles (customer_id, profile_json, last_update)
VALUES (%s, %s, %s)
""",
(
profile.customer_id,
json.dumps(asdict(profile), ensure_ascii=False),
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
),
)
conn.commit()
return
customers = self._load_customers()
customers[profile.customer_id] = asdict(profile)
self._save_customers(customers)
# ========== 基础信息 ==========
def update_info(self, customer_id: str, **kwargs):
"""批量更新客户信息"""
profile = self.get_customer(customer_id)
for key, value in kwargs.items():
if hasattr(profile, key):
setattr(profile, key, value)
self.save_customer(profile)
def update_email(self, customer_id: str, email: str):
profile = self.get_customer(customer_id)
profile.email = email
self.save_customer(profile)
def update_phone(self, customer_id: str, phone: str):
profile = self.get_customer(customer_id)
profile.phone = phone
self.save_customer(profile)
def update_wechat(self, customer_id: str, wechat: str):
profile = self.get_customer(customer_id)
profile.wechat = wechat
self.save_customer(profile)
def update_address(self, customer_id: str, address: str):
profile = self.get_customer(customer_id)
profile.address = address
self.save_customer(profile)
# ========== 需求相关 ==========
def add_requirement(self, customer_id: str, requirement: str):
"""添加需求"""
profile = self.get_customer(customer_id)
if requirement not in profile.requirements:
profile.requirements.append(requirement)
profile.last_contact = datetime.now().isoformat()
self.save_customer(profile)
def set_budget(self, customer_id: str, budget: str, min_val: float = 0, max_val: float = 0):
"""设置预算"""
profile = self.get_customer(customer_id)
profile.budget = budget
profile.budget_range_min = min_val
profile.budget_range_max = max_val
self.save_customer(profile)
def add_preference_service(self, customer_id: str, service: str):
"""添加偏好服务"""
profile = self.get_customer(customer_id)
if service not in profile.preference_services:
profile.preference_services.append(service)
self.save_customer(profile)
# ========== 消费分析 ==========
def add_order(self, customer_id: str, order_id: str, amount: float = 0):
"""添加订单"""
profile = self.get_customer(customer_id)
if order_id not in profile.order_ids:
profile.order_ids.append(order_id)
profile.total_orders += 1
profile.total_spent += amount
# 计算平均客单价
if profile.total_orders > 0:
profile.avg_order_value = profile.total_spent / profile.total_orders
# 更新客单价等级
if profile.avg_order_value >= 100:
profile.customer_level = "A"
elif profile.avg_order_value >= 50:
profile.customer_level = "B"
elif profile.avg_order_value >= 20:
profile.customer_level = "C"
else:
profile.customer_level = "D"
profile.last_order_date = datetime.now().isoformat()
if not profile.first_order_date:
profile.first_order_date = datetime.now().isoformat()
self.save_customer(profile)
def update_order_status(self, customer_id: str, pending: int = None, completed: int = None):
"""更新订单状态"""
profile = self.get_customer(customer_id)
if pending is not None:
profile.pending_orders = pending
if completed is not None:
profile.completed_orders = completed
self.save_customer(profile)
def add_refund(self, customer_id: str):
"""增加退款次数"""
profile = self.get_customer(customer_id)
profile.refund_count += 1
self.save_customer(profile)
# ========== 性格分析 ==========
def add_personality_tag(self, customer_id: str, tag: str):
"""添加性格标签"""
profile = self.get_customer(customer_id)
if tag not in profile.personality:
profile.personality.append(tag)
self.save_customer(profile)
def set_communication_prefer(self, customer_id: str, prefer: str):
"""设置沟通偏好"""
profile = self.get_customer(customer_id)
profile.communication_prefer = prefer
self.save_customer(profile)
def analyze_purchase_frequency(self, customer_id: str):
"""分析购买频次"""
profile = self.get_customer(customer_id)
if profile.first_order_date and profile.last_order_date:
try:
first = datetime.fromisoformat(profile.first_order_date)
last = datetime.fromisoformat(profile.last_order_date)
days = (last - first).days
if days == 0:
profile.purchase_frequency = ""
elif days < 30:
profile.purchase_frequency = ""
elif days < 90:
profile.purchase_frequency = ""
else:
profile.purchase_frequency = ""
self.save_customer(profile)
except:
pass
# ========== 评价相关 ==========
def add_good_review(self, customer_id: str):
profile = self.get_customer(customer_id)
profile.good_reviews += 1
self.save_customer(profile)
def add_bad_review(self, customer_id: str):
profile = self.get_customer(customer_id)
profile.bad_reviews += 1
self.save_customer(profile)
def add_dispute(self, customer_id: str):
profile = self.get_customer(customer_id)
profile.dispute_count += 1
self.save_customer(profile)
# ========== 标签/备注 ==========
def add_tag(self, customer_id: str, tag: str):
profile = self.get_customer(customer_id)
if tag not in profile.tags:
profile.tags.append(tag)
self.save_customer(profile)
def remove_tag(self, customer_id: str, tag: str):
profile = self.get_customer(customer_id)
if tag in profile.tags:
profile.tags.remove(tag)
self.save_customer(profile)
def add_note(self, customer_id: str, note: str):
profile = self.get_customer(customer_id)
note_with_time = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] {note}"
profile.notes.append(note_with_time)
self.save_customer(profile)
# ========== 跟进 ==========
def set_follow_up(self, customer_id: str, by: str, next_date: str = ""):
profile = self.get_customer(customer_id)
profile.follow_up_by = by
profile.follow_up_date = datetime.now().isoformat()
profile.next_follow_date = next_date
self.save_customer(profile)
def set_source(self, customer_id: str, source: str):
profile = self.get_customer(customer_id)
profile.source = source
self.save_customer(profile)
# ========== 报价记录 ==========
def update_last_price(self, customer_id: str, price: int):
"""更新最近一次报价"""
profile = self.get_customer(customer_id)
profile.last_price = price
profile.last_price_time = datetime.now().isoformat()
self.save_customer(profile)
def update_last_min_price(self, customer_id: str, min_price: int):
"""更新最近图片的最低价(用于杀价拦截与策略)"""
profile = self.get_customer(customer_id)
profile.last_min_price = int(min_price or 0)
self.save_customer(profile)
def mark_quote_no_convert(self, customer_id: str):
"""标记:上次报价后未成交,下次可适当降低"""
profile = self.get_customer(customer_id)
profile.last_quote_no_convert = True
self.save_customer(profile)
def clear_quote_no_convert(self, customer_id: str):
"""成交后清除未成交标记"""
profile = self.get_customer(customer_id)
profile.last_quote_no_convert = False
self.save_customer(profile)
def update_last_image(
self,
customer_id: str,
image_url: str,
complexity: str = "",
gemini_prompt: str = "",
aspect_ratio: str = "",
perspective: str = "",
):
"""更新客户最近发来的图片URL并记录复杂度历史及处理参数"""
profile = self.get_customer(customer_id)
profile.last_image_url = image_url
profile.last_image_time = datetime.now().isoformat()
profile.total_images_sent += 1
if complexity:
profile.complexity_history.append(complexity)
profile.complexity_history = profile.complexity_history[-20:]
if gemini_prompt:
profile.last_gemini_prompt = gemini_prompt
if aspect_ratio:
profile.last_aspect_ratio = aspect_ratio
if perspective:
profile.last_perspective = perspective
self.save_customer(profile)
def update_pending_quote_state(
self,
customer_id: str,
images: List[str],
requirements: List[str]
):
"""持久化收图阶段状态,防止服务重启丢失。"""
profile = self.get_customer(customer_id)
profile.pending_quote_images = list(images or [])
profile.pending_quote_requirements = list(requirements or [])
self.save_customer(profile)
def clear_pending_quote_state(self, customer_id: str):
profile = self.get_customer(customer_id)
profile.pending_quote_images = []
profile.pending_quote_requirements = []
self.save_customer(profile)
def update_processing_status(self, customer_id: str, status: str, image_url: str = "", expected_done_at: str = ""):
"""更新当前任务处理状态"""
profile = self.get_customer(customer_id)
profile.processing_status = status
if image_url:
profile.processing_image_url = image_url
if expected_done_at:
profile.expected_done_at = expected_done_at
self.save_customer(profile)
def record_discount(self, customer_id: str, final_price: int):
"""记录一次让价,并更新客户接受的最低价"""
profile = self.get_customer(customer_id)
profile.discount_given_count += 1
if final_price > 0 and (profile.lowest_price_accepted == 0 or final_price < profile.lowest_price_accepted):
profile.lowest_price_accepted = final_price
self.save_customer(profile)
def update_preferred_format(self, customer_id: str, fmt: str):
"""更新客户格式偏好jpg/psd/png"""
profile = self.get_customer(customer_id)
profile.preferred_format = fmt
self.save_customer(profile)
def update_preferred_size(self, customer_id: str, size_desc: str):
"""更新客户尺寸/分辨率偏好"""
profile = self.get_customer(customer_id)
profile.preferred_size = size_desc
self.save_customer(profile)
def save_conversation_summary(self, customer_id: str, summary: str):
"""保存本次对话摘要(一句话)"""
profile = self.get_customer(customer_id)
profile.last_conversation_summary = summary
profile.last_conversation_time = datetime.now().isoformat()
self.save_customer(profile)
def add_image_type(self, customer_id: str, image_type: str):
"""记录客户发图的类型(印花/logo/人物/产品等)"""
profile = self.get_customer(customer_id)
profile.image_type_history.append(image_type)
profile.image_type_history = profile.image_type_history[-20:]
self.save_customer(profile)
def update_decision_speed(self, customer_id: str, speed: str):
"""更新决策速度:快/慢"""
profile = self.get_customer(customer_id)
profile.decision_speed = speed
self.save_customer(profile)
def record_revision(self, customer_id: str):
"""记录一次改稿"""
profile = self.get_customer(customer_id)
profile.revision_count += 1
self.save_customer(profile)
def complete_order(self, customer_id: str, had_revision: bool = False):
"""标记一笔订单完成出图"""
profile = self.get_customer(customer_id)
profile.total_completed_orders += 1
if had_revision:
profile.revision_orders += 1
self.save_customer(profile)
def set_bulk_potential(self, customer_id: str, potential: str):
"""设置批量潜力:有/无/未知"""
profile = self.get_customer(customer_id)
profile.bulk_potential = potential
self.save_customer(profile)
def add_upsell_opportunity(self, customer_id: str, opportunity: str):
"""添加加购机会标记,如 '分层PSD' / '批量打包'"""
profile = self.get_customer(customer_id)
if opportunity not in profile.upsell_opportunity:
profile.upsell_opportunity.append(opportunity)
self.save_customer(profile)
def set_blacklist(self, customer_id: str, reason: str = ""):
"""将客户加入黑名单"""
profile = self.get_customer(customer_id)
profile.blacklist = True
profile.blacklist_reason = reason
self.save_customer(profile)
def unset_blacklist(self, customer_id: str):
"""解除黑名单"""
profile = self.get_customer(customer_id)
profile.blacklist = False
profile.blacklist_reason = ""
self.save_customer(profile)
def set_vip_custom_price(self, customer_id: str, price: int):
"""设置 VIP 专属报价0=取消专属价)"""
profile = self.get_customer(customer_id)
profile.vip_custom_price = price
self.save_customer(profile)
def update_email_status(self, customer_id: str, status: str):
"""更新邮件发送状态sent/failed"""
profile = self.get_customer(customer_id)
profile.last_email_status = status
self.save_customer(profile)
def auto_compute_tags(self, customer_id: str):
"""自动计算并更新衍生标签(价格敏感度、流失风险、决策速度)"""
profile = self.get_customer(customer_id)
# 价格敏感度:根据让价次数和订单数
if profile.total_orders > 0:
ratio = profile.discount_given_count / max(profile.total_orders, 1)
if ratio >= 0.6 or profile.discount_given_count >= 3:
profile.price_sensitivity = ""
elif ratio >= 0.2 or profile.discount_given_count >= 1:
profile.price_sensitivity = ""
else:
profile.price_sensitivity = ""
# 流失风险:根据最后联系时间
if profile.last_contact:
try:
last = datetime.fromisoformat(profile.last_contact)
days = (datetime.now() - last).days
if days > 60:
profile.churn_risk = ""
elif days > 30:
profile.churn_risk = ""
else:
profile.churn_risk = ""
except Exception:
pass
self.save_customer(profile)
# ========== VIP ==========
def set_vip(self, customer_id: str, vip_level: int = 1):
profile = self.get_customer(customer_id)
profile.vip = True
profile.vip_level = vip_level
self.save_customer(profile)
def cancel_vip(self, customer_id: str):
profile = self.get_customer(customer_id)
profile.vip = False
profile.vip_level = 0
self.save_customer(profile)
# ========== 搜索 ==========
def _make_profile(self, cid: str, data: dict) -> CustomerProfile:
"""从原始数据构建 CustomerProfile避免 customer_id 重复"""
d = dict(data)
d.pop('customer_id', None)
return CustomerProfile(customer_id=cid, **d)
def search_by_requirement(self, keyword: str) -> List[CustomerProfile]:
"""按需求搜索"""
customers = self._load_customers()
results = []
for cid, data in customers.items():
requirements = data.get('requirements', [])
if any(keyword in req for req in requirements):
results.append(self._make_profile(cid, data))
return results
def search_by_tag(self, tag: str) -> List[CustomerProfile]:
"""按标签搜索"""
customers = self._load_customers()
results = []
for cid, data in customers.items():
tags = data.get('tags', [])
if tag in tags:
results.append(self._make_profile(cid, data))
return results
def search_by_level(self, level: str) -> List[CustomerProfile]:
"""按客户等级搜索"""
customers = self._load_customers()
results = []
for cid, data in customers.items():
if data.get('customer_level') == level:
results.append(self._make_profile(cid, data))
return results
def get_vip_customers(self) -> List[CustomerProfile]:
"""获取所有VIP客户"""
customers = self._load_customers()
results = []
for cid, data in customers.items():
if data.get('vip'):
results.append(self._make_profile(cid, data))
return results
def get_high_value_customers(self) -> List[CustomerProfile]:
"""获取高价值客户A级"""
return self.search_by_level("A")
# ========== 统计 ==========
def get_stats(self) -> dict:
"""获取统计信息"""
customers = self._load_customers()
total = len(customers)
levels = defaultdict(int)
vip_count = 0
total_revenue = 0
total_orders = 0
for cid, data in customers.items():
levels[data.get('customer_level', 'C')] += 1
if data.get('vip'):
vip_count += 1
total_revenue += data.get('total_spent', 0)
total_orders += data.get('total_orders', 0)
return {
"total_customers": total,
"level_distribution": dict(levels),
"vip_count": vip_count,
"total_revenue": total_revenue,
"total_orders": total_orders,
"avg_order_value": total_revenue / total_orders if total_orders > 0 else 0
}
# ========== 输出 ==========
def get_profile_text(self, customer_id: str) -> str:
"""获取客户画像文本用于AI"""
p = self.get_customer(customer_id)
# 平均复杂度
avg_complexity = ""
if p.complexity_history:
level_map = {"simple": 1, "normal": 2, "complex": 3, "hard": 4}
label_map = {1: "简单", 2: "一般", 3: "复杂", 4: "很复杂"}
avg = sum(level_map.get(c, 2) for c in p.complexity_history) / len(p.complexity_history)
avg_complexity = label_map.get(round(avg), "一般")
# 改稿率
revision_rate = ""
if p.total_completed_orders > 0:
rate = p.revision_orders / p.total_completed_orders
revision_rate = f"{rate:.0%}{p.revision_orders}/{p.total_completed_orders}单有改稿)"
# 常见图片类型
top_types = ""
if p.image_type_history:
from collections import Counter
top = Counter(p.image_type_history).most_common(3)
top_types = "".join(f"{t}({c}次)" for t, c in top)
blacklist_line = f"\n⚠️ 黑名单:{p.blacklist_reason or '已拉黑'}" if p.blacklist else ""
text = f"""
=== 客户档案 ==={blacklist_line}
客户ID: {p.customer_id}
姓名: {p.name or '未知'}
邮箱: {p.email or '未记录'}
电话: {p.phone or '未记录'}
微信: {p.wechat or '未记录'}
--- 消费分析 ---
客户等级: {p.customer_level}{'VIP' if p.vip else ''}
总订单数: {p.total_orders} | 总消费: {p.total_spent}
购买频次: {p.purchase_frequency or '未分析'}
--- 报价与让价 ---
上次报价: {f"{p.last_price}" if p.last_price else "暂无"}{f' | VIP专属价: {p.vip_custom_price}' if p.vip_custom_price else ''}
历史让价: {p.discount_given_count}次 | 接受过的最低价: {f"{p.lowest_price_accepted}" if p.lowest_price_accepted else "暂无"}
价格敏感度: {p.price_sensitivity or '未分析'} | 决策速度: {p.decision_speed or '未知'}
--- 图片习惯 ---
累计发图: {p.total_images_sent}张 | 平均复杂度: {avg_complexity or '暂无'}
常见类型: {top_types or '暂无'}
格式偏好: {p.preferred_format or '未知'} | 尺寸要求: {p.preferred_size or '未知'}
上次发图: {p.last_image_url or '暂无'}
--- 当前任务 ---
处理状态: {p.processing_status or ''} | 预计完成: {p.expected_done_at or '未知'}
--- 服务质量 ---
改稿率: {revision_rate or '暂无'}
批量潜力: {p.bulk_potential or '未知'} | 流失风险: {p.churn_risk or '未分析'}
加购机会: {', '.join(p.upsell_opportunity) if p.upsell_opportunity else '暂无'}
--- 上次对话 ---
摘要: {p.last_conversation_summary or '暂无'}
时间: {p.last_conversation_time or '暂无'}
--- 邮件 ---
最后发送状态: {p.last_email_status or '未知'}
--- 性格与备注 ---
性格标签: {', '.join(p.personality) if p.personality else '暂无'}
备注: {'; '.join(p.notes[-5:]) if p.notes else '暂无'}
"""
return text
# 全局实例
db = CustomerDatabase()