82 lines
2.6 KiB
Python
82 lines
2.6 KiB
Python
|
|
"""
|
||
|
|
Tests for the /api/v1/auth/* endpoints.
|
||
|
|
"""
|
||
|
|
import pytest
|
||
|
|
from httpx import AsyncClient
|
||
|
|
|
||
|
|
pytestmark = pytest.mark.asyncio
|
||
|
|
|
||
|
|
|
||
|
|
async def test_login_valid_credentials(client: AsyncClient, admin_user):
|
||
|
|
"""Login with correct credentials returns 200 and both tokens."""
|
||
|
|
response = await client.post(
|
||
|
|
"/api/v1/auth/login",
|
||
|
|
json={"email": "admin@example.com", "password": "testpassword"},
|
||
|
|
)
|
||
|
|
assert response.status_code == 200
|
||
|
|
data = response.json()
|
||
|
|
assert "access_token" in data
|
||
|
|
assert "refresh_token" in data
|
||
|
|
assert data["token_type"] == "bearer"
|
||
|
|
assert len(data["access_token"]) > 10
|
||
|
|
assert len(data["refresh_token"]) > 10
|
||
|
|
|
||
|
|
|
||
|
|
async def test_login_invalid_password(client: AsyncClient, admin_user):
|
||
|
|
"""Login with wrong password returns 401."""
|
||
|
|
response = await client.post(
|
||
|
|
"/api/v1/auth/login",
|
||
|
|
json={"email": "admin@example.com", "password": "wrongpassword"},
|
||
|
|
)
|
||
|
|
assert response.status_code == 401
|
||
|
|
assert "Invalid" in response.json()["detail"]
|
||
|
|
|
||
|
|
|
||
|
|
async def test_login_unknown_email(client: AsyncClient):
|
||
|
|
"""Login with unknown email returns 401."""
|
||
|
|
response = await client.post(
|
||
|
|
"/api/v1/auth/login",
|
||
|
|
json={"email": "nobody@example.com", "password": "whatever"},
|
||
|
|
)
|
||
|
|
assert response.status_code == 401
|
||
|
|
|
||
|
|
|
||
|
|
async def test_refresh_token_flow(client: AsyncClient, admin_user):
|
||
|
|
"""Valid refresh token returns a new token pair; old token is revoked."""
|
||
|
|
# Login to get initial tokens
|
||
|
|
login_resp = await client.post(
|
||
|
|
"/api/v1/auth/login",
|
||
|
|
json={"email": "admin@example.com", "password": "testpassword"},
|
||
|
|
)
|
||
|
|
assert login_resp.status_code == 200
|
||
|
|
tokens = login_resp.json()
|
||
|
|
original_refresh = tokens["refresh_token"]
|
||
|
|
|
||
|
|
# Use the refresh token to get a new pair
|
||
|
|
refresh_resp = await client.post(
|
||
|
|
"/api/v1/auth/refresh",
|
||
|
|
json={"refresh_token": original_refresh},
|
||
|
|
)
|
||
|
|
assert refresh_resp.status_code == 200
|
||
|
|
new_tokens = refresh_resp.json()
|
||
|
|
assert "access_token" in new_tokens
|
||
|
|
assert "refresh_token" in new_tokens
|
||
|
|
# New refresh token should be different
|
||
|
|
assert new_tokens["refresh_token"] != original_refresh
|
||
|
|
|
||
|
|
# Using the old refresh token should now fail (revoked)
|
||
|
|
reuse_resp = await client.post(
|
||
|
|
"/api/v1/auth/refresh",
|
||
|
|
json={"refresh_token": original_refresh},
|
||
|
|
)
|
||
|
|
assert reuse_resp.status_code == 401
|
||
|
|
|
||
|
|
|
||
|
|
async def test_refresh_invalid_token(client: AsyncClient):
|
||
|
|
"""Passing a made-up refresh token returns 401."""
|
||
|
|
response = await client.post(
|
||
|
|
"/api/v1/auth/refresh",
|
||
|
|
json={"refresh_token": "not-a-real-token"},
|
||
|
|
)
|
||
|
|
assert response.status_code == 401
|