Compare commits

...

31 Commits

Author SHA1 Message Date
ab173d2d0f fix: transfer on delivery handoff requests 2026-03-15 16:05:45 +08:00
311124bc9b fix: transfer on file handoff messages 2026-03-15 15:54:33 +08:00
d191ad8eac fix: alert wecom on brain fallback 2026-03-14 15:33:49 +08:00
87f9e8724d fix: add designer work schedule guidance 2026-03-14 08:37:24 +08:00
1b136d17ad fix: handle designer schedule questions 2026-03-14 08:35:06 +08:00
5b36693c2e feat: alert wecom when no designer is available 2026-03-13 10:42:14 +08:00
5a38fa9e6c docs: add recent update log 2026-03-12 15:52:50 +08:00
71d3f713c9 fix: ignore malformed image urls from card payloads 2026-03-12 15:32:08 +08:00
823f5eac76 fix: transfer when customer asks for payment link 2026-03-12 15:28:05 +08:00
f3e8ea16c6 fix: only greet on first message in a session 2026-03-11 21:22:10 +08:00
3d1d955256 fix: send immediate greeting for each inbound message 2026-03-11 18:49:16 +08:00
8a67c25887 feat: improve first-turn and delayed-image replies 2026-03-11 18:42:18 +08:00
ebca1eaff6 fix: block leaked history summaries in replies 2026-03-11 18:33:17 +08:00
2c003e9a7d fix: clean generated tuhui titles 2026-03-10 15:48:27 +08:00
3f45a4badd fix: randomize tuhui designer alias 2026-03-10 14:35:31 +08:00
c399b8cfc1 fix: anonymize tuhui designer and clean titles 2026-03-10 14:22:23 +08:00
a082364e34 fix: simplify auto process titles and notices 2026-03-10 14:20:26 +08:00
7aa2dff569 fix: normalize tuhui asset urls 2026-03-10 13:40:54 +08:00
64571f4544 chore: switch tuhui defaults to new domain 2026-03-10 13:05:36 +08:00
e0c9f46162 feat: derive tuhui title from image analysis 2026-03-09 16:07:06 +08:00
ba5644371f feat: include processed image url in wecom notice 2026-03-09 15:50:27 +08:00
5fcce98583 fix: normalize animated images before gemini 2026-03-09 14:57:41 +08:00
a2119f3b6d fix: harden outbound leak guard and title naming 2026-03-09 14:34:04 +08:00
d3b55798e5 fix: normalize image formats before gemini 2026-03-09 11:27:14 +08:00
23c2f37a67 fix: use resolved download path for gemini input 2026-03-09 11:04:17 +08:00
bcd162ef22 fix: harden alicdn image downloads 2026-03-09 10:51:12 +08:00
2ab27eb914 fix: streamline gemini flow and add e2e test 2026-03-08 23:58:17 +08:00
82284ce3fb feat: automate image pipeline and simplify gemini flow 2026-03-08 23:42:18 +08:00
3a78eb304a feat: improve routing logs and tuhui integration 2026-03-08 17:34:56 +08:00
39de916b89 fix: retry stalled transfers on follow-up messages 2026-03-08 17:33:51 +08:00
fddd879ba0 fix: harden image handling and update docs 2026-03-08 13:20:18 +08:00
21 changed files with 2279 additions and 628 deletions

View File

@@ -16,7 +16,7 @@ Orchestrator (防抖/去重/冷却/路由)
CustomerServiceBrain (PydanticAI Agent)
├── lookup_chat_history_tool → 查询历史记录
├── transfer_to_human_tool → 转接设计师
└── check_order_status_tool → 订单查询
└── lookup_customer_orders_tool → 订单查询
QianniuAdapter → WebSocket → 回复客户
```
@@ -30,10 +30,13 @@ QianniuAdapter → WebSocket → 回复客户
| 智能接待 | 自动引导发图、问需求、转接设计师 |
| 历史记忆 | AI 可调用工具查询完整聊天历史,避免重复提问 |
| 自动转接 | 收到图片+需求后自动派单给在线设计师 |
| 待转接池 | 设计师不在线时先入队,上线后自动补转接 |
| 转接冷却 | 转接后 120 秒内不再调用 AI直接安抚 |
| 情绪识别 | 客户愤怒/投诉时自动转人工 |
| 消息防抖 | 合并短时间内的多条消息,避免重复回复 |
| 订单静默 | 订单通知/SKU 信息自动入库,不触发 AI |
| 图片兜底 | 识别 `msg_type=1` 图片包;即使无文本也不会被当心跳丢弃 |
| 出站防泄露 | 拦截 `<think>`、工具原文、历史摘要、订单摘要等内部内容 |
| 时段感知 | 根据时间区分"没上班"/"下班了"/"暂时不在" |
| 图片分析 | 后台调用 Gemini 分析图片复杂度 |
| 日报统计 | 每日自动生成客服数据报告 |
@@ -95,6 +98,7 @@ curl http://localhost:6060/api/health
│ ├── chat_log_db.py # 聊天记录SQLite/MySQL
│ ├── customer_db.py # 客户档案
│ ├── image_tasks_db.py # 图片任务
│ ├── pending_transfer_db.py # 待转接队列(本地 SQLite
│ └── task_db/ # 任务模型
├── services/
│ ├── dispatch_service.py # 设计师派单
@@ -125,6 +129,8 @@ curl http://localhost:6060/api/health
| `OPENAI_MODEL` | 对话模型 |
| `DB_TYPE` | 数据库类型(`sqlite` / `mysql` |
| `MYSQL_HOST/PORT/USER/PASSWORD/DATABASE` | MySQL 连接信息 |
| `MYSQL_POOL_SIZE` | MySQL 连接池大小,默认 `10` |
| `MYSQL_POOL_WAIT_TIMEOUT` | 连接池等待超时(秒),默认 `10` |
| `WECHAT_WEBHOOK` | 企业微信通知 Webhook |
| `MESSAGE_DEBOUNCE_SECONDS` | 消息防抖时间(秒) |
| `DISPATCH_BASE_URL` | 派单服务地址 |
@@ -136,11 +142,23 @@ curl http://localhost:6060/api/health
## 消息处理流程
1. **WebSocket 接收** → 千牛原始消息
2. **适配器转换**`StandardMessage`(统一格式)
3. **Orchestrator 过滤** → 订单/SKU 静默入库、心跳过滤、商家回复入库
4. **防抖合并** → 2 秒窗口内多条消息合并为一条
5. **冷却检查**转接后 120 秒内直接安抚,不调 AI
6. **AI 思考**PydanticAI Agent 调用工具、生成回复
7. **转接截获**工具返回转接指令时直接发送,不经 AI 二次加工
8. **乱码清理**过滤 `<think>`、内部标记等泄露内容
9. **发送回复**通过 WebSocket 回复客户,同时入库
2. **适配器转换**`StandardMessage`(统一格式,识别 `msg_type`、递归提取图片 URL
3. **图片兜底** → 纯图片包即使无文本/无直出 URL也会标记为“已收到图片消息”
4. **Orchestrator 过滤** → 订单/SKU 静默入库、心跳过滤、商家回复入库
5. **防抖合并**2 秒窗口内多条消息合并为一条
6. **冷却检查**转接后 120 秒内直接安抚,不调 AI
7. **AI 思考**PydanticAI Agent 调用工具、生成回复
8. **转接截获**工具返回转接指令时直接发送,不经 AI 二次加工
9. **待转接入池**设计师不在线时记录原因,进入待转接队列,稍后自动补转
10. **出站安全过滤** → 过滤 `<think>`、历史摘要、订单摘要、工具原文、时间戳转述历史
11. **发送回复** → 通过 WebSocket 回复客户,同时入库
---
## 运行注意
- `db/pending_transfer.db` 是待转接池的本地 SQLite 数据文件,用于暂存“设计师不在线”的转接请求。
- `db/chat_log_db/chats.db` 是 SQLite 模式下的本地聊天库;当 `DB_TYPE=mysql` 时,线上主库仍然是 MySQL。
- 上面两个 `.db` 文件都属于运行时数据,不建议提交到 Git。
- 当前待转接池是单机本地队列;如果未来部署成多台应用机,需要改成共享存储,否则无法跨机器补转接。
- 出站消息在 Brain、Orchestrator、QianniuAdapter 三层都会做防泄露清洗;如果线上仍出现历史摘要外发,优先确认服务是否已经重启到最新代码。

View File

@@ -0,0 +1,81 @@
# 最近更新记录
统计范围2026-03-09 到 2026-03-12
来源:`git log` 实际提交记录
说明:以下为 `tw` 仓库这几天已经提交的代码更新,不依赖 MySQL 数据判断。
## 2026-03-09
### 图像下载与 Gemini 处理链路
- `bcd162ef` `fix: harden alicdn image downloads`
- 加强阿里 CDN 图片下载稳定性,减少下载失败。
- `23c2f37a` `fix: use resolved download path for gemini input`
- Gemini 使用实际解析后的下载路径,避免路径不一致。
- `d3b55798` `fix: normalize image formats before gemini`
- 在送给 Gemini 前统一图片格式,减少格式兼容问题。
- `5fcce985` `fix: normalize animated images before gemini`
- 动图类素材先规范化,再进入 Gemini 处理。
### 图绘标题与通知
- `a2119f3b` `fix: harden outbound leak guard and title naming`
- 强化外发泄露拦截,并优化标题命名。
- `ba564437` `feat: include processed image url in wecom notice`
- 企微通知里补充处理后图片地址。
- `e0c9f461` `feat: derive tuhui title from image analysis`
- 根据图片分析结果自动生成图绘标题。
## 2026-03-10
### 图绘新域名切换
- `64571f45` `chore: switch tuhui defaults to new domain`
- 默认图绘域名切到 `https://aidg168.uk`
- `7aa2dff5` `fix: normalize tuhui asset urls`
- 图绘返回的素材地址统一改写为新域名。
### 标题与设计师信息清理
- `a082364e` `fix: simplify auto process titles and notices`
- 精简自动处理标题和通知文案。
- `c399b8cf` `fix: anonymize tuhui designer and clean titles`
- 图绘设计师名称匿名化,并继续清理标题。
- `3f45a4ba` `fix: randomize tuhui designer alias`
- 图绘设计师改为随机匿名别名,不再暴露真实店铺设计师名。
- `2c003e9a` `fix: clean generated tuhui titles`
- 清理 URL 式、机器式、脏标题,生成更正常的标题。
## 2026-03-11
### AI 历史记录与回复安全
- `ebca1eaf` `fix: block leaked history summaries in replies`
- 修复历史摘要、详细记录被误发给客户的问题。
### 首轮对话与延迟消息承接
- `8a67c258` `feat: improve first-turn and delayed-image replies`
- 优化首轮回复和“客户说已经发过图”的承接方式。
- `3d1d9552` `fix: send immediate greeting for each inbound message`
- 临时改成每条入站消息先回 `在的 亲`
- `f3e8ea16` `fix: only greet on first message in a session`
- 修正上一版过度触发的问题,改为同一轮会话只在第一句先回一次 `在的 亲`
## 2026-03-12
### 成交信号与转接
- `823f5eac` `fix: transfer when customer asks for payment link`
- 客户明确说“发付款链接 / 支付链接 / 拍单链接 / 下单链接”时,作为强成交信号优先触发转接。
### 脏图片链接过滤
- `71d3f713` `fix: ignore malformed image urls from card payloads`
- 过滤卡片消息、退款消息、JSON 残片里的伪图片链接,避免误送 `ImageAnalyzer` 导致 `400 Bad Request`
## 备注
- 当前文档只记录“已经提交进仓库”的更新。
- 未提交的本地修改不在本记录中。
- 如果后面还要继续追加,可以直接在这个文件后面按日期补充。

View File

@@ -2,7 +2,7 @@ import re
import logging
import json
from pathlib import Path
from typing import List, Tuple
from typing import List, Tuple, Any
from core.adapters.base import BaseAdapter
from core.schema import StandardMessage, StandardResponse
@@ -78,6 +78,10 @@ class QianniuAdapter(BaseAdapter):
acc_id = str(raw.get("acc_id") or raw.get("shop_id") or "")
from_id = str(raw.get("from_id") or raw.get("cy_id") or "")
msg_text = str(raw.get("msg") or raw.get("content") or "")
raw_msg_type = self._safe_int(raw.get("msg_type"), 0)
image_urls = self._extract_inbound_image_urls(raw, msg_text)
if raw_msg_type == 1 and not msg_text.strip():
msg_text = "【系统:已收到图片消息】"
# 判断方向:如果 from_id 包含了店铺名或 acc_id通常说明是商家自己在说话
# 或者逆向接口通常有一个特定的标识,这里我们做一个通用的逻辑判断
@@ -96,7 +100,8 @@ class QianniuAdapter(BaseAdapter):
user_id=user_id,
user_name=str(raw.get("from_name", "")),
content=msg_text,
image_urls=self._extract_urls(msg_text),
msg_type=raw_msg_type,
image_urls=image_urls,
acc_id=acc_id,
acc_type=str(raw.get("acc_type") or "AliWorkbench"),
raw_data=raw
@@ -131,7 +136,73 @@ class QianniuAdapter(BaseAdapter):
logger.error(f"[QianniuAdapter] 发送失败: {e}")
def _extract_urls(self, text: str) -> List[str]:
if not text: return []
if not text:
return []
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
candidates = re.findall(r'https?://[^\s#]+', text)
return [u for u in candidates if any(ext in u.lower() for ext in image_exts)]
candidates = re.findall(r'https?://[^\s#,"\'}\]]+', text)
urls: List[str] = []
seen = set()
for candidate in candidates:
url = str(candidate or "").strip().rstrip('\'".,;:!?)')
lower = url.lower()
if not any(ext in lower for ext in image_exts):
continue
# 过滤被卡片/JSON 串污染的伪图片链接
if any(marker in lower for marker in ("%22title%22", "%22topic%22", '"title":', '"topic":', "%7d")):
continue
if url in seen:
continue
seen.add(url)
urls.append(url)
return urls
@staticmethod
def _safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except Exception:
return default
def _extract_inbound_image_urls(self, raw: dict, msg_text: str) -> List[str]:
urls = []
seen = set()
def add_url(url: str):
if not url:
return
s = str(url).strip()
if not s or s in seen:
return
if self._extract_urls(s):
seen.add(s)
urls.append(s)
for url in self._extract_urls(msg_text):
add_url(url)
for url in self._find_image_urls_in_obj(raw):
add_url(url)
return urls
def _find_image_urls_in_obj(self, obj: Any) -> List[str]:
found: List[str] = []
def walk(val: Any):
if val is None:
return
if isinstance(val, str):
found.extend(self._extract_urls(val))
return
if isinstance(val, dict):
for item in val.values():
walk(item)
return
if isinstance(val, (list, tuple, set)):
for item in val:
walk(item)
walk(obj)
return found

View File

