This commit is contained in:
2026-05-19 23:36:58 +12:00
parent 5172588488
commit a7f8a619b1
68 changed files with 4486 additions and 1430 deletions
+177
View File
@@ -79,11 +79,188 @@ async def _ensure_schema() -> None:
value jsonb not null,
updated_at timestamptz not null default now()
);
create table if not exists events (
id bigserial primary key,
created_at timestamptz not null default now(),
request_id text,
event_type text not null,
actor_email text,
ip text,
status text,
detail jsonb
);
create index if not exists events_created_at_idx on events (created_at desc);
create index if not exists events_event_type_idx on events (event_type);
create index if not exists events_actor_email_idx on events (actor_email);
create table if not exists submissions (
id bigserial primary key,
created_at timestamptz not null default now(),
request_id text,
kind text not null,
email text not null,
full_name text,
phone text,
ip text,
payload jsonb not null
);
create index if not exists submissions_created_at_idx on submissions (created_at desc);
create index if not exists submissions_email_idx on submissions (email);
create index if not exists submissions_kind_idx on submissions (kind);
"""
)
_schema_ensured = True
async def record_event(
*,
event_type: str,
request_id: str | None = None,
actor_email: str | None = None,
ip: str | None = None,
status: str | None = None,
detail: dict | None = None,
) -> None:
"""Append a single business event to the events table. Best-effort:
failures are logged and swallowed so they never block request handling."""
try:
pool = await get_pool()
if pool is None:
return
await _ensure_schema()
payload = json.dumps(detail or {})
async with pool.acquire() as conn:
await conn.execute(
"""
insert into events (request_id, event_type, actor_email, ip, status, detail)
values ($1, $2, $3, $4, $5, $6::jsonb)
""",
request_id, event_type, actor_email, ip, status, payload,
)
except Exception as exc:
logger.warning("record_event(%s) failed: %s", event_type, exc)
async def record_submission(
*,
kind: str,
email: str,
full_name: str | None,
phone: str | None,
ip: str | None,
request_id: str | None,
payload: dict,
) -> None:
"""Persist a contact-form submission (booking / onboarding / contract)."""
try:
pool = await get_pool()
if pool is None:
return
await _ensure_schema()
async with pool.acquire() as conn:
await conn.execute(
"""
insert into submissions (request_id, kind, email, full_name, phone, ip, payload)
values ($1, $2, $3, $4, $5, $6, $7::jsonb)
""",
request_id, kind, email, full_name, phone, ip, json.dumps(payload),
)
except Exception as exc:
logger.warning("record_submission(%s) failed: %s", kind, exc)
async def list_events(
*,
limit: int = 100,
before_id: int | None = None,
event_type: str | None = None,
actor_email: str | None = None,
) -> list[dict]:
pool = await get_pool()
if pool is None:
return []
await _ensure_schema()
clauses: list[str] = []
params: list[Any] = []
if before_id is not None:
params.append(before_id)
clauses.append(f"id < ${len(params)}")
if event_type:
params.append(event_type)
clauses.append(f"event_type = ${len(params)}")
if actor_email:
params.append(actor_email.strip().lower())
clauses.append(f"actor_email = ${len(params)}")
where = ("where " + " and ".join(clauses)) if clauses else ""
params.append(max(1, min(500, limit)))
sql = (
f"select id, created_at, request_id, event_type, actor_email, ip, status, detail "
f"from events {where} order by id desc limit ${len(params)}"
)
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [
{
"id": r["id"],
"createdAt": r["created_at"].isoformat() if r["created_at"] else None,
"requestId": r["request_id"],
"eventType": r["event_type"],
"actorEmail": r["actor_email"],
"ip": r["ip"],
"status": r["status"],
"detail": (json.loads(r["detail"]) if isinstance(r["detail"], (str, bytes, bytearray)) else r["detail"]) or {},
}
for r in rows
]
async def list_submissions(
*,
limit: int = 100,
before_id: int | None = None,
kind: str | None = None,
email: str | None = None,
) -> list[dict]:
pool = await get_pool()
if pool is None:
return []
await _ensure_schema()
clauses: list[str] = []
params: list[Any] = []
if before_id is not None:
params.append(before_id)
clauses.append(f"id < ${len(params)}")
if kind:
params.append(kind)
clauses.append(f"kind = ${len(params)}")
if email:
params.append(email.strip().lower())
clauses.append(f"email = ${len(params)}")
where = ("where " + " and ".join(clauses)) if clauses else ""
params.append(max(1, min(500, limit)))
sql = (
f"select id, created_at, request_id, kind, email, full_name, phone, ip, payload "
f"from submissions {where} order by id desc limit ${len(params)}"
)
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [
{
"id": r["id"],
"createdAt": r["created_at"].isoformat() if r["created_at"] else None,
"requestId": r["request_id"],
"kind": r["kind"],
"email": r["email"],
"fullName": r["full_name"],
"phone": r["phone"],
"ip": r["ip"],
"payload": (json.loads(r["payload"]) if isinstance(r["payload"], (str, bytes, bytearray)) else r["payload"]) or {},
}
for r in rows
]
async def get_kv(key: str) -> Any | None:
pool = await get_pool()
if pool is None: