from datetime import datetime, timedelta from typing import Optional import jwt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from passlib.context import CryptContext from sqlalchemy.orm import Session from app.core.config import settings from app.db import get_db from app.models.session import UserSession # 密码哈希配置(使用 bcrypt) pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") def verify_password(plain_password: str, hashed_password: str) -> bool: # 验证明文密码与哈希是否匹配 return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: # 对密码进行哈希 return pwd_context.hash(password) def create_access_token(subject: str, device_id: str, expires_delta: Optional[timedelta] = None) -> str: # 创建 JWT 访问令牌,默认过期时间从配置读取 expire = datetime.utcnow() + (expires_delta or timedelta(minutes=getattr(settings, "ACCESS_TOKEN_EXPIRE_MINUTES", 60))) # 将 device_id 写入 Token Payload to_encode = {"sub": subject, "device_id": device_id, "exp": expire} return jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256") def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> str: """ 验证 JWT Token 并返回当前用户名 (sub) 增加 Session 强校验:检查数据库中 Session 是否活跃 """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的认证凭据", headers={"WWW-Authenticate": "Bearer"}, ) # 1. 解析 Token try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) username: str = payload.get("sub") device_id: str = payload.get("device_id") if username is None: raise credentials_exception except jwt.ExpiredSignatureError: raise credentials_exception except jwt.InvalidTokenError: raise credentials_exception except jwt.PyJWTError: raise credentials_exception # 2. Session 强校验 (如果有 device_id) if device_id: from app.models.user import User user = db.query(User).filter(User.username == username).first() if not user: raise credentials_exception # 查询活跃 Session session = db.query(UserSession).filter( UserSession.user_id == user.id, UserSession.device_id == device_id, UserSession.active == True ).first() if not session: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="会话已失效或在其他设备登录", headers={"WWW-Authenticate": "Bearer"}, ) return username