Files
DP/Server/app/api/v1/logs.py

389 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
日志和历史记录API
"""
from typing import List, Optional
from datetime import datetime, timedelta
from urllib.parse import urlparse
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import desc
from app.db import get_db
from app.core.config import settings
from app.core.security import get_current_user
from app.models.logs import PltProcessRecord, UserActionLog, PltPiece
from app.models.user import User
router = APIRouter()
CURRENT_QINIU_DOMAIN = settings.QINIU_DOMAIN.rstrip("/")
CURRENT_QINIU_HOST = urlparse(CURRENT_QINIU_DOMAIN).netloc.lower() if CURRENT_QINIU_DOMAIN else ""
KNOWN_QINIU_HOST_MARKERS = ("qiniuio.com", "clouddn.com", "qnssl.com")
LEGACY_QINIU_HOSTS = {
"iovip-z2.qiniuio.com",
"t9zfnhhrn.hn-bkt.clouddn.com",
}
def normalize_qiniu_url(url: Optional[str]) -> Optional[str]:
"""将历史记录中的旧七牛域名切换到当前绑定域名。"""
if not url or not CURRENT_QINIU_DOMAIN:
return url
parsed = urlparse(url)
host = parsed.netloc.lower()
if not parsed.scheme or not host or not parsed.path:
return url
if host == CURRENT_QINIU_HOST:
return url
is_qiniu_host = host in LEGACY_QINIU_HOSTS or any(
host.endswith(marker) for marker in KNOWN_QINIU_HOST_MARKERS
)
if not is_qiniu_host:
return url
suffix = parsed.path
if parsed.query:
suffix = f"{suffix}?{parsed.query}"
return f"{CURRENT_QINIU_DOMAIN}{suffix}"
def normalize_record_piece_urls(record: Optional[PltProcessRecord]) -> Optional[PltProcessRecord]:
"""在返回详情前修正裁片图片地址,兼容旧域名历史记录。"""
if not record or not record.pieces:
return record
for piece in record.pieces:
piece.image_url = normalize_qiniu_url(piece.image_url)
return record
# ==================== 数据模型 ====================
class ActionLogCreate(BaseModel):
"""创建操作日志"""
session_id: Optional[str] = None
action_type: str
action_params: Optional[dict] = None
page: Optional[str] = None
result: str = "success"
error_message: Optional[str] = None
duration_ms: Optional[int] = None
class ActionLogResponse(BaseModel):
"""操作日志响应"""
id: int
action_type: str
action_params: Optional[dict]
page: Optional[str]
result: str
error_message: Optional[str]
duration_ms: Optional[int]
created_at: datetime
class Config:
from_attributes = True
class PltPieceCreate(BaseModel):
"""裁片信息"""
group_id: int
size: str
image_url: Optional[str] = None
width_px: int = 0
height_px: int = 0
width_cm: float = 0
height_cm: float = 0
left_cm: float = 0
top_cm: float = 0
center_x_cm: float = 0
center_y_cm: float = 0
class PltPieceResponse(BaseModel):
"""裁片响应"""
id: int
group_id: int
size: str
image_url: Optional[str]
width_px: int
height_px: int
width_cm: float
height_cm: float
left_cm: float
top_cm: float
center_x_cm: float
center_y_cm: float
class Config:
from_attributes = True
class PltRecordCreate(BaseModel):
"""创建PLT处理记录"""
filename: Optional[str] = None
file_size: Optional[int] = None
rotated_filename: Optional[str] = None
size_labels: Optional[List[str]] = None
dpi: int = 150
rotation: int = 0
total_groups: int = 0
total_pieces: int = 0
process_time_ms: Optional[int] = None
status: str = "success"
error_message: Optional[str] = None
pieces: Optional[List[PltPieceCreate]] = None # 裁片详情
class PltRecordResponse(BaseModel):
"""PLT处理记录响应"""
id: int
filename: Optional[str]
file_size: Optional[int]
rotated_filename: Optional[str]
size_labels: Optional[List[str]]
dpi: int
rotation: int
total_groups: int
total_pieces: int
process_time_ms: Optional[int]
status: str
created_at: datetime
class Config:
from_attributes = True
class PltRecordDetailResponse(PltRecordResponse):
"""PLT处理记录详情响应包含裁片"""
pieces: List[PltPieceResponse] = []
class SessionLogsResponse(BaseModel):
"""会话日志响应给AI用"""
user_id: int
username: str
current_page: Optional[str]
recent_actions: List[ActionLogResponse]
recent_plt_records: List[PltRecordResponse]
error_count: int
# ==================== API端点 ====================
@router.post("/logs/action", response_model=ActionLogResponse)
async def create_action_log(
log_data: ActionLogCreate,
current_username: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""记录用户操作日志"""
# 获取用户ID
user = db.query(User).filter(User.username == current_username).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 创建日志
log = UserActionLog(
user_id=user.id,
session_id=log_data.session_id,
action_type=log_data.action_type,
action_params=log_data.action_params,
page=log_data.page,
result=log_data.result,
error_message=log_data.error_message,
duration_ms=log_data.duration_ms
)
db.add(log)
db.commit()
db.refresh(log)
return log
@router.post("/plt/record", response_model=PltRecordResponse)
async def create_plt_record(
record_data: PltRecordCreate,
current_username: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""记录PLT处理记录包含裁片详情"""
user = db.query(User).filter(User.username == current_username).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
record = PltProcessRecord(
user_id=user.id,
filename=record_data.filename,
file_size=record_data.file_size,
rotated_filename=record_data.rotated_filename,
size_labels=record_data.size_labels,
dpi=record_data.dpi,
rotation=record_data.rotation,
total_groups=record_data.total_groups,
total_pieces=record_data.total_pieces,
process_time_ms=record_data.process_time_ms,
status=record_data.status,
error_message=record_data.error_message
)
db.add(record)
db.flush() # 获取 record.id
# 保存裁片详情
if record_data.pieces:
for piece_data in record_data.pieces:
piece = PltPiece(
record_id=record.id,
group_id=piece_data.group_id,
size=piece_data.size,
image_url=normalize_qiniu_url(piece_data.image_url),
width_px=piece_data.width_px,
height_px=piece_data.height_px,
width_cm=piece_data.width_cm,
height_cm=piece_data.height_cm,
left_cm=piece_data.left_cm,
top_cm=piece_data.top_cm,
center_x_cm=piece_data.center_x_cm,
center_y_cm=piece_data.center_y_cm
)
db.add(piece)
db.commit()
db.refresh(record)
return record
@router.get("/plt/history/{record_id}", response_model=PltRecordDetailResponse)
async def get_plt_record_detail(
record_id: int,
current_username: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取PLT处理记录详情包含裁片"""
user = db.query(User).filter(User.username == current_username).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
record = db.query(PltProcessRecord)\
.options(joinedload(PltProcessRecord.pieces))\
.filter(PltProcessRecord.id == record_id)\
.filter(PltProcessRecord.user_id == user.id)\
.first()
if not record:
raise HTTPException(status_code=404, detail="记录不存在")
return normalize_record_piece_urls(record)
@router.get("/plt/history", response_model=List[PltRecordResponse])
async def get_plt_history(
limit: int = Query(20, ge=1, le=100),
days: int = Query(0, ge=0, le=3650), # 0 表示不限制时间
current_username: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取用户的 PLT 处理历史days=0 表示全部记录"""
user = db.query(User).filter(User.username == current_username).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
query = db.query(PltProcessRecord)\
.filter(PltProcessRecord.user_id == user.id)
if days > 0:
since = datetime.now() - timedelta(days=days)
query = query.filter(PltProcessRecord.created_at >= since)
records = query.order_by(desc(PltProcessRecord.created_at))\
.limit(limit)\
.all()
return records
@router.get("/logs/recent", response_model=List[ActionLogResponse])
async def get_recent_logs(
limit: int = Query(50, ge=1, le=200),
action_type: Optional[str] = None,
current_username: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取用户最近的操作日志"""
user = db.query(User).filter(User.username == current_username).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
query = db.query(UserActionLog).filter(UserActionLog.user_id == user.id)
if action_type:
query = query.filter(UserActionLog.action_type.like(f"{action_type}%"))
logs = query.order_by(desc(UserActionLog.created_at)).limit(limit).all()
return logs
@router.get("/logs/session", response_model=SessionLogsResponse)
async def get_session_context(
hours: int = Query(24, ge=1, le=168),
current_username: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取会话上下文给AI助手用"""
user = db.query(User).filter(User.username == current_username).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
since = datetime.now() - timedelta(hours=hours)
# 最近的操作日志
recent_actions = db.query(UserActionLog)\
.filter(UserActionLog.user_id == user.id)\
.filter(UserActionLog.created_at >= since)\
.order_by(desc(UserActionLog.created_at))\
.limit(50)\
.all()
# 最近的PLT处理记录
recent_plt = db.query(PltProcessRecord)\
.filter(PltProcessRecord.user_id == user.id)\
.filter(PltProcessRecord.created_at >= since)\
.order_by(desc(PltProcessRecord.created_at))\
.limit(10)\
.all()
# 错误数量
error_count = db.query(UserActionLog)\
.filter(UserActionLog.user_id == user.id)\
.filter(UserActionLog.created_at >= since)\
.filter(UserActionLog.result == "error")\
.count()
# 当前页面(最近一次进入页面的记录)
last_page_action = db.query(UserActionLog)\
.filter(UserActionLog.user_id == user.id)\
.filter(UserActionLog.action_type == "page.enter")\
.order_by(desc(UserActionLog.created_at))\
.first()
current_page = last_page_action.action_params.get("page") if last_page_action and last_page_action.action_params else None
return SessionLogsResponse(
user_id=user.id,
username=current_username,
current_page=current_page,
recent_actions=recent_actions,
recent_plt_records=recent_plt,
error_count=error_count
)