"""Postgres-backed key/value persistence for mail-api admin state. The mail-api historically stored client_profiles / allowed_emails / drafts as JSON files on a Docker volume. This module lets the same data live in the shared Goodwalk postgres database so the admin dashboard at admin.goodwalk.co.nz reads from a real database instead of a per-container JSON file. JSON files remain as a development/local fallback and as the seed source for the initial postgres migration. """ from __future__ import annotations import asyncio import json import logging import os from typing import Any try: import asyncpg except Exception: # pragma: no cover - asyncpg is optional in dev asyncpg = None # type: ignore[assignment] logger = logging.getLogger("mail-api.db") _pool: Any = None _pool_lock = asyncio.Lock() _schema_lock = asyncio.Lock() _schema_ensured = False def database_url() -> str: return (os.environ.get("DATABASE_URL", "") or "").strip() def is_enabled() -> bool: return bool(database_url()) and asyncpg is not None async def get_pool() -> Any: """Return a lazily-initialised asyncpg pool, or None when DB is disabled.""" global _pool if not is_enabled(): return None if _pool is not None: return _pool async with _pool_lock: if _pool is None: try: _pool = await asyncpg.create_pool( dsn=database_url(), min_size=1, max_size=4, command_timeout=10, ) logger.info("Postgres pool ready for admin_kv persistence") except Exception as exc: logger.warning("Postgres pool init failed (%s); falling back to JSON only", exc) return None return _pool async def _ensure_schema() -> None: global _schema_ensured if _schema_ensured: return pool = await get_pool() if pool is None: return async with _schema_lock: if _schema_ensured: return async with pool.acquire() as conn: await conn.execute( """ create table if not exists admin_kv ( key text primary key, value jsonb not null, updated_at timestamptz not null default now() ); create table if not exists events ( id bigserial primary key, created_at timestamptz not null default now(), request_id text, event_type text not null, actor_email text, ip text, status text, detail jsonb ); create index if not exists events_created_at_idx on events (created_at desc); create index if not exists events_event_type_idx on events (event_type); create index if not exists events_actor_email_idx on events (actor_email); create table if not exists submissions ( id bigserial primary key, created_at timestamptz not null default now(), request_id text, kind text not null, email text not null, full_name text, phone text, ip text, payload jsonb not null ); create index if not exists submissions_created_at_idx on submissions (created_at desc); create index if not exists submissions_email_idx on submissions (email); create index if not exists submissions_kind_idx on submissions (kind); """ ) _schema_ensured = True async def record_event( *, event_type: str, request_id: str | None = None, actor_email: str | None = None, ip: str | None = None, status: str | None = None, detail: dict | None = None, ) -> None: """Append a single business event to the events table. Best-effort: failures are logged and swallowed so they never block request handling.""" try: pool = await get_pool() if pool is None: return await _ensure_schema() payload = json.dumps(detail or {}) async with pool.acquire() as conn: await conn.execute( """ insert into events (request_id, event_type, actor_email, ip, status, detail) values ($1, $2, $3, $4, $5, $6::jsonb) """, request_id, event_type, actor_email, ip, status, payload, ) except Exception as exc: logger.warning("record_event(%s) failed: %s", event_type, exc) async def record_submission( *, kind: str, email: str, full_name: str | None, phone: str | None, ip: str | None, request_id: str | None, payload: dict, ) -> None: """Persist a contact-form submission (booking / onboarding / contract).""" try: pool = await get_pool() if pool is None: return await _ensure_schema() async with pool.acquire() as conn: await conn.execute( """ insert into submissions (request_id, kind, email, full_name, phone, ip, payload) values ($1, $2, $3, $4, $5, $6, $7::jsonb) """, request_id, kind, email, full_name, phone, ip, json.dumps(payload), ) except Exception as exc: logger.warning("record_submission(%s) failed: %s", kind, exc) async def list_events( *, limit: int = 100, before_id: int | None = None, event_type: str | None = None, actor_email: str | None = None, ) -> list[dict]: pool = await get_pool() if pool is None: return [] await _ensure_schema() clauses: list[str] = [] params: list[Any] = [] if before_id is not None: params.append(before_id) clauses.append(f"id < ${len(params)}") if event_type: params.append(event_type) clauses.append(f"event_type = ${len(params)}") if actor_email: params.append(actor_email.strip().lower()) clauses.append(f"actor_email = ${len(params)}") where = ("where " + " and ".join(clauses)) if clauses else "" params.append(max(1, min(500, limit))) sql = ( f"select id, created_at, request_id, event_type, actor_email, ip, status, detail " f"from events {where} order by id desc limit ${len(params)}" ) async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) return [ { "id": r["id"], "createdAt": r["created_at"].isoformat() if r["created_at"] else None, "requestId": r["request_id"], "eventType": r["event_type"], "actorEmail": r["actor_email"], "ip": r["ip"], "status": r["status"], "detail": (json.loads(r["detail"]) if isinstance(r["detail"], (str, bytes, bytearray)) else r["detail"]) or {}, } for r in rows ] async def list_submissions( *, limit: int = 100, before_id: int | None = None, kind: str | None = None, email: str | None = None, ) -> list[dict]: pool = await get_pool() if pool is None: return [] await _ensure_schema() clauses: list[str] = [] params: list[Any] = [] if before_id is not None: params.append(before_id) clauses.append(f"id < ${len(params)}") if kind: params.append(kind) clauses.append(f"kind = ${len(params)}") if email: params.append(email.strip().lower()) clauses.append(f"email = ${len(params)}") where = ("where " + " and ".join(clauses)) if clauses else "" params.append(max(1, min(500, limit))) sql = ( f"select id, created_at, request_id, kind, email, full_name, phone, ip, payload " f"from submissions {where} order by id desc limit ${len(params)}" ) async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) return [ { "id": r["id"], "createdAt": r["created_at"].isoformat() if r["created_at"] else None, "requestId": r["request_id"], "kind": r["kind"], "email": r["email"], "fullName": r["full_name"], "phone": r["phone"], "ip": r["ip"], "payload": (json.loads(r["payload"]) if isinstance(r["payload"], (str, bytes, bytearray)) else r["payload"]) or {}, } for r in rows ] async def get_kv(key: str) -> Any | None: pool = await get_pool() if pool is None: return None await _ensure_schema() async with pool.acquire() as conn: row = await conn.fetchrow("select value from admin_kv where key = $1", key) if not row: return None raw = row["value"] # asyncpg returns jsonb as a Python str; parse to native value. if isinstance(raw, (dict, list)): return raw if isinstance(raw, (bytes, bytearray)): raw = raw.decode("utf-8") try: return json.loads(raw) except Exception: return None async def set_kv(key: str, value: Any) -> bool: pool = await get_pool() if pool is None: return False await _ensure_schema() payload = json.dumps(value) async with pool.acquire() as conn: await conn.execute( """ insert into admin_kv (key, value, updated_at) values ($1, $2::jsonb, now()) on conflict (key) do update set value = excluded.value, updated_at = excluded.updated_at """, key, payload, ) return True async def has_any_value() -> bool: """Return True if admin_kv already has any rows. Used to decide whether to seed from JSON files on first boot.""" pool = await get_pool() if pool is None: return False await _ensure_schema() async with pool.acquire() as conn: row = await conn.fetchrow("select 1 from admin_kv limit 1") return row is not None