v1.3 - client and admin scaffolding
This commit is contained in:
@@ -6,18 +6,28 @@ from dataclasses import dataclass
|
||||
class Settings:
|
||||
app_name: str
|
||||
database_url: str
|
||||
operator_name: str
|
||||
operator_email: str
|
||||
operator_password: str
|
||||
client_name: str
|
||||
client_email: str
|
||||
client_password: str
|
||||
client_tenant_id: str
|
||||
admin_name: str
|
||||
admin_email: str
|
||||
admin_password: str
|
||||
auth_secret: str
|
||||
|
||||
@classmethod
|
||||
def from_env(cls) -> "Settings":
|
||||
return cls(
|
||||
app_name=os.getenv("APP_NAME", "Data Entry App API"),
|
||||
database_url=os.getenv("DATABASE_URL", "sqlite:///./data_entry_app.db"),
|
||||
operator_name=os.getenv("OPERATOR_NAME", "Operations Manager"),
|
||||
operator_email=os.getenv("OPERATOR_EMAIL", "operator@example.com"),
|
||||
operator_password=os.getenv("OPERATOR_PASSWORD", "changeme"),
|
||||
client_name=os.getenv("CLIENT_NAME", "Hunter Premium Produce"),
|
||||
client_email=os.getenv("CLIENT_EMAIL", "operator@example.com"),
|
||||
client_password=os.getenv("CLIENT_PASSWORD", "changeme"),
|
||||
client_tenant_id=os.getenv("CLIENT_TENANT_ID", "hunter-premium-produce"),
|
||||
admin_name=os.getenv("ADMIN_NAME", "Lean 101"),
|
||||
admin_email=os.getenv("ADMIN_EMAIL", "admin@lean101.local"),
|
||||
admin_password=os.getenv("ADMIN_PASSWORD", "lean101-admin"),
|
||||
auth_secret=os.getenv("AUTH_SECRET", "lean-101-local-dev-secret"),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
|
||||
def _encode(data: dict[str, Any]) -> str:
|
||||
raw = json.dumps(data, separators=(",", ":"), sort_keys=True).encode("utf-8")
|
||||
return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=")
|
||||
|
||||
|
||||
def _decode(value: str) -> dict[str, Any]:
|
||||
padding = "=" * (-len(value) % 4)
|
||||
raw = base64.urlsafe_b64decode(f"{value}{padding}".encode("utf-8"))
|
||||
return json.loads(raw.decode("utf-8"))
|
||||
|
||||
|
||||
def _sign(value: str) -> str:
|
||||
signature = hmac.new(settings.auth_secret.encode("utf-8"), value.encode("utf-8"), hashlib.sha256).digest()
|
||||
return base64.urlsafe_b64encode(signature).decode("utf-8").rstrip("=")
|
||||
|
||||
|
||||
def issue_token(payload: dict[str, Any], ttl_seconds: int = 60 * 60 * 12) -> str:
|
||||
body = {**payload, "exp": int(time.time()) + ttl_seconds}
|
||||
encoded = _encode(body)
|
||||
return f"{encoded}.{_sign(encoded)}"
|
||||
|
||||
|
||||
def verify_token(token: str) -> dict[str, Any]:
|
||||
try:
|
||||
body, signature = token.split(".", 1)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token") from exc
|
||||
|
||||
expected_signature = _sign(body)
|
||||
if not hmac.compare_digest(signature, expected_signature):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token")
|
||||
|
||||
payload = _decode(body)
|
||||
if int(payload.get("exp", 0)) < int(time.time()):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication token has expired")
|
||||
return payload
|
||||
Reference in New Issue
Block a user