a082364e34e33e23dad15d0d57d428d3ca6bbb2f
AI 客服系统
基于 PydanticAI 的智能客服,对接千牛 WebSocket,自动接待、收图、转接设计师。
架构概览
客户消息 (千牛)
↓
WebSocket Client → QianniuAdapter (协议转换)
↓
Orchestrator (防抖/去重/冷却/路由)
↓
CustomerServiceBrain (PydanticAI Agent)
├── lookup_chat_history_tool → 查询历史记录
├── transfer_to_human_tool → 转接设计师
└── lookup_customer_orders_tool → 订单查询
↓
QianniuAdapter → WebSocket → 回复客户
核心功能
| 功能 | 说明 |
|---|---|
| 智能接待 | 自动引导发图、问需求、转接设计师 |
| 历史记忆 | AI 可调用工具查询完整聊天历史,避免重复提问 |
| 自动转接 | 收到图片+需求后自动派单给在线设计师 |
| 待转接池 | 设计师不在线时先入队,上线后自动补转接 |
| 转接冷却 | 转接后 120 秒内不再调用 AI,直接安抚 |
| 情绪识别 | 客户愤怒/投诉时自动转人工 |
| 消息防抖 | 合并短时间内的多条消息,避免重复回复 |
| 订单静默 | 订单通知/SKU 信息自动入库,不触发 AI |
| 图片兜底 | 识别 msg_type=1 图片包;即使无文本也不会被当心跳丢弃 |
| 出站防泄露 | 拦截 <think>、工具原文、历史摘要、订单摘要等内部内容 |
| 时段感知 | 根据时间区分"没上班"/"下班了"/"暂时不在" |
| 图片分析 | 后台调用 Gemini 分析图片复杂度 |
| 日报统计 | 每日自动生成客服数据报告 |
| 多进程 | 支持多 Worker 并行处理 |
快速开始
# 安装依赖
pip install -r requirements.txt
# 配置环境变量
cp .env.example .env
# 编辑 .env 填入 API Key、数据库等配置
# 启动(WebSocket 客服模式)
python run.py
# 完整版(HTTP API + WebSocket)
python run.py --tianwang
# 多进程模式
python run.py --multi -w 4
# 仅 HTTP API
python run.py --api-only
健康检查
curl http://localhost:6060/api/health
项目结构
├── run.py # 统一入口
├── api/
│ └── http_server.py # HTTP API 服务
├── core/
│ ├── orchestrator.py # 总编排(防抖/去重/冷却/路由)
│ ├── pydantic_ai_agent_v2.py # AI 大脑(PydanticAI Agent)
│ ├── agent_tools.py # AI 工具(转接/查历史/查订单)
│ ├── schema.py # 数据模型(StandardMessage/Response)
│ ├── repository.py # 异步数据仓库
│ ├── skill_manager.py # 技能加载器
│ ├── engine.py # 业务逻辑兜底
│ ├── adapters/
│ │ └── qianniu_adapter.py # 千牛协议适配
│ ├── events/
│ │ └── event_bus.py # 异步事件总线
│ └── websocket_*.py # WebSocket 连接/发送/日志
├── db/
│ ├── chat_log_db.py # 聊天记录(SQLite/MySQL)
│ ├── customer_db.py # 客户档案
│ ├── image_tasks_db.py # 图片任务
│ ├── pending_transfer_db.py # 待转接队列(本地 SQLite)
│ └── task_db/ # 任务模型
├── services/
│ ├── dispatch_service.py # 设计师派单
│ ├── service_gemini.py # Gemini 图片分析
│ ├── service_image_analyzer.py # 图片复杂度分析
│ └── ... # 其他服务
├── skills/ # AI 技能定义(SKILL.md)
│ ├── customer-service/ # 客服核心技能
│ ├── owner-style/ # 店主风格
│ ├── pre-sales-skill/ # 售前
│ ├── after-sales-skill/ # 售后
│ ├── pricing-skill/ # 报价(排除出 prompt)
│ ├── risk-skill/ # 风控
│ └── style-skill/ # 语气风格
├── config/ # 配置文件
├── utils/ # 工具(日报/健康检查/API计费等)
└── scripts/ # 运维脚本
环境变量
| 变量 | 说明 |
|---|---|
OPENAI_API_KEY |
火山引擎 Ark API Key |
OPENAI_BASE_URL |
API 地址 |
OPENAI_MODEL |
对话模型 |
DB_TYPE |
数据库类型(sqlite / mysql) |
MYSQL_HOST/PORT/USER/PASSWORD/DATABASE |
MySQL 连接信息 |
MYSQL_POOL_SIZE |
MySQL 连接池大小,默认 10 |
MYSQL_POOL_WAIT_TIMEOUT |
连接池等待超时(秒),默认 10 |
WECHAT_WEBHOOK |
企业微信通知 Webhook |
MESSAGE_DEBOUNCE_SECONDS |
消息防抖时间(秒) |
DISPATCH_BASE_URL |
派单服务地址 |
完整配置见 .env.example。
消息处理流程
- WebSocket 接收 → 千牛原始消息
- 适配器转换 →
StandardMessage(统一格式,识别msg_type、递归提取图片 URL) - 图片兜底 → 纯图片包即使无文本/无直出 URL,也会标记为“已收到图片消息”
- Orchestrator 过滤 → 订单/SKU 静默入库、心跳过滤、商家回复入库
- 防抖合并 → 2 秒窗口内多条消息合并为一条
- 冷却检查 → 转接后 120 秒内直接安抚,不调 AI
- AI 思考 → PydanticAI Agent 调用工具、生成回复
- 转接截获 → 工具返回转接指令时直接发送,不经 AI 二次加工
- 待转接入池 → 设计师不在线时记录原因,进入待转接队列,稍后自动补转
- 出站安全过滤 → 过滤
<think>、历史摘要、订单摘要、工具原文、时间戳转述历史 - 发送回复 → 通过 WebSocket 回复客户,同时入库
运行注意
db/pending_transfer.db是待转接池的本地 SQLite 数据文件,用于暂存“设计师不在线”的转接请求。db/chat_log_db/chats.db是 SQLite 模式下的本地聊天库;当DB_TYPE=mysql时,线上主库仍然是 MySQL。- 上面两个
.db文件都属于运行时数据,不建议提交到 Git。 - 当前待转接池是单机本地队列;如果未来部署成多台应用机,需要改成共享存储,否则无法跨机器补转接。
- 出站消息在 Brain、Orchestrator、QianniuAdapter 三层都会做防泄露清洗;如果线上仍出现历史摘要外发,优先确认服务是否已经重启到最新代码。
Description
Languages
Python
100%