Files
ponzischeme89 6d44e05de4 v1
2026-04-18 07:23:55 +12:00

85 lines
2.3 KiB
Python

"""
Explicit JWT creation and verification.
No ORM magic — all logic is auditable here.
"""
import hashlib
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from app.config import settings
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a signed JWT access token.
Args:
data: Payload data to encode (must include 'sub' key).
expires_delta: Token lifetime. Defaults to ACCESS_TOKEN_EXPIRE_MINUTES.
Returns:
Encoded JWT string.
"""
to_encode = data.copy()
if expires_delta is not None:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": expire, "iat": datetime.now(timezone.utc)})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
return encoded_jwt
def verify_access_token(token: str) -> dict:
"""
Verify and decode a JWT access token.
Args:
token: Encoded JWT string.
Returns:
Decoded payload dict.
Raises:
JWTError: If the token is invalid or expired.
"""
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
return payload
def hash_refresh_token(plaintext: str) -> str:
"""SHA-256 hash a refresh token for storage. Fast is fine — it's already a random secret."""
return hashlib.sha256(plaintext.encode("utf-8")).hexdigest()
def create_refresh_token() -> tuple[str, str]:
"""
Generate a cryptographically secure refresh token.
Returns:
Tuple of (plaintext_token, hashed_token).
Store only the hash; send the plaintext to the client.
"""
plaintext = secrets.token_urlsafe(64)
return plaintext, hash_refresh_token(plaintext)
def get_token_expiry(days: Optional[int] = None) -> datetime:
"""Return a UTC datetime for token expiry."""
expire_days = days if days is not None else settings.REFRESH_TOKEN_EXPIRE_DAYS
return datetime.now(timezone.utc) + timedelta(days=expire_days)