373 lines
11 KiB
Python
373 lines
11 KiB
Python
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
from typing import Iterable
|
||
|
||
try:
|
||
from rich.console import Console
|
||
from rich.logging import RichHandler
|
||
from rich.table import Table
|
||
from rich.text import Text
|
||
except ImportError: # pragma: no cover - exercised only before dependency install
|
||
Console = None
|
||
RichHandler = None
|
||
Table = None
|
||
Text = None
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class LoggingSettings:
|
||
app_name: str
|
||
app_env: str
|
||
host: str
|
||
port: int
|
||
log_level: str
|
||
log_verbose: bool
|
||
database_url: str
|
||
version: str
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class StartupStatus:
|
||
app_name: str
|
||
version: str
|
||
environment: str
|
||
host: str
|
||
port: int
|
||
database: str
|
||
mode: str
|
||
started_at: str
|
||
local_url: str
|
||
network_url: str
|
||
|
||
|
||
class PlainFormatter(logging.Formatter):
|
||
default_time_format = "%Y-%m-%d %H:%M:%S"
|
||
|
||
def format(self, record: logging.LogRecord) -> str:
|
||
if not hasattr(record, "component"):
|
||
record.component = record.name.rsplit(".", 1)[-1]
|
||
return super().format(record)
|
||
|
||
|
||
def _allow_color() -> bool:
|
||
if RichHandler is None or Console is None:
|
||
return False
|
||
if os.getenv("NO_COLOR"):
|
||
return False
|
||
if os.getenv("TERM") == "dumb":
|
||
return False
|
||
return hasattr(sys.stderr, "isatty") and sys.stderr.isatty()
|
||
|
||
|
||
def _allow_unicode() -> bool:
|
||
encoding = (getattr(sys.stdout, "encoding", None) or "").lower()
|
||
if not encoding:
|
||
return False
|
||
return "utf" in encoding
|
||
|
||
|
||
def _console() -> Console:
|
||
return Console(
|
||
stderr=True,
|
||
soft_wrap=False,
|
||
highlight=False,
|
||
force_terminal=_allow_color(),
|
||
no_color=not _allow_color(),
|
||
emoji=False,
|
||
)
|
||
|
||
|
||
def _rich_handler(level: str) -> RichHandler:
|
||
return RichHandler(
|
||
level=level,
|
||
console=_console(),
|
||
show_time=True,
|
||
show_level=True,
|
||
show_path=False,
|
||
omit_repeated_times=False,
|
||
markup=True,
|
||
rich_tracebacks=True,
|
||
tracebacks_show_locals=False,
|
||
log_time_format="%H:%M:%S",
|
||
)
|
||
|
||
|
||
def _plain_handler(level: str) -> logging.StreamHandler:
|
||
handler = logging.StreamHandler()
|
||
handler.setLevel(level)
|
||
handler.setFormatter(
|
||
PlainFormatter("%(asctime)s | %(levelname)-7s | %(component)-10s | %(message)s")
|
||
)
|
||
return handler
|
||
|
||
|
||
def _handler(level: str) -> logging.Handler:
|
||
return _rich_handler(level) if _allow_color() else _plain_handler(level)
|
||
|
||
|
||
def configure_logging(settings: LoggingSettings) -> None:
|
||
level = settings.log_level.upper()
|
||
root = logging.getLogger()
|
||
root.handlers.clear()
|
||
root.setLevel(level)
|
||
root.addHandler(_handler(level))
|
||
|
||
for name in ("uvicorn", "uvicorn.error", "fastapi"):
|
||
logger = logging.getLogger(name)
|
||
logger.handlers.clear()
|
||
logger.setLevel(level)
|
||
logger.propagate = True
|
||
|
||
access_logger = logging.getLogger("uvicorn.access")
|
||
access_logger.handlers.clear()
|
||
access_logger.propagate = False
|
||
access_logger.disabled = True
|
||
|
||
|
||
def get_logger(name: str) -> logging.LoggerAdapter[logging.Logger]:
|
||
component = name.rsplit(".", 1)[-1]
|
||
return logging.LoggerAdapter(logging.getLogger(name), {"component": component})
|
||
|
||
|
||
def _icon(name: str) -> str:
|
||
ascii_icons = {
|
||
"app": "#",
|
||
"info": "i",
|
||
"success": "+",
|
||
"warning": "!",
|
||
"error": "x",
|
||
"debug": ".",
|
||
"section": "=",
|
||
"url": ">",
|
||
"shutdown": "-",
|
||
}
|
||
unicode_icons = {
|
||
"app": "◆",
|
||
"info": "ℹ",
|
||
"success": "✓",
|
||
"warning": "▲",
|
||
"error": "✖",
|
||
"debug": "•",
|
||
"section": "─",
|
||
"url": "↳",
|
||
"shutdown": "◦",
|
||
}
|
||
icons = unicode_icons if _allow_unicode() else ascii_icons
|
||
return icons[name]
|
||
|
||
|
||
def _style(name: str) -> str:
|
||
return {
|
||
"info": "bold cyan",
|
||
"success": "bold green",
|
||
"warning": "bold yellow",
|
||
"error": "bold red",
|
||
"debug": "dim",
|
||
"section": "bold bright_blue",
|
||
"muted": "grey62",
|
||
}[name]
|
||
|
||
|
||
def section_heading(title: str) -> None:
|
||
logger = get_logger("data_entry_app.section")
|
||
if _allow_color():
|
||
_console().rule(Text(f" {title.upper()} ", style=_style("section")))
|
||
return
|
||
logger.info("%s %s %s", _icon("section") * 10, title.upper(), _icon("section") * 10)
|
||
|
||
|
||
def startup_banner(status: StartupStatus) -> None:
|
||
logger = get_logger("data_entry_app.startup")
|
||
if _allow_color():
|
||
console = _console()
|
||
table = Table.grid(expand=False)
|
||
table.add_column(style="bold white", justify="left")
|
||
table.add_column(style="white", justify="left")
|
||
table.add_row("Environment", status.environment)
|
||
table.add_row("Version", status.version)
|
||
table.add_row("Host", status.host)
|
||
table.add_row("Port", str(status.port))
|
||
table.add_row("Database", status.database)
|
||
table.add_row("Mode", status.mode)
|
||
table.add_row("Started", status.started_at)
|
||
|
||
console.rule(Text(f" {status.app_name} ", style="bold white"))
|
||
console.print(Text("Clean startup. Clear status. Ready.", style="italic cyan"))
|
||
console.print(table)
|
||
console.print()
|
||
console.print(Text("App is running at:", style="bold white"))
|
||
console.print(Text(f" Local: {status.local_url}", style="cyan"))
|
||
console.print(Text(f" Network: {status.network_url}", style="cyan"))
|
||
console.print()
|
||
return
|
||
|
||
logger.info("%s %s", _icon("app"), "Startup banner")
|
||
logger.info("App : %s", status.app_name)
|
||
logger.info("Environment : %s", status.environment)
|
||
logger.info("Version : %s", status.version)
|
||
logger.info("Host : %s", status.host)
|
||
logger.info("Port : %s", status.port)
|
||
logger.info("Database : %s", status.database)
|
||
logger.info("Mode : %s", status.mode)
|
||
logger.info("Started : %s", status.started_at)
|
||
logger.info("Local : %s", status.local_url)
|
||
logger.info("Network : %s", status.network_url)
|
||
|
||
|
||
def status_message(level: str, message: str, *args: object, logger_name: str = "data_entry_app.status") -> None:
|
||
palette = {
|
||
"debug": logging.DEBUG,
|
||
"info": logging.INFO,
|
||
"success": logging.INFO,
|
||
"warning": logging.WARNING,
|
||
"error": logging.ERROR,
|
||
}
|
||
labels = {
|
||
"debug": f"[{_icon('debug')}]",
|
||
"info": f"[{_icon('info')}]",
|
||
"success": f"[{_icon('success')}]",
|
||
"warning": f"[{_icon('warning')}]",
|
||
"error": f"[{_icon('error')}]",
|
||
}
|
||
styles = {
|
||
"debug": _style("debug"),
|
||
"info": _style("info"),
|
||
"success": _style("success"),
|
||
"warning": _style("warning"),
|
||
"error": _style("error"),
|
||
}
|
||
logger = get_logger(logger_name)
|
||
rendered = message % args if args else message
|
||
if _allow_color():
|
||
logger.log(palette[level], f"[{styles[level]}]{labels[level]}[/] {rendered}")
|
||
else:
|
||
logger.log(palette[level], "%s %s", labels[level], rendered)
|
||
|
||
|
||
def success(message: str, *args: object, logger_name: str = "data_entry_app.status") -> None:
|
||
status_message("success", message, *args, logger_name=logger_name)
|
||
|
||
|
||
def warning(message: str, *args: object, logger_name: str = "data_entry_app.status") -> None:
|
||
status_message("warning", message, *args, logger_name=logger_name)
|
||
|
||
|
||
def info(message: str, *args: object, logger_name: str = "data_entry_app.status") -> None:
|
||
status_message("info", message, *args, logger_name=logger_name)
|
||
|
||
|
||
def debug(message: str, *args: object, logger_name: str = "data_entry_app.status") -> None:
|
||
status_message("debug", message, *args, logger_name=logger_name)
|
||
|
||
|
||
def fatal(message: str, *args: object, exc_info: bool = False, logger_name: str = "data_entry_app.status") -> None:
|
||
logger = get_logger(logger_name)
|
||
rendered = message % args if args else message
|
||
if _allow_color():
|
||
logger.error(f"[{_style('error')}][{_icon('error')}][/] {rendered}", exc_info=exc_info)
|
||
else:
|
||
logger.error("[%s] %s", _icon("error"), rendered, exc_info=exc_info)
|
||
|
||
|
||
def shutdown_summary(*, uptime_seconds: float, requests_served: int, host: str, port: int) -> None:
|
||
section_heading("Shutdown")
|
||
logger = get_logger("data_entry_app.shutdown")
|
||
summary = f"Uptime {uptime_seconds:.1f}s | Requests {requests_served} | Endpoint http://{host}:{port}"
|
||
if _allow_color():
|
||
logger.info(f"[{_style('debug')}]{_icon('shutdown')}[/] {summary}")
|
||
else:
|
||
logger.info("%s %s", _icon("shutdown"), summary)
|
||
|
||
|
||
def describe_database(url: str) -> str:
|
||
if url.startswith("sqlite"):
|
||
return "sqlite"
|
||
if "postgresql" in url:
|
||
return "postgresql"
|
||
if "mysql" in url:
|
||
return "mysql"
|
||
return url.split(":", 1)[0]
|
||
|
||
|
||
def sanitize_database_target(url: str) -> str:
|
||
if url.startswith("sqlite:///"):
|
||
return url.removeprefix("sqlite:///")
|
||
if "@" in url:
|
||
return url.split("@", 1)[1]
|
||
return url
|
||
|
||
|
||
def startup_status(settings: LoggingSettings) -> StartupStatus:
|
||
host = settings.host
|
||
local_host = "localhost" if host in {"0.0.0.0", "::"} else host
|
||
timestamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
|
||
return StartupStatus(
|
||
app_name=settings.app_name,
|
||
version=settings.version,
|
||
environment=settings.app_env,
|
||
host=settings.host,
|
||
port=settings.port,
|
||
database=f"{describe_database(settings.database_url)} ({sanitize_database_target(settings.database_url)})",
|
||
mode="verbose" if settings.log_verbose else "normal",
|
||
started_at=timestamp,
|
||
local_url=f"http://{local_host}:{settings.port}",
|
||
network_url=f"http://{host}:{settings.port}",
|
||
)
|
||
|
||
|
||
def route_summary(routes: Iterable[object]) -> tuple[int, list[str]]:
|
||
lines: list[str] = []
|
||
count = 0
|
||
for route in routes:
|
||
path = getattr(route, "path", None)
|
||
methods = getattr(route, "methods", None)
|
||
if not path or not methods:
|
||
continue
|
||
filtered_methods = sorted(method for method in methods if method not in {"HEAD", "OPTIONS"})
|
||
if not filtered_methods:
|
||
continue
|
||
count += 1
|
||
lines.append(f"{','.join(filtered_methods):<7} {path}")
|
||
return count, lines
|
||
|
||
|
||
def log_request(
|
||
*,
|
||
method: str,
|
||
path: str,
|
||
status_code: int,
|
||
duration_ms: float,
|
||
client: str,
|
||
content_length: str | None,
|
||
) -> None:
|
||
level = "info"
|
||
if status_code >= 500:
|
||
level = "error"
|
||
elif status_code >= 400:
|
||
level = "warning"
|
||
elif path == "/health":
|
||
level = "debug"
|
||
|
||
message = (
|
||
f"{method:<6} {status_code:>3} {duration_ms:>7.1f}ms "
|
||
f"{path:<36} client={client}"
|
||
)
|
||
if content_length:
|
||
message += f" bytes={content_length}"
|
||
status_message(level, message, logger_name="data_entry_app.http")
|
||
|
||
|
||
class RequestTimer:
|
||
def __init__(self) -> None:
|
||
self.started = time.perf_counter()
|
||
|
||
@property
|
||
def elapsed_ms(self) -> float:
|
||
return (time.perf_counter() - self.started) * 1000
|