Files
ponzischeme89 6d44e05de4 v1
2026-04-18 07:23:55 +12:00

141 lines
4.8 KiB
Python

"""
Audit router.
Admin:
GET /admin/audit — paginated, filtered audit log (admin-authenticated)
Member:
POST /members/audit/page-visit — record a page navigation (member-authenticated)
"""
import math
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
from sqlalchemy import func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.deps import get_current_user
from app.auth.member_deps import get_authenticated_member
from app.database import get_db
from app.middleware.rate_limit import limiter
from app.models.audit import AuditLog
from app.models.member import Member
from app.models.user import User
from app.schemas.audit import AuditLogPage, AuditLogResponse, PageVisitSchema
from app.services.audit import log_audit
from app.services.settings import get_feature_settings_snapshot
router = APIRouter(tags=["Audit"])
async def _require_audit_history_enabled(db: AsyncSession) -> None:
feature_settings = await get_feature_settings_snapshot(db)
if not feature_settings.audit_history_enabled:
raise HTTPException(status_code=404, detail="Audit history is currently disabled.")
# ── Admin: query audit log ─────────────────────────────────────────────────────
@router.get("/admin/audit", response_model=AuditLogPage)
async def admin_list_audit(
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
member_id: Optional[uuid.UUID] = Query(None),
action_type: Optional[str] = Query(None),
status: Optional[str] = Query(None),
area: Optional[str] = Query(None),
date_from: Optional[datetime] = Query(None),
date_to: Optional[datetime] = Query(None),
search: Optional[str] = Query(None),
sort_by: str = Query("timestamp"),
sort_dir: str = Query("desc"),
_admin: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
await _require_audit_history_enabled(db)
allowed_sort = {"timestamp", "action_type", "status", "area", "member_email"}
if sort_by not in allowed_sort:
sort_by = "timestamp"
col = getattr(AuditLog, sort_by)
order = col.desc() if sort_dir == "desc" else col.asc()
conditions = []
if member_id is not None:
conditions.append(AuditLog.member_id == member_id)
if action_type:
conditions.append(AuditLog.action_type == action_type)
if status:
conditions.append(AuditLog.status == status)
if area:
conditions.append(AuditLog.area.ilike(f"%{area}%"))
if date_from:
conditions.append(AuditLog.timestamp >= date_from)
if date_to:
conditions.append(AuditLog.timestamp <= date_to)
if search:
term = f"%{search}%"
conditions.append(
or_(
AuditLog.member_email.ilike(term),
AuditLog.description.ilike(term),
AuditLog.area.ilike(term),
AuditLog.action_type.ilike(term),
AuditLog.error_message.ilike(term),
)
)
base_q = select(AuditLog)
if conditions:
from sqlalchemy import and_
base_q = base_q.where(and_(*conditions))
count_result = await db.execute(select(func.count()).select_from(base_q.subquery()))
total = count_result.scalar_one()
offset = (page - 1) * page_size
items_result = await db.execute(base_q.order_by(order).offset(offset).limit(page_size))
items = items_result.scalars().all()
return AuditLogPage(
items=[AuditLogResponse.model_validate(i) for i in items],
total=total,
page=page,
page_size=page_size,
total_pages=max(1, math.ceil(total / page_size)),
)
# ── Member: page visit ─────────────────────────────────────────────────────────
@router.post("/members/audit/page-visit", status_code=204)
@limiter.limit("120/minute")
async def member_log_page_visit(
request: Request,
response: Response,
data: PageVisitSchema,
member: Member = Depends(get_authenticated_member),
db: AsyncSession = Depends(get_db),
):
feature_settings = await get_feature_settings_snapshot(db)
if not feature_settings.audit_history_enabled:
return
path = data.path[:255] if data.path else "unknown"
title = data.title or path
await log_audit(
db,
member_id=member.id,
member_email=member.email,
action_type="page_visit",
area=path,
description=f"Visited: {title}",
status="success",
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("User-Agent"),
)