新增功能: - 天网协作系统 (HTTP API 端口 6060) - 三种工作流 (查找图片/处理图片/转人工派单) - 图片任务数据库 (支持客户后续增加需求) - 图绘派单系统集成 (API: 8005) - 文字检测与加价 (60-80 元高价值订单) - 风险评估与接单判断 - 作图失败自动转人工 新增文档: - 项目功能汇总.md - 三种工作流功能说明.md - 文字加价功能说明.md - 风险评估功能说明.md - 图片任务数据库功能说明.md - 图绘派单系统集成说明.md - 作图失败转接人工说明.md - DEPLOYMENT.md - TIANWANG_INTEGRATION.md 核心修改: - core/pydantic_ai_agent.py - core/workflow.py - core/websocket_client.py - image/image_analyzer.py - services/service_tuhui_dispatch.py - db/image_tasks_db.py 版本:v1.0 日期:2026-02-28
294 lines
12 KiB
Python
Executable File
294 lines
12 KiB
Python
Executable File
"""
|
||
图片处理独立工具 - 可单独调用,也可被主流程复用。
|
||
|
||
主流程(付款触发)不变,这些工具供 AI 按需组合使用。
|
||
"""
|
||
import os
|
||
import uuid
|
||
import tempfile
|
||
from typing import Dict, Any, Optional
|
||
|
||
_OUTPUT_DIR = os.getenv("RESULT_IMAGE_DIR", "results")
|
||
os.makedirs(_OUTPUT_DIR, exist_ok=True)
|
||
|
||
|
||
async def _download(url: str) -> str:
|
||
"""下载图片到临时文件"""
|
||
import aiohttp
|
||
tmp = os.path.join(tempfile.gettempdir(), f"img_{uuid.uuid4().hex}.jpg")
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||
"Referer": "https://www.taobao.com/",
|
||
}
|
||
async with aiohttp.ClientSession(headers=headers) as session:
|
||
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
|
||
if resp.status != 200:
|
||
raise RuntimeError(f"下载失败: HTTP {resp.status}")
|
||
with open(tmp, "wb") as f:
|
||
f.write(await resp.read())
|
||
return tmp
|
||
|
||
|
||
async def remove_background(image_url: str, save_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
【独立工具】去背景 → 纯白/纯色背景。
|
||
输入 URL 或本地路径,输出白底产品图。
|
||
"""
|
||
from image.perspective_fix import _gemini_call, PROMPT_WHITE_BG
|
||
tmp = None
|
||
try:
|
||
if image_url.startswith(("http://", "https://")):
|
||
tmp = await _download(image_url)
|
||
src = tmp
|
||
else:
|
||
src = image_url
|
||
if not os.path.exists(src):
|
||
return {"success": False, "result_path": "", "message": f"文件不存在: {src}"}
|
||
|
||
out = save_path or os.path.join(_OUTPUT_DIR, f"bg_{uuid.uuid4().hex}.jpg")
|
||
ok = await _gemini_call(src, out, PROMPT_WHITE_BG, aspect_ratio="auto", label="去背景")
|
||
if ok:
|
||
return {"success": True, "result_path": out, "message": "去背景完成"}
|
||
return {"success": False, "result_path": "", "message": "去背景失败"}
|
||
except Exception as e:
|
||
return {"success": False, "result_path": "", "message": str(e)}
|
||
finally:
|
||
if tmp and os.path.exists(tmp):
|
||
os.remove(tmp)
|
||
|
||
|
||
async def perspective_correct(image_url: str, save_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
【独立工具】透视矫正。
|
||
输入需为白底图(可先调 remove_background),输出展平后的图。
|
||
"""
|
||
import cv2
|
||
from image.perspective_fix import find_quad, four_point_transform
|
||
tmp = None
|
||
try:
|
||
if image_url.startswith(("http://", "https://")):
|
||
tmp = await _download(image_url)
|
||
src = tmp
|
||
else:
|
||
src = image_url
|
||
if not os.path.exists(src):
|
||
return {"success": False, "result_path": "", "message": f"文件不存在: {src}"}
|
||
|
||
img = cv2.imread(src)
|
||
if img is None:
|
||
return {"success": False, "result_path": "", "message": "无法读取图片"}
|
||
pts = find_quad(img)
|
||
if pts is None:
|
||
return {"success": False, "result_path": "", "message": "未检测到四边形,无法透视矫正"}
|
||
warped = four_point_transform(img, pts)
|
||
out = save_path or os.path.join(_OUTPUT_DIR, f"persp_{uuid.uuid4().hex}.jpg")
|
||
cv2.imwrite(out, warped, [cv2.IMWRITE_JPEG_QUALITY, 95])
|
||
return {"success": True, "result_path": out, "message": "透视矫正完成"}
|
||
except Exception as e:
|
||
return {"success": False, "result_path": "", "message": str(e)}
|
||
finally:
|
||
if tmp and os.path.exists(tmp):
|
||
os.remove(tmp)
|
||
|
||
|
||
async def extract_pattern(image_url: str, prompt: str = "", aspect_ratio: str = "1:1",
|
||
save_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
【独立工具】印花提取/主处理。
|
||
按提示词和比例输出处理后的图。
|
||
"""
|
||
from services.service_gemini import GeminiExtractV2Service
|
||
tmp = None
|
||
try:
|
||
if image_url.startswith(("http://", "https://")):
|
||
tmp = await _download(image_url)
|
||
src = tmp
|
||
else:
|
||
src = image_url
|
||
if not os.path.exists(src):
|
||
return {"success": False, "result_path": "", "message": f"文件不存在: {src}"}
|
||
|
||
out = save_path or os.path.join(_OUTPUT_DIR, f"extract_{uuid.uuid4().hex}.jpg")
|
||
service = GeminiExtractV2Service()
|
||
try:
|
||
ok, msg, _ = await service.extract_pattern(
|
||
input_path=src, output_path=out,
|
||
custom_prompt=prompt or None, aspect_ratio=aspect_ratio,
|
||
)
|
||
if ok and os.path.exists(out):
|
||
return {"success": True, "result_path": out, "message": "提取完成"}
|
||
return {"success": False, "result_path": "", "message": msg or "提取失败"}
|
||
finally:
|
||
await service.cleanup()
|
||
except Exception as e:
|
||
return {"success": False, "result_path": "", "message": str(e)}
|
||
finally:
|
||
if tmp and os.path.exists(tmp):
|
||
os.remove(tmp)
|
||
|
||
|
||
async def enhance_image(image_url: str, save_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
【独立工具】高清增强。
|
||
使用 Qwen RunningHub,失败时降级 Gemini。
|
||
"""
|
||
from services.service_qwen import 清晰化_api
|
||
from image.perspective_fix import _gemini_call, PROMPT_ENHANCE_SIMPLE
|
||
tmp = None
|
||
try:
|
||
if image_url.startswith(("http://", "https://")):
|
||
tmp = await _download(image_url)
|
||
src = tmp
|
||
else:
|
||
src = image_url
|
||
if not os.path.exists(src):
|
||
return {"success": False, "result_path": "", "message": f"文件不存在: {src}"}
|
||
|
||
out = save_path or os.path.join(_OUTPUT_DIR, f"enh_{uuid.uuid4().hex}.jpg")
|
||
ok = await 清晰化_api(img_path=src, save_path=out)
|
||
if not ok:
|
||
ok = await _gemini_call(src, out, PROMPT_ENHANCE_SIMPLE, aspect_ratio="auto", label="增强")
|
||
if ok:
|
||
return {"success": True, "result_path": out, "message": "高清增强完成"}
|
||
return {"success": False, "result_path": "", "message": "高清增强失败"}
|
||
except Exception as e:
|
||
return {"success": False, "result_path": "", "message": str(e)}
|
||
finally:
|
||
if tmp and os.path.exists(tmp):
|
||
os.remove(tmp)
|
||
|
||
|
||
async def color_match_images(orig_url: str, result_url: str, save_path: str = "",
|
||
strength: float = 0.75) -> Dict[str, Any]:
|
||
"""
|
||
【独立工具】颜色匹配。将 result 的色调匹配到 orig。
|
||
"""
|
||
import cv2
|
||
from image.perspective_fix import _color_match
|
||
tmp_orig = tmp_result = None
|
||
try:
|
||
if orig_url.startswith(("http://", "https://")):
|
||
tmp_orig = await _download(orig_url)
|
||
orig_path = tmp_orig
|
||
else:
|
||
orig_path = orig_url
|
||
if result_url.startswith(("http://", "https://")):
|
||
tmp_result = await _download(result_url)
|
||
result_path = tmp_result
|
||
else:
|
||
result_path = result_url
|
||
|
||
orig_img = cv2.imread(orig_path)
|
||
result_img = cv2.imread(result_path)
|
||
if orig_img is None or result_img is None:
|
||
return {"success": False, "result_path": "", "message": "图片读取失败"}
|
||
matched = _color_match(orig_img, result_img, strength=strength)
|
||
out = save_path or os.path.join(_OUTPUT_DIR, f"color_{uuid.uuid4().hex}.jpg")
|
||
cv2.imwrite(out, matched, [cv2.IMWRITE_JPEG_QUALITY, 95])
|
||
return {"success": True, "result_path": out, "message": f"颜色匹配完成(强度{strength:.0%})"}
|
||
except Exception as e:
|
||
return {"success": False, "result_path": "", "message": str(e)}
|
||
finally:
|
||
for t in (tmp_orig, tmp_result):
|
||
if t and os.path.exists(t):
|
||
os.remove(t)
|
||
|
||
|
||
async def trim_border(image_url: str, save_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
【独立工具】裁切四周背景边(支持任意颜色:白/黄/米等)。
|
||
"""
|
||
import cv2
|
||
from image.perspective_fix import tool_trim_white_border
|
||
tmp = None
|
||
try:
|
||
if image_url.startswith(("http://", "https://")):
|
||
tmp = await _download(image_url)
|
||
src = tmp
|
||
else:
|
||
src = image_url
|
||
if not os.path.exists(src):
|
||
return {"success": False, "result_path": "", "message": f"文件不存在: {src}"}
|
||
|
||
img = cv2.imread(src)
|
||
if img is None:
|
||
return {"success": False, "result_path": "", "message": "无法读取图片"}
|
||
trimmed, did_trim, info = tool_trim_white_border(img)
|
||
out = save_path or os.path.join(_OUTPUT_DIR, f"trim_{uuid.uuid4().hex}.jpg")
|
||
cv2.imwrite(out, trimmed, [cv2.IMWRITE_JPEG_QUALITY, 95])
|
||
return {"success": True, "result_path": out, "message": "裁边完成" if did_trim else "无需裁边"}
|
||
except Exception as e:
|
||
return {"success": False, "result_path": "", "message": str(e)}
|
||
finally:
|
||
if tmp and os.path.exists(tmp):
|
||
os.remove(tmp)
|
||
|
||
|
||
async def vectorize_to_eps(image_url: str, save_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
【独立工具】矢量化 - 将图片转为 EPS 矢量文件。
|
||
客户要做矢量图、转 EPS、转 AI 格式时调用。
|
||
"""
|
||
tmp = None
|
||
try:
|
||
if image_url.startswith(("http://", "https://")):
|
||
tmp = await _download(image_url)
|
||
src = tmp
|
||
else:
|
||
src = image_url
|
||
if not os.path.exists(src):
|
||
return {"success": False, "result_path": "", "message": f"文件不存在: {src}"}
|
||
|
||
from services.service_vectorizer import VectorizerService
|
||
svc = VectorizerService()
|
||
out = save_path or os.path.join(_OUTPUT_DIR, f"vec_{uuid.uuid4().hex}.eps")
|
||
result_path = await svc.image_to_eps(src, save_eps_path=out)
|
||
if result_path and os.path.exists(result_path):
|
||
return {"success": True, "result_path": result_path, "message": "矢量化完成,已生成 EPS 文件"}
|
||
return {"success": False, "result_path": "", "message": "矢量化失败"}
|
||
except ImportError as e:
|
||
return {"success": False, "result_path": "", "message": f"矢量化服务不可用: {e}"}
|
||
except Exception as e:
|
||
return {"success": False, "result_path": "", "message": str(e)}
|
||
finally:
|
||
if tmp and os.path.exists(tmp):
|
||
os.remove(tmp)
|
||
|
||
|
||
async def meitu_enhance(image_url: str, mode: str = "standard", save_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
【独立工具】美图画质增强。
|
||
模式: crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化)
|
||
客户要画质增强、清晰化、美图处理时调用。
|
||
"""
|
||
tmp = None
|
||
try:
|
||
if image_url.startswith(("http://", "https://")):
|
||
tmp = await _download(image_url)
|
||
src = tmp
|
||
else:
|
||
src = image_url
|
||
if not os.path.exists(src):
|
||
return {"success": False, "result_path": "", "message": f"文件不存在: {src}"}
|
||
|
||
from pathlib import Path
|
||
from services.service_meitu import MeituAPIService
|
||
svc = MeituAPIService()
|
||
output_dir = Path(_OUTPUT_DIR)
|
||
result = await svc.process_image(src, mode=mode, output_dir=output_dir)
|
||
out = result.get("processed_path")
|
||
if out and os.path.exists(str(out)):
|
||
if save_path:
|
||
import shutil
|
||
shutil.copy(str(out), save_path)
|
||
out = save_path
|
||
return {"success": True, "result_path": str(out), "message": f"画质增强完成({result.get('mode_name', mode)})"}
|
||
return {"success": False, "result_path": "", "message": "美图处理失败"}
|
||
except ImportError as e:
|
||
return {"success": False, "result_path": "", "message": f"美图服务不可用: {e}"}
|
||
except Exception as e:
|
||
return {"success": False, "result_path": "", "message": str(e)}
|
||
finally:
|
||
if tmp and os.path.exists(tmp):
|
||
os.remove(tmp)
|