Files
gw/backend/tests/security/test_authn.py
T

329 lines
13 KiB
Python
Raw Normal View History

2026-04-18 07:23:55 +12:00
"""
Authentication security tests.
Control coverage
────────────────
OWASP ASVS v4.0 V2 Authentication Verification
V3 Session Management Verification
OWASP API Top 10 API2:2023 Broken Authentication
"""
import base64
import json
from datetime import timedelta
import pytest
from httpx import AsyncClient
from jose import jwt as jose_jwt
from sqlalchemy import update as sa_update
from app.auth.jwt import create_access_token
from app.models.user import User
pytestmark = pytest.mark.asyncio
# ── V2.1 Password security ───────────────────────────────────────────────────
class TestCredentialValidation:
"""ASVS V2.1 — Credential acceptance and rejection rules."""
async def test_wrong_password_returns_401(self, client: AsyncClient, admin_user):
"""ASVS 2.1.1 | API2 — Incorrect password is rejected with 401."""
resp = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": "wrongpassword"},
)
assert resp.status_code == 401
async def test_unknown_email_returns_401(self, client: AsyncClient):
"""ASVS 2.1.1 | API2 — Unregistered email returns 401, not 404.
Returning 404 for an unknown email would allow attackers to enumerate
registered accounts.
"""
resp = await client.post(
"/api/v1/auth/login",
json={"email": "ghost@example.com", "password": "anything"},
)
assert resp.status_code == 401
async def test_empty_password_rejected(self, client: AsyncClient, admin_user):
"""ASVS 2.1.1 — Empty password string is rejected."""
resp = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": ""},
)
assert resp.status_code in (401, 422)
async def test_null_password_rejected(self, client: AsyncClient, admin_user):
"""ASVS 2.1.1 — Null password field fails schema validation."""
resp = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": None},
)
assert resp.status_code == 422
async def test_missing_fields_rejected(self, client: AsyncClient):
"""ASVS 2.1.1 — Requests missing required auth fields return 422."""
resp = await client.post("/api/v1/auth/login", json={})
assert resp.status_code == 422
async def test_very_long_password_handled_safely(self, client: AsyncClient, admin_user):
"""ASVS 2.1.7 — Passwords of 1 000+ characters must not cause a 500.
bcrypt >= 4.0 raises ValueError('Password must be 72 bytes or fewer') when
checkpw() is called with an oversized password. The app must catch this and
return 401 rather than propagating a 500.
"""
resp = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": "x" * 1000},
)
assert resp.status_code == 401
async def test_unicode_password_handled_safely(self, client: AsyncClient, admin_user):
"""ASVS 2.1.4 — Multi-byte / emoji passwords do not cause a 500."""
resp = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": "пароль🔑emoji"},
)
assert resp.status_code == 401
async def test_inactive_account_rejected(
self, client: AsyncClient, admin_user, db_session
):
"""ASVS 2.1.10 — Deactivated accounts cannot authenticate.
is_active=False is the soft-disable mechanism; the login handler checks
this after verifying the password.
"""
await db_session.execute(
sa_update(User)
.where(User.id == admin_user.id)
.values(is_active=False)
)
await db_session.commit()
resp = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": "testpassword"},
)
assert resp.status_code == 401
async def test_error_response_does_not_enumerate_users(
self, client: AsyncClient, admin_user
):
"""ASVS 2.2.2 | API2 — Bad password and unknown email return identical status codes.
A differing status code (e.g. 404 vs 401) or error message leaks
whether an address is registered, enabling user enumeration.
"""
wrong_pass = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": "wrong"},
)
no_user = await client.post(
"/api/v1/auth/login",
json={"email": "nobody@example.com", "password": "wrong"},
)
assert wrong_pass.status_code == no_user.status_code == 401
# ── V3.5 Token-based session management ─────────────────────────────────────
class TestJWTSecurity:
"""ASVS V3.5 | API2 — JWT access token validation controls."""
async def test_no_auth_header_rejected(self, client: AsyncClient):
"""ASVS 3.5.1 | API2 — Write endpoint with no Authorization header is denied."""
resp = await client.post(
"/api/v1/pages",
json={"title": "x", "slug": "x", "body": "x"},
)
assert resp.status_code in (401, 403)
async def test_garbage_bearer_token_rejected(self, client: AsyncClient):
"""ASVS 3.5.1 | API2 — Arbitrary string in Bearer position is rejected."""
resp = await client.post(
"/api/v1/pages",
json={"title": "x", "slug": "x", "body": "x"},
headers={"Authorization": "Bearer not-a-real-jwt"},
)
assert resp.status_code in (401, 403)
async def test_wrong_signing_key_rejected(self, client: AsyncClient, admin_user):
"""ASVS 3.5.1 | API2 — JWT signed with a different secret is rejected.
Tokens signed with a different key have a valid structure but fail
signature verification against the server's SECRET_KEY.
"""
fake_token = jose_jwt.encode(
{"sub": str(admin_user.id)}, "wrong-secret", algorithm="HS256"
)
resp = await client.post(
"/api/v1/pages",
json={"title": "x", "slug": "x", "body": "x"},
headers={"Authorization": f"Bearer {fake_token}"},
)
assert resp.status_code in (401, 403)
async def test_expired_access_token_rejected(self, client: AsyncClient, admin_user):
"""ASVS 3.5.2 | API2 — Expired JWT is rejected even if the signature is valid.
An expired token is issued with expires_delta in the past (-1 s), so
the 'exp' claim is already exceeded at the time of the request.
"""
expired = create_access_token(
data={"sub": str(admin_user.id)},
expires_delta=timedelta(seconds=-1),
)
resp = await client.post(
"/api/v1/pages",
json={"title": "x", "slug": "x", "body": "x"},
headers={"Authorization": f"Bearer {expired}"},
)
assert resp.status_code in (401, 403)
async def test_alg_none_attack_rejected(self, client: AsyncClient, admin_user):
"""ASVS 3.5.3 | API2 — JWT with algorithm 'none' (unsigned) is rejected.
The 'alg: none' attack tricks vulnerable verifiers into accepting
unsigned tokens. python-jose rejects them when a key is expected.
The unsigned token is constructed manually to avoid library restrictions.
"""
header_b64 = base64.urlsafe_b64encode(
b'{"alg":"none","typ":"JWT"}'
).rstrip(b"=").decode()
payload_b64 = base64.urlsafe_b64encode(
json.dumps({"sub": str(admin_user.id)}).encode()
).rstrip(b"=").decode()
none_token = f"{header_b64}.{payload_b64}."
resp = await client.post(
"/api/v1/pages",
json={"title": "x", "slug": "x", "body": "x"},
headers={"Authorization": f"Bearer {none_token}"},
)
assert resp.status_code in (401, 403)
async def test_tampered_payload_rejected(self, client: AsyncClient, admin_user):
"""ASVS 3.5.1 | API2 — JWT with a replaced payload but original signature is rejected.
The signature covers the original header+payload. Swapping the payload
invalidates the signature, so the token must be rejected even though
the signature portion itself is a valid HMAC.
"""
valid_token = create_access_token(data={"sub": str(admin_user.id)})
header, _, signature = valid_token.split(".")
fake_payload = base64.urlsafe_b64encode(
json.dumps({"sub": "00000000-0000-0000-0000-000000000000"}).encode()
).rstrip(b"=").decode()
tampered = f"{header}.{fake_payload}.{signature}"
resp = await client.post(
"/api/v1/pages",
json={"title": "x", "slug": "x", "body": "x"},
headers={"Authorization": f"Bearer {tampered}"},
)
assert resp.status_code in (401, 403)
async def test_token_in_query_string_rejected(self, client: AsyncClient, admin_user):
"""ASVS 3.5.1 | API2 — Bearer token passed as a query parameter is rejected.
Tokens in URLs appear in server logs, browser history, and Referer
headers, making them trivially leakable. Only the Authorization header
is accepted.
"""
valid_token = create_access_token(data={"sub": str(admin_user.id)})
resp = await client.post(
f"/api/v1/pages?token={valid_token}",
json={"title": "x", "slug": "x", "body": "x"},
)
assert resp.status_code in (401, 403)
async def test_basic_auth_scheme_rejected(self, client: AsyncClient, admin_user):
"""ASVS 3.5.1 | API2 — HTTP Basic Auth scheme is not accepted; Bearer is required."""
import base64 as b64
credentials = b64.b64encode(b"admin@example.com:testpassword").decode()
resp = await client.post(
"/api/v1/pages",
json={"title": "x", "slug": "x", "body": "x"},
headers={"Authorization": f"Basic {credentials}"},
)
assert resp.status_code in (401, 403)
# ── V3.5 Refresh token rotation and revocation ───────────────────────────────
class TestRefreshTokenSecurity:
"""ASVS V3.5 | API2 — Refresh token single-use rotation and revocation."""
async def test_refresh_token_is_rotated_and_old_token_revoked(
self, client: AsyncClient, admin_user
):
"""ASVS 3.5.2 — After rotation the original refresh token cannot be reused.
The server performs one-time-use rotation: on each /auth/refresh call the
presented token is revoked atomically and a new pair is issued. Presenting
the old token a second time must return 401.
"""
login = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": "testpassword"},
)
assert login.status_code == 200
original_refresh = login.json()["refresh_token"]
rotate = await client.post(
"/api/v1/auth/refresh",
json={"refresh_token": original_refresh},
)
assert rotate.status_code == 200
assert rotate.json()["refresh_token"] != original_refresh
reuse = await client.post(
"/api/v1/auth/refresh",
json={"refresh_token": original_refresh},
)
assert reuse.status_code == 401
async def test_forged_refresh_token_rejected(self, client: AsyncClient):
"""ASVS 3.5.2 | API2 — A randomly-generated string is not a valid refresh token."""
resp = await client.post(
"/api/v1/auth/refresh",
json={"refresh_token": "totally-made-up-random-value"},
)
assert resp.status_code == 401
async def test_empty_refresh_token_rejected(self, client: AsyncClient):
"""ASVS 3.5.2 — Empty refresh token string is rejected."""
resp = await client.post(
"/api/v1/auth/refresh",
json={"refresh_token": ""},
)
assert resp.status_code in (401, 422)
async def test_new_access_token_is_functional(
self, client: AsyncClient, admin_user
):
"""ASVS 3.5.2 — Access token issued after a refresh is accepted by protected endpoints."""
login = await client.post(
"/api/v1/auth/login",
json={"email": "admin@example.com", "password": "testpassword"},
)
original_refresh = login.json()["refresh_token"]
rotate = await client.post(
"/api/v1/auth/refresh",
json={"refresh_token": original_refresh},
)
new_access = rotate.json()["access_token"]
resp = await client.post(
"/api/v1/pages",
json={"title": "Post-refresh", "slug": "post-refresh", "body": "<p>ok</p>"},
headers={"Authorization": f"Bearer {new_access}"},
)
assert resp.status_code == 201