Refactor auth-host redirection middleware to its own module.
Implement redirection to remove /auth/ from UI URLs when on auth-host.
This commit is contained in:
89
passkey/fastapi/auth_host.py
Normal file
89
passkey/fastapi/auth_host.py
Normal file
@@ -0,0 +1,89 @@
|
||||
"""Middleware for handling auth host redirects."""
|
||||
|
||||
from fastapi import Request, Response
|
||||
from fastapi.responses import RedirectResponse
|
||||
|
||||
from passkey.util import hostutil, passphrase
|
||||
|
||||
|
||||
def is_ui_path(path: str) -> bool:
|
||||
"""Check if the path is a UI endpoint."""
|
||||
ui_paths = {"/", "/admin", "/admin/", "/auth/", "/auth/admin", "/auth/admin/"}
|
||||
if path in ui_paths:
|
||||
return True
|
||||
# Treat reset token pages as UI (dynamic). Accept single-segment tokens.
|
||||
if path.startswith("/auth/"):
|
||||
token = path[6:]
|
||||
if token and "/" not in token and passphrase.is_well_formed(token):
|
||||
return True
|
||||
else:
|
||||
token = path[1:]
|
||||
if token and "/" not in token and passphrase.is_well_formed(token):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_restricted_path(path: str) -> bool:
|
||||
"""Check if the path is restricted (API/admin endpoints)."""
|
||||
return path.startswith(
|
||||
("/auth/api/admin", "/auth/api/user", "/auth/api/ws", "/auth/ws/")
|
||||
)
|
||||
|
||||
|
||||
def should_redirect_to_auth_host(path: str) -> bool:
|
||||
"""Determine if the request should be redirected to the auth host."""
|
||||
return is_ui_path(path) or is_restricted_path(path)
|
||||
|
||||
|
||||
def redirect_to_auth_host(request: Request, cfg: str, path: str) -> Response:
|
||||
"""Create a redirect response to the auth host."""
|
||||
if is_restricted_path(path):
|
||||
return Response(status_code=404)
|
||||
new_path = (
|
||||
path[5:] or "/" if is_ui_path(path) and path.startswith("/auth") else path
|
||||
)
|
||||
return RedirectResponse(f"{request.url.scheme}://{cfg}{new_path}", 307)
|
||||
|
||||
|
||||
def should_redirect_auth_path_to_root(path: str) -> bool:
|
||||
"""Check if /auth/ UI path should be redirected to root on auth host."""
|
||||
if not path.startswith("/auth/"):
|
||||
return False
|
||||
ui_paths = {"/auth/", "/auth/admin", "/auth/admin/"}
|
||||
if path in ui_paths:
|
||||
return True
|
||||
# Check for reset token
|
||||
token = path[6:]
|
||||
return bool(token and "/" not in token and passphrase.is_well_formed(token))
|
||||
|
||||
|
||||
def redirect_to_root_on_auth_host(request: Request, cur: str, path: str) -> Response:
|
||||
"""Create a redirect response to root path on the same host."""
|
||||
new_path = path[5:] or "/"
|
||||
return RedirectResponse(f"{request.url.scheme}://{cur}{new_path}", 307)
|
||||
|
||||
|
||||
async def redirect_middleware(request: Request, call_next):
|
||||
"""Middleware to handle auth host redirects."""
|
||||
cfg = hostutil.configured_auth_host()
|
||||
if not cfg:
|
||||
return await call_next(request)
|
||||
|
||||
cur = hostutil.normalize_host(request.headers.get("host"))
|
||||
if not cur:
|
||||
return await call_next(request)
|
||||
|
||||
cfg_normalized = hostutil.normalize_host(cfg)
|
||||
on_auth_host = cur == cfg_normalized
|
||||
|
||||
path = request.url.path or "/"
|
||||
|
||||
if not on_auth_host:
|
||||
if not should_redirect_to_auth_host(path):
|
||||
return await call_next(request)
|
||||
return redirect_to_auth_host(request, cfg, path)
|
||||
else:
|
||||
# On auth host: force UI endpoints at root
|
||||
if should_redirect_auth_path_to_root(path):
|
||||
return redirect_to_root_on_auth_host(request, cur, path)
|
||||
return await call_next(request)
|
||||
@@ -8,7 +8,7 @@ from fastapi.staticfiles import StaticFiles
|
||||
|
||||
from passkey.util import frontend, hostutil, passphrase
|
||||
|
||||
from . import admin, api, ws
|
||||
from . import admin, api, auth_host, ws
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@@ -47,38 +47,8 @@ async def lifespan(app: FastAPI): # pragma: no cover - startup path
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
|
||||
@app.middleware("http")
|
||||
async def auth_host_redirect(request, call_next): # pragma: no cover
|
||||
cfg = hostutil.configured_auth_host()
|
||||
if not cfg:
|
||||
return await call_next(request)
|
||||
cur = hostutil.normalize_host(request.headers.get("host"))
|
||||
if not cur or cur == hostutil.normalize_host(cfg):
|
||||
return await call_next(request)
|
||||
p = request.url.path or "/"
|
||||
ui = {"/", "/admin", "/admin/", "/auth/", "/auth/admin", "/auth/admin/"}
|
||||
restricted = p.startswith(
|
||||
("/auth/api/admin", "/auth/api/user", "/auth/api/ws", "/auth/ws/")
|
||||
)
|
||||
ui_match = p in ui
|
||||
if not ui_match:
|
||||
# Treat reset token pages as UI (dynamic). Accept single-segment tokens.
|
||||
if p.startswith("/auth/"):
|
||||
t = p[6:]
|
||||
if t and "/" not in t and passphrase.is_well_formed(t):
|
||||
ui_match = True
|
||||
else:
|
||||
t = p[1:]
|
||||
if t and "/" not in t and passphrase.is_well_formed(t):
|
||||
ui_match = True
|
||||
if not (ui_match or restricted):
|
||||
return await call_next(request)
|
||||
if restricted:
|
||||
return Response(status_code=404)
|
||||
newp = p[5:] or "/" if ui_match and p.startswith("/auth") else p
|
||||
return RedirectResponse(f"{request.url.scheme}://{cfg}{newp}", 307)
|
||||
|
||||
# Apply redirections to auth-host if configured (deny access to restricted endpoints, remove /auth/)
|
||||
app.middleware("http")(auth_host.redirect_middleware)
|
||||
|
||||
app.mount("/auth/admin/", admin.app)
|
||||
app.mount("/auth/api/", api.app)
|
||||
|
||||
Reference in New Issue
Block a user