Files
gw-svelte/mail-api/db.py
T
2026-05-26 23:30:22 +12:00

367 lines
12 KiB
Python

"""Postgres-backed key/value persistence for mail-api admin state.
The mail-api historically stored client_profiles / allowed_emails / drafts
as JSON files on a Docker volume. This module lets the same data live in
the shared Goodwalk postgres database so the admin dashboard at
admin.goodwalk.co.nz reads from a real database instead of a per-container
JSON file. JSON files remain as a development/local fallback and as the
seed source for the initial postgres migration.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from typing import Any
try:
import asyncpg
except Exception: # pragma: no cover - asyncpg is optional in dev
asyncpg = None # type: ignore[assignment]
logger = logging.getLogger("mail-api.db")
_pool: Any = None
_pool_lock = asyncio.Lock()
_schema_lock = asyncio.Lock()
_schema_ensured = False
def database_url() -> str:
return (os.environ.get("DATABASE_URL", "") or "").strip()
def is_enabled() -> bool:
return bool(database_url()) and asyncpg is not None
async def get_pool() -> Any:
"""Return a lazily-initialised asyncpg pool, or None when DB is disabled."""
global _pool
if not is_enabled():
return None
if _pool is not None:
return _pool
async with _pool_lock:
if _pool is None:
try:
_pool = await asyncpg.create_pool(
dsn=database_url(),
min_size=1,
max_size=4,
command_timeout=10,
)
logger.info("Postgres pool ready for admin_kv persistence")
except Exception as exc:
logger.warning("Postgres pool init failed (%s); falling back to JSON only", exc)
return None
return _pool
async def _ensure_schema() -> None:
global _schema_ensured
if _schema_ensured:
return
pool = await get_pool()
if pool is None:
return
async with _schema_lock:
if _schema_ensured:
return
async with pool.acquire() as conn:
await conn.execute(
"""
create table if not exists admin_kv (
key text primary key,
value jsonb not null,
updated_at timestamptz not null default now()
);
create table if not exists events (
id bigserial primary key,
created_at timestamptz not null default now(),
request_id text,
event_type text not null,
actor_email text,
ip text,
status text,
detail jsonb
);
create index if not exists events_created_at_idx on events (created_at desc);
create index if not exists events_event_type_idx on events (event_type);
create index if not exists events_actor_email_idx on events (actor_email);
create table if not exists submissions (
id bigserial primary key,
created_at timestamptz not null default now(),
request_id text,
kind text not null,
email text not null,
full_name text,
phone text,
ip text,
payload jsonb not null
);
create index if not exists submissions_created_at_idx on submissions (created_at desc);
create index if not exists submissions_email_idx on submissions (email);
create index if not exists submissions_kind_idx on submissions (kind);
"""
)
_schema_ensured = True
async def record_event(
*,
event_type: str,
request_id: str | None = None,
actor_email: str | None = None,
ip: str | None = None,
status: str | None = None,
detail: dict | None = None,
) -> None:
"""Append a single business event to the events table. Best-effort:
failures are logged and swallowed so they never block request handling."""
try:
pool = await get_pool()
if pool is None:
return
await _ensure_schema()
payload = json.dumps(detail or {})
async with pool.acquire() as conn:
await conn.execute(
"""
insert into events (request_id, event_type, actor_email, ip, status, detail)
values ($1, $2, $3, $4, $5, $6::jsonb)
""",
request_id, event_type, actor_email, ip, status, payload,
)
except Exception as exc:
logger.warning("record_event(%s) failed: %s", event_type, exc)
async def record_submission(
*,
kind: str,
email: str,
full_name: str | None,
phone: str | None,
ip: str | None,
request_id: str | None,
payload: dict,
) -> None:
"""Persist a contact-form submission (booking / onboarding / contract)."""
try:
pool = await get_pool()
if pool is None:
return
await _ensure_schema()
async with pool.acquire() as conn:
await conn.execute(
"""
insert into submissions (request_id, kind, email, full_name, phone, ip, payload)
values ($1, $2, $3, $4, $5, $6, $7::jsonb)
""",
request_id, kind, email, full_name, phone, ip, json.dumps(payload),
)
except Exception as exc:
logger.warning("record_submission(%s) failed: %s", kind, exc)
async def list_events(
*,
limit: int = 100,
before_id: int | None = None,
event_type: str | None = None,
actor_email: str | None = None,
) -> list[dict]:
pool = await get_pool()
if pool is None:
return []
await _ensure_schema()
clauses: list[str] = []
params: list[Any] = []
if before_id is not None:
params.append(before_id)
clauses.append(f"id < ${len(params)}")
if event_type:
params.append(event_type)
clauses.append(f"event_type = ${len(params)}")
if actor_email:
params.append(actor_email.strip().lower())
clauses.append(f"actor_email = ${len(params)}")
where = ("where " + " and ".join(clauses)) if clauses else ""
params.append(max(1, min(500, limit)))
sql = (
f"select id, created_at, request_id, event_type, actor_email, ip, status, detail "
f"from events {where} order by id desc limit ${len(params)}"
)
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [
{
"id": r["id"],
"createdAt": r["created_at"].isoformat() if r["created_at"] else None,
"requestId": r["request_id"],
"eventType": r["event_type"],
"actorEmail": r["actor_email"],
"ip": r["ip"],
"status": r["status"],
"detail": (json.loads(r["detail"]) if isinstance(r["detail"], (str, bytes, bytearray)) else r["detail"]) or {},
}
for r in rows
]
async def list_submissions(
*,
limit: int = 100,
before_id: int | None = None,
kind: str | None = None,
email: str | None = None,
) -> list[dict]:
pool = await get_pool()
if pool is None:
return []
await _ensure_schema()
clauses: list[str] = []
params: list[Any] = []
if before_id is not None:
params.append(before_id)
clauses.append(f"id < ${len(params)}")
if kind:
params.append(kind)
clauses.append(f"kind = ${len(params)}")
if email:
params.append(email.strip().lower())
clauses.append(f"email = ${len(params)}")
where = ("where " + " and ".join(clauses)) if clauses else ""
params.append(max(1, min(500, limit)))
sql = (
f"select id, created_at, request_id, kind, email, full_name, phone, ip, payload "
f"from submissions {where} order by id desc limit ${len(params)}"
)
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [
{
"id": r["id"],
"createdAt": r["created_at"].isoformat() if r["created_at"] else None,
"requestId": r["request_id"],
"kind": r["kind"],
"email": r["email"],
"fullName": r["full_name"],
"phone": r["phone"],
"ip": r["ip"],
"payload": (json.loads(r["payload"]) if isinstance(r["payload"], (str, bytes, bytearray)) else r["payload"]) or {},
}
for r in rows
]
async def get_kv(key: str) -> Any | None:
pool = await get_pool()
if pool is None:
return None
await _ensure_schema()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from admin_kv where key = $1", key)
if not row:
return None
raw = row["value"]
# asyncpg returns jsonb as a Python str; parse to native value.
if isinstance(raw, (dict, list)):
return raw
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8")
try:
return json.loads(raw)
except Exception:
return None
async def set_kv(key: str, value: Any) -> bool:
pool = await get_pool()
if pool is None:
return False
await _ensure_schema()
payload = json.dumps(value)
async with pool.acquire() as conn:
await conn.execute(
"""
insert into admin_kv (key, value, updated_at)
values ($1, $2::jsonb, now())
on conflict (key) do update
set value = excluded.value,
updated_at = excluded.updated_at
""",
key,
payload,
)
return True
async def get_submission_journey(email: str) -> dict | None:
"""Return the most recent submission_journeys row for the given email, or
None when no journey was promoted (or DB is off). The table is owned by
the SvelteKit app — see src/routes/api/track/promote — but the mail-api
reads it here so the owner dashboard's enquiry view can render it
alongside the booking record."""
pool = await get_pool()
if pool is None:
return None
normalized = (email or "").strip().lower()
if not normalized:
return None
try:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select id, anon_id, events, client_events, created_at
from submission_journeys
where email = $1
order by created_at desc
limit 1
""",
normalized,
)
except Exception as exc:
# Table may not exist yet in environments where the new init script
# hasn't run. Don't break the enquiry view over it.
logger.warning("submission_journeys read failed: %s", exc)
return None
if not row:
return None
def _parse(value: Any) -> Any:
if isinstance(value, (list, dict)):
return value
if isinstance(value, (bytes, bytearray)):
value = value.decode("utf-8")
try:
return json.loads(value) if value else []
except Exception:
return []
return {
"id": row["id"],
"anonId": row["anon_id"],
"events": _parse(row["events"]),
"clientEvents": _parse(row["client_events"]),
"createdAt": row["created_at"].isoformat() if row["created_at"] else None,
}
async def has_any_value() -> bool:
"""Return True if admin_kv already has any rows. Used to decide whether
to seed from JSON files on first boot."""
pool = await get_pool()
if pool is None:
return False
await _ensure_schema()
async with pool.acquire() as conn:
row = await conn.fetchrow("select 1 from admin_kv limit 1")
return row is not None