Refactor user-profile, restricted access and reset token registration as separate apps so the frontend does not need to guess which context it is running in.

Support user-navigable URLs at / as well as /auth/, allowing for a dedicated authentication site with pretty URLs.
This commit is contained in:
Leo Vasanko
2025-10-02 15:42:01 -06:00
parent fbfd0bbb47
commit 5d8304bbd9
23 changed files with 668 additions and 295 deletions

View File

@@ -14,7 +14,7 @@ import uuid7
from . import authsession, globals
from .db import Org, Permission, Role, User
from .util import passphrase, tokens
from .util import hostutil, passphrase, tokens
def _init_logger() -> logging.Logger:
@@ -47,7 +47,8 @@ async def _create_and_log_admin_reset_link(user_uuid, message, session_type) ->
expires=authsession.expires(),
info={"type": session_type},
)
reset_link = f"{globals.passkey.instance.origin}/auth/{token}"
base = hostutil.auth_site_base_url()
reset_link = f"{base}{token}"
logger.info(ADMIN_RESET_MESSAGE, message, reset_link)
return reset_link

View File

@@ -94,6 +94,13 @@ def add_common_options(p: argparse.ArgumentParser) -> None:
)
p.add_argument("--rp-name", help="Relying Party name (default: same as rp-id)")
p.add_argument("--origin", help="Origin URL (default: https://<rp-id>)")
p.add_argument(
"--auth-host",
help=(
"Dedicated host (optionally with scheme/port) to serve the auth UI at the root,"
" e.g. auth.example.com or https://auth.example.com"
),
)
def main():
@@ -168,6 +175,16 @@ def main():
os.environ["PASSKEY_RP_NAME"] = args.rp_name
if origin:
os.environ["PASSKEY_ORIGIN"] = origin
if getattr(args, "auth_host", None):
os.environ["PASSKEY_AUTH_HOST"] = args.auth_host
else:
# Preserve pre-set env variable if CLI option omitted
args.auth_host = os.environ.get("PASSKEY_AUTH_HOST")
if getattr(args, "auth_host", None):
from passkey.util import hostutil as _hostutil # local import
_hostutil.reload_config()
# One-time initialization + bootstrap before starting any server processes.
# Lifespan in worker processes will call globals.init with bootstrap disabled.

View File

@@ -6,7 +6,6 @@ from fastapi.responses import FileResponse, JSONResponse
from ..authsession import expires
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import frontend, hostutil, passphrase, permutil, querysafe, tokens
from . import authz
@@ -358,10 +357,8 @@ async def admin_create_user_registration_link(
expires=expires(),
info={"type": "device addition", "created_by_admin": True},
)
origin = hostutil.effective_origin(
request.url.scheme, request.headers.get("host"), global_passkey.instance.rp_id
)
url = f"{origin}/auth/{token}"
base = hostutil.auth_site_base_url(request.url.scheme, request.headers.get("host"))
url = f"{base}{token}"
return {"url": url, "expires": expires().isoformat()}

View File

