""" 图片处理独立工具 - 可单独调用,也可被主流程复用。 主流程(付款触发)不变,这些工具供 AI 按需组合使用。 """ import os import uuid import tempfile from typing import Dict, Any, Optional _OUTPUT_DIR = os.getenv("RESULT_IMAGE_DIR", "results") os.makedirs(_OUTPUT_DIR, exist_ok=True) async def _download(url: str) -> str: """下载图片到临时文件""" import aiohttp tmp = os.path.join(tempfile.gettempdir(), f"img_{uuid.uuid4().hex}.jpg") headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Referer": "https://www.taobao.com/", } async with aiohttp.ClientSession(headers=headers) as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp: if resp.status != 200: raise RuntimeError(f"下载失败: HTTP {resp.status}") with open(tmp, "wb") as f: f.write(await resp.read()) return tmp async def remove_background(image_url: str, save_path: str = "") -> Dict[str, Any]: """ 【独立工具】去背景 → 纯白/纯色背景。 输入 URL 或本地路径,输出白底产品图。 """ from image.perspective_fix import _gemini_call, PROMPT_WHITE_BG tmp = None try: if image_url.startswith(("http://", "https://")): tmp = await _download(image_url) src = tmp else: src = image_url if not os.path.exists(src): return {"success": False, "result_path": "", "message": f"文件不存在: {src}"} out = save_path or os.path.join(_OUTPUT_DIR, f"bg_{uuid.uuid4().hex}.jpg") ok = await _gemini_call(src, out, PROMPT_WHITE_BG, aspect_ratio="auto", label="去背景") if ok: return {"success": True, "result_path": out, "message": "去背景完成"} return {"success": False, "result_path": "", "message": "去背景失败"} except Exception as e: return {"success": False, "result_path": "", "message": str(e)} finally: if tmp and os.path.exists(tmp): os.remove(tmp) async def perspective_correct(image_url: str, save_path: str = "") -> Dict[str, Any]: """ 【独立工具】透视矫正。 输入需为白底图(可先调 remove_background),输出展平后的图。 """ import cv2 from image.perspective_fix import find_quad, four_point_transform tmp = None try: if image_url.startswith(("http://", "https://")): tmp = await _download(image_url) src = tmp else: src = image_url if not os.path.exists(src): return {"success": False, "result_path": "", "message": f"文件不存在: {src}"} img = cv2.imread(src) if img is None: return {"success": False, "result_path": "", "message": "无法读取图片"} pts = find_quad(img) if pts is None: return {"success": False, "result_path": "", "message": "未检测到四边形,无法透视矫正"} warped = four_point_transform(img, pts) out = save_path or os.path.join(_OUTPUT_DIR, f"persp_{uuid.uuid4().hex}.jpg") cv2.imwrite(out, warped, [cv2.IMWRITE_JPEG_QUALITY, 95]) return {"success": True, "result_path": out, "message": "透视矫正完成"} except Exception as e: return {"success": False, "result_path": "", "message": str(e)} finally: if tmp and os.path.exists(tmp): os.remove(tmp) async def extract_pattern(image_url: str, prompt: str = "", aspect_ratio: str = "1:1", save_path: str = "") -> Dict[str, Any]: """ 【独立工具】印花提取/主处理。 按提示词和比例输出处理后的图。 """ from services.service_gemini import GeminiExtractV2Service tmp = None try: if image_url.startswith(("http://", "https://")): tmp = await _download(image_url) src = tmp else: src = image_url if not os.path.exists(src): return {"success": False, "result_path": "", "message": f"文件不存在: {src}"} out = save_path or os.path.join(_OUTPUT_DIR, f"extract_{uuid.uuid4().hex}.jpg") service = GeminiExtractV2Service() try: ok, msg, _ = await service.extract_pattern( input_path=src, output_path=out, custom_prompt=prompt or None, aspect_ratio=aspect_ratio, ) if ok and os.path.exists(out): return {"success": True, "result_path": out, "message": "提取完成"} return {"success": False, "result_path": "", "message": msg or "提取失败"} finally: await service.cleanup() except Exception as e: return {"success": False, "result_path": "", "message": str(e)} finally: if tmp and os.path.exists(tmp): os.remove(tmp) async def enhance_image(image_url: str, save_path: str = "") -> Dict[str, Any]: """ 【独立工具】高清增强。 使用 Qwen RunningHub,失败时降级 Gemini。 """ from services.service_qwen import 清晰化_api from image.perspective_fix import _gemini_call, PROMPT_ENHANCE_SIMPLE tmp = None try: if image_url.startswith(("http://", "https://")): tmp = await _download(image_url) src = tmp else: src = image_url if not os.path.exists(src): return {"success": False, "result_path": "", "message": f"文件不存在: {src}"} out = save_path or os.path.join(_OUTPUT_DIR, f"enh_{uuid.uuid4().hex}.jpg") ok = await 清晰化_api(img_path=src, save_path=out) if not ok: ok = await _gemini_call(src, out, PROMPT_ENHANCE_SIMPLE, aspect_ratio="auto", label="增强") if ok: return {"success": True, "result_path": out, "message": "高清增强完成"} return {"success": False, "result_path": "", "message": "高清增强失败"} except Exception as e: return {"success": False, "result_path": "", "message": str(e)} finally: if tmp and os.path.exists(tmp): os.remove(tmp) async def color_match_images(orig_url: str, result_url: str, save_path: str = "", strength: float = 0.75) -> Dict[str, Any]: """ 【独立工具】颜色匹配。将 result 的色调匹配到 orig。 """ import cv2 from image.perspective_fix import _color_match tmp_orig = tmp_result = None try: if orig_url.startswith(("http://", "https://")): tmp_orig = await _download(orig_url) orig_path = tmp_orig else: orig_path = orig_url if result_url.startswith(("http://", "https://")): tmp_result = await _download(result_url) result_path = tmp_result else: result_path = result_url orig_img = cv2.imread(orig_path) result_img = cv2.imread(result_path) if orig_img is None or result_img is None: return {"success": False, "result_path": "", "message": "图片读取失败"} matched = _color_match(orig_img, result_img, strength=strength) out = save_path or os.path.join(_OUTPUT_DIR, f"color_{uuid.uuid4().hex}.jpg") cv2.imwrite(out, matched, [cv2.IMWRITE_JPEG_QUALITY, 95]) return {"success": True, "result_path": out, "message": f"颜色匹配完成(强度{strength:.0%})"} except Exception as e: return {"success": False, "result_path": "", "message": str(e)} finally: for t in (tmp_orig, tmp_result): if t and os.path.exists(t): os.remove(t) async def trim_border(image_url: str, save_path: str = "") -> Dict[str, Any]: """ 【独立工具】裁切四周背景边(支持任意颜色:白/黄/米等)。 """ import cv2 from image.perspective_fix import tool_trim_white_border tmp = None try: if image_url.startswith(("http://", "https://")): tmp = await _download(image_url) src = tmp else: src = image_url if not os.path.exists(src): return {"success": False, "result_path": "", "message": f"文件不存在: {src}"} img = cv2.imread(src) if img is None: return {"success": False, "result_path": "", "message": "无法读取图片"} trimmed, did_trim, info = tool_trim_white_border(img) out = save_path or os.path.join(_OUTPUT_DIR, f"trim_{uuid.uuid4().hex}.jpg") cv2.imwrite(out, trimmed, [cv2.IMWRITE_JPEG_QUALITY, 95]) return {"success": True, "result_path": out, "message": "裁边完成" if did_trim else "无需裁边"} except Exception as e: return {"success": False, "result_path": "", "message": str(e)} finally: if tmp and os.path.exists(tmp): os.remove(tmp) async def vectorize_to_eps(image_url: str, save_path: str = "") -> Dict[str, Any]: """ 【独立工具】矢量化 - 将图片转为 EPS 矢量文件。 客户要做矢量图、转 EPS、转 AI 格式时调用。 """ tmp = None try: if image_url.startswith(("http://", "https://")): tmp = await _download(image_url) src = tmp else: src = image_url if not os.path.exists(src): return {"success": False, "result_path": "", "message": f"文件不存在: {src}"} from services.service_vectorizer import VectorizerService svc = VectorizerService() out = save_path or os.path.join(_OUTPUT_DIR, f"vec_{uuid.uuid4().hex}.eps") result_path = await svc.image_to_eps(src, save_eps_path=out) if result_path and os.path.exists(result_path): return {"success": True, "result_path": result_path, "message": "矢量化完成,已生成 EPS 文件"} return {"success": False, "result_path": "", "message": "矢量化失败"} except ImportError as e: return {"success": False, "result_path": "", "message": f"矢量化服务不可用: {e}"} except Exception as e: return {"success": False, "result_path": "", "message": str(e)} finally: if tmp and os.path.exists(tmp): os.remove(tmp) async def meitu_enhance(image_url: str, mode: str = "standard", save_path: str = "") -> Dict[str, Any]: """ 【独立工具】美图画质增强。 模式: crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化) 客户要画质增强、清晰化、美图处理时调用。 """ tmp = None try: if image_url.startswith(("http://", "https://")): tmp = await _download(image_url) src = tmp else: src = image_url if not os.path.exists(src): return {"success": False, "result_path": "", "message": f"文件不存在: {src}"} from pathlib import Path from services.service_meitu import MeituAPIService svc = MeituAPIService() output_dir = Path(_OUTPUT_DIR) result = await svc.process_image(src, mode=mode, output_dir=output_dir) out = result.get("processed_path") if out and os.path.exists(str(out)): if save_path: import shutil shutil.copy(str(out), save_path) out = save_path return {"success": True, "result_path": str(out), "message": f"画质增强完成({result.get('mode_name', mode)})"} return {"success": False, "result_path": "", "message": "美图处理失败"} except ImportError as e: return {"success": False, "result_path": "", "message": f"美图服务不可用: {e}"} except Exception as e: return {"success": False, "result_path": "", "message": str(e)} finally: if tmp and os.path.exists(tmp): os.remove(tmp)