diff --git a/README.md b/README.md index 20c9a28..89ef4fd 100755 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Orchestrator (防抖/去重/冷却/路由) CustomerServiceBrain (PydanticAI Agent) ├── lookup_chat_history_tool → 查询历史记录 ├── transfer_to_human_tool → 转接设计师 - └── check_order_status_tool → 订单查询 + └── lookup_customer_orders_tool → 订单查询 ↓ QianniuAdapter → WebSocket → 回复客户 ``` @@ -30,10 +30,13 @@ QianniuAdapter → WebSocket → 回复客户 | 智能接待 | 自动引导发图、问需求、转接设计师 | | 历史记忆 | AI 可调用工具查询完整聊天历史,避免重复提问 | | 自动转接 | 收到图片+需求后自动派单给在线设计师 | +| 待转接池 | 设计师不在线时先入队,上线后自动补转接 | | 转接冷却 | 转接后 120 秒内不再调用 AI,直接安抚 | | 情绪识别 | 客户愤怒/投诉时自动转人工 | | 消息防抖 | 合并短时间内的多条消息,避免重复回复 | | 订单静默 | 订单通知/SKU 信息自动入库,不触发 AI | +| 图片兜底 | 识别 `msg_type=1` 图片包;即使无文本也不会被当心跳丢弃 | +| 出站防泄露 | 拦截 ``、工具原文、历史摘要、订单摘要等内部内容 | | 时段感知 | 根据时间区分"没上班"/"下班了"/"暂时不在" | | 图片分析 | 后台调用 Gemini 分析图片复杂度 | | 日报统计 | 每日自动生成客服数据报告 | @@ -95,6 +98,7 @@ curl http://localhost:6060/api/health │ ├── chat_log_db.py # 聊天记录(SQLite/MySQL) │ ├── customer_db.py # 客户档案 │ ├── image_tasks_db.py # 图片任务 +│ ├── pending_transfer_db.py # 待转接队列(本地 SQLite) │ └── task_db/ # 任务模型 ├── services/ │ ├── dispatch_service.py # 设计师派单 @@ -125,6 +129,8 @@ curl http://localhost:6060/api/health | `OPENAI_MODEL` | 对话模型 | | `DB_TYPE` | 数据库类型(`sqlite` / `mysql`) | | `MYSQL_HOST/PORT/USER/PASSWORD/DATABASE` | MySQL 连接信息 | +| `MYSQL_POOL_SIZE` | MySQL 连接池大小,默认 `10` | +| `MYSQL_POOL_WAIT_TIMEOUT` | 连接池等待超时(秒),默认 `10` | | `WECHAT_WEBHOOK` | 企业微信通知 Webhook | | `MESSAGE_DEBOUNCE_SECONDS` | 消息防抖时间(秒) | | `DISPATCH_BASE_URL` | 派单服务地址 | @@ -136,11 +142,23 @@ curl http://localhost:6060/api/health ## 消息处理流程 1. **WebSocket 接收** → 千牛原始消息 -2. **适配器转换** → `StandardMessage`(统一格式) -3. **Orchestrator 过滤** → 订单/SKU 静默入库、心跳过滤、商家回复入库 -4. **防抖合并** → 2 秒窗口内多条消息合并为一条 -5. **冷却检查** → 转接后 120 秒内直接安抚,不调 AI -6. **AI 思考** → PydanticAI Agent 调用工具、生成回复 -7. **转接截获** → 工具返回转接指令时直接发送,不经 AI 二次加工 -8. **乱码清理** → 过滤 ``、内部标记等泄露内容 -9. **发送回复** → 通过 WebSocket 回复客户,同时入库 +2. **适配器转换** → `StandardMessage`(统一格式,识别 `msg_type`、递归提取图片 URL) +3. **图片兜底** → 纯图片包即使无文本/无直出 URL,也会标记为“已收到图片消息” +4. **Orchestrator 过滤** → 订单/SKU 静默入库、心跳过滤、商家回复入库 +5. **防抖合并** → 2 秒窗口内多条消息合并为一条 +6. **冷却检查** → 转接后 120 秒内直接安抚,不调 AI +7. **AI 思考** → PydanticAI Agent 调用工具、生成回复 +8. **转接截获** → 工具返回转接指令时直接发送,不经 AI 二次加工 +9. **待转接入池** → 设计师不在线时记录原因,进入待转接队列,稍后自动补转 +10. **出站安全过滤** → 过滤 ``、历史摘要、订单摘要、工具原文、时间戳转述历史 +11. **发送回复** → 通过 WebSocket 回复客户,同时入库 + +--- + +## 运行注意 + +- `db/pending_transfer.db` 是待转接池的本地 SQLite 数据文件,用于暂存“设计师不在线”的转接请求。 +- `db/chat_log_db/chats.db` 是 SQLite 模式下的本地聊天库;当 `DB_TYPE=mysql` 时,线上主库仍然是 MySQL。 +- 上面两个 `.db` 文件都属于运行时数据,不建议提交到 Git。 +- 当前待转接池是单机本地队列;如果未来部署成多台应用机,需要改成共享存储,否则无法跨机器补转接。 +- 出站消息在 Brain、Orchestrator、QianniuAdapter 三层都会做防泄露清洗;如果线上仍出现历史摘要外发,优先确认服务是否已经重启到最新代码。 diff --git a/core/adapters/qianniu_adapter.py b/core/adapters/qianniu_adapter.py index 2cf5bc2..452f167 100644 --- a/core/adapters/qianniu_adapter.py +++ b/core/adapters/qianniu_adapter.py @@ -2,7 +2,7 @@ import re import logging import json from pathlib import Path -from typing import List, Tuple +from typing import List, Tuple, Any from core.adapters.base import BaseAdapter from core.schema import StandardMessage, StandardResponse @@ -78,6 +78,10 @@ class QianniuAdapter(BaseAdapter): acc_id = str(raw.get("acc_id") or raw.get("shop_id") or "") from_id = str(raw.get("from_id") or raw.get("cy_id") or "") msg_text = str(raw.get("msg") or raw.get("content") or "") + raw_msg_type = self._safe_int(raw.get("msg_type"), 0) + image_urls = self._extract_inbound_image_urls(raw, msg_text) + if raw_msg_type == 1 and not msg_text.strip(): + msg_text = "【系统:已收到图片消息】" # 判断方向:如果 from_id 包含了店铺名或 acc_id,通常说明是商家自己在说话 # 或者逆向接口通常有一个特定的标识,这里我们做一个通用的逻辑判断 @@ -96,7 +100,8 @@ class QianniuAdapter(BaseAdapter): user_id=user_id, user_name=str(raw.get("from_name", "")), content=msg_text, - image_urls=self._extract_urls(msg_text), + msg_type=raw_msg_type, + image_urls=image_urls, acc_id=acc_id, acc_type=str(raw.get("acc_type") or "AliWorkbench"), raw_data=raw @@ -135,3 +140,52 @@ class QianniuAdapter(BaseAdapter): image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") candidates = re.findall(r'https?://[^\s#]+', text) return [u for u in candidates if any(ext in u.lower() for ext in image_exts)] + + @staticmethod + def _safe_int(value: Any, default: int = 0) -> int: + try: + return int(value) + except Exception: + return default + + def _extract_inbound_image_urls(self, raw: dict, msg_text: str) -> List[str]: + urls = [] + seen = set() + + def add_url(url: str): + if not url: + return + s = str(url).strip() + if not s or s in seen: + return + if self._extract_urls(s): + seen.add(s) + urls.append(s) + + for url in self._extract_urls(msg_text): + add_url(url) + + for url in self._find_image_urls_in_obj(raw): + add_url(url) + + return urls + + def _find_image_urls_in_obj(self, obj: Any) -> List[str]: + found: List[str] = [] + + def walk(val: Any): + if val is None: + return + if isinstance(val, str): + found.extend(self._extract_urls(val)) + return + if isinstance(val, dict): + for item in val.values(): + walk(item) + return + if isinstance(val, (list, tuple, set)): + for item in val: + walk(item) + + walk(obj) + return found diff --git a/core/agent_tools.py b/core/agent_tools.py index 9a04daa..aebfde5 100644 --- a/core/agent_tools.py +++ b/core/agent_tools.py @@ -121,7 +121,9 @@ async def lookup_chat_history_tool( line = f"[{ts}] {role}:{msg}" lines.append(line) if r["direction"] == "in": - if "已收到" in msg and "图" in msg: + msg_type = int(r.get("msg_type") or 0) + image_urls = str(r.get("image_urls", "") or "").strip() + if msg_type == 1 or image_urls or ("已收到" in msg and "图" in msg): has_images = True if any(k in msg for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印"]): customer_needs.append(msg[:60]) diff --git a/core/orchestrator.py b/core/orchestrator.py index aef54c6..35856c8 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -148,7 +148,14 @@ class SystemOrchestrator: if is_order or is_price_only or is_sku_only or is_sku_amount: await self._handle_order_packet(platform, std_msg) logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态") - await repo.save_chat(platform, user_id, msg_text, "in", acc_id=std_msg.acc_id) + await repo.save_chat( + platform, + user_id, + msg_text, + "in", + acc_id=std_msg.acc_id, + msg_type=std_msg.msg_type, + ) return preview = (std_msg.content or "").replace("\n", "\\n") @@ -159,12 +166,20 @@ class SystemOrchestrator: f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}" ) - # 过滤心跳 - if not (std_msg.content or "").strip() and not std_msg.image_urls: return + # 过滤心跳;图片消息即使暂时没拿到 URL,也不能直接丢掉 + if std_msg.msg_type != 1 and not (std_msg.content or "").strip() and not std_msg.image_urls: + return # 如果是商家人工回复,静默入库 if direction == "out": - await repo.save_chat(platform, user_id, std_msg.content, "out", acc_id=std_msg.acc_id) + await repo.save_chat( + platform, + user_id, + std_msg.content, + "out", + acc_id=std_msg.acc_id, + msg_type=std_msg.msg_type, + ) return # ID 去重 @@ -326,7 +341,15 @@ class SystemOrchestrator: db_start = time.time() db_content = combined_content if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}" - await repo.save_chat(platform, user_id, db_content, "in", acc_id=acc_id, image_urls=all_image_urls) + await repo.save_chat( + platform, + user_id, + db_content, + "in", + acc_id=acc_id, + image_urls=all_image_urls, + msg_type=final_msg.msg_type, + ) db_elapsed = time.time() - db_start logger.info(f"[计时] user={user_id} 消息入库: {db_elapsed:.2f}s") @@ -376,11 +399,25 @@ class SystemOrchestrator: metadata={"acc_id": acc_id, "acc_type": acc_type} ) await self.qianniu_adapter.translate_outbound(greet, user_id) - await repo.save_chat(platform, user_id, greet.reply_content, "out", acc_id=acc_id) + await repo.save_chat( + platform, + user_id, + greet.reply_content, + "out", + acc_id=acc_id, + msg_type=greet.msg_type, + ) await asyncio.sleep(0.5) await self.qianniu_adapter.translate_outbound(std_res, user_id) - await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id) + await repo.save_chat( + platform, + user_id, + std_res.reply_content, + "out", + acc_id=acc_id, + msg_type=std_res.msg_type, + ) if std_res.metadata.get("pending_transfer"): reason = str(std_res.metadata.get("pending_transfer_reason") or "").strip() @@ -450,10 +487,24 @@ class SystemOrchestrator: ) await self.qianniu_adapter.translate_outbound(notify, customer_id) - await repo.save_chat("qianniu", customer_id, notify.reply_content, "out", acc_id=acc_id) + await repo.save_chat( + "qianniu", + customer_id, + notify.reply_content, + "out", + acc_id=acc_id, + msg_type=notify.msg_type, + ) await asyncio.sleep(0.5) await self.qianniu_adapter.translate_outbound(transfer, customer_id) - await repo.save_chat("qianniu", customer_id, transfer.reply_content, "out", acc_id=acc_id) + await repo.save_chat( + "qianniu", + customer_id, + transfer.reply_content, + "out", + acc_id=acc_id, + msg_type=transfer.msg_type, + ) self._last_transfer_time[f"{customer_id}@{acc_id}"] = time.time() await asyncio.to_thread(complete_pending_transfer, row_id) diff --git a/core/pydantic_ai_agent_v2.py b/core/pydantic_ai_agent_v2.py index 213915d..a08ee13 100644 --- a/core/pydantic_ai_agent_v2.py +++ b/core/pydantic_ai_agent_v2.py @@ -195,9 +195,13 @@ class CustomerServiceBrain: user_content = msg.content or "" # 客户已发图:告知 AI 图已收到,引导问需求,但不要直接转接 - if msg.image_urls: + has_image_message = bool(msg.image_urls) or msg.msg_type == 1 + if has_image_message: + image_count = max(len(msg.image_urls), 1) + if user_content.startswith("【系统:已收到图片消息"): + user_content = "" user_content = ( - f"【系统通知:客户已发送 {len(msg.image_urls)} 张图片,图已收到不要再让客户发图。" + f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。" f"你现在必须先问客户:这张是找原图还是高清修复?有什么具体要求?" f"等客户明确回答后才能转接,严禁跳过问需求直接转接!】\n{user_content}" ) diff --git a/core/repository.py b/core/repository.py index 57df58a..52f584e 100644 --- a/core/repository.py +++ b/core/repository.py @@ -18,7 +18,16 @@ class DataRepository: # --- 聊天记录 (异步化) --- - async def save_chat(self, platform: str, user_id: str, content: str, direction: str, acc_id: str = "", image_urls: list = None): + async def save_chat( + self, + platform: str, + user_id: str, + content: str, + direction: str, + acc_id: str = "", + image_urls: list = None, + msg_type: int = 0, + ): """异步持久化存储聊天记录""" # 将图片URL列表转为\n分隔的字符串 urls_str = "\n".join(image_urls) if image_urls else "" @@ -29,6 +38,7 @@ class DataRepository: direction=direction, platform=platform, acc_id=acc_id, + msg_type=msg_type, image_urls=urls_str ) @@ -42,6 +52,8 @@ class DataRepository: { "role": role, "content": r["message"], + "msg_type": r.get("msg_type", 0), + "image_urls": r.get("image_urls", ""), "timestamp": r.get("timestamp", ""), } ) diff --git a/core/websocket_send_flow.py b/core/websocket_send_flow.py index df3bcbb..f408376 100644 --- a/core/websocket_send_flow.py +++ b/core/websocket_send_flow.py @@ -38,34 +38,36 @@ async def send_message_flow(client, message): """发送消息到服务器。""" if client.websocket and client.websocket.state == websockets.protocol.State.OPEN: try: + payload = message if isinstance(message, dict) else {} msg_json = json.dumps(message, ensure_ascii=False) await client.websocket.send(msg_json) pretty = json.dumps(message, ensure_ascii=False, indent=2) client.logger.info(f"[{client.get_time()}] 发送成功:\n{pretty}") - data = message.get("data", {}) if isinstance(message, dict) else {} client._activity_log( "send_message_success", - trace_id=message.get("_trace_id", "") if isinstance(message, dict) else "", - acc_id=data.get("acc_id", ""), - customer_id=data.get("cy_id", ""), - msg_type=data.get("msg_type", 0), - msg=data.get("msg", ""), + trace_id=payload.get("_trace_id", ""), + acc_id=payload.get("acc_id", ""), + customer_id=payload.get("cy_id") or payload.get("from_id", ""), + msg_type=payload.get("msg_type", 0), + msg=payload.get("msg", ""), ) except Exception as e: client.logger.info(f"[{client.get_time()}] 发送失败: {e}") + payload = message if isinstance(message, dict) else {} client._activity_log( "send_message_error", - trace_id=message.get("_trace_id", ""), - acc_id=message.get("acc_id", ""), - customer_id=message.get("from_id", ""), + trace_id=payload.get("_trace_id", ""), + acc_id=payload.get("acc_id", ""), + customer_id=payload.get("cy_id") or payload.get("from_id", ""), error=str(e), ) else: client.logger.info(f"[{client.get_time()}] 错误: 连接未打开") + payload = message if isinstance(message, dict) else {} client._activity_log( "send_message_skipped", - trace_id=message.get("_trace_id", ""), + trace_id=payload.get("_trace_id", ""), reason="socket_not_open", - acc_id=message.get("acc_id", ""), - customer_id=message.get("from_id", ""), + acc_id=payload.get("acc_id", ""), + customer_id=payload.get("cy_id") or payload.get("from_id", ""), ) diff --git a/db/chat_log_db.py b/db/chat_log_db.py index 1d6d413..a7049f3 100755 --- a/db/chat_log_db.py +++ b/db/chat_log_db.py @@ -388,7 +388,7 @@ def get_conversation(customer_id: str, limit: int = 200, acc_id: str = "") -> Li with _get_conn() as conn: rows = conn.execute(_sql(""" SELECT * FROM ( - SELECT id, direction, message, msg_type, timestamp, acc_id + SELECT id, direction, message, msg_type, timestamp, acc_id, image_urls FROM chat_logs WHERE customer_id = ? ORDER BY timestamp DESC, id DESC