from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Header from sqlalchemy.orm import Session from typing import Optional import os import uuid import shutil from datetime import datetime from PIL import Image import json from app.core.database import get_db from app.models.work import Work from app.models.user import User from app.core.security import decode_access_token router = APIRouter(prefix="/upload", tags=["上传"]) # 上传配置 UPLOAD_BASE_DIR = "/app/uploads" ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"} MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB # 图绘域名 TUHUI_DOMAIN = "https://tuhui.cloud" def get_current_user(authorization: str = Header(None), db: Session = Depends(get_db)): """获取当前登录用户""" if not authorization or not authorization.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未登录,请先登录" ) token = authorization.replace("Bearer ", "") payload = decode_access_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token 无效或已过期" ) user_id = int(payload.get("sub")) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在" ) return user def generate_thumbnail(image_path: str, thumb_path: str, size=(400, 400)): """生成缩略图 - 修复透明 PNG 问题""" with Image.open(image_path) as img: # 修复 Bug #1: 透明 PNG 转 RGB 后再保存为 JPEG if img.mode in ('RGBA', 'LA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') if img.mode in ('RGBA', 'LA'): background.paste(img, mask=img.split()[-1]) img = background elif img.mode != 'RGB': img = img.convert('RGB') img.thumbnail(size, Image.Resampling.LANCZOS) img.save(thumb_path, quality=85) def add_watermark(image_path: str, watermarked_path: str, watermark_text: str = "图绘"): """添加水印 - 修复透明 PNG 问题""" with Image.open(image_path) as img: # 修复 Bug #1: 透明 PNG 转 RGB 后再保存为 JPEG if img.mode in ('RGBA', 'LA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') if img.mode in ('RGBA', 'LA'): background.paste(img, mask=img.split()[-1]) img = background elif img.mode != 'RGB': img = img.convert('RGB') width, height = img.size from PIL import ImageDraw, ImageFont draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 36) except: font = ImageFont.load_default() text = watermark_text bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] x = width - text_width - 20 y = height - text_height - 20 draw.text((x, y), text, fill=(255, 255, 255, 128), font=font) img.save(watermarked_path, quality=90) @router.post("", summary="上传作品") async def upload_work( file: UploadFile = File(..., description="作品图片文件"), title: str = Form(..., description="作品标题"), description: Optional[str] = Form(None, description="作品描述"), category: str = Form(..., description="作品分类"), tags: Optional[str] = Form(None, description="标签,逗号分隔"), price: float = Form(..., ge=0, description="作品价格"), current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ 📤 上传作品 API **需要登录**:是(Bearer Token) **支持格式**:jpg, jpeg, png, gif, webp **文件大小**:最大 50MB """ # 验证文件扩展名 file_ext = os.path.splitext(file.filename)[1].lower() if file_ext not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"不支持的文件格式,仅支持:{', '.join(ALLOWED_EXTENSIONS)}" ) # 验证文件大小 file.file.seek(0, 2) file_size = file.file.tell() file.file.seek(0) if file_size > MAX_FILE_SIZE: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"文件过大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB" ) # 生成唯一文件名 unique_id = str(uuid.uuid4()) timestamp = datetime.now().strftime("%Y%m%d") # 创建上传目录 upload_dir = os.path.join(UPLOAD_BASE_DIR, "original", timestamp) thumb_dir = os.path.join(UPLOAD_BASE_DIR, "thumbnail", timestamp) watermarked_dir = os.path.join(UPLOAD_BASE_DIR, "watermarked", timestamp) os.makedirs(upload_dir, exist_ok=True) os.makedirs(thumb_dir, exist_ok=True) os.makedirs(watermarked_dir, exist_ok=True) # 保存文件 original_filename = f"{unique_id}{file_ext}" original_path = os.path.join(upload_dir, original_filename) thumb_filename = f"{unique_id}_thumb.jpg" thumb_path = os.path.join(thumb_dir, thumb_filename) watermarked_filename = f"{unique_id}_watermarked.jpg" watermarked_path = os.path.join(watermarked_dir, watermarked_filename) try: # 保存原图 with open(original_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # 生成缩略图 generate_thumbnail(original_path, thumb_path) # 生成水印图 add_watermark(original_path, watermarked_path) # 解析标签 - 修复 Bug #3: tags 转字符串 if tags: tags_list = [tag.strip() for tag in tags.split(",")] tags_str = ",".join(tags_list) # 转成逗号分隔的字符串 else: tags_str = None # 创建数据库记录 - 修复所有字段问题 work = Work( title=title, description=description or "", category=category, tags=tags_str, # 修复:使用字符串而不是列表 price=price, designer=current_user.nickname or current_user.phone, original_image=f"/uploads/original/{timestamp}/{original_filename}", thumbnail_image=f"/uploads/thumbnail/{timestamp}/{thumb_filename}", watermarked_image=f"/uploads/watermarked/{timestamp}/{watermarked_filename}" ) db.add(work) db.commit() db.refresh(work) # 构建完整的图片 URL image_url = f"{TUHUI_DOMAIN}{work.original_image}" thumbnail_url = f"{TUHUI_DOMAIN}{work.thumbnail_image}" watermarked_url = f"{TUHUI_DOMAIN}{work.watermarked_image}" return { "success": True, "message": "上传成功,等待审核", "work_id": work.id, "image_url": image_url, "thumbnail_url": thumbnail_url, "watermarked_url": watermarked_url } except Exception as e: # 清理已上传的文件 for path in [original_path, thumb_path, watermarked_path]: if os.path.exists(path): os.remove(path) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"上传失败:{str(e)}" ) @router.get("/my", summary="我的上传") def get_my_uploads( page: int = Form(1, ge=1), page_size: int = Form(20, ge=1, le=100), current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """获取当前用户的上传记录""" from sqlalchemy import desc offset = (page - 1) * page_size works = db.query(Work).filter( Work.designer == current_user.phone ).order_by(desc(Work.created_at)).offset(offset).limit(page_size).all() total = db.query(Work).filter(Work.designer == current_user.phone).count() return { "total": total, "page": page, "page_size": page_size, "items": works }