Add honeypot, spam protection to contact form

This commit is contained in:
2026-05-02 11:24:11 +12:00
parent cd8d581f7a
commit 3587ba7f26
25 changed files with 553 additions and 41 deletions
Binary file not shown.
+2
View File
@@ -2,3 +2,5 @@
2026-05-02 09:07:05 CRITICAL mail-api: Required environment variable RESEND_API_KEY is not set
2026-05-02 09:07:45 INFO mail-api: Logging initialised → console=INFO, file=logs\mail-api.log (DEBUG, rotating)
2026-05-02 09:07:45 CRITICAL mail-api: Startup aborted: missing env vars: ['RESEND_API_KEY', 'OWNER_EMAIL']
2026-05-02 11:16:43 INFO mail-api: Logging initialised → console=INFO, file=logs\mail-api.log (DEBUG, rotating)
2026-05-02 11:16:43 CRITICAL mail-api: Startup aborted: missing env vars: ['RESEND_API_KEY', 'OWNER_EMAIL']
+156 -2
View File
@@ -1,4 +1,5 @@
import asyncio
from collections import deque
import logging
import logging.handlers
import os
@@ -89,6 +90,12 @@ def _load_config() -> dict:
"from_email": os.environ.get("FROM_EMAIL", "GoodWalk <bookings@goodwalk.co.nz>"),
"reply_to": os.environ.get("REPLY_TO", "aless@goodwalk.co.nz"),
"max_attempts": max(1, int(os.environ.get("MAIL_MAX_ATTEMPTS", "3"))),
"form_min_seconds": max(1, int(os.environ.get("FORM_MIN_SECONDS", "4"))),
"form_max_seconds": max(60, int(os.environ.get("FORM_MAX_SECONDS", "7200"))),
"rate_limit_window_seconds": max(60, int(os.environ.get("RATE_LIMIT_WINDOW_SECONDS", "900"))),
"rate_limit_max_per_ip": max(1, int(os.environ.get("RATE_LIMIT_MAX_PER_IP", "5"))),
"rate_limit_max_per_email": max(1, int(os.environ.get("RATE_LIMIT_MAX_PER_EMAIL", "3"))),
"rate_limit_min_interval_seconds": max(1, int(os.environ.get("RATE_LIMIT_MIN_INTERVAL_SECONDS", "20"))),
}
@@ -98,12 +105,27 @@ OWNER_EMAIL = _config["owner_email"]
FROM_EMAIL = _config["from_email"]
REPLY_TO = _config["reply_to"]
MAX_SEND_ATTEMPTS = _config["max_attempts"]
FORM_MIN_SECONDS = _config["form_min_seconds"]
FORM_MAX_SECONDS = _config["form_max_seconds"]
RATE_LIMIT_WINDOW_SECONDS = _config["rate_limit_window_seconds"]
RATE_LIMIT_MAX_PER_IP = _config["rate_limit_max_per_ip"]
RATE_LIMIT_MAX_PER_EMAIL = _config["rate_limit_max_per_email"]
RATE_LIMIT_MIN_INTERVAL_SECONDS = _config["rate_limit_min_interval_seconds"]
LOGO_URL = "https://www.goodwalk.co.nz/static/images/goodwalk-auckland-dog-walking-logo.png"
logger.info(
"Mail API config: from=%r reply_to=%r owner=%r max_attempts=%d",
FROM_EMAIL, REPLY_TO, OWNER_EMAIL, MAX_SEND_ATTEMPTS,
"Mail API config: from=%r reply_to=%r owner=%r max_attempts=%d form_min=%ss form_max=%ss rate_window=%ss per_ip=%d per_email=%d min_interval=%ss",
FROM_EMAIL,
REPLY_TO,
OWNER_EMAIL,
MAX_SEND_ATTEMPTS,
FORM_MIN_SECONDS,
FORM_MAX_SECONDS,
RATE_LIMIT_WINDOW_SECONDS,
RATE_LIMIT_MAX_PER_IP,
RATE_LIMIT_MAX_PER_EMAIL,
RATE_LIMIT_MIN_INTERVAL_SECONDS,
)
app = FastAPI(title="GoodWalk Mail API")
@@ -147,6 +169,8 @@ class BookingSubmission(BaseModel):
location: str
message: str = ""
services: list[str] = []
website: str = ""
formStartedAt: int | None = None
referrer: str = ""
page: str = ""
@@ -160,6 +184,119 @@ def _get_ip(request: Request) -> str:
return request.client.host if request.client else "unknown"
_submit_attempts_by_ip: dict[str, deque[float]] = {}
_submit_attempts_by_email: dict[str, deque[float]] = {}
_submit_rate_limit_lock = asyncio.Lock()
def _trimmed(value: str) -> str:
return value.strip()
def _prune_attempts(attempts: deque[float], now: float, window_seconds: int) -> None:
while attempts and now - attempts[0] > window_seconds:
attempts.popleft()
def _seconds_until_allowed(last_attempt_at: float, now: float, min_interval_seconds: int) -> int:
retry_after = max(1, int(min_interval_seconds - (now - last_attempt_at)))
return retry_after
async def _enforce_submit_rate_limits(request_id: str, ip: str, email: str) -> None:
now = time.monotonic()
normalized_email = email.strip().lower()
async with _submit_rate_limit_lock:
ip_attempts = _submit_attempts_by_ip.setdefault(ip, deque())
email_attempts = _submit_attempts_by_email.setdefault(normalized_email, deque())
_prune_attempts(ip_attempts, now, RATE_LIMIT_WINDOW_SECONDS)
_prune_attempts(email_attempts, now, RATE_LIMIT_WINDOW_SECONDS)
if ip_attempts and now - ip_attempts[-1] < RATE_LIMIT_MIN_INTERVAL_SECONDS:
retry_after = _seconds_until_allowed(ip_attempts[-1], now, RATE_LIMIT_MIN_INTERVAL_SECONDS)
logger.warning(
"[%s] rate limited: ip=%s submitted again after %.1fs (minimum %ss)",
request_id,
ip,
now - ip_attempts[-1],
RATE_LIMIT_MIN_INTERVAL_SECONDS,
)
raise HTTPException(
status_code=429,
detail=f"Please wait about {retry_after} seconds before trying again.",
)
if len(ip_attempts) >= RATE_LIMIT_MAX_PER_IP:
logger.warning(
"[%s] rate limited: ip=%s exceeded %d submissions in %ss",
request_id,
ip,
RATE_LIMIT_MAX_PER_IP,
RATE_LIMIT_WINDOW_SECONDS,
)
raise HTTPException(
status_code=429,
detail="Too many enquiries from this connection. Please try again a little later.",
)
if len(email_attempts) >= RATE_LIMIT_MAX_PER_EMAIL:
logger.warning(
"[%s] rate limited: email=%s exceeded %d submissions in %ss",
request_id,
normalized_email,
RATE_LIMIT_MAX_PER_EMAIL,
RATE_LIMIT_WINDOW_SECONDS,
)
raise HTTPException(
status_code=429,
detail="That email address has reached the enquiry limit for now. Please try again later.",
)
ip_attempts.append(now)
email_attempts.append(now)
def _enforce_form_timing(request_id: str, data: BookingSubmission) -> None:
if data.formStartedAt is None or data.formStartedAt <= 0:
logger.warning("[%s] rejected: missing or invalid formStartedAt", request_id)
raise HTTPException(
status_code=400,
detail="Please refresh the page and try again.",
)
elapsed_seconds = (time.time() * 1000 - data.formStartedAt) / 1000
if elapsed_seconds < FORM_MIN_SECONDS:
logger.warning(
"[%s] rejected: form submitted too quickly (%.2fs < %ss)",
request_id,
elapsed_seconds,
FORM_MIN_SECONDS,
)
raise HTTPException(
status_code=400,
detail="Please take a moment to fill in the form before sending it.",
)
if elapsed_seconds > FORM_MAX_SECONDS:
logger.warning(
"[%s] rejected: stale form submission (%.0fs > %ss)",
request_id,
elapsed_seconds,
FORM_MAX_SECONDS,
)
raise HTTPException(
status_code=400,
detail="This form has been open for too long. Please refresh the page and try again.",
)
def _is_honeypot_triggered(data: BookingSubmission) -> bool:
return bool(_trimmed(data.website))
def _parse_ua(ua: str) -> str:
if not ua:
return "Unknown"
@@ -596,6 +733,23 @@ async def submit_booking(data: BookingSubmission, request: Request):
)
logger.debug("[%s] full payload: %s", request_id, data.model_dump())
await _enforce_submit_rate_limits(request_id, ip, str(data.email))
_enforce_form_timing(request_id, data)
if _is_honeypot_triggered(data):
logger.warning(
"[%s] honeypot triggered for ip=%s email=%s page=%r",
request_id,
ip,
data.email,
data.page,
)
return {
"ok": True,
"request_id": request_id,
"ignored": True,
}
failures: list[dict] = []
try: