Files
ponzischeme89 6d44e05de4 v1
2026-04-18 07:23:55 +12:00

775 lines
32 KiB
Python

"""
Goodwalk Backend CLI
--------------------
A rich management interface for the Goodwalk CMS API.
Usage (from backend/ directory):
python cli.py — show help
python cli.py dev — start dev server
python cli.py migrate — run Alembic migrations
python cli.py seed — seed admin user + sample CMS content
python cli.py seed-content — seed site content from data/content.json
python cli.py status — show DB connection + table row counts
python cli.py create-admin — create a new admin user interactively
python cli.py routes — list all registered API routes
"""
import asyncio
import os
import sys
from pathlib import Path
import typer
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from rich.prompt import Prompt, Confirm
from rich.table import Table
from rich.text import Text
from rich import print as rprint
# ── Setup ─────────────────────────────────────────────────────────────────────
app = typer.Typer(
name="goodwalk",
help="[bold green]Goodwalk CMS API[/bold green] — management CLI",
add_completion=False,
rich_markup_mode="rich",
)
console = Console()
BANNER = """
[bold green]
██████╗ ██████╗ ██████╗ ██████╗ ██╗ ██╗ █████╗ ██╗ ██╗ ██╗
██╔════╝ ██╔═══██╗██╔═══██╗██╔══██╗██║ ██║██╔══██╗██║ ██║ ██╔╝
██║ ███╗██║ ██║██║ ██║██║ ██║██║ █╗ ██║███████║██║ █████╔╝
██║ ██║██║ ██║██║ ██║██║ ██║██║███╗██║██╔══██║██║ ██╔═██╗
╚██████╔╝╚██████╔╝╚██████╔╝██████╔╝╚███╔███╔╝██║ ██║███████╗██║ ██╗
╚═════╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚══╝╚══╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝
[/bold green][dim] CMS API · FastAPI · PostgreSQL · JWT Auth[/dim]
"""
def _print_banner():
console.print(BANNER)
def _load_settings():
"""Import app settings (triggers .env load)."""
try:
from app.config import settings
return settings
except Exception as e:
console.print(f"[bold red]✗ Failed to load settings:[/bold red] {e}")
raise typer.Exit(1)
def _make_engine(settings):
from sqlalchemy.ext.asyncio import create_async_engine
return create_async_engine(settings.DATABASE_URL, echo=False)
# ── dev ───────────────────────────────────────────────────────────────────────
@app.command()
def dev(
host: str = typer.Option("127.0.0.1", "--host", "-h", help="Bind host"),
port: int = typer.Option(8000, "--port", "-p", help="Bind port"),
reload: bool = typer.Option(True, "--reload/--no-reload", help="Enable auto-reload"),
):
"""
[bold]Start the development server.[/bold]
Launches Uvicorn with hot-reload enabled. Reads configuration from [cyan].env[/cyan].
"""
_print_banner()
settings = _load_settings()
info = Table.grid(padding=(0, 2))
info.add_column(style="dim")
info.add_column(style="bold cyan")
info.add_row("API URL", f"http://{host}:{port}")
info.add_row("Docs", f"http://{host}:{port}/docs")
info.add_row("Redoc", f"http://{host}:{port}/redoc")
info.add_row("Health", f"http://{host}:{port}/health")
info.add_row("Database", settings.DATABASE_URL.split("@")[-1] if "@" in settings.DATABASE_URL else settings.DATABASE_URL)
info.add_row("CORS Origins", settings.ALLOWED_ORIGINS)
info.add_row("Auto-reload", "[green]on[/green]" if reload else "[yellow]off[/yellow]")
console.print(Panel(info, title="[bold green]Starting Goodwalk CMS API[/bold green]", border_style="green"))
console.print()
import uvicorn
uvicorn.run(
"app.main:app",
host=host,
port=port,
reload=reload,
log_level="warning", # suppress uvicorn's own access log — we use RequestLogMiddleware
access_log=False,
)
# ── migrate ───────────────────────────────────────────────────────────────────
@app.command()
def migrate(
revision: str = typer.Argument("head", help="Alembic revision target (default: head)"),
autogenerate: bool = typer.Option(False, "--autogenerate", "-a", help="Generate a new migration"),
message: str = typer.Option("auto", "--message", "-m", help="Migration message (used with --autogenerate)"),
):
"""
[bold]Run database migrations via Alembic.[/bold]
By default runs [cyan]alembic upgrade head[/cyan].
Pass [cyan]--autogenerate[/cyan] to generate a new migration from model changes.
"""
_print_banner()
console.print(Panel("[bold]Running Alembic migrations[/bold]", border_style="cyan"))
import subprocess
if autogenerate:
cmd = [sys.executable, "-m", "alembic", "revision", "--autogenerate", "-m", message]
label = f"Generating migration: [cyan]{message}[/cyan]"
else:
cmd = [sys.executable, "-m", "alembic", "upgrade", revision]
label = f"Upgrading to [cyan]{revision}[/cyan]"
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task(label, total=None)
result = subprocess.run(cmd, capture_output=True, text=True)
progress.update(task, completed=True)
if result.returncode == 0:
console.print(f"\n[bold green]✓ Migration complete[/bold green]")
if result.stdout.strip():
console.print(f"[dim]{result.stdout.strip()}[/dim]")
else:
console.print(f"\n[bold red]✗ Migration failed[/bold red]")
console.print(f"[red]{result.stderr.strip()}[/red]")
raise typer.Exit(1)
# ── seed ──────────────────────────────────────────────────────────────────────
@app.command()
def seed():
"""
[bold]Seed the database with a default admin user and sample content.[/bold]
Creates:
• Admin user [cyan]admin@example.com[/cyan] / [cyan]changeme123[/cyan]
• Sample Page, BlogPost, and SiteSettings rows
• All site content sections from [cyan]data/content.json[/cyan]
"""
_print_banner()
console.print(Panel("[bold]Seeding database[/bold]", border_style="cyan"))
asyncio.run(_seed_async())
async def _seed_async():
settings = _load_settings()
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import select, text
from app.models import Base, User, Page, BlogPost, SiteSettings, ContentSection
from app.auth.password import hash_password
import json
engine = _make_engine(settings)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Session = async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
results = []
async with Session() as session:
# ── Admin user ──────────────────────────────────────────────────────
existing = await session.execute(select(User).where(User.email == "admin@example.com"))
if existing.scalar_one_or_none() is None:
session.add(User(
email="admin@example.com",
hashed_password=hash_password("changeme123"),
is_active=True,
))
results.append(("[green]created[/green]", "User", "admin@example.com"))
else:
results.append(("[yellow]exists[/yellow] ", "User", "admin@example.com"))
# ── Sample Page ─────────────────────────────────────────────────────
existing = await session.execute(select(Page).where(Page.slug == "home"))
if existing.scalar_one_or_none() is None:
session.add(Page(
title="Home",
slug="home",
body="<h1>Welcome to Goodwalk</h1><p>Professional dog walking services across Auckland Central.</p>",
meta_title="Goodwalk Auckland Dog Walking Service",
meta_description="Trusted, professional dog walking services across Auckland Central.",
published=True,
))
results.append(("[green]created[/green]", "Page", "home"))
else:
results.append(("[yellow]exists[/yellow] ", "Page", "home"))
# ── Sample BlogPost ─────────────────────────────────────────────────
existing = await session.execute(select(BlogPost).where(BlogPost.slug == "hello-world"))
if existing.scalar_one_or_none() is None:
session.add(BlogPost(
title="Welcome to the Goodwalk Blog",
slug="hello-world",
excerpt="Our first blog post — introducing Goodwalk and the Tiny Gang.",
body="<p>Welcome to the Goodwalk blog! We'll be sharing updates, tips, and stories from the Tiny Gang.</p>",
author="Alessandra",
tags=["news", "welcome"],
published=True,
))
results.append(("[green]created[/green]", "BlogPost", "hello-world"))
else:
results.append(("[yellow]exists[/yellow] ", "BlogPost", "hello-world"))
# ── SiteSettings ────────────────────────────────────────────────────
existing = await session.execute(select(SiteSettings).limit(1))
if existing.scalar_one_or_none() is None:
session.add(SiteSettings(
site_name="Goodwalk",
tagline="Unleashing Fun in Your Dog's Day!",
logo_url="/images/logo-v6.png",
footer_text="© 2026 Goodwalk. All rights reserved.",
social_links={
"instagram": "https://www.instagram.com/goodwalk.nz",
"facebook": "https://www.facebook.com/goodwalk.nz",
"google": "https://g.page/goodwalk",
},
))
results.append(("[green]created[/green]", "SiteSettings", "singleton"))
else:
results.append(("[yellow]exists[/yellow] ", "SiteSettings", "singleton"))
# ── Content sections from content.json ──────────────────────────────
content_file = Path(__file__).parent / "data" / "content.json"
if content_file.exists():
with open(content_file, encoding="utf-8") as f:
content = json.load(f)
pages_data = content.get("pages", {})
sections = {
"siteSettings": content.get("siteSettings", {}),
"navigation": content.get("navigation", {}),
"footer": content.get("footer", {}),
"testimonials": content.get("testimonials", []),
"pages.home": pages_data.get("home", {}),
"pages.packWalks": pages_data.get("packWalks", {}),
"pages.oneOnOneWalks": pages_data.get("oneOnOneWalks", {}),
"pages.puppyVisits": pages_data.get("puppyVisits", {}),
"pages.pricing": pages_data.get("pricing", {}),
"pages.about": pages_data.get("about", {}),
"pages.contact": pages_data.get("contact", {}),
}
for key, data in sections.items():
existing = await session.execute(
select(ContentSection).where(ContentSection.key == key)
)
row = existing.scalar_one_or_none()
if row is None:
session.add(ContentSection(key=key, data=data))
results.append(("[green]created[/green]", "Section", key))
else:
row.data = data
results.append(("[cyan]updated[/cyan] ", "Section", key))
else:
console.print(f" [yellow]⚠ content.json not found at {content_file} — skipping sections[/yellow]")
await session.commit()
await engine.dispose()
# ── Results table ────────────────────────────────────────────────────────
table = Table(box=box.ROUNDED, show_header=True, border_style="green")
table.add_column("Status", style="bold", width=12)
table.add_column("Type", style="cyan", width=16)
table.add_column("Key / Identifier")
for status, type_, key in results:
table.add_row(status, type_, key)
console.print()
console.print(table)
console.print(f"\n[bold green]✓ Seed complete[/bold green] — {len(results)} records processed\n")
# ── seed-content ──────────────────────────────────────────────────────────────
@app.command(name="seed-content")
def seed_content():
"""
[bold]Re-seed only the content sections from data/content.json.[/bold]
Upserts all page sections, navigation, footer, testimonials, and settings.
Safe to run multiple times — existing rows are updated, not duplicated.
"""
_print_banner()
console.print(Panel("[bold]Seeding content sections from data/content.json[/bold]", border_style="cyan"))
asyncio.run(_seed_content_async())
async def _seed_content_async():
import json
settings = _load_settings()
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import select
from app.models import Base, ContentSection
content_file = Path(__file__).parent / "data" / "content.json"
if not content_file.exists():
console.print(f"[bold red]✗ Not found:[/bold red] {content_file}")
raise typer.Exit(1)
with open(content_file, encoding="utf-8") as f:
content = json.load(f)
pages_data = content.get("pages", {})
sections = {
"siteSettings": content.get("siteSettings", {}),
"navigation": content.get("navigation", {}),
"footer": content.get("footer", {}),
"testimonials": content.get("testimonials", []),
"pages.home": pages_data.get("home", {}),
"pages.packWalks": pages_data.get("packWalks", {}),
"pages.oneOnOneWalks": pages_data.get("oneOnOneWalks", {}),
"pages.puppyVisits": pages_data.get("puppyVisits", {}),
"pages.pricing": pages_data.get("pricing", {}),
"pages.about": pages_data.get("about", {}),
"pages.contact": pages_data.get("contact", {}),
}
engine = _make_engine(settings)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Session = async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console,
) as progress:
task = progress.add_task("Upserting sections...", total=len(sections))
results = []
async with Session() as session:
for key, data in sections.items():
existing = await session.execute(
select(ContentSection).where(ContentSection.key == key)
)
row = existing.scalar_one_or_none()
if row is None:
session.add(ContentSection(key=key, data=data))
results.append(("[green]inserted[/green]", key))
else:
row.data = data
results.append(("[cyan]updated[/cyan] ", key))
progress.advance(task)
await session.commit()
await engine.dispose()
table = Table(box=box.SIMPLE, show_header=True, border_style="dim")
table.add_column("Status", width=10)
table.add_column("Section key")
for status, key in results:
table.add_row(status, key)
console.print()
console.print(table)
console.print(f"[bold green]✓ Done[/bold green] — {len(results)} sections seeded\n")
# ── status ────────────────────────────────────────────────────────────────────
@app.command()
def status():
"""
[bold]Show database connection status and table row counts.[/bold]
Verifies the connection and prints a summary of all CMS data currently stored.
"""
_print_banner()
asyncio.run(_status_async())
async def _status_async():
settings = _load_settings()
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import select, func, text
from app.models import User, Page, BlogPost, SiteSettings, ContentSection
db_display = settings.DATABASE_URL.split("@")[-1] if "@" in settings.DATABASE_URL else settings.DATABASE_URL
engine = _make_engine(settings)
Session = async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
with Progress(SpinnerColumn(), TextColumn("Connecting to database..."), console=console) as p:
p.add_task("", total=None)
try:
async with Session() as session:
await session.execute(text("SELECT 1"))
connected = True
except Exception as e:
connected = False
err = str(e)
if not connected:
console.print(Panel(
f"[bold red]✗ Cannot connect[/bold red]\n[dim]{db_display}[/dim]\n\n[red]{err}[/red]",
title="Database Status",
border_style="red",
))
raise typer.Exit(1)
# Row counts
async with Session() as session:
counts = {}
for label, model, filter_col in [
("Pages (total)", Page, None),
("Pages (published)", Page, Page.published == True),
("Blog Posts (total)", BlogPost, None),
("Blog Posts (pub.)", BlogPost, BlogPost.published == True),
("Site Settings", SiteSettings, None),
("CMS Users", User, None),
("Content Sections", ContentSection, None),
]:
stmt = select(func.count()).select_from(model)
if filter_col is not None:
stmt = stmt.where(filter_col)
result = await session.execute(stmt)
counts[label] = result.scalar_one()
# Section keys
sections_result = await session.execute(
select(ContentSection.key, ContentSection.updated_at).order_by(ContentSection.key)
)
section_rows = sections_result.all()
await engine.dispose()
# ── Connection panel ─────────────────────────────────────────────────────
conn_info = Table.grid(padding=(0, 2))
conn_info.add_column(style="dim")
conn_info.add_column(style="bold")
conn_info.add_row("Status", "[bold green]● Connected[/bold green]")
conn_info.add_row("Database", db_display)
console.print(Panel(conn_info, title="[bold green]Database Connection[/bold green]", border_style="green"))
console.print()
# ── Row counts ───────────────────────────────────────────────────────────
counts_table = Table(box=box.ROUNDED, title="[bold]Table Row Counts[/bold]", border_style="cyan")
counts_table.add_column("Table / Filter", style="cyan")
counts_table.add_column("Rows", justify="right", style="bold white")
for label, count in counts.items():
style = "green" if count > 0 else "dim"
counts_table.add_row(label, f"[{style}]{count}[/{style}]")
console.print(counts_table)
console.print()
# ── Section keys ─────────────────────────────────────────────────────────
if section_rows:
sec_table = Table(box=box.SIMPLE, title="[bold]Content Sections[/bold]", border_style="dim")
sec_table.add_column("Key", style="cyan")
sec_table.add_column("Last Updated", style="dim")
for key, updated_at in section_rows:
sec_table.add_row(key, updated_at.strftime("%Y-%m-%d %H:%M") if updated_at else "")
console.print(sec_table)
console.print()
# ── create-admin ──────────────────────────────────────────────────────────────
@app.command(name="create-admin")
def create_admin():
"""
[bold]Interactively create a new CMS admin user.[/bold]
Prompts for email and password. Password is bcrypt-hashed before storage.
"""
_print_banner()
console.print(Panel("[bold]Create Admin User[/bold]", border_style="cyan"))
email = Prompt.ask("[cyan]Email address[/cyan]")
password = Prompt.ask("[cyan]Password[/cyan]", password=True)
confirm = Prompt.ask("[cyan]Confirm password[/cyan]", password=True)
if password != confirm:
console.print("[bold red]✗ Passwords do not match[/bold red]")
raise typer.Exit(1)
if len(password) < 8:
console.print("[bold red]✗ Password must be at least 8 characters[/bold red]")
raise typer.Exit(1)
asyncio.run(_create_admin_async(email, password))
async def _create_admin_async(email: str, password: str):
settings = _load_settings()
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import select
from app.models import User
from app.auth.password import hash_password
engine = _make_engine(settings)
Session = async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
async with Session() as session:
existing = await session.execute(select(User).where(User.email == email))
if existing.scalar_one_or_none() is not None:
console.print(f"[bold yellow]⚠ User already exists:[/bold yellow] {email}")
await engine.dispose()
raise typer.Exit(1)
user = User(email=email, hashed_password=hash_password(password), is_active=True)
session.add(user)
await session.commit()
await session.refresh(user)
await engine.dispose()
console.print(f"\n[bold green]✓ Admin user created[/bold green]")
info = Table.grid(padding=(0, 2))
info.add_column(style="dim")
info.add_column(style="bold cyan")
info.add_row("Email", email)
info.add_row("ID", str(user.id))
info.add_row("Active", "[green]yes[/green]")
console.print(Panel(info, border_style="green"))
console.print()
# ── routes ────────────────────────────────────────────────────────────────────
@app.command()
def routes():
"""
[bold]List all registered API routes.[/bold]
Displays method, path, and handler name for every endpoint.
"""
_print_banner()
from app.main import app as fastapi_app
table = Table(box=box.ROUNDED, title="[bold]Registered API Routes[/bold]", border_style="cyan")
table.add_column("Methods", style="bold yellow", width=20)
table.add_column("Path", style="cyan")
table.add_column("Handler", style="dim")
table.add_column("Auth", justify="center", width=6)
METHOD_COLORS = {
"GET": "green",
"POST": "yellow",
"PUT": "blue",
"DELETE": "red",
"PATCH": "magenta",
}
route_rows = []
for route in fastapi_app.routes:
if not hasattr(route, "methods"):
continue
methods = sorted(route.methods or [])
path = route.path
handler = route.endpoint.__name__ if hasattr(route, "endpoint") else ""
# Detect auth by checking if get_current_user is in dependencies
deps = getattr(route, "dependencies", [])
dep_names = [str(d) for d in deps]
auth_required = any("get_current_user" in d for d in dep_names)
# Also check endpoint's own dependencies via __wrapped__ or direct inspection
endpoint_deps = getattr(route.endpoint, "__wrapped__", route.endpoint)
method_str = " ".join(
f"[{METHOD_COLORS.get(m, 'white')}]{m}[/{METHOD_COLORS.get(m, 'white')}]"
for m in methods
)
route_rows.append((path, method_str, handler, auth_required))
# Sort: /api/v1 first, then /api/, then rest
def sort_key(r):
p = r[0]
if p.startswith("/api/v1"): return (0, p)
if p.startswith("/api"): return (1, p)
return (2, p)
route_rows.sort(key=sort_key)
for path, method_str, handler, auth in route_rows:
auth_icon = "[yellow]🔒[/yellow]" if auth else ""
table.add_row(method_str, path, handler, auth_icon)
console.print(table)
console.print(f"\n[dim]Total routes: {len(route_rows)}[/dim]\n")
# ── Interactive shell ─────────────────────────────────────────────────────────
MENU_ITEMS = [
("1", "dev", "Start the FastAPI dev server [dim](background thread)[/dim]"),
("2", "migrate", "Run Alembic migrations [dim](upgrade head)[/dim]"),
("3", "seed", "Seed admin user + all content"),
("4", "seed-content", "Re-seed content.json sections only"),
("5", "status", "Database connection + row counts"),
("6", "create-admin", "Create a new CMS admin user"),
("7", "routes", "List all registered API routes"),
("q", "quit", "Exit the CLI"),
]
# Tracks whether the dev server background thread is running
_server_thread = None
def _print_menu():
table = Table(box=box.SIMPLE, show_header=False, padding=(0, 2), border_style="dim")
table.add_column(style="bold cyan", width=4)
table.add_column(style="bold white", width=18)
table.add_column()
for key, _, description in MENU_ITEMS:
table.add_row(f"[{key}]", _, description)
console.print(Panel(table, title="[bold green]Goodwalk CLI — What would you like to do?[/bold green]", border_style="green"))
def _run_dev_in_background(host: str, port: int):
"""Start Uvicorn in a daemon thread so the prompt stays live."""
import threading
import uvicorn
global _server_thread
if _server_thread is not None and _server_thread.is_alive():
console.print("[yellow]⚠ Dev server is already running[/yellow]")
return
config = uvicorn.Config("app.main:app", host=host, port=port, reload=False, log_level="warning", access_log=False)
server = uvicorn.Server(config)
def _target():
asyncio.run(server.serve())
_server_thread = threading.Thread(target=_target, daemon=True, name="uvicorn")
_server_thread.start()
console.print(f"\n[bold green]✓ Dev server started[/bold green] → [cyan]http://{host}:{port}[/cyan] | docs: [cyan]http://{host}:{port}/docs[/cyan]")
console.print("[dim]Running in background — use [q] to exit the CLI (server stops with it)[/dim]\n")
def _dispatch(choice: str):
"""Run the selected command without exiting the shell."""
choice = choice.strip().lower()
if choice in ("1", "dev"):
settings = _load_settings()
_run_dev_in_background("127.0.0.1", 8000)
elif choice in ("2", "migrate"):
import subprocess
autogen = Confirm.ask("Generate a new migration? (no = upgrade head)", default=False)
if autogen:
msg = Prompt.ask("Migration message", default="auto")
cmd = [sys.executable, "-m", "alembic", "revision", "--autogenerate", "-m", msg]
else:
cmd = [sys.executable, "-m", "alembic", "upgrade", "head"]
with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console) as p:
p.add_task("Running migration...", total=None)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
console.print("[bold green]✓ Migration complete[/bold green]")
if result.stdout.strip():
console.print(f"[dim]{result.stdout.strip()}[/dim]")
else:
console.print(f"[bold red]✗ Migration failed[/bold red]\n[red]{result.stderr.strip()}[/red]")
elif choice in ("3", "seed"):
asyncio.run(_seed_async())
elif choice in ("4", "seed-content"):
asyncio.run(_seed_content_async())
elif choice in ("5", "status"):
asyncio.run(_status_async())
elif choice in ("6", "create-admin"):
console.print(Panel("[bold]Create Admin User[/bold]", border_style="cyan"))
email = Prompt.ask("[cyan]Email address[/cyan]")
password = Prompt.ask("[cyan]Password[/cyan]", password=True)
confirm = Prompt.ask("[cyan]Confirm password[/cyan]", password=True)
if password != confirm:
console.print("[bold red]✗ Passwords do not match[/bold red]")
elif len(password) < 8:
console.print("[bold red]✗ Password must be at least 8 characters[/bold red]")
else:
asyncio.run(_create_admin_async(email, password))
elif choice in ("7", "routes"):
routes()
elif choice in ("q", "quit", "exit"):
console.print("\n[dim]Goodbye 👋[/dim]\n")
raise SystemExit(0)
else:
console.print(f"[yellow]Unknown command:[/yellow] [bold]{choice}[/bold] — enter a number or letter from the menu")
@app.command()
def shell():
"""
[bold]Launch the interactive shell.[/bold]
Presents a menu after each command so you can keep working without restarting.
The dev server runs in a background thread — the prompt stays live.
"""
_print_banner()
console.print("[dim]Type a number, command name, or [bold]q[/bold] to quit. Ctrl+C also exits.[/dim]\n")
while True:
_print_menu()
try:
choice = Prompt.ask("[bold green]>[/bold green]", default="").strip()
except (KeyboardInterrupt, EOFError):
console.print("\n[dim]Goodbye 👋[/dim]\n")
break
if not choice:
continue
console.print()
try:
_dispatch(choice)
except SystemExit:
break
except Exception as e:
console.print(f"[bold red]✗ Error:[/bold red] {e}")
console.print()
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Running with no arguments → drop straight into the interactive shell
if len(sys.argv) == 1:
sys.argv.append("shell")
app()