Files
DP/Server/app/core/security.py

81 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
from typing import Optional
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db import get_db
from app.models.session import UserSession
# 密码哈希配置(使用 bcrypt
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
# 验证明文密码与哈希是否匹配
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
# 对密码进行哈希
return pwd_context.hash(password)
def create_access_token(subject: str, device_id: str, expires_delta: Optional[timedelta] = None) -> str:
# 创建 JWT 访问令牌,默认过期时间从配置读取
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=getattr(settings, "ACCESS_TOKEN_EXPIRE_MINUTES", 60)))
# 将 device_id 写入 Token Payload
to_encode = {"sub": subject, "device_id": device_id, "exp": expire}
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256")
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> str:
"""
验证 JWT Token 并返回当前用户名 (sub)
增加 Session 强校验:检查数据库中 Session 是否活跃
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
# 1. 解析 Token
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
username: str = payload.get("sub")
device_id: str = payload.get("device_id")
if username is None:
raise credentials_exception
except jwt.ExpiredSignatureError:
raise credentials_exception
except jwt.InvalidTokenError:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
# 2. Session 强校验 (如果有 device_id)
if device_id:
from app.models.user import User
user = db.query(User).filter(User.username == username).first()
if not user:
raise credentials_exception
# 查询活跃 Session
session = db.query(UserSession).filter(
UserSession.user_id == user.id,
UserSession.device_id == device_id,
UserSession.active == True
).first()
if not session:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="会话已失效或在其他设备登录",
headers={"WWW-Authenticate": "Bearer"},
)
return username