@@ -13,7 +13,7 @@ from fastapi import (
Request,
Response,
)
from fastapi.responses import FileResponse, JSONResponse
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer
from passkey.util import frontend
@@ -112,13 +112,20 @@ async def forward_authentication(perm: list[str] = Query([]), auth=Cookie(None))
}
return Response(status_code=204, headers=remote_headers)
except HTTPException as e:
return FileResponse(frontend.file("index.html"), status_code=e.status_code)
html = frontend.file("restricted", "index.html").read_bytes()
return Response(html, status_code=e.status_code, media_type="text/html")
@app.get("/settings")
async def get_settings():
pk = global_passkey.instance
return {"rp_id": pk.rp_id, "rp_name": pk.rp_name}
base_path = hostutil.ui_base_path()
return {
"rp_id": pk.rp_id,
"rp_name": pk.rp_name,
"ui_base_path": base_path,
"auth_host": hostutil.configured_auth_host(),
}
@app.post("/user-info")
@@ -267,10 +274,8 @@ async def api_create_link(request: Request, auth=Cookie(None)):
expires=expires(),
info=session.infodict(request, "device addition"),
)
origin = hostutil.effective_origin(
request.url.scheme, request.headers.get("host"), global_passkey.instance.rp_id
)
url = f"{origin}/auth/{token}"
base = hostutil.auth_site_base_url(request.url.scheme, request.headers.get("host"))
url = f"{base}{token}"
return {
"message": "Registration link generated successfully",
"url": url,

View File

@@ -2,11 +2,11 @@ import logging
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from fastapi import Cookie, FastAPI, HTTPException
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from passkey.util import frontend, passphrase
from passkey.util import frontend, hostutil, passphrase
from . import admin, api, ws
@@ -53,26 +53,37 @@ app.mount(
"/auth/assets/", StaticFiles(directory=frontend.file("assets")), name="assets"
)
# Navigable URLs are defined here. We support both / and /auth/ as the base path
# / is used on a dedicated auth site, /auth/ on app domains with auth
@app.get("/")
async def frontapp_redirect(request: Request):
"""Redirect root (in case accessed on backend) to the main authentication app."""
return RedirectResponse(request.url_for("frontapp"), status_code=303)
@app.get("/auth/")
async def frontapp():
"""Serve the main authentication app."""
return FileResponse(frontend.file("index.html"))
@app.get("/admin", include_in_schema=False)
@app.get("/auth/admin", include_in_schema=False)
async def admin_root_redirect():
return RedirectResponse(f"{hostutil.ui_base_path()}admin/", status_code=307)
@app.get("/admin/", include_in_schema=False)
async def admin_root(auth=Cookie(None)):
return await admin.adminapp(auth) # Delegate to handler of /auth/admin/
@app.get("/{reset}")
@app.get("/auth/{reset}")
async def reset_link(request: Request, reset: str):
"""Pretty URL for reset links."""
if reset == "admin":
# Admin app missing trailing slash lands here, be friendly to user
return RedirectResponse(request.url_for("adminapp"), status_code=303)
async def reset_link(reset: str):
"""Serve the SPA directly with an injected reset token."""
if not passphrase.is_well_formed(reset):
raise HTTPException(status_code=404)
url = request.url_for("frontapp").include_query_params(reset=reset)
return RedirectResponse(url, status_code=303)
return FileResponse(frontend.file("reset", "index.html"))
@app.get("/restricted", include_in_schema=False)
@app.get("/auth/restricted", include_in_schema=False)
async def restricted_view():
return FileResponse(frontend.file("restricted", "index.html"))

View File

@@ -17,7 +17,7 @@ from uuid import UUID
from passkey import authsession as _authsession
from passkey import globals as _g
from passkey.util import passphrase
from passkey.util import hostutil, passphrase
from passkey.util import tokens as _tokens
@@ -69,7 +69,8 @@ async def _create_reset(user, role_name: str):
expires=_authsession.expires(),
info={"type": "manual reset", "role": role_name},
)
return f"{_g.passkey.instance.origin}/auth/{token}", token
base = hostutil.auth_site_base_url()
return f"{base}{token}", token
async def _main(query: str | None) -> int:

View File

@@ -1,24 +1,67 @@
"""Utilities for host validation and origin determination."""
"""Utilities for determining the auth UI host and base URLs."""
import os
from functools import lru_cache
from urllib.parse import urlparse
from ..globals import passkey as global_passkey
_AUTH_HOST_ENV = "PASSKEY_AUTH_HOST"
def effective_origin(scheme: str, host: str | None, rp_id: str) -> str:
"""Determine the effective origin for a request.
Uses the provided host if it's compatible with the relying party ID,
otherwise falls back to the configured origin.
def _default_origin_scheme() -> str:
origin_url = urlparse(global_passkey.instance.origin)
return origin_url.scheme or "https"
Args:
scheme: The URL scheme (e.g. "https")
host: The host header value (e.g. "example.com" or "sub.example.com:8080")
rp_id: The relying party ID (e.g. "example.com")
Returns:
The effective origin URL to use
"""
@lru_cache(maxsize=1)
def _load_config() -> tuple[str | None, str] | None:
raw = os.getenv(_AUTH_HOST_ENV)
if not raw:
return None
candidate = raw.strip()
if not candidate:
return None
parsed = urlparse(candidate if "://" in candidate else f"//{candidate}")
netloc = parsed.netloc or parsed.path
if not netloc:
return None
return (parsed.scheme or None, netloc.strip("/"))
def configured_auth_host() -> str | None:
cfg = _load_config()
return cfg[1] if cfg else None
def is_root_mode() -> bool:
return _load_config() is not None
def ui_base_path() -> str:
return "/" if is_root_mode() else "/auth/"
def _format_base_url(scheme: str, netloc: str) -> str:
scheme_part = scheme or _default_origin_scheme()
base = f"{scheme_part}://{netloc}"
return base if base.endswith("/") else f"{base}/"
def auth_site_base_url(scheme: str | None = None, host: str | None = None) -> str:
cfg = _load_config()
if cfg:
cfg_scheme, cfg_host = cfg
scheme_to_use = cfg_scheme or scheme or _default_origin_scheme()
return _format_base_url(scheme_to_use, cfg_host)
if host:
hostname = host.split(":")[0] # Remove port if present
if hostname == rp_id or hostname.endswith(f".{rp_id}"):
return f"{scheme}://{host}"
return global_passkey.instance.origin
scheme_to_use = scheme or _default_origin_scheme()
return _format_base_url(scheme_to_use, host.strip("/"))
origin = global_passkey.instance.origin.rstrip("/")
return f"{origin}/auth/"
def reload_config() -> None:
_load_config.cache_clear()