from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Header from sqlalchemy.orm import Session from typing import Optional import os import uuid import shutil from datetime import datetime from PIL import Image from app.core.database import get_db from app.models.work import Work from app.models.user import User from app.core.security import decode_access_token router = APIRouter(prefix="/upload", tags=["上传"]) # 上传配置 UPLOAD_BASE_DIR = "/var/www/tuhui_uploads" ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"} MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB # 图绘域名 TUHUI_DOMAIN = "https://tuhui.cloud" def get_current_user(authorization: str = Header(None), db: Session = Depends(get_db)): """获取当前登录用户""" if not authorization or not authorization.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未登录,请先登录" ) token = authorization.replace("Bearer ", "") payload = decode_access_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token 无效或已过期" ) user_id = int(payload.get("sub")) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在" ) return user def generate_thumbnail(image_path: str, thumb_path: str, size=(400, 400)): """生成缩略图""" with Image.open(image_path) as img: img.thumbnail(size, Image.Resampling.LANCZOS) img.save(thumb_path, quality=85) def add_watermark(image_path: str, watermarked_path: str, watermark_text: str = "图绘"): """添加水印""" with Image.open(image_path) as img: width, height = img.size from PIL import ImageDraw, ImageFont draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 36) except: font = ImageFont.load_default() text = watermark_text bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] x = width - text_width - 20 y = height - text_height - 20 draw.text((x, y), text, fill=(255, 255, 255, 128), font=font) img.save(watermarked_path, quality=90) @router.post("", summary="上传作品") async def upload_work( file: UploadFile = File(..., description="作品图片文件"), title: str = Form(..., description="作品标题"), description: Optional[str] = Form(None, description="作品描述"), category: str = Form(..., description="作品分类"), tags: Optional[str] = Form(None, description="标签,逗号分隔"), price: float = Form(..., ge=0, description="作品价格"), current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ 📤 上传作品 API **需要登录**:是(Bearer Token) **支持格式**:jpg, jpeg, png, gif, webp **文件大小**:最大 50MB **处理流程**: 1. 验证文件类型和大小 2. 保存原图 3. 自动生成缩略图(400x400) 4. 自动生成水印图 5. 创建数据库记录 6. 返回 work_id 和 image_url """ # 验证文件扩展名 file_ext = os.path.splitext(file.filename)[1].lower() if file_ext not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"不支持的文件格式,仅支持:{', '.join(ALLOWED_EXTENSIONS)}" ) # 验证文件大小 file.file.seek(0, 2) file_size = file.file.tell() file.file.seek(0) if file_size > MAX_FILE_SIZE: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"文件过大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB" ) # 生成唯一文件名 unique_id = str(uuid.uuid4()) timestamp = datetime.now().strftime("%Y%m%d") # 创建上传目录 upload_dir = os.path.join(UPLOAD_BASE_DIR, "original", timestamp) thumb_dir = os.path.join(UPLOAD_BASE_DIR, "thumbnail", timestamp) watermarked_dir = os.path.join(UPLOAD_BASE_DIR, "watermarked", timestamp) os.makedirs(upload_dir, exist_ok=True) os.makedirs(thumb_dir, exist_ok=True) os.makedirs(watermarked_dir, exist_ok=True) # 保存文件 original_filename = f"{unique_id}{file_ext}" original_path = os.path.join(upload_dir, original_filename) thumb_filename = f"{unique_id}_thumb.jpg" thumb_path = os.path.join(thumb_dir, thumb_filename) watermarked_filename = f"{unique_id}_watermarked.jpg" watermarked_path = os.path.join(watermarked_dir, watermarked_filename) try: # 保存原图 with open(original_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # 生成缩略图 generate_thumbnail(original_path, thumb_path) # 生成水印图 add_watermark(original_path, watermarked_path) # 解析标签 tags_list = [tag.strip() for tag in tags.split(",")] if tags else [] # 获取图片尺寸 with Image.open(original_path) as img: img_width, img_height = img.size # 创建数据库记录 work = Work( title=title, description=description or "", category=category, tags=tags_list, price=price, designer_id=current_user.id, original_image=f"/uploads/original/{timestamp}/{original_filename}", thumbnail_image=f"/uploads/thumbnail/{timestamp}/{thumb_filename}", watermarked_image=f"/uploads/watermarked/{timestamp}/{watermarked_filename}", width=img_width, height=img_height, file_size=file_size, status="pending" # pending, approved, rejected ) db.add(work) db.commit() db.refresh(work) # 构建完整的图片 URL image_url = f"{TUHUI_DOMAIN}{work.original_image}" thumbnail_url = f"{TUHUI_DOMAIN}{work.thumbnail_image}" watermarked_url = f"{TUHUI_DOMAIN}{work.watermarked_image}" # 返回结果(包含 work_id 和 image_url) return { "success": True, "message": "上传成功,等待审核", "work_id": work.id, "image_url": image_url, "thumbnail_url": thumbnail_url, "watermarked_url": watermarked_url, "work": { "id": work.id, "title": work.title, "description": work.description, "category": work.category, "tags": work.tags, "price": work.price, "width": work.width, "height": work.height, "file_size": work.file_size, "status": work.status, "original_image": work.original_image, "thumbnail_image": work.thumbnail_image, "watermarked_image": work.watermarked_image, "created_at": work.created_at.isoformat() if work.created_at else None } } except Exception as e: # 清理已上传的文件 for path in [original_path, thumb_path, watermarked_path]: if os.path.exists(path): os.remove(path) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"上传失败:{str(e)}" ) @router.get("/my", summary="我的上传") def get_my_uploads( page: int = Form(1, ge=1), page_size: int = Form(20, ge=1, le=100), current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """获取当前用户的上传记录""" from sqlalchemy import desc offset = (page - 1) * page_size works = db.query(Work).filter( Work.designer_id == current_user.id ).order_by(desc(Work.created_at)).offset(offset).limit(page_size).all() total = db.query(Work).filter(Work.designer_id == current_user.id).count() return { "total": total, "page": page, "page_size": page_size, "items": works }