139 lines
4.0 KiB
Python
139 lines
4.0 KiB
Python
"""Postgres-backed key/value persistence for mail-api admin state.
|
|
|
|
The mail-api historically stored client_profiles / allowed_emails / drafts
|
|
as JSON files on a Docker volume. This module lets the same data live in
|
|
the shared Goodwalk postgres database so the admin dashboard at
|
|
admin.goodwalk.co.nz reads from a real database instead of a per-container
|
|
JSON file. JSON files remain as a development/local fallback and as the
|
|
seed source for the initial postgres migration.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
try:
|
|
import asyncpg
|
|
except Exception: # pragma: no cover - asyncpg is optional in dev
|
|
asyncpg = None # type: ignore[assignment]
|
|
|
|
|
|
logger = logging.getLogger("mail-api.db")
|
|
|
|
_pool: Any = None
|
|
_pool_lock = asyncio.Lock()
|
|
_schema_lock = asyncio.Lock()
|
|
_schema_ensured = False
|
|
|
|
|
|
def database_url() -> str:
|
|
return (os.environ.get("DATABASE_URL", "") or "").strip()
|
|
|
|
|
|
def is_enabled() -> bool:
|
|
return bool(database_url()) and asyncpg is not None
|
|
|
|
|
|
async def get_pool() -> Any:
|
|
"""Return a lazily-initialised asyncpg pool, or None when DB is disabled."""
|
|
global _pool
|
|
if not is_enabled():
|
|
return None
|
|
if _pool is not None:
|
|
return _pool
|
|
async with _pool_lock:
|
|
if _pool is None:
|
|
try:
|
|
_pool = await asyncpg.create_pool(
|
|
dsn=database_url(),
|
|
min_size=1,
|
|
max_size=4,
|
|
command_timeout=10,
|
|
)
|
|
logger.info("Postgres pool ready for admin_kv persistence")
|
|
except Exception as exc:
|
|
logger.warning("Postgres pool init failed (%s); falling back to JSON only", exc)
|
|
return None
|
|
return _pool
|
|
|
|
|
|
async def _ensure_schema() -> None:
|
|
global _schema_ensured
|
|
if _schema_ensured:
|
|
return
|
|
pool = await get_pool()
|
|
if pool is None:
|
|
return
|
|
async with _schema_lock:
|
|
if _schema_ensured:
|
|
return
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
create table if not exists admin_kv (
|
|
key text primary key,
|
|
value jsonb not null,
|
|
updated_at timestamptz not null default now()
|
|
);
|
|
"""
|
|
)
|
|
_schema_ensured = True
|
|
|
|
|
|
async def get_kv(key: str) -> Any | None:
|
|
pool = await get_pool()
|
|
if pool is None:
|
|
return None
|
|
await _ensure_schema()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("select value from admin_kv where key = $1", key)
|
|
if not row:
|
|
return None
|
|
raw = row["value"]
|
|
# asyncpg returns jsonb as a Python str; parse to native value.
|
|
if isinstance(raw, (dict, list)):
|
|
return raw
|
|
if isinstance(raw, (bytes, bytearray)):
|
|
raw = raw.decode("utf-8")
|
|
try:
|
|
return json.loads(raw)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
async def set_kv(key: str, value: Any) -> bool:
|
|
pool = await get_pool()
|
|
if pool is None:
|
|
return False
|
|
await _ensure_schema()
|
|
payload = json.dumps(value)
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
insert into admin_kv (key, value, updated_at)
|
|
values ($1, $2::jsonb, now())
|
|
on conflict (key) do update
|
|
set value = excluded.value,
|
|
updated_at = excluded.updated_at
|
|
""",
|
|
key,
|
|
payload,
|
|
)
|
|
return True
|
|
|
|
|
|
async def has_any_value() -> bool:
|
|
"""Return True if admin_kv already has any rows. Used to decide whether
|
|
to seed from JSON files on first boot."""
|
|
pool = await get_pool()
|
|
if pool is None:
|
|
return False
|
|
await _ensure_schema()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("select 1 from admin_kv limit 1")
|
|
return row is not None
|