From a60c1bd5f5c5c7b594affcbe41af7c6c03f88590 Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Sat, 4 Oct 2025 16:49:23 -0600 Subject: [PATCH] Refactor auth-host redirection middleware to its own module. Implement redirection to remove /auth/ from UI URLs when on auth-host. --- passkey/fastapi/auth_host.py | 89 ++++++++++++++++++++++++++++++++++++ passkey/fastapi/mainapp.py | 36 ++------------- 2 files changed, 92 insertions(+), 33 deletions(-) create mode 100644 passkey/fastapi/auth_host.py diff --git a/passkey/fastapi/auth_host.py b/passkey/fastapi/auth_host.py new file mode 100644 index 0000000..e6f1d7d --- /dev/null +++ b/passkey/fastapi/auth_host.py @@ -0,0 +1,89 @@ +"""Middleware for handling auth host redirects.""" + +from fastapi import Request, Response +from fastapi.responses import RedirectResponse + +from passkey.util import hostutil, passphrase + + +def is_ui_path(path: str) -> bool: + """Check if the path is a UI endpoint.""" + ui_paths = {"/", "/admin", "/admin/", "/auth/", "/auth/admin", "/auth/admin/"} + if path in ui_paths: + return True + # Treat reset token pages as UI (dynamic). Accept single-segment tokens. + if path.startswith("/auth/"): + token = path[6:] + if token and "/" not in token and passphrase.is_well_formed(token): + return True + else: + token = path[1:] + if token and "/" not in token and passphrase.is_well_formed(token): + return True + return False + + +def is_restricted_path(path: str) -> bool: + """Check if the path is restricted (API/admin endpoints).""" + return path.startswith( + ("/auth/api/admin", "/auth/api/user", "/auth/api/ws", "/auth/ws/") + ) + + +def should_redirect_to_auth_host(path: str) -> bool: + """Determine if the request should be redirected to the auth host.""" + return is_ui_path(path) or is_restricted_path(path) + + +def redirect_to_auth_host(request: Request, cfg: str, path: str) -> Response: + """Create a redirect response to the auth host.""" + if is_restricted_path(path): + return Response(status_code=404) + new_path = ( + path[5:] or "/" if is_ui_path(path) and path.startswith("/auth") else path + ) + return RedirectResponse(f"{request.url.scheme}://{cfg}{new_path}", 307) + + +def should_redirect_auth_path_to_root(path: str) -> bool: + """Check if /auth/ UI path should be redirected to root on auth host.""" + if not path.startswith("/auth/"): + return False + ui_paths = {"/auth/", "/auth/admin", "/auth/admin/"} + if path in ui_paths: + return True + # Check for reset token + token = path[6:] + return bool(token and "/" not in token and passphrase.is_well_formed(token)) + + +def redirect_to_root_on_auth_host(request: Request, cur: str, path: str) -> Response: + """Create a redirect response to root path on the same host.""" + new_path = path[5:] or "/" + return RedirectResponse(f"{request.url.scheme}://{cur}{new_path}", 307) + + +async def redirect_middleware(request: Request, call_next): + """Middleware to handle auth host redirects.""" + cfg = hostutil.configured_auth_host() + if not cfg: + return await call_next(request) + + cur = hostutil.normalize_host(request.headers.get("host")) + if not cur: + return await call_next(request) + + cfg_normalized = hostutil.normalize_host(cfg) + on_auth_host = cur == cfg_normalized + + path = request.url.path or "/" + + if not on_auth_host: + if not should_redirect_to_auth_host(path): + return await call_next(request) + return redirect_to_auth_host(request, cfg, path) + else: + # On auth host: force UI endpoints at root + if should_redirect_auth_path_to_root(path): + return redirect_to_root_on_auth_host(request, cur, path) + return await call_next(request) diff --git a/passkey/fastapi/mainapp.py b/passkey/fastapi/mainapp.py index 7ed9f22..3878f8f 100644 --- a/passkey/fastapi/mainapp.py +++ b/passkey/fastapi/mainapp.py @@ -8,7 +8,7 @@ from fastapi.staticfiles import StaticFiles from passkey.util import frontend, hostutil, passphrase -from . import admin, api, ws +from . import admin, api, auth_host, ws @asynccontextmanager @@ -47,38 +47,8 @@ async def lifespan(app: FastAPI): # pragma: no cover - startup path app = FastAPI(lifespan=lifespan) - -@app.middleware("http") -async def auth_host_redirect(request, call_next): # pragma: no cover - cfg = hostutil.configured_auth_host() - if not cfg: - return await call_next(request) - cur = hostutil.normalize_host(request.headers.get("host")) - if not cur or cur == hostutil.normalize_host(cfg): - return await call_next(request) - p = request.url.path or "/" - ui = {"/", "/admin", "/admin/", "/auth/", "/auth/admin", "/auth/admin/"} - restricted = p.startswith( - ("/auth/api/admin", "/auth/api/user", "/auth/api/ws", "/auth/ws/") - ) - ui_match = p in ui - if not ui_match: - # Treat reset token pages as UI (dynamic). Accept single-segment tokens. - if p.startswith("/auth/"): - t = p[6:] - if t and "/" not in t and passphrase.is_well_formed(t): - ui_match = True - else: - t = p[1:] - if t and "/" not in t and passphrase.is_well_formed(t): - ui_match = True - if not (ui_match or restricted): - return await call_next(request) - if restricted: - return Response(status_code=404) - newp = p[5:] or "/" if ui_match and p.startswith("/auth") else p - return RedirectResponse(f"{request.url.scheme}://{cfg}{newp}", 307) - +# Apply redirections to auth-host if configured (deny access to restricted endpoints, remove /auth/) +app.middleware("http")(auth_host.redirect_middleware) app.mount("/auth/admin/", admin.app) app.mount("/auth/api/", api.app)