@@ -1,5 +1,6 @@
import logging
import asyncio
import re
from datetime import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
@@ -10,6 +11,41 @@ from db.chat_log_db import get_conversation, get_customer_orders
logger = logging.getLogger("cs_agent")
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
_HISTORY_NOISE_PREFIXES = (
"[系统订单信息]",
"[进店卡片]",
"【系统:已收到",
"金额:",
"定制:",
)
def _is_plain_transfer_command(text: str) -> bool:
return bool(_TRANSFER_COMMAND_RE.fullmatch(str(text or "").strip()))
def _normalize_history_message(message: str, role: str) -> str:
text = str(message or "").strip()
if not text:
return ""
if _is_plain_transfer_command(text):
return "已转接设计师"
if role == "客服" and "[转移会话]" in text:
return "已尝试转接设计师"
return text
def _extract_need_snippet(message: str) -> str:
text = str(message or "").strip()
if not text:
return ""
if any(text.startswith(prefix) for prefix in _HISTORY_NOISE_PREFIXES):
return ""
if "http://" in text or "https://" in text:
return ""
return text[:60]
class TransferSuccessException(Exception):
"""转接成功后抛出此异常,用于提前终止 AI 处理流程"""
@@ -117,14 +153,18 @@ async def lookup_chat_history_tool(
for r in rows:
role = "客户" if r["direction"] == "in" else "客服"
ts = str(r.get("timestamp", ""))
msg = r.get("message", "")
msg = _normalize_history_message(r.get("message", ""), role)
line = f"[{ts}] {role}{msg}"
lines.append(line)
if r["direction"] == "in":
if "已收到" in msg and "" in msg:
msg_type = int(r.get("msg_type") or 0)
raw_message = str(r.get("message", "") or "")
image_urls = str(r.get("image_urls", "") or "").strip()
if msg_type == 1 or image_urls or ("已收到" in msg and "" in msg):
has_images = True
if any(k in msg for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印"]):
customer_needs.append(msg[:60])
need_text = _extract_need_snippet(raw_message)
if need_text and any(k in need_text for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印", "大图", "素材"]):
customer_needs.append(need_text)
summary_parts = [f"{len(rows)}条历史消息。"]
if has_images:

View File

@@ -3,6 +3,7 @@ import asyncio
import re
import time
import json
from datetime import datetime
from typing import Optional, List, Any, Dict
from collections import deque
from core.schema import StandardMessage, StandardResponse
@@ -17,6 +18,7 @@ from db.pending_transfer_db import (
retry_pending_transfer,
)
from services.dispatch_service import dispatch_service
from services.service_auto_image_pipeline import auto_image_pipeline_service
logger = logging.getLogger("cs_agent")
@@ -24,8 +26,11 @@ logger = logging.getLogger("cs_agent")
MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量
TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI
DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒)
FIRST_GREETING_IDLE_SEC = 180 # 超过该空闲时间,视为新一轮对话
PENDING_TRANSFER_POLL_SECONDS = 30
PENDING_TRANSFER_RETRY_SECONDS = 60
TRANSFER_RETRY_WINDOW_SEC = 300
TRANSFER_RETRY_GAP_SEC = 45
# 转接后安抚话术池(轮换使用,避免复读)
_TRANSFER_CALM_REPLIES = [
@@ -46,6 +51,9 @@ _OUTBOUND_BLOCK_MARKERS = (
'[{"name":',
)
_TRANSFER_COMMAND_MARKER = "[转移会话]"
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
# 历史记录格式检测模式AI 转述历史时容易泄露)
_HISTORY_LEAK_PATTERNS = [
r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]', # [2026-03-07 12:00:00] 客户:
@@ -79,7 +87,10 @@ class SystemOrchestrator:
self._debounce_tasks: Dict[str, asyncio.Task] = {}
self._pending_messages: Dict[str, List[StandardMessage]] = {}
self._user_locks: Dict[str, asyncio.Lock] = {}
self._last_inbound_seen_time: Dict[str, float] = {}
self._pending_transfer_task: Optional[asyncio.Task] = None
self._last_retry_transfer_time: Dict[str, float] = {}
self._auto_pipeline_jobs: Dict[str, float] = {}
bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event)
@@ -107,13 +118,109 @@ class SystemOrchestrator:
self._pending_transfer_task = asyncio.create_task(self._process_pending_transfers_loop())
logger.info("[Orchestrator] 待转接轮询任务已启动")
@staticmethod
def _parse_history_ts(ts: Any) -> Optional[datetime]:
text = str(ts or "").strip()
if not text:
return None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"):
try:
return datetime.strptime(text, fmt)
except ValueError:
continue
return None
def _find_stalled_transfer(self, history: List[dict]) -> Optional[dict]:
if not history:
return None
last_transfer_idx = -1
for idx in range(len(history) - 1, -1, -1):
item = history[idx]
if item.get("role") == "assistant" and _TRANSFER_COMMAND_MARKER in str(item.get("content") or ""):
last_transfer_idx = idx
break
if last_transfer_idx < 0:
return None
transfer_item = history[last_transfer_idx]
transfer_at = self._parse_history_ts(transfer_item.get("timestamp"))
if not transfer_at:
return None
elapsed = time.time() - transfer_at.timestamp()
if elapsed < 0 or elapsed > TRANSFER_RETRY_WINDOW_SEC:
return None
after_transfer = history[last_transfer_idx + 1:]
if not any(item.get("role") == "user" for item in after_transfer):
return None
for item in after_transfer:
if item.get("role") != "assistant":
continue
content = str(item.get("content") or "")
if _TRANSFER_COMMAND_MARKER not in content:
return None
return {
"timestamp": transfer_at,
"elapsed": elapsed,
"content": str(transfer_item.get("content") or ""),
}
async def _retry_stalled_transfer_if_needed(
self,
session_key: str,
user_id: str,
platform: str,
acc_id: str,
acc_type: str,
history: List[dict],
) -> Optional[StandardResponse]:
stalled = self._find_stalled_transfer(history)
if not stalled:
return None
last_retry_at = self._last_retry_transfer_time.get(session_key, 0.0)
if time.time() - last_retry_at < TRANSFER_RETRY_GAP_SEC:
logger.info(
f"[Orchestrator] 转接补发冷却中,先不重复补转: user={user_id} acc={acc_id}"
)
return None
logger.info(
f"[Orchestrator] 检测到疑似转接未接上,准备补发转接: "
f"user={user_id} acc={acc_id} elapsed={stalled['elapsed']:.0f}s"
)
designer_name = await dispatch_service.assign_designer(user_id=user_id)
if not designer_name:
logger.info(f"[Orchestrator] 补发转接失败,当前仍无可用设计师: user={user_id} acc={acc_id}")
return None
self._last_retry_transfer_time[session_key] = time.time()
return StandardResponse(
reply_content=f"正在为您转接|[转移会话],{designer_name},无原因",
need_transfer=True,
metadata={
"acc_id": acc_id,
"acc_type": acc_type,
"transfer_prelude": "我再帮您转一下哈",
"retry_transfer": True,
},
)
@staticmethod
def _sanitize_outbound_text(text: str) -> str:
if not text:
return ""
cleaned = str(text).strip()
if "[转移会话]" in cleaned:
if _TRANSFER_COMMAND_RE.fullmatch(cleaned):
return cleaned
if _TRANSFER_COMMAND_MARKER in cleaned:
logger.warning("[Orchestrator] 检测到混入正文的转接指令,替换为安全兜底回复")
return "我在帮你看记录,稍等哈"
if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS):
logger.warning("[Orchestrator] 拦截到内部内容外发,替换为安全兜底回复")
return "我在帮你看记录,稍等哈"
@@ -124,6 +231,129 @@ class SystemOrchestrator:
return "我在帮你看记录,稍等哈"
return cleaned
@staticmethod
def _sanitize_history_content_for_ai(text: str) -> str:
cleaned = str(text or "").strip()
if not cleaned:
return ""
if _TRANSFER_COMMAND_RE.fullmatch(cleaned):
return "系统:之前已转接设计师"
if "【历史记录摘要】" in cleaned or "【详细记录】" in cleaned:
return "系统:刚刚查过历史记录"
if "【订单摘要】" in cleaned or "【订单详情】" in cleaned:
return "系统:刚刚查过订单记录"
if _TRANSFER_COMMAND_MARKER in cleaned:
cleaned = re.sub(
r"正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*",
"系统:之前已转接设计师",
cleaned,
)
return cleaned
def _sanitize_history_for_ai(self, history: List[dict]) -> List[dict]:
sanitized = []
for item in history or []:
normalized = dict(item)
normalized["content"] = self._sanitize_history_content_for_ai(item.get("content", ""))
sanitized.append(normalized)
return sanitized
@staticmethod
def _extract_designer_name(transfer_cmd: str) -> str:
text = str(transfer_cmd or "").strip()
match = re.search(r"\[转移会话\],([^,]+),", text)
return str(match.group(1)).strip() if match else ""
@staticmethod
def _infer_processing_intent(requirement_text: str, history: Optional[List[dict]] = None) -> str:
combined_parts = [str(requirement_text or "").lower()]
for item in history or []:
if item.get("role") == "user":
combined_parts.append(str(item.get("content") or "").lower())
combined = "\n".join(combined_parts)
repair_keywords = ("修复", "高清", "清晰", "放大", "老照片")
if any(k in combined for k in repair_keywords):
return "repair"
return "find_original"
@staticmethod
def _collect_recent_image_urls(history: List[dict], fallback_urls: Optional[List[str]] = None) -> List[str]:
urls: List[str] = []
seen = set()
def add_url(url: str):
value = str(url or "").strip()
if not value or value in seen:
return
seen.add(value)
urls.append(value)
for url in fallback_urls or []:
add_url(url)
for item in reversed(history or []):
if item.get("role") != "user":
continue
raw_urls = item.get("image_urls") or []
if isinstance(raw_urls, str):
for part in re.split(r"[\n#]+", raw_urls):
add_url(part)
elif isinstance(raw_urls, list):
for part in raw_urls:
add_url(part)
content = str(item.get("content") or "")
for match in re.findall(r"https?://[^\s#]+", content):
add_url(match)
if len(urls) >= 5:
break
return urls
def _schedule_auto_pipeline(
self,
*,
session_key: str,
customer_id: str,
acc_id: str,
designer_name: str,
requirement_text: str,
history: List[dict],
image_urls: Optional[List[str]] = None,
):
resolved_urls = self._collect_recent_image_urls(history, image_urls)
if not resolved_urls:
logger.info(f"[Orchestrator] 自动处理跳过:未找到客户图片 user={customer_id} acc={acc_id}")
return
intent = self._infer_processing_intent(requirement_text, history)
signature_src = f"{session_key}|{designer_name}|{intent}|{'|'.join(resolved_urls)}"
signature = str(abs(hash(signature_src)))
now = time.time()
last_run = self._auto_pipeline_jobs.get(signature, 0.0)
if now - last_run < 600:
logger.info(f"[Orchestrator] 自动处理已在近期触发,跳过重复任务 user={customer_id} acc={acc_id}")
return
self._auto_pipeline_jobs[signature] = now
async def _runner():
try:
result = await auto_image_pipeline_service.process_and_notify(
session_key=session_key,
customer_id=customer_id,
acc_id=acc_id,
designer_name=designer_name,
requirement_text=requirement_text,
image_urls=resolved_urls,
intent=intent,
)
logger.info(
f"[Orchestrator] 自动处理完成 user={customer_id} acc={acc_id} "
f"ok={result.get('success')} uploaded={len(result.get('uploaded') or [])}"
)
except Exception as e:
logger.warning(f"[Orchestrator] 自动处理失败 user={customer_id} acc={acc_id}: {e}")
asyncio.create_task(_runner())
async def on_raw_message_received(self, platform: str, raw_data: dict):
"""链路入口"""
try:
@@ -148,7 +378,14 @@ class SystemOrchestrator:
if is_order or is_price_only or is_sku_only or is_sku_amount:
await self._handle_order_packet(platform, std_msg)
logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态")
await repo.save_chat(platform, user_id, msg_text, "in", acc_id=std_msg.acc_id)
await repo.save_chat(
platform,
user_id,
msg_text,
"in",
acc_id=std_msg.acc_id,
msg_type=std_msg.msg_type,
)
return
preview = (std_msg.content or "").replace("\n", "\\n")
@@ -159,12 +396,20 @@ class SystemOrchestrator:
f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}"
)
# 过滤心跳
if not (std_msg.content or "").strip() and not std_msg.image_urls: return
# 过滤心跳;图片消息即使暂时没拿到 URL也不能直接丢掉
if std_msg.msg_type != 1 and not (std_msg.content or "").strip() and not std_msg.image_urls:
return
# 如果是商家人工回复,静默入库
if direction == "out":
await repo.save_chat(platform, user_id, std_msg.content, "out", acc_id=std_msg.acc_id)
await repo.save_chat(
platform,
user_id,
std_msg.content,
"out",
acc_id=std_msg.acc_id,
msg_type=std_msg.msg_type,
)
return
# ID 去重
@@ -172,7 +417,26 @@ class SystemOrchestrator:
if std_msg.msg_id in self._processed_msg_ids: return
self._processed_msg_ids.append(std_msg.msg_id)
# 进入防抖(使用 session_key 隔离不同店铺)
# 同一轮对话只在第一句先发固定承接,不经过 AI
now_ts = time.time()
last_inbound_ts = self._last_inbound_seen_time.get(session_key, 0.0)
self._last_inbound_seen_time[session_key] = now_ts
if now_ts - last_inbound_ts >= FIRST_GREETING_IDLE_SEC:
first_greet = StandardResponse(
reply_content="在的 亲",
metadata={"acc_id": std_msg.acc_id, "acc_type": std_msg.acc_type},
)
await self.qianniu_adapter.translate_outbound(first_greet, user_id)
await repo.save_chat(
platform,
user_id,
first_greet.reply_content,
"out",
acc_id=std_msg.acc_id,
msg_type=first_greet.msg_type,
)
# 进入防抖(使用 session_key 隔离不同店铺)
if session_key in self._debounce_tasks: self._debounce_tasks[session_key].cancel()
if session_key not in self._pending_messages: self._pending_messages[session_key] = []
self._pending_messages[session_key].append(std_msg)
@@ -225,7 +489,7 @@ class SystemOrchestrator:
except Exception as e:
logger.warning(f"[Orchestrator] 订单消息处理异常: {e}")
async def _analyze_images_background(self, session_key: str, image_urls: List[str]):
async def _analyze_images_background(self, session_key: str, image_urls: List[str], requirement_text: str = ""):
"""后台静默分析图片,存入用户数据库用于数据标定"""
try:
from services.service_image_analyzer import image_analyzer_service
@@ -234,9 +498,9 @@ class SystemOrchestrator:
db = CustomerDatabase()
profile = db.get_customer(session_key)
for url in image_urls:
for url in (image_urls or [])[:1]:
try:
result = await image_analyzer_service.analyze(url)
result = await image_analyzer_service.analyze(url, customer_requirement=requirement_text)
result_json = json.dumps(result, ensure_ascii=False)
# 更新最近一次分析
@@ -326,20 +590,45 @@ class SystemOrchestrator:
db_start = time.time()
db_content = combined_content
if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}"
await repo.save_chat(platform, user_id, db_content, "in", acc_id=acc_id, image_urls=all_image_urls)
await repo.save_chat(
platform,
user_id,
db_content,
"in",
acc_id=acc_id,
image_urls=all_image_urls,
msg_type=final_msg.msg_type,
)
db_elapsed = time.time() - db_start
logger.info(f"[计时] user={user_id} 消息入库: {db_elapsed:.2f}s")
# B2. 后台图片分析(不阻塞主流程,用于数据标定)
if all_image_urls:
asyncio.create_task(self._analyze_images_background(session_key, all_image_urls))
asyncio.create_task(self._analyze_images_background(session_key, all_image_urls, combined_content))
# C. 冷却检查转接成功后冷却期内直接回安抚话术不调AI
history_start = time.time()
history = await repo.get_chat_history(user_id, limit=12, acc_id=acc_id)
history_elapsed = time.time() - history_start
logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)")
ai_history = history[:-1] if history and history[-1].get("content") == db_content else history
ai_history = self._sanitize_history_for_ai(ai_history)
# C. 短时间追问且疑似没真正接上人工:优先补发一次转接
std_res = await self._retry_stalled_transfer_if_needed(
session_key=session_key,
user_id=user_id,
platform=platform,
acc_id=acc_id,
acc_type=acc_type,
history=history,
)
# D. 冷却检查转接成功后冷却期内直接回安抚话术不调AI
last_transfer = self._last_transfer_time.get(session_key, 0)
cooldown_elapsed = time.time() - last_transfer
is_in_cooldown = cooldown_elapsed < TRANSFER_COOLDOWN_SEC
if is_in_cooldown:
if std_res is None and is_in_cooldown:
idx = self._transfer_calm_idx.get(session_key, 0)
calm_reply = _TRANSFER_CALM_REPLIES[idx % len(_TRANSFER_CALM_REPLIES)]
self._transfer_calm_idx[session_key] = idx + 1
@@ -348,21 +637,16 @@ class SystemOrchestrator:
reply_content=calm_reply,
metadata={"acc_id": acc_id, "acc_type": acc_type}
)
else:
# D. 正常流程调用AI思考
history_start = time.time()
history = await repo.get_chat_history(user_id, limit=10, acc_id=acc_id)
if history and history[-1].get('content') == db_content: history = history[:-1]
history_elapsed = time.time() - history_start
logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)")
if std_res is None:
# E. 正常流程调用AI思考
ai_start = time.time()
std_res = await self.brain.think_and_reply(final_msg, history=history)
std_res = await self.brain.think_and_reply(final_msg, history=ai_history)
ai_elapsed = time.time() - ai_start
total_elapsed = time.time() - process_start
logger.info(f"[计时] user={user_id} AI思考: {ai_elapsed:.1f}s | 总耗时: {total_elapsed:.1f}s")
# E. 发送并记录时间
# F. 发送并记录时间
if std_res.should_reply:
std_res.reply_content = self._sanitize_outbound_text(std_res.reply_content)
meta = dict(std_res.metadata or {})
@@ -371,16 +655,32 @@ class SystemOrchestrator:
# 转接场景:先发一句安抚话,再发转接指令
if "[转移会话]" in std_res.reply_content:
designer_name = self._extract_designer_name(std_res.reply_content)
transfer_prelude = str(std_res.metadata.get("transfer_prelude") or "").strip()
greet = StandardResponse(
reply_content="收到,我叫设计师来看下哈",
reply_content=transfer_prelude or "收到,我叫设计师来看下哈",
metadata={"acc_id": acc_id, "acc_type": acc_type}
)
await self.qianniu_adapter.translate_outbound(greet, user_id)
await repo.save_chat(platform, user_id, greet.reply_content, "out", acc_id=acc_id)
await repo.save_chat(
platform,
user_id,
greet.reply_content,
"out",
acc_id=acc_id,
msg_type=greet.msg_type,
)
await asyncio.sleep(0.5)
await self.qianniu_adapter.translate_outbound(std_res, user_id)
await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id)
await repo.save_chat(
platform,
user_id,
std_res.reply_content,
"out",
acc_id=acc_id,
msg_type=std_res.msg_type,
)
if std_res.metadata.get("pending_transfer"):
reason = str(std_res.metadata.get("pending_transfer_reason") or "").strip()
@@ -399,6 +699,15 @@ class SystemOrchestrator:
if "[转移会话]" in std_res.reply_content:
self._last_transfer_time[session_key] = time.time()
self._schedule_auto_pipeline(
session_key=session_key,
customer_id=user_id,
acc_id=acc_id,
designer_name=self._extract_designer_name(std_res.reply_content),
requirement_text=combined_content,
history=history,
image_urls=all_image_urls,
)
except asyncio.CancelledError: pass
except Exception as e: logger.exception(f"[Orchestrator] 处理失败: {e}")
@@ -450,12 +759,35 @@ class SystemOrchestrator:
)
await self.qianniu_adapter.translate_outbound(notify, customer_id)
await repo.save_chat("qianniu", customer_id, notify.reply_content, "out", acc_id=acc_id)
await repo.save_chat(
"qianniu",
customer_id,
notify.reply_content,
"out",
acc_id=acc_id,
msg_type=notify.msg_type,
)
await asyncio.sleep(0.5)
await self.qianniu_adapter.translate_outbound(transfer, customer_id)
await repo.save_chat("qianniu", customer_id, transfer.reply_content, "out", acc_id=acc_id)
await repo.save_chat(
"qianniu",
customer_id,
transfer.reply_content,
"out",
acc_id=acc_id,
msg_type=transfer.msg_type,
)
self._last_transfer_time[f"{customer_id}@{acc_id}"] = time.time()
history = await repo.get_chat_history(customer_id, limit=12, acc_id=acc_id)
self._schedule_auto_pipeline(
session_key=f"{customer_id}@{acc_id}",
customer_id=customer_id,
acc_id=acc_id,
designer_name=designer_name,
requirement_text=reason,
history=history,
)
await asyncio.to_thread(complete_pending_transfer, row_id)
logger.info(
f"[Orchestrator] 待转接自动完成: pending_id={row_id} user={customer_id} designer={designer_name} reason={reason}"

View File

@@ -10,6 +10,7 @@ from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
from core.schema import StandardMessage, StandardResponse
from core.agent_tools import register_agent_tools, TransferSuccessException
from services.service_wecom_bot import wecom_bot_service
logger = logging.getLogger("cs_agent")
@@ -26,6 +27,7 @@ _INTERNAL_TOOL_MARKERS = (
"【订单摘要】",
"【订单详情】",
)
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
# 历史记录格式检测模式AI 转述历史时容易泄露)
_HISTORY_LEAK_PATTERNS = [
@@ -39,6 +41,109 @@ _HISTORY_LEAK_PATTERNS = [
r'(状态|金额|数量)[:].*(状态|金额|数量)[:]', # 状态:xxx 金额:xxx 连续出现
]
_FIND_ORIGINAL_INTENT_KEYWORDS = (
"找图",
"找原图",
"原图",
"素材",
"大图",
"源图",
)
_FIND_ORIGINAL_QUESTION_KEYWORDS = (
"有吗",
"有没",
"有没有",
"能找吗",
"找得到吗",
"能不能找到",
"能找到吗",
)
_REPAIR_INTENT_KEYWORDS = (
"修复",
"高清修复",
"高清",
"清晰",
"清楚",
"变清晰",
"修清楚",
"放大清晰",
)
_IMAGE_ALREADY_SENT_HINT_KEYWORDS = (
"上面不是发了吗",
"上面不是有吗",
"我不是发了吗",
"前面不是发了吗",
"前面发了",
"上面发了",
"我发过了",
"不是发了吗",
"都发了",
"你没看到吗",
"聊天记录里有",
"上面有图",
)
_PAYMENT_LINK_REQUEST_KEYWORDS = (
"付款链接",
"支付链接",
"拍单链接",
"下单链接",
"付款吧",
"发我链接",
"发个链接",
"发下链接",
"发一下链接",
"给我链接",
)
_FILE_HANDOFF_TRANSFER_KEYWORDS = (
"发送文件了看到了吗",
"发文件了看到了吗",
"文件发了看到了吗",
"文件收到了吗",
"文件收到没",
"文件看到了吗",
"我把文件发过去了",
"文件发过去了",
"给你发文件了",
"源文件发过去了",
"文件发你了",
)
_DELIVERY_HANDOFF_HINT_KEYWORDS = (
"发给我吧原图",
"原图发给我",
"把原图发给我",
"发我吧原图",
"把文件发给我",
"把成品发给我",
"做好了发给我",
"做好了直接发我",
"做完了发给我",
"发过来吧",
)
_DESIGNER_SCHEDULE_QUESTION_KEYWORDS = (
"几点上班",
"什么时候上班",
"大概什么时候上班",
"一般几点上班",
"明天几点上班",
"设计师几点上班",
"设计师什么时候上班",
"几点在线",
"什么时候在线",
"设计师在吗",
"上班了没",
"设计师上班了吗",
)
DESIGNER_WORK_START_HOUR = 9
DESIGNER_WORK_END_HOUR = 12
def _clip(text: str, limit: int = 1200) -> str:
if text is None:
@@ -49,6 +154,46 @@ def _clip(text: str, limit: int = 1200) -> str:
return f"{text[:limit]}...(截断, 共{len(text)}字)"
async def _notify_brain_fallback(
msg: StandardMessage,
error: Exception,
history_messages: Optional[List[Dict[str, Any]]] = None,
) -> None:
history_messages = history_messages or []
recent_lines: List[str] = []
for item in history_messages[-3:]:
role = str(item.get("role") or "").strip() or "unknown"
content = _clip(str(item.get("content") or "").replace("\r", " ").replace("\n", " "), 80)
if content:
recent_lines.append(f"{role}: {content}")
current_input = _clip(str(msg.content or "").replace("\r", " ").replace("\n", " "), 200)
image_count = len(getattr(msg, "image_urls", None) or [])
lines = [
"【AI兜底告警】",
f"店铺:{msg.acc_id or '-'}",
f"客户:{msg.user_id or '-'}",
f"消息类型:{getattr(msg, 'msg_type', '-')}",
f"图片数:{image_count}",
f"当前消息:{current_input or '-'}",
f"错误:{_clip(str(error), 300)}",
]
if recent_lines:
lines.append("最近上下文:")
lines.extend(recent_lines)
try:
ok = await wecom_bot_service.send_text("\n".join(lines))
if ok:
logger.info(f"[Brain Fallback Alert] 已发送企业微信告警 user={msg.user_id} acc={msg.acc_id}")
else:
logger.warning(f"[Brain Fallback Alert] 企业微信告警发送失败 user={msg.user_id} acc={msg.acc_id}")
except Exception as notify_err:
logger.warning(
f"[Brain Fallback Alert] 企业微信告警异常 user={msg.user_id} acc={msg.acc_id}: {notify_err}"
)
def _fmt_time(ts: Any) -> str:
s = str(ts or "").strip()
if not s:
@@ -79,6 +224,13 @@ def _sanitize_reply_text(reply_text: str) -> str:
text = re.sub(r'[\[\]]{2,}', '', text)
text = text.strip()
if _TRANSFER_COMMAND_RE.fullmatch(text):
return text
if "[转移会话]" in text:
logger.warning("[Brain] 拦截到混入正文的转接指令,降级为安全兜底回复")
return "我在帮你看记录,稍等哈"
# 检查固定标记
if any(marker in text for marker in _INTERNAL_TOOL_MARKERS):
logger.warning("[Brain] 拦截到工具原文泄露,降级为安全兜底回复")
@@ -93,6 +245,109 @@ def _sanitize_reply_text(reply_text: str) -> str:
return text.strip()
def _normalize_text(text: Any) -> str:
return str(text or "").strip().lower()
def _infer_image_intent(current_text: str, history: Optional[List[dict]] = None) -> str:
text = _normalize_text(current_text)
recent_user_text = "\n".join(
_normalize_text(h.get("content", ""))
for h in (history or [])[-6:]
if h.get("role") == "user"
)
combined = f"{recent_user_text}\n{text}"
if any(k in combined for k in _REPAIR_INTENT_KEYWORDS):
return "repair"
if any(k in combined for k in _FIND_ORIGINAL_INTENT_KEYWORDS):
return "find_original"
if any(k in text for k in _FIND_ORIGINAL_QUESTION_KEYWORDS):
return "find_original"
return ""
def _history_has_customer_image(history: Optional[List[dict]] = None) -> bool:
for item in history or []:
if item.get("role") != "user":
continue
msg_type = int(item.get("msg_type") or 0)
image_urls = item.get("image_urls") or []
if isinstance(image_urls, str):
image_urls = [part for part in image_urls.splitlines() if part.strip()]
content = str(item.get("content") or "")
if msg_type == 1 or image_urls or ("已收到" in content and "" in content):
return True
return False
def _customer_claims_image_already_sent(current_text: str, history: Optional[List[dict]] = None) -> bool:
text = _normalize_text(current_text)
if not text or not _history_has_customer_image(history):
return False
return any(keyword in text for keyword in _IMAGE_ALREADY_SENT_HINT_KEYWORDS)
def _requests_payment_link(current_text: str, history: Optional[List[dict]] = None) -> bool:
text = _normalize_text(current_text)
if not text:
return False
if any(keyword in text for keyword in _PAYMENT_LINK_REQUEST_KEYWORDS):
return True
return ("付款" in text or "支付" in text or "拍单" in text or "下单" in text) and "链接" in text
def _history_has_transfer_or_order(history: Optional[List[dict]] = None) -> bool:
for item in history or []:
content = str(item.get("content") or "")
if not content:
continue
if "[转移会话]" in content or "设计师上线了" in content:
return True
if "[系统订单信息]" in content or "订单状态:" in content or "订单号:" in content:
return True
return False
def _requests_file_handoff_transfer(current_text: str, history: Optional[List[dict]] = None) -> bool:
text = _normalize_text(current_text)
if not text:
return False
if any(keyword in text for keyword in _FILE_HANDOFF_TRANSFER_KEYWORDS):
return True
if any(keyword in text for keyword in _DELIVERY_HANDOFF_HINT_KEYWORDS):
return True
has_file_signal = "文件" in text or "源文件" in text
has_delivery_signal = any(token in text for token in ("发了", "发送了", "发过去", "发你", "给你发"))
has_receipt_signal = any(token in text for token in ("看到了吗", "收到了吗", "收到没", "看见了吗", "收到吗"))
if has_file_signal and (has_delivery_signal or has_receipt_signal):
return True
if _history_has_transfer_or_order(history):
has_asset_signal = any(token in text for token in ("原图", "源文件", "文件", "成品", "", "图片"))
has_send_request = any(
token in text
for token in ("发给我", "发我", "发过来", "给我吧", "给我发", "直接发", "传给我", "传过来")
)
if has_asset_signal and has_send_request:
return True
return False
def _asks_designer_schedule(current_text: str, history: Optional[List[dict]] = None) -> bool:
text = _normalize_text(current_text)
if not text:
return False
if any(keyword in text for keyword in _DESIGNER_SCHEDULE_QUESTION_KEYWORDS):
return True
return ("上班" in text or "在线" in text) and ("几点" in text or "什么时候" in text or "在吗" in text)
class CustomerServiceBrain:
"""
重构后的单一 Agent 大脑:
@@ -144,18 +399,25 @@ class CustomerServiceBrain:
"【核心逻辑】\n"
"1. 业务:只聊高清修复和找原图。核心链路:引导发图 -> 问需求 -> 找设计师。\n"
"2. **主动引导**:只有当客户【从未发过图】且没有历史图片记录时,才引导发图。\n"
"2.1 **消息延迟安抚**:如果客户说'上面不是发了吗''我发过了''你没看到吗',说明他在提醒你图早就发过了。\n"
" 这时先道歉,类似'不好意思哈,刚刚消息慢了点',再承接后续;严禁让客户重发图片。\n"
"3. **非业务问题**:如果客户问招聘、合作、闲聊等与做图无关的话题,礼貌拒绝。\n"
"4. **客户说没有参考图**:直接转人工:'好的,我这就叫设计师帮您找哈'\n"
"5. **客户问尺寸/能否打印/退款**:直接转人工:'这个设计师帮您看下哈'\n"
"6. **转接时机(严格两步)**:必须同时满足【有图】+【客户明确说了要找原图/修复/具体要求】才能转接\n"
" 客户只发了图但没说需求 → 必须先问'亲亲这张是找原图还是修复哈?'\n"
" 客户说了'有吗''能找吗' → 这不算明确需求,要追问'是要找原图还是高清修复呢?'\n"
"7. **下线安抚**只有工具返回ERROR时才能提设计师不在。根据错误码区分\n"
"6. **付款链接特判**:客户明确说'发付款链接''支付链接''拍单链接''下单链接'时,视为强成交信号,必须立即调用转人工工具;严禁只回复'直接下单'\n"
"7. **设计师上班时间特判**:客户问'几点上班''什么时候上班''设计师在吗''什么时候在线'时,默认是在问设计师。不要按闲聊处理,也不要回复'我只处理业务'\n"
" 设计师固定是早上9点上班12点下班。应结合当前时间自然回答不要机械复读。\n"
"8. **转接时机(严格两步)**:除付款链接特判、文件交接特判外,必须同时满足【有图】+【客户明确或可直接判断的需求】才能转接。\n"
" 客户只发了图但没说需求 → 先问'亲亲这张是找原图还是修复哈?'\n"
" 客户说了'有吗''能找吗''找图''找原图''有大图吗' → 直接按【找原图】意图处理,不要重复追问。\n"
" 客户说了'修复''高清''清晰点''放大清晰' → 直接按【高清修复】意图处理,不要重复追问。\n"
" 客户说'文件发过去了''发送文件了看到了吗''源文件发你了''发给我吧原图''做好了直接发我'这类交付话术 → 视为设计师成品/文件交接场景,必须立即调用转人工工具,不要再问客户发图或问需求。\n"
"9. **下线安抚**只有工具返回ERROR时才能提设计师不在。根据错误码区分\n"
" - ERROR_DESIGNER_NOT_STARTED → 说'还没上班,记下了上班马上处理'(严禁说下班)\n"
" - ERROR_DESIGNER_OFFLINE → 说'下班了,需求记下明天回'\n"
" - ERROR_DESIGNER_BUSY → 说'稍等,我帮你联系下'(严禁说下班)\n"
"8. 正在转接中:如果系统提示已在转接,回:'已经在帮你催了哈,稍等下!'\n"
"9. **每次转接必须调用工具**:不要猜测,每次都重新调用。\n\n"
"10. 正在转接中:如果系统提示已在转接,回:'已经在帮你催了哈,稍等下!'\n"
"11. **每次转接必须调用工具**:不要猜测,每次都重新调用。\n\n"
"【情绪识别与应急转人工】\n"
"当客户出现以下信号时,立即调用转人工工具,不要继续机械回复:\n"
@@ -194,13 +456,75 @@ class CustomerServiceBrain:
try:
user_content = msg.content or ""
# 客户已发图:告知 AI 图已收到,引导问需求,但不要直接转接
if msg.image_urls:
if _requests_payment_link(user_content, history):
user_content = (
f"【系统通知:客户已发送 {len(msg.image_urls)} 张图片,图已收到不要再让客户发图"
f"你现在必须先问客户:这张是找原图还是高清修复?有什么具体要求?"
f"等客户明确回答后才能转接,严禁跳过问需求直接转接!】\n{user_content}"
"【系统通知:客户正在明确索要付款/支付链接,这是强成交信号"
"不要只回复'直接下单''平台拍单',必须立即调用转人工工具转接设计师跟进付款。】\n"
f"{user_content}"
)
elif _requests_file_handoff_transfer(user_content, history):
logger.info(f"[Brain] 已识别为文件交接转接意图: user={msg.user_id}")
user_content = (
"【系统通知:客户现在是在说文件/原图/成品的交接,或者让你把做好的内容直接发过去。"
"这通常代表设计师已经做完,客户现在是在催交付或确认文件收发。"
"不要让客户重发图,不要继续问需求,必须立即调用转人工工具转接设计师跟进交付。】\n"
f"{user_content}"
)
elif _asks_designer_schedule(user_content, history):
now_dt = datetime.now()
user_content = (
"【系统通知:客户现在是在问设计师几点上班、什么时候在线、有没有在。"
"这是有效业务上下文,不要按闲聊或无关业务拒绝。"
f"设计师固定工作时间是每天{DESIGNER_WORK_START_HOUR}点上班,{DESIGNER_WORK_END_HOUR}点下班。"
f"当前时间是{now_dt.strftime('%Y-%m-%d %H:%M:%S')}"
"请结合当前时间自然回答:"
f"如果现在还没到{DESIGNER_WORK_START_HOUR}点,就表达还没上班,上班后马上处理;"
f"如果现在已经在{DESIGNER_WORK_START_HOUR}点到{DESIGNER_WORK_END_HOUR}点之间,就表达设计师已经在了或陆续在处理;"
f"如果现在已经过了{DESIGNER_WORK_END_HOUR}点,就表达已经下班,明天{DESIGNER_WORK_START_HOUR}点后处理。"
"不要机械照抄这段说明,要自然一点。】\n"
f"{user_content}"
)
# 客户已发图:告知 AI 图已收到,引导问需求,但不要直接转接
has_image_message = bool(msg.image_urls) or msg.msg_type == 1
if not has_image_message and _customer_claims_image_already_sent(user_content, history):
inferred_intent = _infer_image_intent(user_content, history)
if inferred_intent == "find_original":
next_step = "客户当前更像是在问找原图,别再问他有没有发图。"
elif inferred_intent == "repair":
next_step = "客户当前更像是在问高清修复,别再问他有没有发图。"
else:
next_step = "如果客户需求还不明确,只问这是找原图还是修复,不要让客户重发。"
user_content = (
"【系统通知:客户是在提醒你他上面已经发过图片了,可能刚刚网络或消息同步有点慢。"
"回复时先简短道歉,表示现在已经看到图了,再继续正常承接。"
f"{next_step}\n{user_content}"
)
if has_image_message:
image_count = max(len(msg.image_urls), 1)
if user_content.startswith("【系统:已收到图片消息"):
user_content = ""
inferred_intent = _infer_image_intent(user_content, history)
if inferred_intent == "find_original":
logger.info(f"[Brain] 已根据客户表述推断为找原图意图: user={msg.user_id}")
user_content = (
f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。"
f"系统判断客户当前意图是【找原图】;像'有吗''能找吗''找图'都算找原图意图。"
f"不要再追问'找原图还是高清修复',直接按找原图流程继续;如果信息足够就直接转接。】\n{user_content}"
)
elif inferred_intent == "repair":
logger.info(f"[Brain] 已根据客户表述推断为高清修复意图: user={msg.user_id}")
user_content = (
f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。"
f"系统判断客户当前意图是【高清修复】;像'修复''高清''清晰点'都算修复意图。"
f"不要再追问'找原图还是高清修复',直接按高清修复流程继续;如果信息足够就直接转接。】\n{user_content}"
)
else:
user_content = (
f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。"
f"你现在必须先问客户:这张是找原图还是高清修复?有什么具体要求?"
f"等客户明确回答后才能转接,严禁跳过问需求直接转接!】\n{user_content}"
)
recent_context = ""
if history:
@@ -323,4 +647,5 @@ class CustomerServiceBrain:
except Exception as e:
logger.error(f"[Brain Error]: {e}")
await _notify_brain_fallback(msg, e, history)
return StandardResponse(reply_content="好哒,我在看图,稍等回你哈。", metadata={"acc_id": msg.acc_id})

View File

@@ -1,5 +1,6 @@
import logging
import asyncio
import re
from typing import Optional, List, Any
from datetime import datetime
from db.customer_db import db as customer_db
@@ -8,6 +9,47 @@ from db.chat_log_db import log_message, get_conversation
logger = logging.getLogger("cs_agent")
_OUTBOUND_BLOCK_MARKERS = (
"【历史记录摘要】",
"【详细记录】",
"【订单摘要】",
"【订单详情】",
"<think",
"think_never_used",
'[{"name":',
)
_TRANSFER_COMMAND_RE = re.compile(r"^\s*正在为您转接\|\[转移会话\],[^,\r\n]+,[^\r\n]*\s*$")
_HISTORY_LEAK_PATTERNS = [
r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]',
r'\[\d{2}:\d{2}:\d{2}\]\s*(客户|客服|我)[:]',
r'(根据|查看|查询|翻看)(历史|聊天|对话)(记录|内容)',
r'历史(记录|对话|消息)(显示|表明|中)',
r'之前的(聊天|对话|记录)(中|里|显示)',
r'\d+条(历史|对话)?消息',
r'订单号[:]\s*\d{10,}',
r'(状态|金额|数量)[:].*(状态|金额|数量)[:]',
]
def _sanitize_outbound_archive_text(content: str) -> str:
if not content:
return ""
cleaned = str(content).strip()
if _TRANSFER_COMMAND_RE.fullmatch(cleaned):
return cleaned
if "[转移会话]" in cleaned:
logger.warning("[Repository] 检测到混入正文的转接指令,拦截出站入库")
return "我在帮你看记录,稍等哈"
if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS):
logger.warning("[Repository] 拦截到内部内容写入外发记录,替换为安全兜底回复")
return "我在帮你看记录,稍等哈"
for pattern in _HISTORY_LEAK_PATTERNS:
if re.search(pattern, cleaned):
logger.warning(f"[Repository] 检测到历史记录泄露模式,拦截出站入库: {pattern[:30]}...")
return "我在帮你看记录,稍等哈"
return cleaned
class DataRepository:
"""
异步数据仓库:使用 asyncio.to_thread 屏蔽底层同步 IO 阻塞。
@@ -18,8 +60,19 @@ class DataRepository:
# --- 聊天记录 (异步化) ---
async def save_chat(self, platform: str, user_id: str, content: str, direction: str, acc_id: str = "", image_urls: list = None):
async def save_chat(
self,
platform: str,
user_id: str,
content: str,
direction: str,
acc_id: str = "",
image_urls: list = None,
msg_type: int = 0,
):
"""异步持久化存储聊天记录"""
if direction == "out" and int(msg_type or 0) == 0:
content = _sanitize_outbound_archive_text(content)
# 将图片URL列表转为\n分隔的字符串
urls_str = "\n".join(image_urls) if image_urls else ""
return await asyncio.to_thread(
@@ -29,6 +82,7 @@ class DataRepository:
direction=direction,
platform=platform,
acc_id=acc_id,
msg_type=msg_type,
image_urls=urls_str
)
@@ -42,6 +96,8 @@ class DataRepository:
{
"role": role,
"content": r["message"],
"msg_type": r.get("msg_type", 0),
"image_urls": r.get("image_urls", ""),
"timestamp": r.get("timestamp", ""),
}
)

View File

@@ -1,6 +1,8 @@
import logging
import os
import subprocess
from datetime import datetime
from pathlib import Path
class _AnsiColorFormatter(logging.Formatter):
@@ -56,20 +58,67 @@ class _AnsiColorFormatter(logging.Formatter):
return f"{color}{msg}{self.RESET}"
_APP_VERSION = None
_LOG_RECORD_FACTORY_INSTALLED = False
def get_app_log_version() -> str:
global _APP_VERSION
if _APP_VERSION:
return _APP_VERSION
env_version = str(os.getenv("APP_VERSION", "")).strip()
if env_version:
_APP_VERSION = env_version
return _APP_VERSION
try:
repo_root = Path(__file__).resolve().parent.parent
git_version = subprocess.check_output(
["git", "-C", str(repo_root), "rev-parse", "--short", "HEAD"],
stderr=subprocess.DEVNULL,
text=True,
).strip()
except Exception:
git_version = ""
_APP_VERSION = git_version or "dev"
os.environ.setdefault("APP_VERSION", _APP_VERSION)
return _APP_VERSION
def install_log_record_factory():
global _LOG_RECORD_FACTORY_INSTALLED
if _LOG_RECORD_FACTORY_INSTALLED:
return
version = get_app_log_version()
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.app_version = getattr(record, "app_version", version)
return record
logging.setLogRecordFactory(record_factory)
_LOG_RECORD_FACTORY_INSTALLED = True
def setup_logger():
from logging.handlers import RotatingFileHandler
from config.config import LOG_DIR, LOG_MAX_BYTES, LOG_BACKUP_COUNT
install_log_record_factory()
logger = logging.getLogger("cs_agent")
if getattr(logger, "_cs_logger_configured", False):
return logger
logger.setLevel(logging.INFO)
logger.propagate = False
fmt = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S")
fmt = logging.Formatter("[v%(app_version)s][%(asctime)s] %(message)s", datefmt="%H:%M:%S")
use_color = (os.getenv("LOG_COLOR", "1").lower() in ("1", "true", "yes")) and not bool(os.getenv("NO_COLOR"))
ch = logging.StreamHandler()
ch.setFormatter(_AnsiColorFormatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S", use_color=use_color))
ch.setFormatter(_AnsiColorFormatter("[v%(app_version)s][%(asctime)s] %(message)s", datefmt="%H:%M:%S", use_color=use_color))
logger.addHandler(ch)
LOG_DIR.mkdir(exist_ok=True)

View File

@@ -1,13 +1,54 @@
import json
import logging
import re
import websockets
logger = logging.getLogger("cs_agent")
_OUTBOUND_BLOCK_MARKERS = (
"【历史记录摘要】",
"【详细记录】",
"【订单摘要】",
"【订单详情】",
"<think",
"think_never_used",
'[{"name":',
)
_HISTORY_LEAK_PATTERNS = [
r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]',
r'\[\d{2}:\d{2}:\d{2}\]\s*(客户|客服|我)[:]',
r'(根据|查看|查询|翻看)(历史|聊天|对话)(记录|内容)',
r'历史(记录|对话|消息)(显示|表明|中)',
r'之前的(聊天|对话|记录)(中|里|显示)',
r'\d+条(历史|对话)?消息',
r'订单号[:]\s*\d{10,}',
r'(状态|金额|数量)[:].*(状态|金额|数量)[:]',
]
def _sanitize_outbound_text(content: str) -> str:
if not content:
return ""
cleaned = str(content).strip()
if "[转移会话]" in cleaned:
return cleaned
if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS):
logger.warning("[WebSocketSend] 拦截到内部内容外发,替换为安全兜底回复")
return "我在帮你看记录,稍等哈"
for pattern in _HISTORY_LEAK_PATTERNS:
if re.search(pattern, cleaned):
logger.warning(f"[WebSocketSend] 检测到历史记录泄露模式: {pattern[:30]}...")
return "我在帮你看记录,稍等哈"
return cleaned
async def send_text_flow(client, cy_id, acc_type, content):
"""主动发送文本消息。"""
message = {
"msg_id": "",
"acc_id": "",
"msg": content,
"msg": _sanitize_outbound_text(content),
"from_id": client.reply_id,
"from_name": client.reply_id,
"cy_id": cy_id,
@@ -38,34 +79,38 @@ async def send_message_flow(client, message):
"""发送消息到服务器。"""
if client.websocket and client.websocket.state == websockets.protocol.State.OPEN:
try:
msg_json = json.dumps(message, ensure_ascii=False)
payload = dict(message) if isinstance(message, dict) else {}
if int(payload.get("msg_type", 0) or 0) == 0:
payload["msg"] = _sanitize_outbound_text(payload.get("msg", ""))
msg_json = json.dumps(payload, ensure_ascii=False)
await client.websocket.send(msg_json)
pretty = json.dumps(message, ensure_ascii=False, indent=2)
pretty = json.dumps(payload, ensure_ascii=False, indent=2)
client.logger.info(f"[{client.get_time()}] 发送成功:\n{pretty}")
data = message.get("data", {}) if isinstance(message, dict) else {}
client._activity_log(
"send_message_success",
trace_id=message.get("_trace_id", "") if isinstance(message, dict) else "",
acc_id=data.get("acc_id", ""),
customer_id=data.get("cy_id", ""),
msg_type=data.get("msg_type", 0),
msg=data.get("msg", ""),
trace_id=payload.get("_trace_id", ""),
acc_id=payload.get("acc_id", ""),
customer_id=payload.get("cy_id") or payload.get("from_id", ""),
msg_type=payload.get("msg_type", 0),
msg=payload.get("msg", ""),
)
except Exception as e:
client.logger.info(f"[{client.get_time()}] 发送失败: {e}")
payload = message if isinstance(message, dict) else {}
client._activity_log(
"send_message_error",
trace_id=message.get("_trace_id", ""),
acc_id=message.get("acc_id", ""),
customer_id=message.get("from_id", ""),
trace_id=payload.get("_trace_id", ""),
acc_id=payload.get("acc_id", ""),
customer_id=payload.get("cy_id") or payload.get("from_id", ""),
error=str(e),
)
else:
client.logger.info(f"[{client.get_time()}] 错误: 连接未打开")
payload = message if isinstance(message, dict) else {}
client._activity_log(
"send_message_skipped",
trace_id=message.get("_trace_id", ""),
trace_id=payload.get("_trace_id", ""),
reason="socket_not_open",
acc_id=message.get("acc_id", ""),
customer_id=message.get("from_id", ""),
acc_id=payload.get("acc_id", ""),
customer_id=payload.get("cy_id") or payload.get("from_id", ""),
)

View File

@@ -8,7 +8,7 @@ import sqlite3
import os
import threading
from queue import Queue, Empty
from datetime import datetime
from datetime import datetime, timedelta
from typing import List, Dict, Optional
_DB_PATH = os.path.join(os.path.dirname(__file__), "chat_log_db", "chats.db")
@@ -388,7 +388,7 @@ def get_conversation(customer_id: str, limit: int = 200, acc_id: str = "") -> Li
with _get_conn() as conn:
rows = conn.execute(_sql("""
SELECT * FROM (
SELECT id, direction, message, msg_type, timestamp, acc_id
SELECT id, direction, message, msg_type, timestamp, acc_id, image_urls
FROM chat_logs
WHERE customer_id = ?
ORDER BY timestamp DESC, id DESC
@@ -527,6 +527,49 @@ def get_latest_messages(limit: int = 20) -> List[Dict]:
return [dict(r) for r in rows]
def get_waiting_customer_pool(window_minutes: int = 30) -> Dict:
"""统计最近窗口内、最后一条消息仍来自客户的待接待客户池。"""
cutoff = (datetime.now() - timedelta(minutes=max(window_minutes, 1))).strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
rows = conn.execute(_sql("""
SELECT id, customer_id, acc_id, direction, timestamp
FROM chat_logs
WHERE timestamp >= ?
AND customer_id <> ''
AND customer_id <> 'unknown'
AND acc_id <> ''
ORDER BY id DESC
"""), (cutoff,)).fetchall()
latest_by_session = {}
for row in rows:
item = dict(row)
key = (str(item.get("customer_id") or ""), str(item.get("acc_id") or ""))
if key not in latest_by_session:
latest_by_session[key] = item
per_shop: Dict[str, int] = {}
waiting_sessions = 0
for item in latest_by_session.values():
if str(item.get("direction") or "") != "in":
continue
acc_id = str(item.get("acc_id") or "")
if not acc_id:
continue
per_shop[acc_id] = per_shop.get(acc_id, 0) + 1
waiting_sessions += 1
shops = [
{"acc_id": acc_id, "waiting_customers": count}
for acc_id, count in sorted(per_shop.items(), key=lambda kv: (-kv[1], kv[0]))
]
return {
"total_waiting_customers": waiting_sessions,
"shops": shops,
"window_minutes": window_minutes,
}
# ========== 订单相关 ==========
@_retry_db_operation

View File

@@ -160,3 +160,15 @@ def retry_pending_transfer(row_id: int, delay_seconds: int = 60, error: str = ""
(next_s, now_s, error[:500], row_id),
)
conn.commit()
def count_open_pending_transfers() -> int:
with _get_conn() as conn:
row = conn.execute(
"""
SELECT COUNT(*) AS cnt
FROM pending_transfers
WHERE status IN ('pending', 'processing')
"""
).fetchone()
return int(row["cnt"] or 0) if row else 0

View File

@@ -14,9 +14,13 @@ import hashlib
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.websocket_logger_setup import install_log_record_factory
install_log_record_factory()
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s'
format='[v%(app_version)s][%(asctime)s] %(levelname)s: %(message)s'
)
logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,74 @@
import asyncio
from pathlib import Path
import httpx
TEST_URL = "https://img.alicdn.com/imgextra/i1/O1CN01959PmC2MK7jvMhqXF_!!4611686018427385312-0-amp.jpg"
OUTPUT_DIR = Path(__file__).resolve().parents[1] / "tmp_alicdn_download"
CONTENT_TYPE_TO_SUFFIX = {
"image/jpeg": ".jpg",
"image/jpg": ".jpg",
"image/png": ".png",
"image/webp": ".webp",
"image/avif": ".avif",
}
DEFAULT_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/133.0.0.0 Safari/537.36"
),
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Referer": "https://www.taobao.com/",
}
async def download_once(client: httpx.AsyncClient, url: str):
response = await client.get(url, headers=DEFAULT_HEADERS)
print(f"HTTP {response.status_code}")
content_type = response.headers.get("content-type", "").split(";", 1)[0].strip().lower()
print(f"Content-Type: {content_type}")
if response.status_code != 200:
print(response.text[:300])
response.raise_for_status()
suffix = CONTENT_TYPE_TO_SUFFIX.get(content_type, ".bin")
output_path = OUTPUT_DIR / f"alicdn_test{suffix}"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(response.content)
print(f"Saved to: {output_path}")
print(f"Size: {output_path.stat().st_size} bytes")
async def main():
timeout = httpx.Timeout(60.0, connect=20.0)
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
last_error = None
referers = [
"https://www.taobao.com/",
"https://item.taobao.com/",
"https://detail.tmall.com/",
]
for idx, referer in enumerate(referers, 1):
try:
DEFAULT_HEADERS["Referer"] = referer
print(f"Attempt {idx} with Referer={referer}")
await download_once(client, TEST_URL)
print("Download success")
return
except Exception as e:
last_error = e
print(f"Attempt {idx} failed: {type(e).__name__}: {e}")
await asyncio.sleep(1)
raise RuntimeError(f"All attempts failed: {last_error}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,144 @@
import asyncio
import os
from datetime import datetime
from pathlib import Path
import httpx
from dotenv import load_dotenv
from PIL import Image, ImageDraw
from services.service_gemini import GeminiExtractV2Service
from services.service_tuhui_upload import upload_to_tuhui
load_dotenv()
BASE_DIR = Path(__file__).resolve().parents[1] / "tmp_e2e_pipeline"
BASE_DIR.mkdir(parents=True, exist_ok=True)
TUHUI_WEB_BASE = os.getenv("TUHUI_WEB_BASE_URL", "https://aidg168.uk").rstrip("/")
TUHUI_DIRECT_BASE = os.getenv("TUHUI_DIRECT_BASE_URL", "http://156.226.181.204:8002").rstrip("/")
TUHUI_API_BASES = [f"{TUHUI_DIRECT_BASE}/api", f"{TUHUI_WEB_BASE}/api"]
TUHUI_PHONE = os.getenv("TUHUI_PHONE", "17520145271")
TUHUI_PASSWORD = os.getenv("TUHUI_PASSWORD", "zuowei1216")
def build_test_input() -> Path:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = BASE_DIR / f"input_{ts}.png"
img = Image.new("RGBA", (768, 768), (228, 244, 249, 255))
draw = ImageDraw.Draw(img)
draw.rounded_rectangle((40, 40, 728, 728), radius=48, fill=(33, 114, 147, 255))
draw.ellipse((120, 120, 340, 340), fill=(96, 214, 255, 255))
draw.rectangle((390, 160, 640, 460), fill=(11, 54, 72, 255))
draw.text((110, 540), "TW Gemini E2E Test", fill=(255, 255, 255, 255))
img.save(path)
return path
async def login_tuhui(client: httpx.AsyncClient) -> tuple[str, str]:
last_error = None
for api_base in TUHUI_API_BASES:
try:
resp = await client.post(
f"{api_base}/auth/login",
json={"phone": TUHUI_PHONE, "password": TUHUI_PASSWORD},
timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
return data["access_token"], api_base
except Exception as e:
last_error = e
raise last_error or RuntimeError("图绘登录失败")
async def create_order_and_pay(client: httpx.AsyncClient, api_base: str, token: str, work_id: int) -> dict:
headers = {"Authorization": f"Bearer {token}"}
order_resp = await client.post(
f"{api_base}/orders/create",
headers=headers,
json={"work_id": work_id, "payment_method": "balance"},
timeout=30.0,
)
order_resp.raise_for_status()
order = order_resp.json()
pay_resp = await client.post(
f"{api_base}/orders/pay/{order['id']}",
headers=headers,
timeout=30.0,
)
pay_resp.raise_for_status()
payment = pay_resp.json()
return {"order": order, "payment": payment}
async def download_work(client: httpx.AsyncClient, api_base: str, token: str, work_id: int) -> Path:
headers = {"Authorization": f"Bearer {token}"}
resp = await client.get(
f"{api_base}/works/{work_id}/download",
headers=headers,
timeout=60.0,
)
resp.raise_for_status()
filename = resp.headers.get("content-disposition", "").split("filename=")[-1].strip('"') or f"work_{work_id}.bin"
dest = BASE_DIR / filename
dest.write_bytes(resp.content)
return dest
async def main():
print("== 构建测试图 ==")
input_path = build_test_input()
output_path = BASE_DIR / f"gemini_{input_path.stem}.png"
print(f"输入图: {input_path}")
print("== Gemini 处理 ==")
gemini = GeminiExtractV2Service()
success, message, data = await gemini.extract_pattern(
str(input_path),
str(output_path),
custom_prompt="根据原图生成一张更完整、更干净的科技风背景素材图,保持主体布局清晰。",
aspect_ratio="1:1",
)
print({"success": success, "message": message, "data": data})
if not success:
raise RuntimeError(f"Gemini 处理失败: {message}")
print("== 上传图绘 ==")
title = f"E2E测试图_{datetime.now().strftime('%m%d_%H%M%S')}"
upload_result = await upload_to_tuhui(
image_path=str(output_path),
title=title,
description="自动化端到端测试图0元下载校验。",
price=0,
category="设计素材",
tags="E2E测试,自动化",
designer_name="自动化测试",
)
print(upload_result.as_dict())
if not upload_result.success:
raise RuntimeError(f"图绘上传失败: {upload_result.message}")
print("== 登录图绘并创建0元订单 ==")
async with httpx.AsyncClient(follow_redirects=True) as client:
token, api_base = await login_tuhui(client)
pay_info = await create_order_and_pay(client, api_base, token, upload_result.work_id)
print(pay_info)
print("== 下载作品 ==")
downloaded = await download_work(client, api_base, token, upload_result.work_id)
print({"downloaded_file": str(downloaded), "size": downloaded.stat().st_size})
print("== 测试完成 ==")
print(
{
"work_id": upload_result.work_id,
"detail_url": upload_result.download_url,
"processed_output": str(output_path),
}
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -3,6 +3,7 @@ import logging
import httpx
import asyncio
from typing import Optional
from services.service_designer_alert import designer_alert_service
logger = logging.getLogger("cs_agent")
@@ -53,6 +54,10 @@ class DispatchService:
return designer
logger.warning(f"[Dispatch]{u_tag} 派单被拒: {data.get('reason')} body={body}")
await designer_alert_service.notify_if_needed(
trigger=f"dispatch_rejected:{data.get('reason') or 'unknown'}",
customer_id=user_id,
)
return None
if response.status_code == 401:

View File

@@ -0,0 +1,378 @@
import asyncio
import hashlib
import json
import logging
import mimetypes
import os
import random
import re
from pathlib import Path
from typing import Dict, List, Optional
from urllib.parse import urlparse
import httpx
from dotenv import load_dotenv
from PIL import Image
from db.customer_db import CustomerDatabase
from db.image_tasks_db import TaskStatus, db as task_db
from services.service_gemini import GeminiExtractV2Service
from services.service_tuhui_upload import upload_to_tuhui
from services.service_wecom_bot import wecom_bot_service
load_dotenv()
logger = logging.getLogger("cs_agent")
AUTO_PROCESS_PRICE = int(os.getenv("AUTO_PROCESS_DEFAULT_PRICE", "12"))
AUTO_PROCESS_CATEGORY = os.getenv("AUTO_PROCESS_CATEGORY", "设计素材")
AUTO_PROCESS_ROOT = Path(
os.getenv("AUTO_PROCESS_ROOT", str(Path(__file__).resolve().parents[1] / "runtime" / "auto_processed"))
)
_DOWNLOAD_REFERERS = (
"https://www.taobao.com/",
"https://item.taobao.com/",
"https://detail.tmall.com/",
)
_DOWNLOAD_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/133.0.0.0 Safari/537.36"
),
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
}
_CONTENT_TYPE_SUFFIX = {
"image/jpeg": ".jpg",
"image/jpg": ".jpg",
"image/png": ".png",
"image/webp": ".webp",
"image/avif": ".avif",
"image/gif": ".gif",
}
_DESIGNER_ALIAS_PREFIXES = ("青木", "星野", "白川", "南栀", "言川", "木也", "安可", "拾光", "云岸", "知禾")
_DESIGNER_ALIAS_SUFFIXES = ("设计", "studio", "视觉", "创意", "图像", "工坊", "素材", "像素")
def _safe_name(text: str, fallback: str = "image") -> str:
cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fa5_-]+", "_", str(text or "").strip())
cleaned = cleaned.strip("_")
return cleaned[:40] or fallback
def _looks_like_bad_title(text: str) -> bool:
value = str(text or "").strip().lower()
if not value:
return True
if "http" in value or "www" in value or "alicdn" in value or "imgextra" in value:
return True
if re.search(r"\b(o1cn|jpg|jpeg|png|webp|gif)\b", value):
return True
if value.count("_") >= 3 and not re.search(r"[\u4e00-\u9fa5]{2,}", value):
return True
alnum = re.sub(r"[^0-9a-z_]+", "", value)
if alnum and len(alnum) >= 16 and not re.search(r"[\u4e00-\u9fa5]", value):
return True
return False
def _pick_clean_title_part(raw: str) -> str:
cleaned = _safe_name(raw, "")
if not cleaned or _looks_like_bad_title(cleaned):
return ""
parts = [part for part in cleaned.split("_") if part]
meaningful = [part for part in parts if not _looks_like_bad_title(part) and len(part) >= 2]
if meaningful:
cleaned = "_".join(meaningful[:3])
if _looks_like_bad_title(cleaned):
return ""
return cleaned[:30]
def _suffix_from_url(url: str) -> str:
path = urlparse(str(url or "")).path
suffix = Path(path).suffix.lower()
if suffix in {".png", ".jpg", ".jpeg", ".webp"}:
return suffix
return ".png"
def _build_processing_prompt(intent: str, requirement_text: str, analysis: Dict) -> str:
base_prompt = str((analysis or {}).get("gemini_prompt") or "").strip()
req = str(requirement_text or "").strip()
if base_prompt:
return base_prompt
if intent == "repair":
return f"根据客户需求“{req or '高清修复'}”,保留主体和构图,做高清修复并补足细节。"
return f"根据客户需求“{req or '找原图'}”,严格参考原图元素与构图,生成完整干净的高质量素材图。"
def _build_upload_title(intent: str, analysis: Dict, requirement_text: str, idx: int) -> str:
analysis = analysis or {}
suggested = _pick_clean_title_part(str(analysis.get("title_suggest") or ""))
if suggested:
return suggested
subject = _pick_clean_title_part(str(analysis.get("subject") or ""))
proc_type = _pick_clean_title_part(str(analysis.get("proc_type") or ""))
parts = [part for part in (subject, proc_type) if part]
if parts:
base = "_".join(parts[:2])
else:
base = "图片素材"
return base
def _build_designer_alias() -> str:
return f"{random.choice(_DESIGNER_ALIAS_PREFIXES)}{random.choice(_DESIGNER_ALIAS_SUFFIXES)}"
class AutoImagePipelineService:
def __init__(self):
self.customer_db = CustomerDatabase()
@staticmethod
def _resolve_download_path(dest_path: Path, content_type: str, image_url: str) -> Path:
normalized_type = str(content_type or "").split(";", 1)[0].strip().lower()
suffix = _CONTENT_TYPE_SUFFIX.get(normalized_type, "")
if not suffix:
guessed, _ = mimetypes.guess_type(str(image_url or ""))
suffix = _CONTENT_TYPE_SUFFIX.get(str(guessed or "").lower(), "")
suffix = suffix or dest_path.suffix or ".bin"
return dest_path.with_suffix(suffix)
@staticmethod
def _normalize_image_for_gemini(image_path: Path) -> Path:
suffix = image_path.suffix.lower()
with Image.open(image_path) as img:
is_animated = bool(getattr(img, "is_animated", False)) or int(getattr(img, "n_frames", 1) or 1) > 1
needs_convert = suffix in {".avif", ".webp", ".gif"} or is_animated or img.mode not in ("RGB", "L")
if not needs_convert:
return image_path
normalized_path = image_path.with_suffix(".jpg")
if is_animated:
try:
img.seek(0)
except Exception:
pass
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
img.save(normalized_path, format="JPEG", quality=95)
logger.info(
f"[AutoImagePipeline] 已转换图片格式供Gemini使用: src={image_path} normalized={normalized_path}"
)
return normalized_path
async def _download_image(self, image_url: str, dest_path: Path) -> Path:
dest_path.parent.mkdir(parents=True, exist_ok=True)
timeout = httpx.Timeout(60.0, connect=20.0)
last_error: Optional[Exception] = None
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
for referer in _DOWNLOAD_REFERERS:
for attempt in range(1, 4):
headers = dict(_DOWNLOAD_HEADERS)
headers["Referer"] = referer
try:
response = await client.get(image_url, headers=headers)
if response.status_code in (403, 420, 429):
raise httpx.HTTPStatusError(
f"download blocked status={response.status_code}",
request=response.request,
response=response,
)
response.raise_for_status()
resolved_path = self._resolve_download_path(
dest_path,
response.headers.get("content-type", ""),
image_url,
)
resolved_path.write_bytes(response.content)
logger.info(
f"[AutoImagePipeline] 图片下载成功 status={response.status_code} "
f"referer={referer} path={resolved_path}"
)
return resolved_path
except Exception as e:
last_error = e
logger.warning(
f"[AutoImagePipeline] 图片下载失败 attempt={attempt}/3 "
f"referer={referer} url={image_url} err={e}"
)
if attempt < 3:
await asyncio.sleep(attempt)
raise RuntimeError(f"下载原图失败: {last_error}")
@staticmethod
def _format_transfer_notice(
customer_id: str,
acc_id: str,
designer_name: str,
requirement_text: str,
intent: str,
image_urls: List[str],
) -> str:
lines = [
"【AI自动转设计师】",
f"店铺:{acc_id or '-'}",
f"客户:{customer_id or '-'}",
f"需求:{requirement_text or '-'}",
f"类型:{'高清修复' if intent == 'repair' else '找原图'}",
f"默认价格:{AUTO_PROCESS_PRICE}",
]
if image_urls:
lines.append("原图URL")
lines.extend(image_urls[:5])
return "\n".join(lines)
@staticmethod
def _format_finish_notice(
customer_id: str,
acc_id: str,
designer_name: str,
links: List[Dict[str, str]],
failures: List[str],
) -> str:
lines = [
"【AI处理完成】",
f"店铺:{acc_id or '-'}",
f"客户:{customer_id or '-'}",
f"默认价格:{AUTO_PROCESS_PRICE}",
]
if links:
lines.append("处理结果:")
for idx, item in enumerate(links, 1):
lines.append(f"{idx}. 图绘链接:{item.get('download_url') or '-'}")
lines.append(f" 处理后图片:{item.get('image_url') or '-'}")
lines.append(f" 原图URL{item.get('source_url') or '-'}")
if failures:
lines.append("失败项:")
lines.extend(failures[:5])
return "\n".join(lines)
async def process_and_notify(
self,
*,
session_key: str,
customer_id: str,
acc_id: str,
designer_name: str,
requirement_text: str,
image_urls: List[str],
intent: str = "",
) -> Dict:
image_urls = [str(url).strip() for url in (image_urls or []) if str(url).strip()]
if not image_urls:
return {"success": False, "message": "no_images"}
image_urls = image_urls[:1]
profile = self.customer_db.get_customer(session_key)
analysis = {}
if getattr(profile, "last_image_analysis", ""):
try:
analysis = json.loads(profile.last_image_analysis)
except Exception:
analysis = {}
if not intent:
intent = "repair" if "修复" in requirement_text else "find_original"
await wecom_bot_service.send_text(
self._format_transfer_notice(
customer_id=customer_id,
acc_id=acc_id,
designer_name=designer_name,
requirement_text=requirement_text,
intent=intent,
image_urls=image_urls,
)
)
pipeline_root = AUTO_PROCESS_ROOT / _safe_name(customer_id, "customer")
pipeline_root.mkdir(parents=True, exist_ok=True)
gemini_service = GeminiExtractV2Service()
uploaded_links: List[Dict[str, str]] = []
failures: List[str] = []
for idx, image_url in enumerate(image_urls, 1):
digest = hashlib.md5(f"{customer_id}|{acc_id}|{image_url}".encode("utf-8")).hexdigest()[:10]
input_path = pipeline_root / f"{digest}_src{_suffix_from_url(image_url)}"
output_path = pipeline_root / f"{digest}_out.png"
title = _build_upload_title(intent, analysis, requirement_text, idx)
prompt = _build_processing_prompt(intent, requirement_text, analysis)
task_id = task_db.add_task(
customer_id=customer_id,
platform="qianniu",
original_image=image_url,
operation=intent or "auto_process",
requirements=requirement_text,
status=TaskStatus.PROCESSING.value,
)
try:
input_path = await self._download_image(image_url, input_path)
input_path = self._normalize_image_for_gemini(input_path)
success, message, data = await gemini_service.extract_pattern(
str(input_path),
str(output_path),
custom_prompt=prompt,
aspect_ratio=str((analysis or {}).get("aspect_ratio") or "1:1"),
)
if not success or not output_path.exists():
if task_id:
task_db.update_status(task_id, TaskStatus.FAILED.value)
failures.append(f"{idx}. Gemini失败{message}")
continue
upload_result = await upload_to_tuhui(
image_path=str(output_path),
title=title,
description=requirement_text or prompt[:120],
price=AUTO_PROCESS_PRICE,
category=AUTO_PROCESS_CATEGORY,
tags="AI处理,自动转接",
designer_name=_build_designer_alias(),
)
if not upload_result.success:
if task_id:
task_db.update_status(task_id, TaskStatus.FAILED.value)
failures.append(f"{idx}. 图绘上传失败:{upload_result.message}")
continue
if task_id:
task_db.update_status(task_id, TaskStatus.COMPLETED.value, upload_result.download_url)
uploaded_links.append(
{
"download_url": upload_result.download_url,
"image_url": upload_result.image_url,
"source_url": image_url,
"work_id": str(upload_result.work_id),
}
)
except Exception as e:
if task_id:
task_db.update_status(task_id, TaskStatus.FAILED.value)
failures.append(f"{idx}. 处理异常:{e}")
await wecom_bot_service.send_text(
self._format_finish_notice(
customer_id=customer_id,
acc_id=acc_id,
designer_name=designer_name,
links=uploaded_links,
failures=failures,
)
)
return {
"success": bool(uploaded_links),
"uploaded": uploaded_links,
"failures": failures,
}
auto_image_pipeline_service = AutoImagePipelineService()

View File

@@ -0,0 +1,76 @@
import os
import time
import logging
from datetime import datetime
from typing import Dict
from db.chat_log_db import get_waiting_customer_pool
from db.pending_transfer_db import count_open_pending_transfers
from services.service_wecom_bot import wecom_bot_service
logger = logging.getLogger("cs_agent")
DESIGNER_ALERT_START_HOUR = int(os.getenv("DESIGNER_ALERT_START_HOUR", "8"))
DESIGNER_ALERT_END_HOUR = int(os.getenv("DESIGNER_ALERT_END_HOUR", "24"))
DESIGNER_ALERT_COOLDOWN_SECONDS = int(os.getenv("DESIGNER_ALERT_COOLDOWN_SECONDS", "300"))
DESIGNER_ALERT_POOL_WINDOW_MINUTES = int(os.getenv("DESIGNER_ALERT_POOL_WINDOW_MINUTES", "30"))
class DesignerAlertService:
def __init__(self):
self._last_alert_at = 0.0
@staticmethod
def _in_active_window(now: datetime) -> bool:
hour = now.hour
return DESIGNER_ALERT_START_HOUR <= hour < DESIGNER_ALERT_END_HOUR
@staticmethod
def _render_shop_lines(pool: Dict) -> str:
shops = pool.get("shops") or []
if not shops:
return "- 暂无店铺明细"
lines = []
for item in shops[:10]:
acc_id = str(item.get("acc_id") or "")
waiting = int(item.get("waiting_customers") or 0)
lines.append(f"- {acc_id}{waiting}")
return "\n".join(lines)
async def notify_if_needed(self, *, trigger: str = "", customer_id: str = "", acc_id: str = "") -> bool:
now = datetime.now()
if not self._in_active_window(now):
return False
now_ts = time.time()
if now_ts - self._last_alert_at < max(DESIGNER_ALERT_COOLDOWN_SECONDS, 30):
return False
pending_count = count_open_pending_transfers()
pool = get_waiting_customer_pool(DESIGNER_ALERT_POOL_WINDOW_MINUTES)
waiting_total = int(pool.get("total_waiting_customers") or 0)
if pending_count <= 0 and waiting_total <= 0:
return False
content = (
"【设计师在线提醒】\n"
f"当前时间:{now.strftime('%Y-%m-%d %H:%M:%S')}\n"
"8点到24点内检测到暂无设计师接单有客户待转接。\n"
f"触发来源:{trigger or '-'}\n"
f"当前会话:{customer_id or '-'} / {acc_id or '-'}\n"
f"待转接池:{pending_count}\n"
f"当前客户池:{waiting_total}人(近{pool.get('window_minutes') or DESIGNER_ALERT_POOL_WINDOW_MINUTES}分钟最后一条仍是客户消息)\n"
"店铺分布:\n"
f"{self._render_shop_lines(pool)}\n"
"群里如果有设计师在线,麻烦看一下。"
)
ok = await wecom_bot_service.send_text(content)
if ok:
self._last_alert_at = now_ts
logger.info(
f"[DesignerAlert] 已发送企微提醒 trigger={trigger} pending={pending_count} waiting={waiting_total}"
)
return ok
designer_alert_service = DesignerAlertService()

View File

@@ -1,106 +1,115 @@
#!/usr/bin/env python3
"""
Gemini印花提取V2服务 - 使用服务
更经济的选择1.4毛/张
"""
"""Gemini 出图服务。固定走老张 Gemini 原生出图接口。"""
import asyncio
import aiohttp
import base64
import json
import re
import os
import time
from datetime import datetime
from pathlib import Path
import logging
import mimetypes
import os
from typing import Dict
import aiohttp
from dotenv import load_dotenv
from utils.metrics_tracker import emit as metrics_emit
from utils.service_base import BaseService
logger = logging.getLogger(__name__)
load_dotenv()
GEMINI_IMAGE_MODEL = os.getenv("GEMINI_IMAGE_MODEL", "gemini-3.1-flash-image-preview")
GEMINI_IMAGE_FALLBACK_MODEL = os.getenv("GEMINI_IMAGE_FALLBACK_MODEL", "gemini-2.5-flash-image")
GEMINI_IMAGE_SIZE = os.getenv("GEMINI_IMAGE_SIZE", "1K")
GEMINI_THINKING_LEVEL = os.getenv("GEMINI_THINKING_LEVEL", "MINIMAL")
GEMINI_API_KEY = os.getenv(
"GEMINI_API_KEY",
"sk-8i7uYE0RtnQwDImV8a5f7014DcAb46F6BcEb72Df92218aC8",
)
GEMINI_IMAGE_MODEL = os.getenv("GEMINI_IMAGE_MODEL", "gemini-3-pro-image-preview")
GEMINI_IMAGE_SIZE = os.getenv("GEMINI_IMAGE_SIZE", "2K")
GEMINI_PERSON_GENERATION = os.getenv("GEMINI_PERSON_GENERATION", "")
class GeminiExtractV2Service(BaseService):
"""Gemini印花提取V2服务类 - 使用服务,更经济"""
"""固定单接口的 Gemini 出图服务。"""
SERVICE_NAME = "gemini_extract_v2"
# 多API配置按优先级排序便宜的优先使用
API_CONFIGS = [
API_BASE_URL = "https://api.laozhang.ai/v1beta/models"
DEFAULT_PROMPT = (
"提取印花图案,把褶皱移除。补齐缺失的部分,要生成完整,细节丰富,"
"严格按照原图的元素位置生成平面的印花图不要相似的相似度要100%,生成高质量的印刷图"
)
# {
# "name": "西风接口$0.003逆向",
# "api_key": "sk-UT9aupbfHI4rc3RUn8x5D8gN5Kk31yvLZQu8M3BCY5Nja1Fc",
# "api_url": "https://api.apiqik.com/v1/chat/completions" ,
# "api_model": "gemini-2.5-flash-image",
# "max_retries": 3, # 贵接口少重试
# "cost": "低"
# },
{
"name": "西风接口$0.014",
"api_key": "sk-uRuvzLfIHsc3BiHZ2cyebk0cYsZ8NR9rLL326QqXCKIy9EpK",
"api_url": "https://api.apiqik.online/v1beta/models",
"api_model": GEMINI_IMAGE_MODEL,
"max_retries": 2,
"cost": "",
"use_gemini_format": True # 使用Gemini原生API格式
},
{
"name": "西风接口Fallback",
"api_key": "sk-uRuvzLfIHsc3BiHZ2cyebk0cYsZ8NR9rLL326QqXCKIy9EpK",
"api_url": "https://api.apiqik.online/v1beta/models",
"api_model": GEMINI_IMAGE_FALLBACK_MODEL,
"max_retries": 1,
"cost": "",
"use_gemini_format": True
},
{
"name": "最贵的",
"api_key": "sk-8i7uYE0RtnQwDImV8a5f7014DcAb46F6BcEb72Df92218aC8",
"api_url": "https://api.laozhang.ai/v1/chat/completions",
"api_model": GEMINI_IMAGE_MODEL,
"max_retries": 1,
"cost": ""
}
]
# 默认提示词
DEFAULT_PROMPT = "提取印花图案把褶皱移除。补齐缺失的部分要生成完整细节丰富严格按照原图的元素位置生成平面的印花图不要相似的相似度要100%,生成高质量的印刷图"
# DEFAULT_PROMPT = "生成图片,把衣服的图案展开起来做成数码印花印刷平面图。去掉皱褶,生成图案增强细节。排除衣服图案以外内容"
def __init__(self):
super().__init__(name="gemini_extract_v2")
self.session = None
def image_to_base64(self, image_path: str) -> str:
"""将图片文件转换为base64编码字符串"""
super().__init__(name=self.SERVICE_NAME)
@staticmethod
def _image_to_base64(image_path: str) -> str:
if not os.path.exists(image_path):
logger.error(f"文件不存在: {image_path}")
return ""
try:
if not os.path.exists(image_path):
logger.error(f"文件不存在: {image_path}")
return None
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return encoded_string
return base64.b64encode(image_file.read()).decode("utf-8")
except Exception as e:
logger.error(f"Base64转换失败: {e}")
return None
return ""
@staticmethod
def _guess_mime_type(image_path: str) -> str:
mime_type, _ = mimetypes.guess_type(str(image_path))
return mime_type or "image/png"
@staticmethod
def _build_generation_config(
aspect_ratio: str,
image_size: str,
person_generation: str,
thinking_level: str,
) -> Dict:
valid_ratios = {"1:1", "9:16", "16:9", "3:4", "4:3", "3:2", "2:3", "5:4", "4:5"}
valid_sizes = {"1K", "2K", "4K"}
image_config = {}
if aspect_ratio in valid_ratios:
image_config["aspectRatio"] = aspect_ratio
size_val = (image_size or GEMINI_IMAGE_SIZE or "").upper().strip()
if size_val in valid_sizes:
image_config["imageSize"] = size_val
person_val = (person_generation or GEMINI_PERSON_GENERATION or "").strip()
if person_val:
image_config["personGeneration"] = person_val
generation_config = {"responseModalities": ["IMAGE"]}
if image_config:
generation_config["imageConfig"] = image_config
return generation_config
@staticmethod
def _extract_image_bytes(result: Dict) -> bytes:
candidates = result.get("candidates") or []
if not candidates:
raise ValueError("响应缺少 candidates")
parts = ((candidates[0] or {}).get("content") or {}).get("parts") or []
for part in parts:
inline_data = part.get("inlineData") or {}
encoded = inline_data.get("data")
if encoded:
return base64.b64decode(encoded)
finish_reason = candidates[0].get("finishReason") or ""
if finish_reason == "NO_IMAGE":
raise ValueError("模型未返回图片(NO_IMAGE)")
raise ValueError("响应中未找到 inlineData 图片")
@staticmethod
def _save_image(image_data: bytes, output_path: str) -> Dict:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "wb") as f:
f.write(image_data)
file_size = os.path.getsize(output_path)
return {
"output_path": output_path,
"file_size": file_size,
"api_used": "laozhang_gemini_native",
}
async def extract_pattern(
self,
input_path: str,
@@ -111,415 +120,85 @@ class GeminiExtractV2Service(BaseService):
person_generation: str = "",
thinking_level: str = "",
) -> tuple[bool, str, dict]:
"""
使用多API配置进行印花图案提取
Args:
input_path: 输入图片路径
output_path: 输出图片路径
custom_prompt: 自定义提示词
Returns:
tuple: (success, message, data)
"""
# 转换图片为Base64
img64 = self.image_to_base64(input_path)
img64 = self._image_to_base64(input_path)
if not img64:
return False, "图片编码失败", {}
# 使用自定义提示词或默认提示词
prompt = custom_prompt or self.DEFAULT_PROMPT
# 按优先级逐个尝试API配置
for config_index, config in enumerate(self.API_CONFIGS):
logger.info(f"尝试使用API: {config['name']} (成本: {config['cost']})")
metrics_emit("gemini_request", model=config.get("api_model", ""), provider=config.get("name", ""))
# 对每个API配置进行重试
for attempt in range(config['max_retries']):
try:
logger.info(f"开始Gemini V2印花提取 - {config['name']} (第{attempt + 1}/{config['max_retries']}次尝试): {input_path}")
# 准备请求数据和URL
if config.get('use_gemini_format', False):
# Gemini原生API格式
api_url = f"{config['api_url']}/{config['api_model']}:generateContent?key={config['api_key']}"
headers = {
"Content-Type": "application/json"
}
# 有效比例列表Auto 不传 aspectRatio
valid_ratios = {"1:1", "9:16", "16:9", "3:4", "4:3", "3:2", "2:3", "5:4", "4:5"}
valid_sizes = {"1K", "2K", "4K"}
valid_thinking = {"MINIMAL", "LOW", "MEDIUM", "HIGH"}
image_config = {}
if aspect_ratio in valid_ratios:
image_config["aspectRatio"] = aspect_ratio
size_val = (image_size or GEMINI_IMAGE_SIZE or "").upper().strip()
if size_val in valid_sizes:
image_config["imageSize"] = size_val
person_val = (person_generation or GEMINI_PERSON_GENERATION or "").strip()
if person_val:
# 中转接口若支持该字段会生效;不设置时不发送,保证兼容
image_config["personGeneration"] = person_val
thinking_val = (thinking_level or GEMINI_THINKING_LEVEL or "").upper().strip()
thinking_config = {}
if thinking_val in valid_thinking:
thinking_config["thinkingLevel"] = thinking_val
data = {
"contents": [
{
"role": "user",
"parts": [
{
"inlineData": {
"mimeType": "image/jpeg",
"data": img64
}
},
{
"text": prompt
}
]
}
],
"generationConfig": {
"responseModalities": ["IMAGE"],
**({"imageConfig": image_config} if image_config else {}),
**({"thinkingConfig": thinking_config} if thinking_config else {}),
}
}
logger.info(
f"Gemini 生成配置: 比例={aspect_ratio} 尺寸={image_config.get('imageSize', '默认')} "
f"person={image_config.get('personGeneration', '默认')} thinking={thinking_config.get('thinkingLevel', '默认')}"
)
else:
# OpenAI兼容格式
api_url = config['api_url']
headers = {
"Authorization": f"Bearer {config['api_key']}",
"Content-Type": "application/json"
}
data = {
"model": config['api_model'],
"stream": False,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{img64}"
}
}
]
}
]
}
logger.info(f"正在请求{config['name']}服务 (第{attempt + 1}次)...")
# 发送异步请求
timeout = aiohttp.ClientTimeout(total=300, connect=30)
connector = aiohttp.TCPConnector(limit=10, limit_per_host=5)
try:
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
async with session.post(api_url, headers=headers, json=data) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"{config['name']} API请求失败 (第{attempt + 1}次): {response.status} - {error_text}")
# 如果是当前API配置的最后一次重试
if attempt == config['max_retries'] - 1:
logger.warning(f"{config['name']} 所有重试已用完切换到下一个API配置")
break
# 当前API配置内部重试
base_wait_time = 2
wait_time = base_wait_time * (attempt + 1)
logger.info(f"等待{wait_time}秒后重试{config['name']}...")
await asyncio.sleep(wait_time)
continue
result = await response.json()
# Gemini 偶发只返回文本不返回图片NO_IMAGE 时快速重试/降级
if config.get('use_gemini_format', False):
finish_reason = ""
try:
finish_reason = (
(result.get("candidates") or [{}])[0].get("finishReason", "")
)
except Exception:
finish_reason = ""
if finish_reason == "NO_IMAGE":
logger.warning(
f"{config['name']} 返回 NO_IMAGE (模型={config.get('api_model')}),第{attempt + 1}"
)
metrics_emit("gemini_no_image", model=config.get("api_model", ""), provider=config.get("name", ""))
if attempt == config['max_retries'] - 1:
logger.warning(f"{config['name']} NO_IMAGE 重试已用完,切换下一个配置")
break
await asyncio.sleep(1 + attempt)
continue
except (aiohttp.ClientError, asyncio.TimeoutError, AssertionError) as e:
logger.error(f"{config['name']} 网络连接错误 (第{attempt + 1}次): {str(e)}")
# 如果是当前API配置的最后一次重试
if attempt == config['max_retries'] - 1:
logger.warning(f"{config['name']} 网络重试已用完切换到下一个API配置")
break
# 当前API配置内部重试
base_wait_time = 2
wait_time = base_wait_time * (attempt + 1)
logger.info(f"等待{wait_time}秒后重试{config['name']}...")
await asyncio.sleep(wait_time)
continue
logger.info(f"{config['name']} 服务请求成功 (第{attempt + 1}次),正在处理响应...")
# 处理API响应并提取图片
success, message, data = await self._process_api_response(result, output_path, config['name'], config)
if success:
logger.info(f"使用 {config['name']} 成功完成印花提取")
metrics_emit("gemini_success", model=config.get("api_model", ""), provider=config.get("name", ""))
try:
from utils.api_cost_tracker import record
record("gemini_extract", count=1)
except Exception:
pass
return True, f"Gemini V2印花提取完成 - 使用{config['name']}", data
else:
logger.warning(f"{config['name']} 响应处理失败: {message}")
# 如果是当前API配置的最后一次重试
if attempt == config['max_retries'] - 1:
logger.warning(f"{config['name']} 所有重试已用完切换到下一个API配置")
break
# 当前API配置内部重试
base_wait_time = 2
wait_time = base_wait_time * (attempt + 1)
logger.info(f"等待{wait_time}秒后重试{config['name']}...")
await asyncio.sleep(wait_time)
continue
except Exception as e:
logger.error(f"{config['name']} API调用异常 (第{attempt + 1}次): {str(e)}")
# 如果是当前API配置的最后一次重试
if attempt == config['max_retries'] - 1:
logger.warning(f"{config['name']} 异常重试已用完切换到下一个API配置")
break
# 当前API配置内部重试
base_wait_time = 2
wait_time = base_wait_time * (attempt + 1)
logger.info(f"等待{wait_time}秒后重试{config['name']}...")
await asyncio.sleep(wait_time)
continue
# 所有API配置都尝试过了返回失败
return False, "所有API配置都已尝试失败", {}
async def _process_api_response(self, result: dict, output_path: str, api_name: str, config: dict) -> tuple[bool, str, dict]:
"""处理API响应并提取图片"""
try:
# 根据API格式提取内容
if config.get('use_gemini_format', False):
# Gemini原生API格式: candidates[0].content.parts[0]
content_parts = result['candidates'][0]['content']['parts']
# 查找包含图片数据的part
image_data = None
for part in content_parts:
# 注意:响应中使用驼峰命名 inlineData
if 'inlineData' in part:
# 提取Base64图片数据
base64_data = part['inlineData']['data']
logger.info(f"{api_name} 找到Gemini格式的inlineData图片")
try:
image_data = base64.b64decode(base64_data)
break
except Exception as e:
logger.error(f"{api_name} Base64解码失败: {e}")
return False, f"Base64解码失败: {e}", {}
if not image_data:
logger.error(f"{api_name} 在Gemini响应中未找到图片数据")
return False, "未找到图片数据", {}
# 直接保存图片
return await self._save_image(image_data, output_path, api_name)
else:
# OpenAI兼容格式: choices[0].message.content
content = result['choices'][0]['message']['content']
logger.info(f"{api_name} 收到内容: {content[:200]}...")
# 使用原有的URL/Base64提取逻辑
return await self._extract_and_save_image(content, output_path, api_name)
except KeyError as e:
logger.error(f"{api_name} 响应格式不正确,缺少字段: {e}")
logger.error(f"响应内容: {json.dumps(result, ensure_ascii=False)[:500]}")
return False, f"响应格式错误: {e}", {}
except Exception as e:
logger.error(f"{api_name} 处理响应时发生异常: {e}")
return False, f"处理异常: {e}", {}
async def _save_image(self, image_data: bytes, output_path: str, api_name: str) -> tuple[bool, str, dict]:
"""保存图片文件"""
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'wb') as f:
f.write(image_data)
logger.info(f"{api_name} 图片已保存到: {output_path}")
# 验证保存的图片
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
file_size = os.path.getsize(output_path)
logger.info(f"{api_name} 图片保存成功,文件大小: {file_size} bytes")
return True, f"{api_name} 印花提取完成", {
'output_path': output_path,
'file_size': file_size,
'api_used': api_name
prompt = str(custom_prompt or self.DEFAULT_PROMPT).strip()
api_url = f"{self.API_BASE_URL}/{GEMINI_IMAGE_MODEL}:generateContent"
headers = {
"Authorization": f"Bearer {GEMINI_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"contents": [
{
"parts": [
{"inlineData": {"mimeType": self._guess_mime_type(input_path), "data": img64}},
{"text": prompt},
]
}
else:
logger.error(f"{api_name} 保存的图片文件无效")
return False, "保存的图片文件无效", {}
except Exception as e:
logger.error(f"{api_name} 保存图片时发生错误: {e}")
return False, f"保存图片失败: {e}", {}
async def _extract_and_save_image(self, content: str, output_path: str, api_name: str) -> tuple[bool, str, dict]:
"""从响应内容中提取并保存图片URL或Base64格式"""
# 查找和处理图片数据
image_data = None
# 方法1: 查找URL链接 (优先检查URL格式)
url_match = re.search(r'https?://[^\s\)]+\.(?:png|jpg|jpeg|gif|webp)', content)
if url_match:
image_url = url_match.group(0)
logger.info(f"{api_name} 找到图片URL: {image_url}")
# 图片下载重试机制
download_retries = 3
for download_attempt in range(download_retries):
],
"generationConfig": self._build_generation_config(
aspect_ratio=aspect_ratio,
image_size=image_size,
person_generation=person_generation,
thinking_level=thinking_level,
),
}
metrics_emit("gemini_request", model=GEMINI_IMAGE_MODEL, provider="laozhang_gemini_native")
timeout = aiohttp.ClientTimeout(total=300, connect=30)
for attempt in range(1, 3):
try:
logger.info(f"Gemini 出图开始 attempt={attempt}/2 model={GEMINI_IMAGE_MODEL} input={input_path}")
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(api_url, headers=headers, json=payload) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Gemini API请求失败 attempt={attempt}: {response.status} - {error_text}")
if attempt < 2:
await asyncio.sleep(attempt)
continue
return False, f"Gemini API请求失败: {response.status}", {}
result = await response.json()
image_bytes = self._extract_image_bytes(result)
data = self._save_image(image_bytes, output_path)
metrics_emit("gemini_success", model=GEMINI_IMAGE_MODEL, provider="laozhang_gemini_native")
try:
logger.info(f"{api_name} 开始下载图片 (第{download_attempt + 1}/{download_retries}次尝试): {image_url}")
# 异步下载图片,增加超时时间
timeout = aiohttp.ClientTimeout(total=300, connect=60)
connector = aiohttp.TCPConnector(limit=5, limit_per_host=2)
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers=headers
) as download_session:
logger.info(f"{api_name} 正在发送HTTP请求...")
async with download_session.get(image_url) as img_response:
logger.info(f"{api_name} 收到HTTP响应: {img_response.status}")
if img_response.status == 200:
image_data = await img_response.read()
logger.info(f"{api_name} 图片下载成功,大小: {len(image_data)} bytes")
break # 成功则跳出重试循环
else:
logger.error(f"{api_name} 图片下载失败HTTP状态码: {img_response.status}")
if download_attempt == download_retries - 1:
return False, "图片下载失败", {}
else:
await asyncio.sleep(2)
continue
except Exception as e:
logger.error(f"{api_name} 下载图片时发生异常 (第{download_attempt + 1}次): {type(e).__name__}: {str(e)}")
if download_attempt == download_retries - 1:
return False, f"图片下载异常: {str(e)}", {}
else:
await asyncio.sleep(2)
continue
else:
# 方法2: 查找标准格式 data:image/type;base64,data
base64_match = re.search(r'data:image/[^;]+;base64,([A-Za-z0-9+/=]+)', content)
if base64_match:
base64_data = base64_match.group(1)
logger.info(f"{api_name} 找到标准格式的Base64数据")
try:
image_data = base64.b64decode(base64_data)
except Exception as e:
logger.error(f"{api_name} Base64解码失败: {e}")
return False, f"Base64解码失败: {e}", {}
else:
# 方法3: 查找纯Base64数据长字符串
base64_match = re.search(r'([A-Za-z0-9+/=]{100,})', content)
if base64_match:
base64_data = base64_match.group(1)
logger.info(f"{api_name} 找到纯Base64数据")
try:
image_data = base64.b64decode(base64_data)
except Exception as e:
logger.error(f"{api_name} Base64解码失败: {e}")
return False, f"Base64解码失败: {e}", {}
else:
logger.error(f"{api_name} 在响应中未找到图片数据")
return False, "未找到图片数据", {}
# 检查图片数据
if not image_data:
logger.error(f"{api_name} 图片数据为空")
return False, "图片数据为空", {}
# 保存图片
return await self._save_image(image_data, output_path, api_name)
from utils.api_cost_tracker import record
record("gemini_extract", count=1)
except Exception:
pass
return True, "Gemini 出图完成", data
except Exception as e:
logger.error(f"Gemini 出图异常 attempt={attempt}: {e}")
if attempt < 2:
await asyncio.sleep(attempt)
continue
return False, f"Gemini 出图失败: {e}", {}
return False, "Gemini 出图失败", {}
async def correct_perspective(
self,
input_path: str,
output_path: str,
level: str = "mild",
) -> tuple[bool, str, dict]:
"""
透视矫正:先把有透视畸变的图还原为正面平铺视图,再做后续处理。
Args:
input_path: 本地图片路径
output_path: 矫正后输出路径
level: "mild""strong"
"""
if level == "strong":
prompt = (
"这张图存在明显透视畸变(俯拍/斜拍/贴墙)。"
"请对图片进行透视矫正:将主体变换为正面平铺视图,"
"使所有边缘变成水平或垂直,去除梯形形变,"
"保持图案颜色和细节完全不变,只矫正几何形状,输出矫正后的完整图片。"
"这张图存在明显透视畸变。请把主体矫正为正面平铺视图,"
"所有边缘尽量水平或垂直,保持图案颜色和细节不变,只做几何矫正。"
)
else:
prompt = (
"这张图存在轻微透视畸变(衣物悬挂/桌面斜拍)。"
"请做轻度透视矫正:将主体调整为尽量正视角,"
"消除轻微的梯形拉伸感,保持图案颜色和细节不变,输出矫正后的图片。"
"这张图存在轻微透视畸变。请做轻度透视矫正,"
"消除斜拍拉伸感,保持图案颜色和细节不变。"
)
# 透视矫正使用 1:1 比例避免比例失真
return await self.extract_pattern(
input_path=input_path,
output_path=output_path,
@@ -528,40 +207,17 @@ class GeminiExtractV2Service(BaseService):
)
async def cleanup(self):
"""清理资源"""
if self.session and not self.session.closed:
await self.session.close()
return None
# 便捷函数
async def extract_pattern_v2(
input_path: str,
output_path: str,
custom_prompt: str = None,
aspect_ratio: str = "1:1",
) -> tuple[bool, str, dict]:
"""Gemini V2印花提取便捷函数"""
service = GeminiExtractV2Service()
try:
return await service.extract_pattern(input_path, output_path, custom_prompt, aspect_ratio)
finally:
await service.cleanup()
if __name__ == "__main__":
# 测试代码
import asyncio
async def test():
service = GeminiExtractV2Service()
input_path = "F:/api/134.png"
output_path = "test_output_v2.png"
success, message, data = await service.extract_pattern(input_path, output_path)
print(f"结果: {success}")
print(f"消息: {message}")
print(f"数据: {data}")
await service.cleanup()
asyncio.run(test())

View File

@@ -20,6 +20,13 @@ logger = logging.getLogger("cs_agent")
ANALYSIS_PROMPT = """你是一个电商图片处理评估专家。
客户需求如下:
{customer_requirement}
请结合客户需求和图片内容一起判断,不要只看图片本身。
如果客户明确说了“找原图/找图/素材/大图”,类型优先判断为“找原图/素材提取”类;
如果客户明确说了“修复/高清/清晰/放大”,类型优先判断为“高清修复”类。
请仔细分析这张图片,输出以下字段,每行一个,不要多余内容:
敏感内容: <yes|no>
@@ -61,6 +68,26 @@ ANALYSIS_PROMPT = """你是一个电商图片处理评估专家。
"""
def _sanitize_title_part(text: str) -> str:
value = str(text or "").strip()
value = value.replace("/", "_").replace("\\", "_")
value = " ".join(value.split())
return value[:20]
def _build_title_suggest(subject: str, proc_type: str, customer_requirement: str) -> str:
subject_part = _sanitize_title_part(subject)
proc_part = _sanitize_title_part(proc_type)
req_part = _sanitize_title_part(customer_requirement)
parts = [part for part in (subject_part, proc_part) if part]
if parts:
return "_".join(parts[:2])
if req_part:
return req_part
return "图片识别结果"
class ImageAnalyzerService:
"""图片分析服务 - 后台静默运行,不影响主流程"""
@@ -101,7 +128,7 @@ class ImageAnalyzerService:
logger.debug(f"[ImageAnalyzer] 获取尺寸失败: {e}")
return (0, 0)
async def analyze(self, image_url: str) -> Dict[str, Any]:
async def analyze(self, image_url: str, customer_requirement: str = "") -> Dict[str, Any]:
"""
异步分析图片,返回结构化结果
@@ -121,6 +148,7 @@ class ImageAnalyzerService:
"perspective": no|mild|strong,
"aspect_ratio": 比例,
"gemini_prompt": 处理提示词,
"title_suggest": 推荐标题,
"note": 备注,
"price_suggest": 建议价格,
"width": 宽度,
@@ -133,7 +161,8 @@ class ImageAnalyzerService:
return self._fallback(image_url, "未配置 API Key")
# 缓存检查
cache_key = image_url
customer_requirement = str(customer_requirement or "").strip()
cache_key = f"{image_url}|{customer_requirement}"
now = time.monotonic()
cached = self._analysis_cache.get(cache_key)
if cached:
@@ -149,6 +178,9 @@ class ImageAnalyzerService:
try:
client = AsyncOpenAI(base_url=self.base_url, api_key=self.api_key)
prompt_text = ANALYSIS_PROMPT.format(
customer_requirement=customer_requirement or "未提供明确补充需求"
)
response = await asyncio.wait_for(
client.chat.completions.create(
model=self.vision_model,
@@ -156,7 +188,7 @@ class ImageAnalyzerService:
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": ANALYSIS_PROMPT}
{"type": "text", "text": prompt_text}
]
}],
max_tokens=500
@@ -170,6 +202,12 @@ class ImageAnalyzerService:
elapsed = time.monotonic() - start
result = self._parse_result(image_url, content)
result["customer_requirement"] = customer_requirement
result["title_suggest"] = _build_title_suggest(
result.get("subject", ""),
result.get("proc_type", ""),
customer_requirement,
)
result["elapsed"] = round(elapsed, 2)
# 获取尺寸
@@ -241,6 +279,7 @@ class ImageAnalyzerService:
return {
"url": url,
"customer_requirement": "",
"complexity": complexity,
"reason": extract("原因"),
"subject": extract("主体"),
@@ -267,6 +306,11 @@ class ImageAnalyzerService:
"difficulty": extract("难点", ""),
"suggest_method": extract("建议方案", "AI处理"),
"gemini_prompt": extract("提示词"),
"title_suggest": _build_title_suggest(
extract("主体"),
extract("类型"),
"",
),
"note": extract("备注"),
"price_min": price_min,
"price_max": price_max,
@@ -280,6 +324,7 @@ class ImageAnalyzerService:
from datetime import datetime
return {
"url": url,
"customer_requirement": "",
"complexity": "normal",
"reason": reason,
"subject": "",
@@ -306,6 +351,7 @@ class ImageAnalyzerService:
"difficulty": "",
"suggest_method": "",
"gemini_prompt": "",
"title_suggest": "图片识别结果",
"note": "",
"price_min": 15,
"price_max": 20,

View File

@@ -6,55 +6,159 @@
import os
import httpx
import logging
import mimetypes
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
from typing import Iterator, Optional
from urllib.parse import urlparse
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
load_dotenv()
# 图绘平台配置
TUHUI_BASE_URL = os.getenv("TUHUI_BASE_URL", "http://127.0.0.1:8002")
TUHUI_BASE_URL = os.getenv("TUHUI_BASE_URL", "https://aidg168.uk")
TUHUI_FALLBACK_BASE_URL = "https://aidg168.uk"
TUHUI_DIRECT_BASE_URL = os.getenv("TUHUI_DIRECT_BASE_URL", "http://156.226.181.204:8002")
TUHUI_WEB_BASE_URL = os.getenv("TUHUI_WEB_BASE_URL", "https://aidg168.uk").rstrip("/")
TUHUI_PHONE = os.getenv("TUHUI_PHONE", "17520145271") # 图绘账号手机号
TUHUI_PASSWORD = os.getenv("TUHUI_PASSWORD", "zuowei1216") # 图绘账号密码
TUHUI_DEFAULT_PRICE = int(os.getenv("TUHUI_DEFAULT_PRICE", "20")) # 默认定价(元)
TUHUI_DEFAULT_CATEGORY = os.getenv("TUHUI_DEFAULT_CATEGORY", "设计素材")
@dataclass
class TuhuiUploadResult:
"""图绘上传结果。主返回 URL 为站内作品页,保留三元组解包兼容。"""
success: bool
download_url: str
work_id: int
image_url: str = ""
thumbnail_url: str = ""
watermarked_url: str = ""
message: str = ""
def __iter__(self) -> Iterator[object]:
# 兼容历史调用ok, download_url, work_id = result
yield self.success
yield self.download_url
yield self.work_id
def as_dict(self) -> dict:
return {
"success": self.success,
"download_url": self.download_url,
"work_id": self.work_id,
"image_url": self.image_url,
"thumbnail_url": self.thumbnail_url,
"watermarked_url": self.watermarked_url,
"message": self.message,
}
class TuhuiUploadService:
"""图绘平台上传服务"""
def __init__(self):
self.base_url = TUHUI_BASE_URL
self.base_url = TUHUI_BASE_URL.rstrip("/")
self.base_urls = []
for candidate in (
TUHUI_FALLBACK_BASE_URL.rstrip("/"),
TUHUI_DIRECT_BASE_URL.rstrip("/"),
self.base_url,
):
if candidate and candidate not in self.base_urls:
self.base_urls.append(candidate)
if self.base_urls:
self.base_url = self.base_urls[0]
self.phone = TUHUI_PHONE
self.password = TUHUI_PASSWORD
self.default_price = TUHUI_DEFAULT_PRICE
self.access_token = None
self.user_id = None
@staticmethod
def _build_api_url(base_url: str, path: str) -> str:
normalized = path if path.startswith("/") else f"/{path}"
if base_url.endswith("/api"):
return f"{base_url}{normalized}"
return f"{base_url}/api{normalized}"
def _api_url(self, path: str) -> str:
return self._build_api_url(self.base_url, path)
@staticmethod
def _build_work_url(work_id: int) -> str:
return f"{TUHUI_WEB_BASE_URL}/detail/{int(work_id)}"
@staticmethod
def _normalize_asset_url(raw_url: str) -> str:
url = str(raw_url or "").strip()
if not url:
return ""
if url.startswith("/"):
return f"{TUHUI_WEB_BASE_URL}{url}"
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
return url
host = (parsed.netloc or "").lower()
if host in {
"tuhui.cloud",
"www.tuhui.cloud",
"aidg168.uk",
"www.aidg168.uk",
"156.226.181.204:8002",
"1.12.50.92:8002",
"127.0.0.1:8002",
}:
path = parsed.path or ""
if parsed.query:
path = f"{path}?{parsed.query}"
if parsed.fragment:
path = f"{path}#{parsed.fragment}"
return f"{TUHUI_WEB_BASE_URL}{path}"
return url
@staticmethod
def _guess_file_meta(image_path: str) -> tuple[str, str]:
path = Path(image_path)
filename = path.name or "image.jpg"
mime_type, _ = mimetypes.guess_type(filename)
return filename, mime_type or "application/octet-stream"
async def login(self) -> bool:
"""登录图绘平台获取 token"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/auth/login",
json={
"phone": self.phone,
"password": self.password
},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
self.access_token = data.get("access_token")
user = data.get("user", {})
self.user_id = user.get("id")
logger.info(f"图绘平台登录成功,用户 ID: {self.user_id}")
return True
else:
logger.error(f"图绘平台登录失败:{response.status_code} {response.text}")
return False
except Exception as e:
logger.error(f"图绘平台登录异常:{e}")
return False
last_error = ""
for base_url in self.base_urls:
try:
async with httpx.AsyncClient() as client:
response = await client.post(
self._build_api_url(base_url, "/auth/login"),
json={
"phone": self.phone,
"password": self.password
},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
self.access_token = data.get("access_token")
user = data.get("user", {})
self.user_id = user.get("id")
self.base_url = base_url
logger.info(f"图绘平台登录成功,用户 ID: {self.user_id}base={self.base_url}")
return True
last_error = f"{response.status_code} {response.text}"
logger.warning(f"图绘平台登录失败base={base_url}{last_error}")
except Exception as e:
last_error = str(e)
logger.warning(f"图绘平台登录异常base={base_url}{type(e).__name__}: {e!r}")
logger.error(f"图绘平台登录失败:{last_error}")
return False
async def upload_image(
self,
@@ -62,8 +166,10 @@ class TuhuiUploadService:
title: str,
description: str = "",
price: Optional[int] = None,
category: str = "高清修复"
) -> Tuple[bool, str, int]:
category: str = TUHUI_DEFAULT_CATEGORY,
tags: str = "",
designer_name: str = "",
) -> TuhuiUploadResult:
"""
上传图片到图绘平台
@@ -75,36 +181,44 @@ class TuhuiUploadService:
category: 分类
Returns:
(success, image_url, work_id)
TuhuiUploadResult
- success: 是否上传成功
- image_url: 图片 URL
- download_url: 站内作品页地址
- image_url: 原图 URL保留便于需要时取用
- thumbnail_url: 缩略图 URL
- watermarked_url: 水印图 URL
- work_id: 作品 ID
"""
try:
# 如果 token 过期,重新登录
if not self.access_token:
if not await self.login():
return False, "登录失败", 0
return TuhuiUploadResult(False, "", 0, message="登录失败")
# 准备上传数据
price = price or self.default_price
price = self.default_price if price is None else price
# 读取图片文件
if not os.path.exists(image_path):
logger.error(f"图片文件不存在:{image_path}")
return False, "文件不存在", 0
return TuhuiUploadResult(False, "", 0, message="文件不存在")
filename, mime_type = self._guess_file_meta(image_path)
with open(image_path, "rb") as f:
files = {
"original_image": ("image.jpg", f, "image/jpeg")
"file": (filename, f, mime_type)
}
data = {
"title": title,
"description": description,
"price": str(price),
"category": category
"category": category,
}
if tags:
data["tags"] = tags
if designer_name:
data["designer_name"] = str(designer_name).strip()
headers = {
"Authorization": f"Bearer {self.access_token}"
@@ -112,7 +226,7 @@ class TuhuiUploadService:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/works",
self._api_url("/upload"),
files=files,
data=data,
headers=headers,
@@ -120,11 +234,39 @@ class TuhuiUploadService:
)
if response.status_code in [200, 201]:
work_data = response.json()
work_id = work_data.get("id")
image_url = work_data.get("original_image", "")
logger.info(f"图绘平台上传成功,作品 ID: {work_id}, URL: {image_url}")
return True, image_url, work_id
payload = response.json()
if not payload.get("success", False):
logger.error(f"图绘平台上传返回失败:{payload}")
return TuhuiUploadResult(
False,
"",
0,
message=str(payload.get("message", "上传失败")),
)
work_id = int(payload.get("work_id") or payload.get("work", {}).get("id") or 0)
image_url = self._normalize_asset_url(
payload.get("image_url") or payload.get("work", {}).get("original_image") or ""
)
thumbnail_url = self._normalize_asset_url(
payload.get("thumbnail_url") or payload.get("work", {}).get("thumbnail_image") or ""
)
watermarked_url = self._normalize_asset_url(
payload.get("watermarked_url") or payload.get("work", {}).get("watermarked_image") or ""
)
download_url = self._build_work_url(work_id) if work_id else ""
logger.info(
f"图绘平台上传成功,作品 ID: {work_id}, 站内地址: {download_url}, 原图: {image_url}"
)
return TuhuiUploadResult(
True,
download_url,
work_id,
image_url=image_url,
thumbnail_url=thumbnail_url,
watermarked_url=watermarked_url,
message=str(payload.get("message", "上传成功")),
)
else:
logger.error(f"图绘平台上传失败:{response.status_code} {response.text}")
@@ -135,13 +277,13 @@ class TuhuiUploadService:
if await self.login():
# 重新上传
return await self.upload_image(
image_path, title, description, price, category
image_path, title, description, price, category, tags, designer_name
)
return False, f"上传失败:{response.text}", 0
return TuhuiUploadResult(False, "", 0, message=f"上传失败:{response.text}")
except Exception as e:
logger.error(f"图绘平台上传异常:{e}")
return False, f"上传异常:{e}", 0
return TuhuiUploadResult(False, "", 0, message=f"上传异常:{e}")
# 单例
@@ -160,13 +302,16 @@ async def upload_to_tuhui(
image_path: str,
title: str,
description: str = "",
price: int = 20
) -> Tuple[bool, str, int]:
price: int = 20,
category: str = TUHUI_DEFAULT_CATEGORY,
tags: str = "",
designer_name: str = "",
) -> TuhuiUploadResult:
"""
便捷函数:上传图片到图绘平台
Returns:
(success, image_url, work_id)
TuhuiUploadResult
"""
service = get_tuhui_service()
return await service.upload_image(image_path, title, description, price)
return await service.upload_image(image_path, title, description, price, category, tags, designer_name)

View File

@@ -0,0 +1,51 @@
import logging
import os
import httpx
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger("cs_agent")
WECOM_BOT_WEBHOOK = os.getenv(
"WECOM_BOT_WEBHOOK",
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cc88bdef-a13f-4d7e-bdb6-ee51b68b8205",
).strip()
class WecomBotService:
def __init__(self, webhook_url: str = WECOM_BOT_WEBHOOK):
self.webhook_url = str(webhook_url or "").strip()
async def send_text(self, content: str) -> bool:
text = str(content or "").strip()
if not text:
return False
if not self.webhook_url:
logger.warning("[WeComBot] 未配置 webhook跳过发送")
return False
payload = {
"msgtype": "text",
"text": {
"content": text[:3500],
},
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(self.webhook_url, json=payload)
if response.status_code != 200:
logger.warning(f"[WeComBot] 发送失败 HTTP {response.status_code}: {response.text}")
return False
data = response.json()
ok = int(data.get("errcode", -1)) == 0
if not ok:
logger.warning(f"[WeComBot] 发送失败: {data}")
return ok
except Exception as e:
logger.warning(f"[WeComBot] 发送异常: {e}")
return False
wecom_bot_service = WecomBotService()