Files
ponzischeme89 6d44e05de4 v1
2026-04-18 07:23:55 +12:00

196 lines
8.0 KiB
Python

"""
Authorization and access-control security tests.
Control coverage
────────────────
OWASP ASVS v4.0 V4 Access Control Verification
OWASP API Top 10 API1:2023 Broken Object Level Authorization (BOLA)
API3:2023 Broken Object Property Level Authorization
API5:2023 Broken Function Level Authorization
"""
import pytest
from httpx import AsyncClient
pytestmark = pytest.mark.asyncio
# ── V4.1 General access control ─────────────────────────────────────────────
class TestUnauthenticatedWriteAccess:
"""ASVS V4.1 | API5 — Every write/mutate endpoint denies unauthenticated callers."""
@pytest.mark.parametrize("method,path,body", [
("POST", "/api/v1/pages", {"title": "T", "slug": "s", "body": "b"}),
("PUT", "/api/v1/pages/any-slug", {"title": "T"}),
("DELETE", "/api/v1/pages/any-slug", None),
("POST", "/api/v1/posts", {"title": "T", "slug": "s", "body": "b"}),
("PUT", "/api/v1/posts/any-slug", {"title": "T"}),
("DELETE", "/api/v1/posts/any-slug", None),
("PUT", "/api/v1/settings", {"site_name": "X"}),
("GET", "/api/v1/analytics/summary", None),
])
async def test_endpoint_requires_auth(
self, client: AsyncClient, method: str, path: str, body: dict | None
):
"""ASVS 4.1.1 | API5 — {method} {path} returns 401/403 without credentials."""
fn = getattr(client, method.lower())
kwargs = {"json": body} if body else {}
resp = await fn(path, **kwargs)
assert resp.status_code in (401, 403), (
f"{method} {path}: expected 401/403 without auth, got {resp.status_code}"
)
async def test_malformed_auth_scheme_rejected(self, client: AsyncClient):
"""ASVS 4.1.1 | API2 — Non-Bearer Authorization schemes are denied."""
bad_headers = [
"Basic dXNlcjpwYXNz", # HTTP Basic
"Token some-api-key", # Token scheme
"Bearer", # Missing credential
]
for auth in bad_headers:
resp = await client.post(
"/api/v1/pages",
json={"title": "x", "slug": "x", "body": "x"},
headers={"Authorization": auth},
)
assert resp.status_code in (401, 403), (
f"Expected 401/403 for Authorization: {auth!r}"
)
# ── V4.2 Object-level authorization (BOLA) ──────────────────────────────────
class TestObjectLevelAuthorization:
"""ASVS V4.2 | API1 BOLA — Objects cannot be accessed or mutated without authorization."""
async def test_nonexistent_page_returns_404_not_403(self, client: AsyncClient):
"""ASVS 4.2.1 | API1 — Missing resource returns 404, not 403 or 500.
Returning 403 for non-existent resources would reveal that the resource
exists but is protected; 404 is the correct public response.
"""
resp = await client.get("/api/v1/pages/this-slug-does-not-exist-999")
assert resp.status_code == 404
async def test_path_traversal_in_slug_is_rejected(self, client: AsyncClient):
"""ASVS 4.2.1 | API1 — Path-traversal sequences in slug parameters are rejected.
URL-encoded and plain traversal strings must not resolve to real resources
or cause server errors.
"""
traversal_slugs = [
"../admin",
"..%2fadmin",
"%2e%2e/secret",
"../../etc/passwd",
]
for slug in traversal_slugs:
resp = await client.get(f"/api/v1/pages/{slug}")
assert resp.status_code in (404, 422), (
f"Unexpected {resp.status_code} for slug {slug!r}"
)
async def test_delete_nonexistent_resource_returns_404(
self, client: AsyncClient, admin_token: str
):
"""ASVS 4.2.1 | API1 — DELETE on an absent resource returns 404, not 204.
Returning 204 for missing resources would silently confirm that the
operation succeeded, masking business-logic gaps.
"""
resp = await client.delete(
"/api/v1/pages/genuinely-does-not-exist",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert resp.status_code == 404
async def test_update_nonexistent_resource_returns_404(
self, client: AsyncClient, admin_token: str
):
"""ASVS 4.2.1 | API1 — PUT on a missing slug returns 404."""
resp = await client.put(
"/api/v1/pages/genuinely-does-not-exist",
json={"title": "Updated"},
headers={"Authorization": f"Bearer {admin_token}"},
)
assert resp.status_code == 404
# ── V4.3 Mass assignment (object property authorization) ─────────────────────
class TestMassAssignment:
"""ASVS V4.3 | API3 — Server-side models reject undeclared or privileged fields."""
async def test_extra_fields_in_create_are_silently_dropped(
self, client: AsyncClient, admin_token: str
):
"""ASVS 4.3.1 | API3 — Injected undeclared fields are not stored or echoed.
Pydantic's schema strips fields not declared in PageCreate.
The response must not contain 'is_admin', 'hashed_password', or any
caller-supplied 'id'.
"""
resp = await client.post(
"/api/v1/pages",
json={
"title": "Mass-assign test",
"slug": "mass-assign-create",
"body": "<p>body</p>",
"published": True,
# Injected fields that must be dropped
"is_admin": True,
"hashed_password": "injected",
"id": "00000000-0000-0000-0000-000000000001",
"internal_field": "evil",
},
headers={"Authorization": f"Bearer {admin_token}"},
)
assert resp.status_code == 201
data = resp.json()
assert "is_admin" not in data
assert "hashed_password" not in data
assert "internal_field" not in data
assert data.get("id") != "00000000-0000-0000-0000-000000000001"
async def test_extra_fields_in_update_are_silently_dropped(
self, client: AsyncClient, admin_token: str
):
"""ASVS 4.3.1 | API3 — Injected fields in PUT body are stripped by the schema."""
await client.post(
"/api/v1/pages",
json={"title": "Base", "slug": "mass-assign-update", "body": "<p>b</p>"},
headers={"Authorization": f"Bearer {admin_token}"},
)
resp = await client.put(
"/api/v1/pages/mass-assign-update",
json={"title": "Updated", "hacked_field": "injected", "is_admin": True},
headers={"Authorization": f"Bearer {admin_token}"},
)
assert resp.status_code == 200
data = resp.json()
assert "hacked_field" not in data
assert "is_admin" not in data
async def test_published_flag_is_controlled_by_caller(
self, client: AsyncClient, admin_token: str
):
"""API3 — The 'published' field is an intentional caller-controlled property.
This test documents that any authenticated user can publish content.
There is no role separation between 'editor' and 'publisher' roles.
If RBAC is added in future, this test should be updated to reflect
the intended access model.
"""
resp = await client.post(
"/api/v1/pages",
json={
"title": "Published Page",
"slug": "published-by-caller",
"body": "<p>visible</p>",
"published": True,
},
headers={"Authorization": f"Bearer {admin_token}"},
)
assert resp.status_code == 201
assert resp.json()["published"] is True