from __future__ import annotations import base64 import hashlib import hmac import json import time from typing import Any from fastapi import HTTPException, status from app.core.config import settings def _encode(data: dict[str, Any]) -> str: raw = json.dumps(data, separators=(",", ":"), sort_keys=True).encode("utf-8") return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=") def _decode(value: str) -> dict[str, Any]: padding = "=" * (-len(value) % 4) raw = base64.urlsafe_b64decode(f"{value}{padding}".encode("utf-8")) return json.loads(raw.decode("utf-8")) def _sign(value: str) -> str: signature = hmac.new(settings.auth_secret.encode("utf-8"), value.encode("utf-8"), hashlib.sha256).digest() return base64.urlsafe_b64encode(signature).decode("utf-8").rstrip("=") def issue_token(payload: dict[str, Any], ttl_seconds: int = 60 * 60 * 12) -> str: body = {**payload, "exp": int(time.time()) + ttl_seconds} encoded = _encode(body) return f"{encoded}.{_sign(encoded)}" def verify_token(token: str) -> dict[str, Any]: try: body, signature = token.split(".", 1) except ValueError as exc: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token") from exc expected_signature = _sign(body) if not hmac.compare_digest(signature, expected_signature): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token") payload = _decode(body) if int(payload.get("exp", 0)) < int(time.time()): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication token has expired") return payload # --- Password hashing ------------------------------------------------------ # PBKDF2-SHA256 with a per-password 16-byte salt and 200k iterations. Stored # as `pbkdf2_sha256$iterations$salt_hex$hash_hex`. No external dep needed. import os import secrets _PBKDF2_ITERATIONS = 200_000 _PBKDF2_ALGO = "pbkdf2_sha256" def hash_password(password: str) -> str: if not password: raise ValueError("Password cannot be empty") salt = secrets.token_bytes(16) digest = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, _PBKDF2_ITERATIONS) return f"{_PBKDF2_ALGO}${_PBKDF2_ITERATIONS}${salt.hex()}${digest.hex()}" def verify_password(password: str, encoded: str | None) -> bool: if not encoded or not password: return False try: algo, iterations_str, salt_hex, digest_hex = encoded.split("$", 3) except ValueError: return False if algo != _PBKDF2_ALGO: return False try: iterations = int(iterations_str) salt = bytes.fromhex(salt_hex) expected = bytes.fromhex(digest_hex) except ValueError: return False candidate = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, iterations) return hmac.compare_digest(candidate, expected)