Files
tw/image/perspective_fix.py
ZuoWei a6c42d505a feat: 完整功能部署 v1.0
新增功能:
- 天网协作系统 (HTTP API 端口 6060)
- 三种工作流 (查找图片/处理图片/转人工派单)
- 图片任务数据库 (支持客户后续增加需求)
- 图绘派单系统集成 (API: 8005)
- 文字检测与加价 (60-80 元高价值订单)
- 风险评估与接单判断
- 作图失败自动转人工

新增文档:
- 项目功能汇总.md
- 三种工作流功能说明.md
- 文字加价功能说明.md
- 风险评估功能说明.md
- 图片任务数据库功能说明.md
- 图绘派单系统集成说明.md
- 作图失败转接人工说明.md
- DEPLOYMENT.md
- TIANWANG_INTEGRATION.md

核心修改:
- core/pydantic_ai_agent.py
- core/workflow.py
- core/websocket_client.py
- image/image_analyzer.py
- services/service_tuhui_dispatch.py
- db/image_tasks_db.py

版本:v1.0
日期:2026-02-28
2026-02-28 11:20:40 +08:00

652 lines
28 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
透视矫正三步流程:
Step1: Gemini 去背景 → 纯白背景
Step2: OpenCV 在白背景图上检测四角 → warpPerspective 展平
Step3: Gemini 对展平结果做高清增强
用法:
python perspective_fix.py <图片路径或URL> [--debug] [--skip-step1] [--skip-step3]
"""
import sys, io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
import os, asyncio, uuid, tempfile
import numpy as np
import cv2
from dotenv import load_dotenv
load_dotenv()
_OUTPUT_DIR = os.getenv("RESULT_IMAGE_DIR", "results")
os.makedirs(_OUTPUT_DIR, exist_ok=True)
# ═══════════════════════════════════════════════════════════════
# Gemini 辅助函数
# ═══════════════════════════════════════════════════════════════
async def _gemini_call(input_path: str, output_path: str, prompt: str,
aspect_ratio: str = "1:1", label: str = "") -> bool:
from services.service_gemini import GeminiExtractV2Service
service = GeminiExtractV2Service()
try:
ok, msg, _ = await service.extract_pattern(
input_path=input_path,
output_path=output_path,
custom_prompt=prompt,
aspect_ratio=aspect_ratio,
)
status = "成功" if ok else "失败"
print(f" [{label}] Gemini {status}: {msg[:80]}")
return ok and os.path.exists(output_path)
except Exception as e:
print(f" [{label}] Gemini 异常: {e}")
return False
finally:
await service.cleanup()
PROMPT_WHITE_BG = (
"请处理这张图片:\n"
"1. 识别图中的地毯/地垫/印花布料/产品本体作为主体\n"
"2. 去掉主体上面放置的所有物品(杯子、碗、餐具、装饰品等),只保留地垫本身\n"
"3. 把所有背景(桌面、地板、墙壁、阴影)全部替换为纯白色(#FFFFFF)\n"
"4. 保持地垫/产品的颜色、图案、边缘完全不变\n"
"输出:只有主体产品、纯白背景、无杂物的干净产品图。"
)
# 当第一次去背景效果不好时(白色覆盖率过低),用更强硬的提示词重试
PROMPT_WHITE_BG_STRONG = (
"严格执行:将这张图的背景彻底替换为纯白色 RGB(255,255,255)。\n"
"只保留图片中央的产品/地毯/布料主体,其他所有区域(桌面/地板/墙/阴影/物品)"
"一律改为纯白色。产品边缘要干净锐利,不留任何半透明或灰色区域。\n"
"重要:不论主体上摆放了什么东西,统统去掉,只输出产品本身+白色背景。"
)
PROMPT_ENHANCE = (
"请对这张已展平的图案进行高清增强:提升整体清晰度和色彩饱和度,"
"修复边缘锯齿,补全缺失细节,输出印刷级高质量平面图,背景保持纯白。"
)
# Step3 增强失败时的兜底提示词(更简单,成功率更高)
PROMPT_ENHANCE_SIMPLE = (
"请提升这张图片的清晰度和画质,输出高清版本,背景保持纯白。"
)
def _measure_white_coverage(image: np.ndarray) -> float:
"""返回图片中白色像素的百分比,用于判断去背景效果"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
_, mask = cv2.threshold(gray, 245, 255, cv2.THRESH_BINARY)
return float(np.sum(mask == 255)) / mask.size
def _color_match(source: np.ndarray, target: np.ndarray,
strength: float = 0.75, exclude_white: bool = True) -> np.ndarray:
"""
将 target 的色调匹配到 source类 PS「匹配颜色」
使用 LAB 色彩空间 Reinhard 均值/标准差迁移。
Args:
source: 原图(色彩参考来源)
target: 待调整图(处理后结果)
strength: 迁移强度 0.0-1.0,推荐 0.6-0.85
exclude_white: 统计时排除白色像素,避免背景影响肤色/图案计算
Returns:
调色后的 BGR 图像
"""
src_f = source.astype(np.float32) / 255.0
tgt_f = target.astype(np.float32) / 255.0
src_lab = cv2.cvtColor(src_f, cv2.COLOR_BGR2Lab)
tgt_lab = cv2.cvtColor(tgt_f, cv2.COLOR_BGR2Lab)
result = tgt_lab.copy()
for ch in range(3):
if exclude_white:
# 排除极亮像素L > 95统计只看图案区域
src_mask = src_lab[:, :, 0] < 95
tgt_mask = tgt_lab[:, :, 0] < 95
src_vals = src_lab[:, :, ch][src_mask]
tgt_vals = tgt_lab[:, :, ch][tgt_mask]
else:
src_vals = src_lab[:, :, ch].ravel()
tgt_vals = tgt_lab[:, :, ch].ravel()
if src_vals.size == 0 or tgt_vals.size == 0:
continue
src_mean, src_std = float(src_vals.mean()), float(src_vals.std())
tgt_mean, tgt_std = float(tgt_vals.mean()), float(tgt_vals.std())
if tgt_std < 1e-6:
continue
# Reinhard 迁移:先归一化到目标,再重映射到源分布
shifted = (tgt_lab[:, :, ch] - tgt_mean) / tgt_std * src_std + src_mean
# 按 strength 混合strength=1 完全迁移0 保持不变
result[:, :, ch] = shifted * strength + tgt_lab[:, :, ch] * (1.0 - strength)
result_bgr = cv2.cvtColor(result, cv2.COLOR_Lab2BGR)
result_bgr = np.clip(result_bgr * 255, 0, 255).astype(np.uint8)
print(f" [颜色匹配] 强度={strength:.0%} | "
f"源均值L={src_lab[:,:,0].mean():.1f} → 目标均值L={tgt_lab[:,:,0].mean():.1f}")
return result_bgr
# ═══════════════════════════════════════════════════════════════
# OpenCV 透视矫正
# ═══════════════════════════════════════════════════════════════
def order_points(pts: np.ndarray) -> np.ndarray:
"""
把四个点排列为 [左上, 右上, 右下, 左下]。
使用质心角度排序,对矩形、菱形、平行四边形等各种透视形状均适用。
"""
cx, cy = pts[:, 0].mean(), pts[:, 1].mean()
# 计算每个点相对质心的角度(从正上方顺时针)
angles = np.arctan2(pts[:, 1] - cy, pts[:, 0] - cx)
# 顺时针排序:从右上开始(角度最小的)
order = np.argsort(angles)
sorted_pts = pts[order]
# 找到最左上角作为起点x+y 最小)
s = sorted_pts.sum(axis=1)
start = np.argmin(s)
# 从左上角开始顺时针排列 → [左上, 右上, 右下, 左下]
indices = [(start + i) % 4 for i in range(4)]
rect = sorted_pts[indices].astype("float32")
return rect
def four_point_transform(image: np.ndarray, pts: np.ndarray) -> np.ndarray:
rect = order_points(pts)
tl, tr, br, bl = rect
w1 = np.linalg.norm(br - bl)
w2 = np.linalg.norm(tr - tl)
h1 = np.linalg.norm(tr - br)
h2 = np.linalg.norm(tl - bl)
W = int(max(w1, w2))
H = int(max(h1, h2))
print(f" [CV] 角点: TL={tl.astype(int)} TR={tr.astype(int)} BR={br.astype(int)} BL={bl.astype(int)}")
print(f" [CV] 矫正后目标尺寸: {W}x{H}")
dst = np.array([
[0, 0 ],
[W - 1, 0 ],
[W - 1, H - 1],
[0, H - 1],
], dtype="float32")
M = cv2.getPerspectiveTransform(rect, dst)
warped = cv2.warpPerspective(
image, M, (W, H),
flags=cv2.INTER_LANCZOS4,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(255, 255, 255),
)
return warped
def _detect_bg_color(image: np.ndarray, corner_size: int = 24) -> np.ndarray:
"""
从图片四个角落采样估计背景颜色BGR
适用于白色、米色、黄色、灰色等各种背景。
"""
H, W = image.shape[:2]
cs = min(corner_size, H // 5, W // 5)
corners = [
image[:cs, :cs], # 左上
image[:cs, W-cs:], # 右上
image[H-cs:, :cs], # 左下
image[H-cs:, W-cs:], # 右下
]
pixels = np.concatenate([c.reshape(-1, 3) for c in corners], axis=0)
bg = np.median(pixels, axis=0).astype(np.uint8)
return bg # BGR
def tool_trim_white_border(image: np.ndarray,
tolerance: int = 18,
bg_ratio: float = 0.90,
padding: int = 4) -> tuple[np.ndarray, bool, dict]:
"""
【Tool】智能背景边裁切支持任意背景色白/黄/米/灰等)。
算法:
1. 从四角采样估计背景色
2. 逐行/列扫描:若该行/列中 bg_ratio 以上的像素与背景色差异 <= tolerance则为背景行/列
3. 找到内容区域边界后裁切
Returns:
(裁切后图片, 是否裁切, 详情dict)
"""
H, W = image.shape[:2]
bg_color = _detect_bg_color(image)
img_f = image.astype(np.int32)
# 每个像素与背景色的最大通道差异
diff = np.abs(img_f - bg_color.astype(np.int32)).max(axis=2) # H x W
is_bg = diff <= tolerance # True = 接近背景色
row_bg_ratio = is_bg.mean(axis=1) # 每行的背景像素占比
col_bg_ratio = is_bg.mean(axis=0) # 每列的背景像素占比
top = next((i for i in range(H) if row_bg_ratio[i] < bg_ratio), H)
bottom = next((i for i in range(H-1,-1,-1) if row_bg_ratio[i] < bg_ratio), -1) + 1
left = next((i for i in range(W) if col_bg_ratio[i] < bg_ratio), W)
right = next((i for i in range(W-1,-1,-1) if col_bg_ratio[i] < bg_ratio), -1) + 1
border_top = top
border_bottom = H - bottom
border_left = left
border_right = W - right
max_border = max(border_top, border_bottom, border_left, border_right)
bg_hex = "#{:02X}{:02X}{:02X}".format(int(bg_color[2]), int(bg_color[1]), int(bg_color[0]))
info = {"top": border_top, "bottom": border_bottom,
"left": border_left, "right": border_right, "bg_color": bg_hex}
if max_border < 5:
print(f" [裁边] 背景色{bg_hex} | 上{border_top}{border_bottom}{border_left}{border_right}px → 无需裁切")
return image, False, info
y1 = max(0, top - padding)
y2 = min(H, bottom + padding)
x1 = max(0, left - padding)
x2 = min(W, right + padding)
cropped = image[y1:y2, x1:x2]
ch, cw = cropped.shape[:2]
print(f" [裁边] 背景色{bg_hex} | 上{border_top}{border_bottom}{border_left}{border_right}px → 裁切 {W}x{H}{cw}x{ch}")
return cropped, True, info
async def tool_color_match(orig_img: np.ndarray, result_img: np.ndarray,
strength: float = 0.75) -> np.ndarray:
"""【Tool】颜色匹配封装版供 AI 决策层调用)"""
return _color_match(orig_img, result_img, strength=strength)
async def ai_decide_postprocess(orig_img: np.ndarray, result_img: np.ndarray) -> dict:
"""
【AI 决策层】用视觉模型分析出图效果,决定是否需要颜色匹配和白边裁切。
Returns:
{
"need_color_match": bool,
"color_strength": float, # 0.5-0.9
"need_trim": bool,
"reason": str,
}
"""
import base64
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
model = os.getenv("VISION_MODEL", "glm-4v-flash")
# 无 API 时默认两个都做
if not api_key:
return {"need_color_match": True, "color_strength": 0.75,
"need_trim": True, "reason": "无API Key默认执行"}
def _encode(img: np.ndarray) -> str:
resized = cv2.resize(img, (512, 512))
_, buf = cv2.imencode(".jpg", resized, [cv2.IMWRITE_JPEG_QUALITY, 80])
return base64.b64encode(buf).decode()
orig_b64 = _encode(orig_img)
result_b64 = _encode(result_img)
prompt = (
"你是图片后处理决策助手。图一是原图图二是AI处理后的结果图。请判断\n\n"
"【问题1】颜色差异处理后图片的整体色调与原图相比差异是否明显\n"
"(明显=色调/饱和度/冷暖差异很大;轻微=有轻微偏差;无=颜色基本一致)\n\n"
"【问题2】多余边框处理后图片四周是否有不属于图案内容的多余空白边框\n"
"注意:边框颜色不一定是白色,也可能是黄色、米色、灰色等任何纯色。\n"
"判断标准:图案内容的外围是否有一圈明显的纯色空白带。\n\n"
"严格按格式回答(每行一个字段,不要多余内容):\n"
"颜色差异: <明显|轻微|无>\n"
"多余边框: <有|无>\n"
"边框位置: <有边框的方向如「上下」,没有则填无>"
)
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
response = await client.chat.completions.create(
model=model,
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{orig_b64}"}},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{result_b64}"}},
{"type": "text", "text": prompt},
],
}],
)
text = response.choices[0].message.content or ""
print(f" [AI决策] 原始回答: {text.strip()[:120]}")
def _get(key):
for line in text.splitlines():
line = line.strip()
if line.startswith(key):
return line.split(":", 1)[-1].strip()
return ""
color_level = _get("颜色差异")
has_border = "" in _get("多余边框")
border_pos = _get("边框位置")
strength_map = {"明显": 0.80, "轻微": 0.55, "": 0.0}
color_strength = strength_map.get(color_level, 0.75)
need_color = color_strength > 0
reason = f"颜色差异={color_level or '?'}, 边框={'有('+border_pos+')' if has_border else ''}"
print(f" [AI决策] {reason} → 颜色匹配={'' if need_color else ''}(强度{color_strength:.0%}), 裁边={'' if has_border else ''}")
return {
"need_color_match": need_color,
"color_strength": color_strength,
"need_trim": has_border,
"reason": reason,
}
except Exception as e:
print(f" [AI决策] 调用失败({e}),默认执行颜色匹配+裁边")
return {"need_color_match": True, "color_strength": 0.75,
"need_trim": True, "reason": f"AI决策失败: {e}"}
def _points_are_unique(pts: np.ndarray, min_dist: float = 20.0) -> bool:
"""检查4个角点两两之间距离都大于 min_dist防止重复点导致退化变换"""
for i in range(len(pts)):
for j in range(i + 1, len(pts)):
if np.linalg.norm(pts[i] - pts[j]) < min_dist:
return False
return True
def find_quad(image: np.ndarray):
"""
在白背景图上检测主体四边形角点。
策略(按优先级):
1. 二值化 + approxPolyDPepsilon 从小到大尝试)
2. 凸包取极值四点(最左/最右/最上/最下)
3. minAreaRect 四角
"""
h, w = image.shape[:2]
img_area = h * w
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# ── 获取主体轮廓 ──────────────────────────────────────────
_, thresh = cv2.threshold(gray, 245, 255, cv2.THRESH_BINARY_INV)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (20, 20))
closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
cnts, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not cnts:
edges = cv2.Canny(gray, 30, 100)
k2 = cv2.getStructuringElement(cv2.MORPH_RECT, (10, 10))
closed = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, k2)
cnts, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not cnts:
print(" [CV] 无法检测轮廓")
return None
c = max(cnts, key=cv2.contourArea)
area = cv2.contourArea(c)
print(f" [CV] 主体轮廓面积: {area:.0f} / {img_area} ({area/img_area*100:.1f}%)")
if area < img_area * 0.05:
print(" [CV] 面积太小,背景可能去除不完全")
return None
peri = cv2.arcLength(c, True)
# ── 策略1approxPolyDPepsilon 逐步放大直到得到4个唯一角点 ──
for eps_ratio in [0.02, 0.03, 0.04, 0.05, 0.06]:
approx = cv2.approxPolyDP(c, eps_ratio * peri, True)
pts = approx.reshape(-1, 2).astype("float32")
if len(pts) == 4 and _points_are_unique(pts):
print(f" [CV] approxPolyDP 成功 (eps={eps_ratio}), 4个唯一角点")
return pts
print(f" [CV] approxPolyDP eps={eps_ratio}: {len(pts)} 顶点,唯一={_points_are_unique(pts) if len(pts)==4 else 'N/A'}")
# ── 策略2凸包极值四点最左/最上/最右/最下)─────────────
hull = cv2.convexHull(c).reshape(-1, 2).astype("float32")
if len(hull) >= 4:
# 取4个极值方向的点
left = hull[np.argmin(hull[:, 0])] # 最左
right = hull[np.argmax(hull[:, 0])] # 最右
top = hull[np.argmin(hull[:, 1])] # 最上
bottom = hull[np.argmax(hull[:, 1])] # 最下
pts = np.array([left, top, right, bottom], dtype="float32")
if _points_are_unique(pts):
print(f" [CV] 使用凸包极值四点: L={left.astype(int)} T={top.astype(int)} R={right.astype(int)} B={bottom.astype(int)}")
return pts
# ── 策略3minAreaRect 四角(兜底)─────────────────────────
print(f" [CV] 兜底:使用 minAreaRect")
rect = cv2.minAreaRect(c)
box = cv2.boxPoints(rect).astype("float32")
return box
def save_debug_img(image: np.ndarray, pts, path: str):
"""保存带角点标注的调试图"""
dbg = image.copy()
if pts is not None:
rect = order_points(pts)
labels = ["TL", "TR", "BR", "BL"]
colors = [(0,0,255), (0,255,0), (255,0,0), (0,165,255)]
for i, (px, py) in enumerate(rect):
cv2.circle(dbg, (int(px), int(py)), 12, colors[i], -1)
cv2.putText(dbg, labels[i], (int(px)+15, int(py)),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, colors[i], 3)
box = rect.reshape((-1,1,2)).astype(np.int32)
cv2.polylines(dbg, [box], True, (0,0,255), 3)
cv2.imwrite(path, dbg, [cv2.IMWRITE_JPEG_QUALITY, 90])
print(f" [Debug] 调试图: {path}")
# ═══════════════════════════════════════════════════════════════
# 主流程
# ═══════════════════════════════════════════════════════════════
async def process(src: str, debug: bool = False,
skip_step1: bool = False, skip_step3: bool = False) -> str | None:
uid = uuid.uuid4().hex
tmp = [] # 临时文件列表,最后统一清理
# ── 下载URL 情况)──────────────────────────────────────
if src.startswith("http"):
import aiohttp
dl = os.path.join(tempfile.gettempdir(), f"pfix_dl_{uid}.jpg")
tmp.append(dl)
print("[下载] 原图中...")
async with aiohttp.ClientSession(headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Referer": "https://www.taobao.com/",
}) as sess:
async with sess.get(src, timeout=aiohttp.ClientTimeout(total=30)) as r:
if r.status != 200:
print(f"[下载] 失败: HTTP {r.status}")
return None
with open(dl, "wb") as f:
f.write(await r.read())
local_src = dl
else:
local_src = src
current = local_src # 当前处理中的文件
orig_img = cv2.imread(local_src) # 保留原图用于颜色匹配
# 记录原图宽高比,用于检测 Gemini 旋转问题
orig_ratio = (orig_img.shape[1] / orig_img.shape[0]) if orig_img is not None else 1.0
try:
# ── Step 1: Gemini 去背景 → 白背景 ──────────────────
if not skip_step1:
print("\n" + ""*50)
print("Step 1 / 3 | Gemini 去背景 → 白色背景")
print(""*50)
s1_out = os.path.join(tempfile.gettempdir(), f"pfix_s1_{uid}.jpg")
tmp.append(s1_out)
ok = await _gemini_call(current, s1_out, PROMPT_WHITE_BG,
aspect_ratio="auto", label="去背景")
if ok:
# 检查白色覆盖率,判断背景去除是否充分
s1_img = cv2.imread(s1_out)
white_pct = _measure_white_coverage(s1_img) if s1_img is not None else 0.0
print(f" [去背景] 白色覆盖率: {white_pct:.1%}", end="")
if white_pct < 0.20:
# 背景去除太差,用强化提示词重试
print(" → 太低,强化提示词重试...")
s1_retry = os.path.join(tempfile.gettempdir(), f"pfix_s1r_{uid}.jpg")
tmp.append(s1_retry)
ok2 = await _gemini_call(current, s1_retry, PROMPT_WHITE_BG_STRONG,
aspect_ratio="auto", label="去背景(强化)")
if ok2:
r_img = cv2.imread(s1_retry)
retry_pct = _measure_white_coverage(r_img) if r_img is not None else 0.0
print(f" [去背景] 重试白色覆盖率: {retry_pct:.1%}", end="")
if retry_pct >= white_pct:
print(" → 效果更好,采用重试结果")
current = s1_retry
else:
print(" → 效果未提升,保留首次结果")
current = s1_out
else:
print(" [去背景] 重试失败,保留首次结果")
current = s1_out
else:
print(" → 合格")
current = s1_out
else:
print(" Step1 失败,用原图继续")
else:
print("\n[跳过 Step1] 直接用原图")
# ── Step 2: OpenCV 在白背景图上检测+透视矫正 ─────────
print("\n" + ""*50)
print("Step 2 / 3 | OpenCV 轮廓检测 + 透视矫正")
print(""*50)
img = cv2.imread(current)
if img is None:
print(f" 无法读取: {current}")
return None
h, w = img.shape[:2]
print(f" 输入尺寸: {w}x{h}")
pts = find_quad(img)
if debug:
dbg_path = os.path.join(_OUTPUT_DIR, f"debug_{uid}.jpg")
save_debug_img(img, pts, dbg_path)
if pts is not None:
warped = four_point_transform(img, pts)
# ── 方向校正Gemini 可能把图旋转 90°需要纠正 ──
wh2, ww2 = warped.shape[:2]
warped_ratio = ww2 / wh2 # 宽/高
# 若原图横竖方向与矫正结果相反(比例差异超过 1.5 倍),旋转 90°
if orig_ratio > 1.0 and warped_ratio < 1.0 / 1.5:
# 原图横,结果竖 → 顺时针转 90°
warped = cv2.rotate(warped, cv2.ROTATE_90_CLOCKWISE)
print(f" [方向校正] 原图横({orig_ratio:.2f}) vs 矫正竖({warped_ratio:.2f}) → 旋转90°")
elif orig_ratio < 1.0 and warped_ratio > 1.5:
# 原图竖,结果横 → 逆时针转 90°
warped = cv2.rotate(warped, cv2.ROTATE_90_COUNTERCLOCKWISE)
print(f" [方向校正] 原图竖({orig_ratio:.2f}) vs 矫正横({warped_ratio:.2f}) → 旋转-90°")
else:
print(f" [方向校正] 方向一致,无需旋转 (原图比例={orig_ratio:.2f}, 矫正比例={warped_ratio:.2f})")
s2_out = os.path.join(tempfile.gettempdir(), f"pfix_s2_{uid}.jpg")
tmp.append(s2_out)
cv2.imwrite(s2_out, warped, [cv2.IMWRITE_JPEG_QUALITY, 95])
current = s2_out
wh2, ww2 = warped.shape[:2]
print(f" 透视矫正完成 → {ww2}x{wh2}")
else:
print(" 角点检测失败,跳过透视矫正,继续用白背景图")
# ── Step 3: Qwen 高清增强 ─────────────────────────────
if not skip_step3:
print("\n" + ""*50)
print("Step 3 / 5 | Qwen 高清增强RunningHub")
print(""*50)
final_out = os.path.join(_OUTPUT_DIR, f"pfix_final_{uid}.jpg")
from services.service_qwen import 清晰化_api
ok = await 清晰化_api(img_path=current, save_path=final_out)
if ok:
print(f" [高清增强] Qwen 成功")
else:
# Qwen 失败,用 Gemini 简化提示词兜底
print(" Qwen 失败Gemini 兜底重试...")
ok = await _gemini_call(current, final_out, PROMPT_ENHANCE_SIMPLE,
aspect_ratio="auto", label="高清增强(Gemini兜底)")
if not ok:
print(" Step3 全部失败,直接保存矫正结果")
import shutil
shutil.copy2(current, final_out)
else:
final_out = os.path.join(_OUTPUT_DIR, f"pfix_final_{uid}.jpg")
import shutil
shutil.copy2(current, final_out)
print("\n[跳过 Step3] 直接保存矫正结果")
# ── Step 4: AI 决策 + 后处理(颜色匹配 & 白边裁切)────
print("\n" + ""*50)
print("Step 4 / 4 | AI 决策后处理(颜色匹配 / 白边裁切)")
print(""*50)
final_img = cv2.imread(final_out)
if final_img is not None and orig_img is not None:
decision = await ai_decide_postprocess(orig_img, final_img)
# Tool 1: 颜色匹配
if decision["need_color_match"]:
final_img = await tool_color_match(orig_img, final_img,
strength=decision["color_strength"])
cv2.imwrite(final_out, final_img, [cv2.IMWRITE_JPEG_QUALITY, 95])
else:
print(" [颜色匹配] AI 判断无需调色,跳过")
# Tool 2: 白边裁切
if decision["need_trim"]:
trimmed, did_trim, _ = tool_trim_white_border(final_img)
if did_trim:
cv2.imwrite(final_out, trimmed, [cv2.IMWRITE_JPEG_QUALITY, 95])
else:
print(" [裁边] AI 判断无白边,跳过")
else:
print(" [Step4] 图片读取失败,跳过后处理")
size_kb = os.path.getsize(final_out) / 1024
print(f"\n{'='*50}")
print(f" 完成!输出文件: {final_out}")
print(f" 文件大小: {size_kb:.0f} KB")
print(f"{'='*50}")
return final_out
finally:
for f in tmp:
if os.path.exists(f):
os.remove(f)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python perspective_fix.py <图片路径或URL> [--debug] [--skip-step1] [--skip-step3]")
sys.exit(1)
src_arg = sys.argv[1]
debug_arg = "--debug" in sys.argv
skip1_arg = "--skip-step1" in sys.argv
skip3_arg = "--skip-step3" in sys.argv
asyncio.run(process(src_arg, debug=debug_arg, skip_step1=skip1_arg, skip_step3=skip3_arg))