347 lines
13 KiB
Python
347 lines
13 KiB
Python
import asyncio
|
||
import hashlib
|
||
import json
|
||
import logging
|
||
import mimetypes
|
||
import os
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional
|
||
from urllib.parse import urlparse
|
||
|
||
import httpx
|
||
from dotenv import load_dotenv
|
||
from PIL import Image
|
||
|
||
from db.customer_db import CustomerDatabase
|
||
from db.image_tasks_db import TaskStatus, db as task_db
|
||
from services.service_gemini import GeminiExtractV2Service
|
||
from services.service_tuhui_upload import upload_to_tuhui
|
||
from services.service_wecom_bot import wecom_bot_service
|
||
|
||
load_dotenv()
|
||
logger = logging.getLogger("cs_agent")
|
||
|
||
AUTO_PROCESS_PRICE = int(os.getenv("AUTO_PROCESS_DEFAULT_PRICE", "12"))
|
||
AUTO_PROCESS_CATEGORY = os.getenv("AUTO_PROCESS_CATEGORY", "设计素材")
|
||
AUTO_PROCESS_ROOT = Path(
|
||
os.getenv("AUTO_PROCESS_ROOT", str(Path(__file__).resolve().parents[1] / "runtime" / "auto_processed"))
|
||
)
|
||
_DOWNLOAD_REFERERS = (
|
||
"https://www.taobao.com/",
|
||
"https://item.taobao.com/",
|
||
"https://detail.tmall.com/",
|
||
)
|
||
_DOWNLOAD_HEADERS = {
|
||
"User-Agent": (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/133.0.0.0 Safari/537.36"
|
||
),
|
||
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
"Cache-Control": "no-cache",
|
||
"Pragma": "no-cache",
|
||
}
|
||
_CONTENT_TYPE_SUFFIX = {
|
||
"image/jpeg": ".jpg",
|
||
"image/jpg": ".jpg",
|
||
"image/png": ".png",
|
||
"image/webp": ".webp",
|
||
"image/avif": ".avif",
|
||
"image/gif": ".gif",
|
||
}
|
||
|
||
|
||
def _safe_name(text: str, fallback: str = "image") -> str:
|
||
cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fa5_-]+", "_", str(text or "").strip())
|
||
cleaned = cleaned.strip("_")
|
||
return cleaned[:40] or fallback
|
||
|
||
|
||
def _suffix_from_url(url: str) -> str:
|
||
path = urlparse(str(url or "")).path
|
||
suffix = Path(path).suffix.lower()
|
||
if suffix in {".png", ".jpg", ".jpeg", ".webp"}:
|
||
return suffix
|
||
return ".png"
|
||
|
||
|
||
def _build_processing_prompt(intent: str, requirement_text: str, analysis: Dict) -> str:
|
||
base_prompt = str((analysis or {}).get("gemini_prompt") or "").strip()
|
||
req = str(requirement_text or "").strip()
|
||
if base_prompt:
|
||
return base_prompt
|
||
if intent == "repair":
|
||
return f"根据客户需求“{req or '高清修复'}”,保留主体和构图,做高清修复并补足细节。"
|
||
return f"根据客户需求“{req or '找原图'}”,严格参考原图元素与构图,生成完整干净的高质量素材图。"
|
||
|
||
|
||
def _build_upload_title(intent: str, analysis: Dict, requirement_text: str, idx: int) -> str:
|
||
analysis = analysis or {}
|
||
suggested = _safe_name(str(analysis.get("title_suggest") or ""), "")
|
||
if suggested:
|
||
return f"{suggested}_{idx}"
|
||
subject = _safe_name(str(analysis.get("subject") or ""), "")
|
||
proc_type = _safe_name(str(analysis.get("proc_type") or ""), "")
|
||
requirement = _safe_name(str(requirement_text or ""), "")
|
||
action = "修复" if intent == "repair" else "原图"
|
||
|
||
parts = [part for part in (subject, proc_type, requirement) if part]
|
||
if parts:
|
||
base = "_".join(parts[:2])
|
||
else:
|
||
base = "图片识别结果"
|
||
|
||
return f"{base}_{action}_{idx}"
|
||
|
||
|
||
class AutoImagePipelineService:
|
||
def __init__(self):
|
||
self.customer_db = CustomerDatabase()
|
||
|
||
@staticmethod
|
||
def _resolve_download_path(dest_path: Path, content_type: str, image_url: str) -> Path:
|
||
normalized_type = str(content_type or "").split(";", 1)[0].strip().lower()
|
||
suffix = _CONTENT_TYPE_SUFFIX.get(normalized_type, "")
|
||
if not suffix:
|
||
guessed, _ = mimetypes.guess_type(str(image_url or ""))
|
||
suffix = _CONTENT_TYPE_SUFFIX.get(str(guessed or "").lower(), "")
|
||
suffix = suffix or dest_path.suffix or ".bin"
|
||
return dest_path.with_suffix(suffix)
|
||
|
||
@staticmethod
|
||
def _normalize_image_for_gemini(image_path: Path) -> Path:
|
||
suffix = image_path.suffix.lower()
|
||
with Image.open(image_path) as img:
|
||
is_animated = bool(getattr(img, "is_animated", False)) or int(getattr(img, "n_frames", 1) or 1) > 1
|
||
needs_convert = suffix in {".avif", ".webp", ".gif"} or is_animated or img.mode not in ("RGB", "L")
|
||
if not needs_convert:
|
||
return image_path
|
||
|
||
normalized_path = image_path.with_suffix(".jpg")
|
||
if is_animated:
|
||
try:
|
||
img.seek(0)
|
||
except Exception:
|
||
pass
|
||
if img.mode not in ("RGB", "L"):
|
||
img = img.convert("RGB")
|
||
img.save(normalized_path, format="JPEG", quality=95)
|
||
logger.info(
|
||
f"[AutoImagePipeline] 已转换图片格式供Gemini使用: src={image_path} normalized={normalized_path}"
|
||
)
|
||
return normalized_path
|
||
|
||
async def _download_image(self, image_url: str, dest_path: Path) -> Path:
|
||
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
||
timeout = httpx.Timeout(60.0, connect=20.0)
|
||
last_error: Optional[Exception] = None
|
||
|
||
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
|
||
for referer in _DOWNLOAD_REFERERS:
|
||
for attempt in range(1, 4):
|
||
headers = dict(_DOWNLOAD_HEADERS)
|
||
headers["Referer"] = referer
|
||
try:
|
||
response = await client.get(image_url, headers=headers)
|
||
if response.status_code in (403, 420, 429):
|
||
raise httpx.HTTPStatusError(
|
||
f"download blocked status={response.status_code}",
|
||
request=response.request,
|
||
response=response,
|
||
)
|
||
response.raise_for_status()
|
||
resolved_path = self._resolve_download_path(
|
||
dest_path,
|
||
response.headers.get("content-type", ""),
|
||
image_url,
|
||
)
|
||
resolved_path.write_bytes(response.content)
|
||
logger.info(
|
||
f"[AutoImagePipeline] 图片下载成功 status={response.status_code} "
|
||
f"referer={referer} path={resolved_path}"
|
||
)
|
||
return resolved_path
|
||
except Exception as e:
|
||
last_error = e
|
||
logger.warning(
|
||
f"[AutoImagePipeline] 图片下载失败 attempt={attempt}/3 "
|
||
f"referer={referer} url={image_url} err={e}"
|
||
)
|
||
if attempt < 3:
|
||
await asyncio.sleep(attempt)
|
||
|
||
raise RuntimeError(f"下载原图失败: {last_error}")
|
||
|
||
@staticmethod
|
||
def _format_transfer_notice(
|
||
customer_id: str,
|
||
acc_id: str,
|
||
designer_name: str,
|
||
requirement_text: str,
|
||
intent: str,
|
||
image_urls: List[str],
|
||
) -> str:
|
||
lines = [
|
||
"【AI自动转设计师】",
|
||
f"店铺:{acc_id or '-'}",
|
||
f"客户:{customer_id or '-'}",
|
||
f"设计师:{designer_name or '-'}",
|
||
f"需求:{requirement_text or '-'}",
|
||
f"类型:{'高清修复' if intent == 'repair' else '找原图'}",
|
||
f"默认价格:{AUTO_PROCESS_PRICE}元",
|
||
]
|
||
if image_urls:
|
||
lines.append("原图URL:")
|
||
lines.extend(image_urls[:5])
|
||
return "\n".join(lines)
|
||
|
||
@staticmethod
|
||
def _format_finish_notice(
|
||
customer_id: str,
|
||
acc_id: str,
|
||
designer_name: str,
|
||
links: List[Dict[str, str]],
|
||
failures: List[str],
|
||
) -> str:
|
||
lines = [
|
||
"【AI处理完成】",
|
||
f"店铺:{acc_id or '-'}",
|
||
f"客户:{customer_id or '-'}",
|
||
f"设计师:{designer_name or '-'}",
|
||
f"默认价格:{AUTO_PROCESS_PRICE}元",
|
||
]
|
||
if links:
|
||
lines.append("处理结果:")
|
||
for idx, item in enumerate(links, 1):
|
||
lines.append(f"{idx}. 图绘链接:{item.get('download_url') or '-'}")
|
||
lines.append(f" 处理后图片:{item.get('image_url') or '-'}")
|
||
lines.append(f" 原图URL:{item.get('source_url') or '-'}")
|
||
if failures:
|
||
lines.append("失败项:")
|
||
lines.extend(failures[:5])
|
||
return "\n".join(lines)
|
||
|
||
async def process_and_notify(
|
||
self,
|
||
*,
|
||
session_key: str,
|
||
customer_id: str,
|
||
acc_id: str,
|
||
designer_name: str,
|
||
requirement_text: str,
|
||
image_urls: List[str],
|
||
intent: str = "",
|
||
) -> Dict:
|
||
image_urls = [str(url).strip() for url in (image_urls or []) if str(url).strip()]
|
||
if not image_urls:
|
||
return {"success": False, "message": "no_images"}
|
||
image_urls = image_urls[:1]
|
||
|
||
profile = self.customer_db.get_customer(session_key)
|
||
analysis = {}
|
||
if getattr(profile, "last_image_analysis", ""):
|
||
try:
|
||
analysis = json.loads(profile.last_image_analysis)
|
||
except Exception:
|
||
analysis = {}
|
||
|
||
if not intent:
|
||
intent = "repair" if "修复" in requirement_text else "find_original"
|
||
|
||
await wecom_bot_service.send_text(
|
||
self._format_transfer_notice(
|
||
customer_id=customer_id,
|
||
acc_id=acc_id,
|
||
designer_name=designer_name,
|
||
requirement_text=requirement_text,
|
||
intent=intent,
|
||
image_urls=image_urls,
|
||
)
|
||
)
|
||
|
||
pipeline_root = AUTO_PROCESS_ROOT / _safe_name(customer_id, "customer")
|
||
pipeline_root.mkdir(parents=True, exist_ok=True)
|
||
gemini_service = GeminiExtractV2Service()
|
||
uploaded_links: List[Dict[str, str]] = []
|
||
failures: List[str] = []
|
||
|
||
for idx, image_url in enumerate(image_urls, 1):
|
||
digest = hashlib.md5(f"{customer_id}|{acc_id}|{image_url}".encode("utf-8")).hexdigest()[:10]
|
||
input_path = pipeline_root / f"{digest}_src{_suffix_from_url(image_url)}"
|
||
output_path = pipeline_root / f"{digest}_out.png"
|
||
title = _build_upload_title(intent, analysis, requirement_text, idx)
|
||
prompt = _build_processing_prompt(intent, requirement_text, analysis)
|
||
task_id = task_db.add_task(
|
||
customer_id=customer_id,
|
||
platform="qianniu",
|
||
original_image=image_url,
|
||
operation=intent or "auto_process",
|
||
requirements=requirement_text,
|
||
status=TaskStatus.PROCESSING.value,
|
||
)
|
||
try:
|
||
input_path = await self._download_image(image_url, input_path)
|
||
input_path = self._normalize_image_for_gemini(input_path)
|
||
success, message, data = await gemini_service.extract_pattern(
|
||
str(input_path),
|
||
str(output_path),
|
||
custom_prompt=prompt,
|
||
aspect_ratio=str((analysis or {}).get("aspect_ratio") or "1:1"),
|
||
)
|
||
if not success or not output_path.exists():
|
||
if task_id:
|
||
task_db.update_status(task_id, TaskStatus.FAILED.value)
|
||
failures.append(f"{idx}. Gemini失败:{message}")
|
||
continue
|
||
|
||
upload_result = await upload_to_tuhui(
|
||
image_path=str(output_path),
|
||
title=title,
|
||
description=requirement_text or prompt[:120],
|
||
price=AUTO_PROCESS_PRICE,
|
||
category=AUTO_PROCESS_CATEGORY,
|
||
tags="AI处理,自动转接",
|
||
designer_name=designer_name,
|
||
)
|
||
if not upload_result.success:
|
||
if task_id:
|
||
task_db.update_status(task_id, TaskStatus.FAILED.value)
|
||
failures.append(f"{idx}. 图绘上传失败:{upload_result.message}")
|
||
continue
|
||
|
||
if task_id:
|
||
task_db.update_status(task_id, TaskStatus.COMPLETED.value, upload_result.download_url)
|
||
uploaded_links.append(
|
||
{
|
||
"download_url": upload_result.download_url,
|
||
"image_url": upload_result.image_url,
|
||
"source_url": image_url,
|
||
"work_id": str(upload_result.work_id),
|
||
}
|
||
)
|
||
except Exception as e:
|
||
if task_id:
|
||
task_db.update_status(task_id, TaskStatus.FAILED.value)
|
||
failures.append(f"{idx}. 处理异常:{e}")
|
||
|
||
await wecom_bot_service.send_text(
|
||
self._format_finish_notice(
|
||
customer_id=customer_id,
|
||
acc_id=acc_id,
|
||
designer_name=designer_name,
|
||
links=uploaded_links,
|
||
failures=failures,
|
||
)
|
||
)
|
||
|
||
return {
|
||
"success": bool(uploaded_links),
|
||
"uploaded": uploaded_links,
|
||
"failures": failures,
|
||
}
|
||
|
||
|
||
auto_image_pipeline_service = AutoImagePipelineService()
|