"""Postgres-backed key/value persistence for mail-api admin state. The mail-api historically stored client_profiles / allowed_emails / drafts as JSON files on a Docker volume. This module lets the same data live in the shared Goodwalk postgres database so the admin dashboard at admin.goodwalk.co.nz reads from a real database instead of a per-container JSON file. JSON files remain as a development/local fallback and as the seed source for the initial postgres migration. """ from __future__ import annotations import asyncio import json import logging import os from typing import Any try: import asyncpg except Exception: # pragma: no cover - asyncpg is optional in dev asyncpg = None # type: ignore[assignment] logger = logging.getLogger("mail-api.db") _pool: Any = None _pool_lock = asyncio.Lock() _schema_lock = asyncio.Lock() _schema_ensured = False def database_url() -> str: return (os.environ.get("DATABASE_URL", "") or "").strip() def is_enabled() -> bool: return bool(database_url()) and asyncpg is not None async def get_pool() -> Any: """Return a lazily-initialised asyncpg pool, or None when DB is disabled.""" global _pool if not is_enabled(): return None if _pool is not None: return _pool async with _pool_lock: if _pool is None: try: _pool = await asyncpg.create_pool( dsn=database_url(), min_size=1, max_size=4, command_timeout=10, ) logger.info("Postgres pool ready for admin_kv persistence") except Exception as exc: logger.warning("Postgres pool init failed (%s); falling back to JSON only", exc) return None return _pool async def _ensure_schema() -> None: global _schema_ensured if _schema_ensured: return pool = await get_pool() if pool is None: return async with _schema_lock: if _schema_ensured: return async with pool.acquire() as conn: await conn.execute( """ create table if not exists admin_kv ( key text primary key, value jsonb not null, updated_at timestamptz not null default now() ); """ ) _schema_ensured = True async def get_kv(key: str) -> Any | None: pool = await get_pool() if pool is None: return None await _ensure_schema() async with pool.acquire() as conn: row = await conn.fetchrow("select value from admin_kv where key = $1", key) if not row: return None raw = row["value"] # asyncpg returns jsonb as a Python str; parse to native value. if isinstance(raw, (dict, list)): return raw if isinstance(raw, (bytes, bytearray)): raw = raw.decode("utf-8") try: return json.loads(raw) except Exception: return None async def set_kv(key: str, value: Any) -> bool: pool = await get_pool() if pool is None: return False await _ensure_schema() payload = json.dumps(value) async with pool.acquire() as conn: await conn.execute( """ insert into admin_kv (key, value, updated_at) values ($1, $2::jsonb, now()) on conflict (key) do update set value = excluded.value, updated_at = excluded.updated_at """, key, payload, ) return True async def has_any_value() -> bool: """Return True if admin_kv already has any rows. Used to decide whether to seed from JSON files on first boot.""" pool = await get_pool() if pool is None: return False await _ensure_schema() async with pool.acquire() as conn: row = await conn.fetchrow("select 1 from admin_kv limit 1") return row is not None