import asyncio
import base64
from collections import deque
from contextlib import asynccontextmanager
import json
import os
import random
import re
import secrets
import time
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import resend
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse, Response
from starlette.types import ASGIApp, Receive, Scope, Send
import db as admin_db
from mail_api.config import (
ALLOWED_EMAILS_FILE as _ALLOWED_EMAILS_FILE,
APP_VERSION,
AUTH_CODE_MAX_ATTEMPTS,
AUTH_CODE_REQUESTS_PER_HOUR,
AUTH_CODE_TTL_SECONDS,
AUTH_IP_BLOCK_DURATION,
AUTH_IP_FAILURE_WINDOW,
AUTH_IP_MAX_FAILURES,
AUTH_SESSION_TTL_SECONDS,
BIRTHDAY_CHECK_INTERVAL_SECONDS,
CLIENT_BCC,
CLIENT_PROFILES_FILE as _CLIENT_PROFILES_FILE,
CORS_ALLOWED_ORIGINS,
CP_ADMIN_EMAILS,
DEPLOY_SMOKE_SECRET,
DEV_MODE,
DRAFTS_FILE as _DRAFTS_FILE,
EMAIL_SEND_TIMEOUT_SECONDS,
ENABLE_GENERAL_ENQUIRIES,
FORM_MAX_SECONDS,
FORM_MIN_SECONDS,
FROM_EMAIL,
LEGACY_SEED_FILE as _LEGACY_SEED_FILE,
LOGO_URL,
MAX_REQUEST_BODY_BYTES,
MAX_SEND_ATTEMPTS,
OWNER_BCC,
OWNER_EMAIL,
RATE_LIMIT_MAX_PER_EMAIL,
RATE_LIMIT_MAX_PER_IP,
RATE_LIMIT_MIN_INTERVAL_SECONDS,
RATE_LIMIT_WINDOW_SECONDS,
REPLY_TO,
STARTUP_TEST_RECIPIENT,
TRUSTED_HOSTS,
logger,
)
from mail_api.models import (
BaseSubmission,
BirthdayAutoSendRequest,
BirthdayEmailRequest,
BookingSubmission,
ClientStatusUpdate,
ContractSubmission,
OnboardingSubmission,
RenderMessageRequest,
SendMessageRequest,
WelcomePackEmailRequest,
)
@asynccontextmanager
async def _lifespan(app: FastAPI):
await _startup_mail_check()
try:
yield
finally:
await _shutdown_background_tasks()
app = FastAPI(title="GoodWalk Mail API", lifespan=_lifespan)
# ── Auth state ───────────────────────────────────────────────────────────────
def _write_pii_json(path: Path, payload: object) -> None:
"""Atomically write a JSON file and chmod it owner-only (0600).
The chmod is best-effort: it is a no-op on Windows, but on the Linux
Docker host it ensures the file with PII is unreadable by other users.
"""
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(payload, indent=2), encoding="utf-8")
try:
os.chmod(tmp, 0o600)
except OSError:
pass
os.replace(tmp, path)
def _load_allowed_emails_from_file() -> set[str]:
seed = {e.strip().lower() for e in os.environ.get("ALLOWED_EMAILS", "").split(",") if e.strip()}
try:
if _ALLOWED_EMAILS_FILE.exists():
data = json.loads(_ALLOWED_EMAILS_FILE.read_text(encoding="utf-8"))
seed.update(e.lower() for e in data.get("emails", []) if isinstance(e, str))
except Exception as exc:
logger.warning("Could not load allowed_emails file: %s", exc)
return seed
def _save_allowed_emails_file(emails: set[str]) -> None:
try:
_write_pii_json(_ALLOWED_EMAILS_FILE, {"emails": sorted(emails)})
except Exception as exc:
logger.warning("Could not save allowed_emails file: %s", exc)
def _load_client_profiles_from_file() -> dict[str, dict]:
try:
if _CLIENT_PROFILES_FILE.exists():
return json.loads(_CLIENT_PROFILES_FILE.read_text(encoding="utf-8"))
except Exception as exc:
logger.warning("Could not load client_profiles file: %s", exc)
return {}
def _save_client_profiles_file(profiles: dict) -> None:
try:
_write_pii_json(_CLIENT_PROFILES_FILE, profiles)
except Exception as exc:
logger.warning("Could not save client_profiles file: %s", exc)
def _load_drafts_from_file() -> dict:
try:
if _DRAFTS_FILE.exists():
return json.loads(_DRAFTS_FILE.read_text(encoding="utf-8"))
except Exception as exc:
logger.warning("Could not load drafts file: %s", exc)
return {}
def _save_drafts_file(drafts: dict) -> None:
try:
_write_pii_json(_DRAFTS_FILE, drafts)
except Exception as exc:
logger.warning("Could not save drafts file: %s", exc)
async def _save_active_sessions_async() -> None:
"""Persist live sessions to admin_kv so they survive container restarts.
Snapshot filters out expired entries before writing. Best-effort —
failure is logged but does not block the auth flow (memory remains
authoritative for the current process).
"""
now = time.time()
snapshot = {tok: s for tok, s in _active_sessions.items() if s.get("expires_at", 0) > now}
try:
await admin_db.set_kv("active_sessions", snapshot)
except Exception as exc:
logger.warning("Could not persist active_sessions: %s", exc)
async def _load_active_sessions_async() -> dict[str, dict]:
if not admin_db.is_enabled():
return {}
try:
data = await admin_db.get_kv("active_sessions")
if not isinstance(data, dict):
return {}
now = time.time()
return {
tok: s
for tok, s in data.items()
if isinstance(s, dict) and isinstance(s.get("expires_at"), (int, float)) and s["expires_at"] > now
}
except Exception as exc:
logger.warning("Could not load active_sessions from admin_kv: %s", exc)
return {}
async def _persist_admin_state(key: str, value: Any) -> None:
"""Write a single admin_kv blob to postgres when the database is available."""
try:
await admin_db.set_kv(key, value)
except Exception as exc:
logger.warning("Postgres persist (%s) failed; JSON copy is still authoritative: %s", key, exc)
async def _seed_admin_state_from_json_if_needed() -> None:
"""Seed admin_kv from the JSON files on disk.
Controlled by ADMIN_DATA_SEED_FROM_JSON:
- "never": do nothing
- "auto": seed only when admin_kv has no rows yet (default, safe on every boot)
- "force": overwrite postgres with whatever the JSON files currently hold
The deployer exposes -SeedAdminData which sets this to "force" for one boot.
"""
mode = (os.environ.get("ADMIN_DATA_SEED_FROM_JSON", "auto") or "auto").strip().lower()
if mode == "never":
return
if not admin_db.is_enabled():
return
try:
if mode == "auto" and await admin_db.has_any_value():
return
seed_clients = _load_client_profiles_from_file()
seed_emails = sorted(_load_allowed_emails_from_file())
seed_drafts = _load_drafts_from_file()
if not seed_clients and not seed_emails and not seed_drafts:
return
if seed_clients:
await admin_db.set_kv("client_profiles", seed_clients)
if seed_emails:
await admin_db.set_kv("allowed_emails", {"emails": seed_emails})
if seed_drafts:
await admin_db.set_kv("drafts", seed_drafts)
logger.info(
"Seeded admin_kv from JSON (mode=%s): clients=%d emails=%d drafts=%d",
mode, len(seed_clients), len(seed_emails), len(seed_drafts),
)
except Exception as exc:
logger.warning("Admin seed from JSON failed: %s", exc)
async def _merge_legacy_seed_if_present() -> None:
"""Merge the shipped legacy-clients-seed.json into _client_profiles.
Add-only: never overwrites an email that already exists in the live data.
Idempotent: re-running on every boot is a no-op once the entries are in.
Writes the updated profiles back to the JSON file + admin_kv so the merged
state survives container restarts.
"""
global _client_profiles
seed_path = _LEGACY_SEED_FILE
if not seed_path.exists():
return
try:
seed = json.loads(seed_path.read_text(encoding="utf-8"))
except Exception as exc:
logger.warning("Legacy seed file unreadable (%s): %s", seed_path, exc)
return
if not isinstance(seed, dict) or not seed:
return
added: list[str] = []
skipped_existing = 0
for raw_email, profile in seed.items():
if not isinstance(raw_email, str) or not isinstance(profile, dict):
continue
email = raw_email.strip().lower()
if not email:
continue
if email == OWNER_EMAIL.strip().lower():
continue
if email in _client_profiles:
skipped_existing += 1
continue
_client_profiles[email] = profile
added.append(email)
if not added:
logger.info(
"Legacy seed already merged (existing=%d, candidates=%d).",
skipped_existing, len(seed),
)
return
snapshot = dict(_client_profiles)
try:
await asyncio.to_thread(_save_client_profiles_file, snapshot)
except Exception as exc:
logger.warning("Could not save client_profiles after legacy merge: %s", exc)
try:
await _persist_admin_state("client_profiles", snapshot)
except Exception as exc:
logger.warning("Could not persist client_profiles to postgres after legacy merge: %s", exc)
logger.info(
"Legacy seed merged: added=%d skipped_existing=%d total_after=%d",
len(added), skipped_existing, len(_client_profiles),
)
async def _load_allowed_emails_async() -> set[str]:
if admin_db.is_enabled():
data = await admin_db.get_kv("allowed_emails")
if isinstance(data, dict):
emails = data.get("emails", [])
if isinstance(emails, list):
seed = {e.strip().lower() for e in os.environ.get("ALLOWED_EMAILS", "").split(",") if e.strip()}
seed.update(e.lower() for e in emails if isinstance(e, str))
return seed
return _load_allowed_emails_from_file()
async def _load_client_profiles_async() -> dict[str, dict]:
if admin_db.is_enabled():
data = await admin_db.get_kv("client_profiles")
if isinstance(data, dict):
return data
return _load_client_profiles_from_file()
async def _load_drafts_async() -> dict:
if admin_db.is_enabled():
data = await admin_db.get_kv("drafts")
if isinstance(data, dict):
return data
return _load_drafts_from_file()
_allowed_emails: set[str] = _load_allowed_emails_from_file()
if OWNER_EMAIL:
_allowed_emails.add(OWNER_EMAIL.strip().lower())
_pending_codes: dict[str, dict] = {} # email -> {code, expires_at, attempts}
_active_sessions: dict[str, dict] = {} # token -> {email, expires_at}
_code_requests: dict[str, deque] = {} # email -> deque of monotonic timestamps
_client_profiles: dict[str, dict] = _load_client_profiles_from_file()
_drafts: dict[str, dict] = _load_drafts_from_file() # email -> {onboarding: {...}, contract: {...}}
_auth_failures_by_ip: dict[str, deque] = {} # ip -> deque of failure timestamps
_blocked_ips: dict[str, float] = {} # ip -> unblock_at (monotonic)
_auth_lock = asyncio.Lock()
_birthday_auto_task: asyncio.Task | None = None
logger.info("Auth: loaded %d allowed email(s)", len(_allowed_emails))
async def _require_session_email(request: Request) -> str:
auth_header = request.headers.get("Authorization", "")
token = auth_header.removeprefix("Bearer ").strip()
if not token:
raise HTTPException(status_code=401, detail="No token provided.")
async with _auth_lock:
session = _active_sessions.get(token)
if not session:
raise HTTPException(status_code=401, detail="Invalid session.")
if time.time() > session["expires_at"]:
_active_sessions.pop(token, None)
raise HTTPException(status_code=401, detail="Session expired. Please sign in again.")
return session["email"]
async def _require_owner_email(request: Request) -> str:
email = await _require_session_email(request)
if email not in CP_ADMIN_EMAILS:
raise HTTPException(status_code=403, detail="Owner access required.")
return email
async def _register_email(email: str) -> None:
normalized = email.strip().lower()
if not normalized:
return
async with _auth_lock:
if normalized not in _allowed_emails:
_allowed_emails.add(normalized)
snapshot = sorted(_allowed_emails)
await asyncio.to_thread(_save_allowed_emails_file, set(_allowed_emails))
await _persist_admin_state("allowed_emails", {"emails": snapshot})
logger.info("Auth: registered new allowed email: %s", normalized)
def _client_is_reachable(profile: dict) -> bool:
"""True if outreach (welcome pack, birthday email, etc.) should still target
this client. Excludes lifecycle states that mean the relationship has ended.
"""
lifecycle = profile.get("lifecycle")
if not isinstance(lifecycle, dict):
return True
return lifecycle.get("status") not in {"cancelled", "archived"}
async def _store_client_profile(email: str, profile: dict) -> None:
normalized = email.strip().lower()
if not normalized:
return
async with _auth_lock:
existing = _client_profiles.get(normalized, {})
merged = {
k: v
for k, v in {**existing, **profile}.items()
if v is not None and not (isinstance(v, str) and v == "")
}
if merged != existing:
_client_profiles[normalized] = merged
snapshot = dict(_client_profiles)
await asyncio.to_thread(_save_client_profiles_file, snapshot)
await _persist_admin_state("client_profiles", snapshot)
def _check_ip_blocked(ip: str, request_id: str) -> None:
now = time.monotonic()
unblock_at = _blocked_ips.get(ip)
if unblock_at is not None:
if now < unblock_at:
remaining = int(unblock_at - now)
logger.warning("[%s] auth: blocked ip=%s (%ds remaining)", request_id, ip, remaining)
raise HTTPException(
status_code=429,
detail=f"Too many failed attempts. Try again in {remaining // 60 + 1} minute(s).",
headers={"Retry-After": str(remaining)},
)
else:
del _blocked_ips[ip]
def _record_auth_failure(ip: str, request_id: str, reason: str) -> None:
now = time.monotonic()
failures = _auth_failures_by_ip.setdefault(ip, deque())
while failures and now - failures[0] > AUTH_IP_FAILURE_WINDOW:
failures.popleft()
failures.append(now)
logger.warning("[%s] auth: failure ip=%s reason=%r total_in_window=%d", request_id, ip, reason, len(failures))
if len(failures) >= AUTH_IP_MAX_FAILURES:
_blocked_ips[ip] = now + AUTH_IP_BLOCK_DURATION
logger.warning(
"[%s] auth: ip=%s BLOCKED for %ds after %d failures",
request_id, ip, AUTH_IP_BLOCK_DURATION, len(failures),
)
class _BodySizeLimitMiddleware:
"""Reject requests whose Content-Length exceeds MAX_REQUEST_BODY_BYTES.
Defence-in-depth alongside nginx ``client_max_body_size``. Streaming
requests without a Content-Length header are tracked byte-by-byte and
short-circuited if they overflow the cap.
"""
def __init__(self, app: ASGIApp, max_bytes: int) -> None:
self.app = app
self.max_bytes = max_bytes
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
headers = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in scope.get("headers", [])}
declared = headers.get("content-length")
if declared is not None:
try:
if int(declared) > self.max_bytes:
await _send_413(send)
return
except ValueError:
pass
received = 0
overflowed = False
async def _wrapped_receive():
nonlocal received, overflowed
message = await receive()
if message["type"] == "http.request":
received += len(message.get("body", b""))
if received > self.max_bytes:
overflowed = True
return {"type": "http.disconnect"}
return message
if overflowed:
await _send_413(send)
return
await self.app(scope, _wrapped_receive, send)
async def _send_413(send: Send) -> None:
await send({
"type": "http.response.start",
"status": 413,
"headers": [(b"content-type", b"application/json")],
})
await send({
"type": "http.response.body",
"body": b'{"detail":"Request body too large."}',
})
app.add_middleware(_BodySizeLimitMiddleware, max_bytes=MAX_REQUEST_BODY_BYTES)
app.add_middleware(TrustedHostMiddleware, allowed_hosts=list(TRUSTED_HOSTS))
app.add_middleware(
CORSMiddleware,
allow_origins=list(CORS_ALLOWED_ORIGINS),
allow_methods=["POST", "GET"],
allow_headers=["Authorization", "Content-Type", "X-Requested-With"],
allow_credentials=False,
max_age=600,
)
@app.middleware("http")
async def _request_logging_middleware(request: Request, call_next):
request_id = uuid.uuid4().hex[:8]
request.state.request_id = request_id
started = time.monotonic()
try:
response = await call_next(request)
except Exception:
elapsed_ms = (time.monotonic() - started) * 1000
logger.exception(
"[%s] %s %s crashed after %.0fms",
request_id, request.method, request.url.path, elapsed_ms,
)
raise
elapsed_ms = (time.monotonic() - started) * 1000
logger.info(
"[%s] %s %s → %d (%.0fms)",
request_id, request.method, request.url.path, response.status_code, elapsed_ms,
)
response.headers["X-Request-ID"] = request_id
return response
# ── Helpers ──────────────────────────────────────────────────────────────────
def _get_ip(request: Request) -> str:
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def _is_deploy_smoke(request: Request) -> bool:
"""True when the request carries a matching X-Deploy-Smoke header.
Used by the deploy script to verify the form endpoints are reachable and
parse a valid payload, without producing a real submission. Disabled
entirely when DEPLOY_SMOKE_SECRET is unset.
"""
if not DEPLOY_SMOKE_SECRET:
return False
presented = request.headers.get("x-deploy-smoke") or ""
if not presented:
return False
return secrets.compare_digest(presented, DEPLOY_SMOKE_SECRET)
_submit_attempts_by_ip: dict[str, deque[float]] = {}
_submit_attempts_by_email: dict[str, deque[float]] = {}
_submit_rate_limit_lock = asyncio.Lock()
def _trimmed(value: str) -> str:
return value.strip()
def _prune_attempts(attempts: deque[float], now: float, window_seconds: int) -> None:
while attempts and now - attempts[0] > window_seconds:
attempts.popleft()
def _seconds_until_allowed(last_attempt_at: float, now: float, min_interval_seconds: int) -> int:
retry_after = max(1, int(min_interval_seconds - (now - last_attempt_at)))
return retry_after
async def _enforce_submit_rate_limits(request_id: str, ip: str, email: str) -> None:
now = time.monotonic()
normalized_email = email.strip().lower()
async with _submit_rate_limit_lock:
ip_attempts = _submit_attempts_by_ip.setdefault(ip, deque())
email_attempts = _submit_attempts_by_email.setdefault(normalized_email, deque())
_prune_attempts(ip_attempts, now, RATE_LIMIT_WINDOW_SECONDS)
_prune_attempts(email_attempts, now, RATE_LIMIT_WINDOW_SECONDS)
if ip_attempts and now - ip_attempts[-1] < RATE_LIMIT_MIN_INTERVAL_SECONDS:
retry_after = _seconds_until_allowed(ip_attempts[-1], now, RATE_LIMIT_MIN_INTERVAL_SECONDS)
logger.warning(
"[%s] rate limited: ip=%s submitted again after %.1fs (minimum %ss)",
request_id,
ip,
now - ip_attempts[-1],
RATE_LIMIT_MIN_INTERVAL_SECONDS,
)
raise HTTPException(
status_code=429,
detail=f"Please wait about {retry_after} seconds before trying again.",
)
if len(ip_attempts) >= RATE_LIMIT_MAX_PER_IP:
logger.warning(
"[%s] rate limited: ip=%s exceeded %d submissions in %ss",
request_id,
ip,
RATE_LIMIT_MAX_PER_IP,
RATE_LIMIT_WINDOW_SECONDS,
)
raise HTTPException(
status_code=429,
detail="Too many enquiries from this connection. Please try again a little later.",
)
if len(email_attempts) >= RATE_LIMIT_MAX_PER_EMAIL:
logger.warning(
"[%s] rate limited: email=%s exceeded %d submissions in %ss",
request_id,
normalized_email,
RATE_LIMIT_MAX_PER_EMAIL,
RATE_LIMIT_WINDOW_SECONDS,
)
raise HTTPException(
status_code=429,
detail="That email address has reached the enquiry limit for now. Please try again later.",
)
ip_attempts.append(now)
email_attempts.append(now)
def _enforce_form_timing(request_id: str, data: BaseSubmission) -> None:
if data.formStartedAt is None or data.formStartedAt <= 0:
logger.warning("[%s] rejected: missing or invalid formStartedAt", request_id)
raise HTTPException(
status_code=400,
detail="Please refresh the page and try again.",
)
elapsed_seconds = (time.time() * 1000 - data.formStartedAt) / 1000
if elapsed_seconds < FORM_MIN_SECONDS:
logger.warning(
"[%s] rejected: form submitted too quickly (%.2fs < %ss)",
request_id,
elapsed_seconds,
FORM_MIN_SECONDS,
)
raise HTTPException(
status_code=400,
detail="Please take a moment to fill in the form before sending it.",
)
if elapsed_seconds > FORM_MAX_SECONDS:
logger.warning(
"[%s] rejected: stale form submission (%.0fs > %ss)",
request_id,
elapsed_seconds,
FORM_MAX_SECONDS,
)
raise HTTPException(
status_code=400,
detail="This form has been open for too long. Please refresh the page and try again.",
)
def _is_honeypot_triggered(data: BaseSubmission) -> bool:
return bool(_trimmed(data.website))
def _is_general_enquiry(data: BookingSubmission) -> bool:
return _trimmed(data.enquiryType).lower() == "general"
def _enquiry_type_label(data: BookingSubmission) -> str:
return "General enquiry" if _is_general_enquiry(data) else "Booking enquiry"
def _validate_submission(request_id: str, data: BookingSubmission) -> None:
enquiry_type = _trimmed(data.enquiryType).lower()
if enquiry_type not in {"booking", "general"}:
logger.warning("[%s] rejected: invalid enquiryType=%r", request_id, data.enquiryType)
raise HTTPException(
status_code=400,
detail="Please choose a valid enquiry type and try again.",
)
if not _trimmed(data.fullName):
logger.warning("[%s] rejected: missing full name", request_id)
raise HTTPException(
status_code=400,
detail="Please enter your full name.",
)
if not _trimmed(data.phone):
logger.warning("[%s] rejected: missing phone number", request_id)
raise HTTPException(
status_code=400,
detail="Please enter your contact number.",
)
if _is_general_enquiry(data):
if not ENABLE_GENERAL_ENQUIRIES:
logger.warning("[%s] rejected: general enquiries are disabled", request_id)
raise HTTPException(
status_code=403,
detail="General enquiries are currently unavailable through this form.",
)
if not _trimmed(data.message):
logger.warning("[%s] rejected: missing general enquiry message", request_id)
raise HTTPException(
status_code=400,
detail="Please tell us how we can help.",
)
return
if not _trimmed(data.petName):
logger.warning("[%s] rejected: missing pet name", request_id)
raise HTTPException(
status_code=400,
detail="Please enter your dog's name.",
)
if not _trimmed(data.location):
logger.warning("[%s] rejected: missing location", request_id)
raise HTTPException(
status_code=400,
detail="Please enter your location.",
)
def _normalize_submission(data: BookingSubmission) -> None:
data.enquiryType = "general" if _is_general_enquiry(data) else "booking"
data.fullName = _trimmed(data.fullName)
data.phone = _trimmed(data.phone)
data.petName = _trimmed(data.petName)
data.location = _trimmed(data.location)
data.message = _trimmed(data.message)
data.referrer = _trimmed(data.referrer)
data.page = _trimmed(data.page)
data.services = [_trimmed(service) for service in data.services if _trimmed(service)]
data.journey = [_trimmed(step) for step in data.journey if _trimmed(step)][:12]
data.stepChanges = max(0, data.stepChanges)
for field_name in ("visitStartedAt", "pageEnteredAt", "firstInteractionAt", "sendClickedAt"):
value = getattr(data, field_name)
if value is None or value <= 0:
setattr(data, field_name, None)
if _is_general_enquiry(data):
data.petName = ""
data.location = ""
data.services = []
def _validate_onboarding_submission(request_id: str, data: OnboardingSubmission) -> None:
if not _trimmed(data.fullName):
logger.warning("[%s] onboarding rejected: missing full name", request_id)
raise HTTPException(status_code=400, detail="Please enter your full name.")
if not _trimmed(data.phone):
logger.warning("[%s] onboarding rejected: missing phone", request_id)
raise HTTPException(status_code=400, detail="Please enter your phone number.")
required_fields = {
"address": "Please enter your address.",
"dogName": "Please enter your dog's name.",
"dogBreed": "Please enter your dog's breed.",
"vetName": "Please enter your vet clinic name.",
"vetPhone": "Please enter your vet phone number.",
"emergencyContactName": "Please enter an emergency contact name.",
"emergencyContactPhone": "Please enter an emergency contact phone number.",
}
for field_name, message in required_fields.items():
if not _trimmed(getattr(data, field_name)):
logger.warning("[%s] onboarding rejected: missing %s", request_id, field_name)
raise HTTPException(status_code=400, detail=message)
if not data.servicesNeeded:
logger.warning("[%s] onboarding rejected: missing services", request_id)
raise HTTPException(status_code=400, detail="Please choose at least one service.")
if not data.councilRegistrationConfirmed:
raise HTTPException(status_code=400, detail="Please confirm council registration.")
if not data.vaccinationsConfirmed:
raise HTTPException(status_code=400, detail="Please confirm vaccinations are current.")
if not data.emergencyVetConsent:
raise HTTPException(status_code=400, detail="Please confirm emergency veterinary consent.")
if not data.termsAccepted:
raise HTTPException(status_code=400, detail="Please confirm the onboarding declaration.")
signature = _trimmed(data.signatureDataUrl)
if not signature.startswith("data:image/png;base64,") or len(signature) < 128:
logger.warning("[%s] onboarding rejected: invalid signature payload", request_id)
raise HTTPException(status_code=400, detail="Please add your signature before sending.")
def _normalize_onboarding_submission(data: OnboardingSubmission) -> None:
data.fullName = _trimmed(data.fullName)
data.phone = _trimmed(data.phone)
data.address = _trimmed(data.address)
data.dogName = _trimmed(data.dogName)
data.dogBreed = _trimmed(data.dogBreed)
data.dogAge = _trimmed(data.dogAge)
data.temperament = _trimmed(data.temperament)
data.medicalNotes = _trimmed(data.medicalNotes)
data.accessInstructions = _trimmed(data.accessInstructions)
data.vetName = _trimmed(data.vetName)
data.vetPhone = _trimmed(data.vetPhone)
data.emergencyContactName = _trimmed(data.emergencyContactName)
data.emergencyContactPhone = _trimmed(data.emergencyContactPhone)
data.referrer = _trimmed(data.referrer)
data.page = _trimmed(data.page)
data.servicesNeeded = [_trimmed(service) for service in data.servicesNeeded if _trimmed(service)][:8]
for field_name in ("visitStartedAt", "pageEnteredAt", "firstInteractionAt", "sendClickedAt"):
value = getattr(data, field_name)
if value is None or value <= 0:
setattr(data, field_name, None)
def _parse_ua(ua: str) -> str:
if not ua:
return "Unknown"
browsers = [("Edg/", "Edge"), ("OPR/", "Opera"), ("Chrome/", "Chrome"),
("Firefox/", "Firefox"), ("Safari/", "Safari")]
systems = [("Windows NT 10", "Windows 10/11"), ("Windows NT 6", "Windows 8"),
("Mac OS X", "macOS"), ("iPhone", "iPhone"), ("iPad", "iPad"),
("Android", "Android"), ("Linux", "Linux")]
browser = next((n for p, n in browsers if p in ua), "Unknown browser")
system = next((n for p, n in systems if p in ua), "Unknown OS")
return f"{browser} on {system}"
def _detail_row(label: str, value: str) -> str:
if not value:
return ""
return f"""
{label}
{value}
"""
def _meta_row(label: str, value: str) -> str:
if not value:
return ""
return f"""
{label}
{value}
"""
def _format_duration_ms(duration_ms: int | None) -> str:
if duration_ms is None or duration_ms < 0:
return ""
total_seconds = int(round(duration_ms / 1000))
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours}h {minutes}m"
if minutes > 0:
return f"{minutes}m {seconds}s"
return f"{seconds}s"
def _duration_between(start_ms: int | None, end_ms: int | None) -> str:
if start_ms is None or end_ms is None or end_ms < start_ms:
return ""
return _format_duration_ms(end_ms - start_ms)
def _journey_text(journey: list[str]) -> str:
if not journey:
return ""
return " -> ".join(journey)
# ── Email templates ──────────────────────────────────────────────────────────
def _logo_header(badge_html: str = "", subtitle: str = "") -> str:
badge = f'
{badge_html}
' if badge_html else ""
sub = f"""
{subtitle}
""" if subtitle else ""
return f"""
{sub}
{badge}
"""
def client_email(data: BookingSubmission) -> str:
is_general = _is_general_enquiry(data)
services_text = ", ".join(data.services) if data.services else "Not specified"
enquiry_summary_rows = [
_detail_row("Your name", data.fullName),
_detail_row("Email", str(data.email)),
_detail_row("Phone", data.phone),
_detail_row("Type", _enquiry_type_label(data)),
]
if is_general:
if data.message:
enquiry_summary_rows.append(_detail_row("Message", data.message))
intro_html = (
"We’ve received your message and we will be in touch shortly."
)
next_steps_html = (
"We will review your message and reply within 1 business day."
)
logo_subtitle = "General enquiries and dog walking support"
else:
enquiry_summary_rows.extend(
[
_detail_row("Dog’s name", data.petName),
_detail_row("Location", data.location),
_detail_row("Services", services_text),
]
)
if data.message:
enquiry_summary_rows.append(_detail_row("About the dog", data.message))
intro_html = (
"We’ve received your enquiry and we will be in touch shortly to arrange "
"a Meet & Greet with you and "
f"{data.petName}."
)
next_steps_html = (
"We will review your details and reach out within 1 business day "
"to schedule a free Meet & Greet. No commitment required — just a "
f"chance for {data.petName} to make a new best friend."
)
logo_subtitle = "Professional dog walking services"
return f"""
We received your enquiry
{_logo_header(subtitle=logo_subtitle)}
Thanks, {data.fullName.split()[0]}! 🐾
{intro_html}
Your enquiry summary
{"".join(enquiry_summary_rows)}
What happens next?
{next_steps_html}
Questions? Just reply to this email or reach us at 022 642 1011.
Use the same email address you originally used with Goodwalk. We’ll send you a one-time code when you sign in.
"""
def _onboarding_confirmation_email_html(data: OnboardingSubmission) -> str:
first_name = data.fullName.split()[0] if data.fullName.strip() else "there"
dog_name = _trimmed(data.dogName)
service_names = [service.strip() for service in data.servicesNeeded if isinstance(service, str) and service.strip()]
service_summary = ", ".join(service_names[:2]) if service_names else "your selected service"
if len(service_names) > 2:
service_summary += f" + {len(service_names) - 2} more"
onboarding_url = "https://clients.goodwalk.co.nz/"
badge_html = (
'
Submitted
"
)
return f"""
Your onboarding has been submitted
{_logo_header(badge_html=badge_html, subtitle="Your onboarding details are safely with us")}
Thanks, {first_name}. Your onboarding is complete.
We’ve received your details{f" for {dog_name}" if dog_name else ""} and they’re now on file with Goodwalk.
You can sign back in any time to review what you submitted.
Snapshot
Owner
{data.fullName}
Dog
{dog_name or 'Details submitted'}
Services
{service_summary}
What happens next?
We’ll review your submission and come back to you if we need anything clarified.
If you need to check your details again, use the button below to sign back in with a one-time code.