Files
tw/services/service_auto_image_pipeline.py

350 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import hashlib
import json
import logging
import mimetypes
import os
import random
import re
from pathlib import Path
from typing import Dict, List, Optional
from urllib.parse import urlparse
import httpx
from dotenv import load_dotenv
from PIL import Image
from db.customer_db import CustomerDatabase
from db.image_tasks_db import TaskStatus, db as task_db
from services.service_gemini import GeminiExtractV2Service
from services.service_tuhui_upload import upload_to_tuhui
from services.service_wecom_bot import wecom_bot_service
load_dotenv()
logger = logging.getLogger("cs_agent")
AUTO_PROCESS_PRICE = int(os.getenv("AUTO_PROCESS_DEFAULT_PRICE", "12"))
AUTO_PROCESS_CATEGORY = os.getenv("AUTO_PROCESS_CATEGORY", "设计素材")
AUTO_PROCESS_ROOT = Path(
os.getenv("AUTO_PROCESS_ROOT", str(Path(__file__).resolve().parents[1] / "runtime" / "auto_processed"))
)
_DOWNLOAD_REFERERS = (
"https://www.taobao.com/",
"https://item.taobao.com/",
"https://detail.tmall.com/",
)
_DOWNLOAD_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/133.0.0.0 Safari/537.36"
),
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
}
_CONTENT_TYPE_SUFFIX = {
"image/jpeg": ".jpg",
"image/jpg": ".jpg",
"image/png": ".png",
"image/webp": ".webp",
"image/avif": ".avif",
"image/gif": ".gif",
}
_DESIGNER_ALIAS_PREFIXES = ("青木", "星野", "白川", "南栀", "言川", "木也", "安可", "拾光", "云岸", "知禾")
_DESIGNER_ALIAS_SUFFIXES = ("设计", "studio", "视觉", "创意", "图像", "工坊", "素材", "像素")
def _safe_name(text: str, fallback: str = "image") -> str:
cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fa5_-]+", "_", str(text or "").strip())
cleaned = cleaned.strip("_")
return cleaned[:40] or fallback
def _suffix_from_url(url: str) -> str:
path = urlparse(str(url or "")).path
suffix = Path(path).suffix.lower()
if suffix in {".png", ".jpg", ".jpeg", ".webp"}:
return suffix
return ".png"
def _build_processing_prompt(intent: str, requirement_text: str, analysis: Dict) -> str:
base_prompt = str((analysis or {}).get("gemini_prompt") or "").strip()
req = str(requirement_text or "").strip()
if base_prompt:
return base_prompt
if intent == "repair":
return f"根据客户需求“{req or '高清修复'}”,保留主体和构图,做高清修复并补足细节。"
return f"根据客户需求“{req or '找原图'}”,严格参考原图元素与构图,生成完整干净的高质量素材图。"
def _build_upload_title(intent: str, analysis: Dict, requirement_text: str, idx: int) -> str:
analysis = analysis or {}
suggested = _safe_name(str(analysis.get("title_suggest") or ""), "")
if suggested:
return suggested
subject = _safe_name(str(analysis.get("subject") or ""), "")
proc_type = _safe_name(str(analysis.get("proc_type") or ""), "")
parts = [part for part in (subject, proc_type) if part]
if parts:
base = "_".join(parts[:2])
else:
base = "图片素材"
return base
def _build_designer_alias() -> str:
return f"{random.choice(_DESIGNER_ALIAS_PREFIXES)}{random.choice(_DESIGNER_ALIAS_SUFFIXES)}"
class AutoImagePipelineService:
def __init__(self):
self.customer_db = CustomerDatabase()
@staticmethod
def _resolve_download_path(dest_path: Path, content_type: str, image_url: str) -> Path:
normalized_type = str(content_type or "").split(";", 1)[0].strip().lower()
suffix = _CONTENT_TYPE_SUFFIX.get(normalized_type, "")
if not suffix:
guessed, _ = mimetypes.guess_type(str(image_url or ""))
suffix = _CONTENT_TYPE_SUFFIX.get(str(guessed or "").lower(), "")
suffix = suffix or dest_path.suffix or ".bin"
return dest_path.with_suffix(suffix)
@staticmethod
def _normalize_image_for_gemini(image_path: Path) -> Path:
suffix = image_path.suffix.lower()
with Image.open(image_path) as img:
is_animated = bool(getattr(img, "is_animated", False)) or int(getattr(img, "n_frames", 1) or 1) > 1
needs_convert = suffix in {".avif", ".webp", ".gif"} or is_animated or img.mode not in ("RGB", "L")
if not needs_convert:
return image_path
normalized_path = image_path.with_suffix(".jpg")
if is_animated:
try:
img.seek(0)
except Exception:
pass
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
img.save(normalized_path, format="JPEG", quality=95)
logger.info(
f"[AutoImagePipeline] 已转换图片格式供Gemini使用: src={image_path} normalized={normalized_path}"
)
return normalized_path
async def _download_image(self, image_url: str, dest_path: Path) -> Path:
dest_path.parent.mkdir(parents=True, exist_ok=True)
timeout = httpx.Timeout(60.0, connect=20.0)
last_error: Optional[Exception] = None
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
for referer in _DOWNLOAD_REFERERS:
for attempt in range(1, 4):
headers = dict(_DOWNLOAD_HEADERS)
headers["Referer"] = referer
try:
response = await client.get(image_url, headers=headers)
if response.status_code in (403, 420, 429):
raise httpx.HTTPStatusError(
f"download blocked status={response.status_code}",
request=response.request,
response=response,
)
response.raise_for_status()
resolved_path = self._resolve_download_path(
dest_path,
response.headers.get("content-type", ""),
image_url,
)
resolved_path.write_bytes(response.content)
logger.info(
f"[AutoImagePipeline] 图片下载成功 status={response.status_code} "
f"referer={referer} path={resolved_path}"
)
return resolved_path
except Exception as e:
last_error = e
logger.warning(
f"[AutoImagePipeline] 图片下载失败 attempt={attempt}/3 "
f"referer={referer} url={image_url} err={e}"
)
if attempt < 3:
await asyncio.sleep(attempt)
raise RuntimeError(f"下载原图失败: {last_error}")
@staticmethod
def _format_transfer_notice(
customer_id: str,
acc_id: str,
designer_name: str,
requirement_text: str,
intent: str,
image_urls: List[str],
) -> str:
lines = [
"【AI自动转设计师】",
f"店铺:{acc_id or '-'}",
f"客户:{customer_id or '-'}",
f"需求:{requirement_text or '-'}",
f"类型:{'高清修复' if intent == 'repair' else '找原图'}",
f"默认价格:{AUTO_PROCESS_PRICE}",
]
if image_urls:
lines.append("原图URL")
lines.extend(image_urls[:5])
return "\n".join(lines)
@staticmethod
def _format_finish_notice(
customer_id: str,
acc_id: str,
designer_name: str,
links: List[Dict[str, str]],
failures: List[str],
) -> str:
lines = [
"【AI处理完成】",
f"店铺:{acc_id or '-'}",
f"客户:{customer_id or '-'}",
f"默认价格:{AUTO_PROCESS_PRICE}",
]
if links:
lines.append("处理结果:")
for idx, item in enumerate(links, 1):
lines.append(f"{idx}. 图绘链接:{item.get('download_url') or '-'}")
lines.append(f" 处理后图片:{item.get('image_url') or '-'}")
lines.append(f" 原图URL{item.get('source_url') or '-'}")
if failures:
lines.append("失败项:")
lines.extend(failures[:5])
return "\n".join(lines)
async def process_and_notify(
self,
*,
session_key: str,
customer_id: str,
acc_id: str,
designer_name: str,
requirement_text: str,
image_urls: List[str],
intent: str = "",
) -> Dict:
image_urls = [str(url).strip() for url in (image_urls or []) if str(url).strip()]
if not image_urls:
return {"success": False, "message": "no_images"}
image_urls = image_urls[:1]
profile = self.customer_db.get_customer(session_key)
analysis = {}
if getattr(profile, "last_image_analysis", ""):
try:
analysis = json.loads(profile.last_image_analysis)
except Exception:
analysis = {}
if not intent:
intent = "repair" if "修复" in requirement_text else "find_original"
await wecom_bot_service.send_text(
self._format_transfer_notice(
customer_id=customer_id,
acc_id=acc_id,
designer_name=designer_name,
requirement_text=requirement_text,
intent=intent,
image_urls=image_urls,
)
)
pipeline_root = AUTO_PROCESS_ROOT / _safe_name(customer_id, "customer")
pipeline_root.mkdir(parents=True, exist_ok=True)
gemini_service = GeminiExtractV2Service()
uploaded_links: List[Dict[str, str]] = []
failures: List[str] = []
for idx, image_url in enumerate(image_urls, 1):
digest = hashlib.md5(f"{customer_id}|{acc_id}|{image_url}".encode("utf-8")).hexdigest()[:10]
input_path = pipeline_root / f"{digest}_src{_suffix_from_url(image_url)}"
output_path = pipeline_root / f"{digest}_out.png"
title = _build_upload_title(intent, analysis, requirement_text, idx)
prompt = _build_processing_prompt(intent, requirement_text, analysis)
task_id = task_db.add_task(
customer_id=customer_id,
platform="qianniu",
original_image=image_url,
operation=intent or "auto_process",
requirements=requirement_text,
status=TaskStatus.PROCESSING.value,
)
try:
input_path = await self._download_image(image_url, input_path)
input_path = self._normalize_image_for_gemini(input_path)
success, message, data = await gemini_service.extract_pattern(
str(input_path),
str(output_path),
custom_prompt=prompt,
aspect_ratio=str((analysis or {}).get("aspect_ratio") or "1:1"),
)
if not success or not output_path.exists():
if task_id:
task_db.update_status(task_id, TaskStatus.FAILED.value)
failures.append(f"{idx}. Gemini失败{message}")
continue
upload_result = await upload_to_tuhui(
image_path=str(output_path),
title=title,
description=requirement_text or prompt[:120],
price=AUTO_PROCESS_PRICE,
category=AUTO_PROCESS_CATEGORY,
tags="AI处理,自动转接",
designer_name=_build_designer_alias(),
)
if not upload_result.success:
if task_id:
task_db.update_status(task_id, TaskStatus.FAILED.value)
failures.append(f"{idx}. 图绘上传失败:{upload_result.message}")
continue
if task_id:
task_db.update_status(task_id, TaskStatus.COMPLETED.value, upload_result.download_url)
uploaded_links.append(
{
"download_url": upload_result.download_url,
"image_url": upload_result.image_url,
"source_url": image_url,
"work_id": str(upload_result.work_id),
}
)
except Exception as e:
if task_id:
task_db.update_status(task_id, TaskStatus.FAILED.value)
failures.append(f"{idx}. 处理异常:{e}")
await wecom_bot_service.send_text(
self._format_finish_notice(
customer_id=customer_id,
acc_id=acc_id,
designer_name=designer_name,
links=uploaded_links,
failures=failures,
)
)
return {
"success": bool(uploaded_links),
"uploaded": uploaded_links,
"failures": failures,
}
auto_image_pipeline_service = AutoImagePipelineService()