# -*- coding: utf-8 -*- """ 七牛云对象存储工具 """ import base64 import hashlib from datetime import datetime from typing import Optional, Tuple from qiniu import Auth, put_data, BucketManager from app.core.config import settings # ==================== 配置 ==================== # 从 settings 读取(通过 pydantic-settings 加载 .env) QINIU_ACCESS_KEY = settings.QINIU_ACCESS_KEY QINIU_SECRET_KEY = settings.QINIU_SECRET_KEY QINIU_BUCKET = settings.QINIU_BUCKET QINIU_DOMAIN = settings.QINIU_DOMAIN # 例如: http://cdn.example.com # 是否启用七牛云存储 QINIU_ENABLED = bool(QINIU_ACCESS_KEY and QINIU_SECRET_KEY and QINIU_BUCKET and QINIU_DOMAIN) class QiniuStorage: """七牛云存储工具类""" def __init__(self): self.enabled = QINIU_ENABLED if self.enabled: self.auth = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY) self.bucket = QINIU_BUCKET self.domain = QINIU_DOMAIN.rstrip('/') print(f"✅ 七牛云存储已启用,Bucket: {self.bucket}") else: self.auth = None self.bucket = None self.domain = None print("⚠️ 七牛云存储未配置,图片将使用 base64 返回") def upload_base64( self, base64_data: str, key_prefix: str = "plt", user_id: Optional[int] = None ) -> Tuple[bool, str]: """ 上传 base64 图片到七牛云 Args: base64_data: base64 编码的图片数据(可带或不带 data:image/png;base64, 前缀) key_prefix: 文件名前缀 user_id: 用户ID(用于分目录) Returns: (success, url_or_error): 成功返回 URL,失败返回错误信息 """ if not self.enabled: return False, "七牛云存储未启用" try: # 去掉 base64 前缀 if ',' in base64_data: base64_data = base64_data.split(',')[1] # 解码 image_data = base64.b64decode(base64_data) # 生成文件名: plt/2024/02/05/u123/时间戳_哈希.png now = datetime.now() date_path = now.strftime("%Y/%m/%d") timestamp = now.strftime("%H%M%S") # 时分秒 content_hash = hashlib.md5(image_data).hexdigest()[:8] if user_id: key = f"{key_prefix}/{date_path}/u{user_id}/{timestamp}_{content_hash}.png" else: key = f"{key_prefix}/{date_path}/{timestamp}_{content_hash}.png" # 生成上传凭证 token = self.auth.upload_token(self.bucket, key, 3600) # 上传 ret, info = put_data(token, key, image_data) if info.status_code == 200: url = f"{self.domain}/{key}" return True, url else: print(f"[七牛云] 上传失败: status={info.status_code}, error={info.error}, text={info.text_body}") return False, f"上传失败: {info.error}" except Exception as e: print(f"[七牛云] 上传异常: {str(e)}") return False, f"上传异常: {str(e)}" def upload_batch( self, images: list, # [(base64_data, name), ...] key_prefix: str = "plt", user_id: Optional[int] = None ) -> list: """ 批量上传图片 Returns: [(success, url_or_base64, name), ...] """ results = [] success_count = 0 fail_count = 0 for base64_data, name in images: if self.enabled: success, result = self.upload_base64(base64_data, key_prefix, user_id) if success: results.append((True, result, name)) success_count += 1 else: # 上传失败,回退到 base64 print(f"[七牛云] 上传失败 {name}: {result}") results.append((False, base64_data, name)) fail_count += 1 else: # 未启用,返回原始 base64 results.append((False, base64_data, name)) if fail_count > 0: print(f"[七牛云] 批量上传结果: 成功 {success_count}, 失败 {fail_count}") return results def delete(self, key: str) -> bool: """删除文件""" if not self.enabled: return False try: bucket_manager = BucketManager(self.auth) ret, info = bucket_manager.delete(self.bucket, key) return info.status_code == 200 except Exception: return False def get_url(self, key: str) -> str: """获取文件 URL""" if not self.enabled or not key: return "" return f"{self.domain}/{key}" # 全局实例 qiniu_storage = QiniuStorage()