Files
tuhui.cloud/backend/app/api/upload.py.backup
2026-03-08 19:28:32 +08:00

257 lines
8.5 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Header
from sqlalchemy.orm import Session
from typing import Optional
import os
import uuid
import shutil
from datetime import datetime
from PIL import Image
from app.core.database import get_db
from app.models.work import Work
from app.models.user import User
from app.core.security import decode_access_token
router = APIRouter(prefix="/upload", tags=["上传"])
# 上传配置
UPLOAD_BASE_DIR = "/var/www/tuhui_uploads"
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
# 图绘域名
TUHUI_DOMAIN = "https://tuhui.cloud"
def get_current_user(authorization: str = Header(None), db: Session = Depends(get_db)):
"""获取当前登录用户"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未登录,请先登录"
)
token = authorization.replace("Bearer ", "")
payload = decode_access_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 无效或已过期"
)
user_id = int(payload.get("sub"))
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在"
)
return user
def generate_thumbnail(image_path: str, thumb_path: str, size=(400, 400)):
"""生成缩略图"""
with Image.open(image_path) as img:
img.thumbnail(size, Image.Resampling.LANCZOS)
img.save(thumb_path, quality=85)
def add_watermark(image_path: str, watermarked_path: str, watermark_text: str = "图绘"):
"""添加水印"""
with Image.open(image_path) as img:
width, height = img.size
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 36)
except:
font = ImageFont.load_default()
text = watermark_text
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = width - text_width - 20
y = height - text_height - 20
draw.text((x, y), text, fill=(255, 255, 255, 128), font=font)
img.save(watermarked_path, quality=90)
@router.post("", summary="上传作品")
async def upload_work(
file: UploadFile = File(..., description="作品图片文件"),
title: str = Form(..., description="作品标题"),
description: Optional[str] = Form(None, description="作品描述"),
category: str = Form(..., description="作品分类"),
tags: Optional[str] = Form(None, description="标签,逗号分隔"),
price: float = Form(..., ge=0, description="作品价格"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
📤 上传作品 API
**需要登录**Bearer Token
**支持格式**jpg, jpeg, png, gif, webp
**文件大小**:最大 50MB
**处理流程**
1. 验证文件类型和大小
2. 保存原图
3. 自动生成缩略图400x400
4. 自动生成水印图
5. 创建数据库记录
6. 返回 work_id 和 image_url
"""
# 验证文件扩展名
file_ext = os.path.splitext(file.filename)[1].lower()
if file_ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的文件格式,仅支持:{', '.join(ALLOWED_EXTENSIONS)}"
)
# 验证文件大小
file.file.seek(0, 2)
file_size = file.file.tell()
file.file.seek(0)
if file_size > MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"文件过大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB"
)
# 生成唯一文件名
unique_id = str(uuid.uuid4())
timestamp = datetime.now().strftime("%Y%m%d")
# 创建上传目录
upload_dir = os.path.join(UPLOAD_BASE_DIR, "original", timestamp)
thumb_dir = os.path.join(UPLOAD_BASE_DIR, "thumbnail", timestamp)
watermarked_dir = os.path.join(UPLOAD_BASE_DIR, "watermarked", timestamp)
os.makedirs(upload_dir, exist_ok=True)
os.makedirs(thumb_dir, exist_ok=True)
os.makedirs(watermarked_dir, exist_ok=True)
# 保存文件
original_filename = f"{unique_id}{file_ext}"
original_path = os.path.join(upload_dir, original_filename)
thumb_filename = f"{unique_id}_thumb.jpg"
thumb_path = os.path.join(thumb_dir, thumb_filename)
watermarked_filename = f"{unique_id}_watermarked.jpg"
watermarked_path = os.path.join(watermarked_dir, watermarked_filename)
try:
# 保存原图
with open(original_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 生成缩略图
generate_thumbnail(original_path, thumb_path)
# 生成水印图
add_watermark(original_path, watermarked_path)
# 解析标签
tags_list = [tag.strip() for tag in tags.split(",")] if tags else []
# 获取图片尺寸
with Image.open(original_path) as img:
img_width, img_height = img.size
# 创建数据库记录
work = Work(
title=title,
description=description or "",
category=category,
tags=tags_list,
price=price,
designer_id=current_user.id,
original_image=f"/uploads/original/{timestamp}/{original_filename}",
thumbnail_image=f"/uploads/thumbnail/{timestamp}/{thumb_filename}",
watermarked_image=f"/uploads/watermarked/{timestamp}/{watermarked_filename}",
width=img_width,
height=img_height,
file_size=file_size,
status="pending" # pending, approved, rejected
)
db.add(work)
db.commit()
db.refresh(work)
# 构建完整的图片 URL
image_url = f"{TUHUI_DOMAIN}{work.original_image}"
thumbnail_url = f"{TUHUI_DOMAIN}{work.thumbnail_image}"
watermarked_url = f"{TUHUI_DOMAIN}{work.watermarked_image}"
# 返回结果(包含 work_id 和 image_url
return {
"success": True,
"message": "上传成功,等待审核",
"work_id": work.id,
"image_url": image_url,
"thumbnail_url": thumbnail_url,
"watermarked_url": watermarked_url,
"work": {
"id": work.id,
"title": work.title,
"description": work.description,
"category": work.category,
"tags": work.tags,
"price": work.price,
"width": work.width,
"height": work.height,
"file_size": work.file_size,
"status": work.status,
"original_image": work.original_image,
"thumbnail_image": work.thumbnail_image,
"watermarked_image": work.watermarked_image,
"created_at": work.created_at.isoformat() if work.created_at else None
}
}
except Exception as e:
# 清理已上传的文件
for path in [original_path, thumb_path, watermarked_path]:
if os.path.exists(path):
os.remove(path)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"上传失败:{str(e)}"
)
@router.get("/my", summary="我的上传")
def get_my_uploads(
page: int = Form(1, ge=1),
page_size: int = Form(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取当前用户的上传记录"""
from sqlalchemy import desc
offset = (page - 1) * page_size
works = db.query(Work).filter(
Work.designer_id == current_user.id
).order_by(desc(Work.created_at)).offset(offset).limit(page_size).all()
total = db.query(Work).filter(Work.designer_id == current_user.id).count()
return {
"total": total,
"page": page,
"page_size": page_size,
"items": works
}