Major changes to server startup. Admin page tuning.

This commit is contained in:
Leo Vasanko
2025-08-29 20:41:38 -06:00
parent 6e80011eed
commit 7380f09458
12 changed files with 1077 additions and 143 deletions

View File

@@ -1,52 +1,249 @@
import argparse
import asyncio
import atexit
import contextlib
import ipaddress
import logging
import os
import signal
import subprocess
from pathlib import Path
from urllib.parse import urlparse
import uvicorn
DEFAULT_HOST = "localhost"
DEFAULT_SERVE_PORT = 4401
DEFAULT_DEV_PORT = 4402
def parse_endpoint(
value: str | None, default_port: int
) -> tuple[str | None, int | None, str | None, bool]:
"""Parse an endpoint using stdlib (urllib.parse, ipaddress).
Returns (host, port, uds_path). If uds_path is not None, host/port are None.
Supported forms:
- host[:port]
- :port (uses default host)
- [ipv6][:port] (bracketed for port usage)
- ipv6 (unbracketed, no port allowed -> default port)
- unix:/path/to/socket.sock
- None -> defaults (localhost:4401)
Notes:
- For IPv6 with an explicit port you MUST use brackets (e.g. [::1]:8080)
- Unbracketed IPv6 like ::1 implies the default port.
"""
if not value:
return DEFAULT_HOST, default_port, None, False
# Port only (numeric) -> localhost:port
if value.isdigit():
try:
port_only = int(value)
except ValueError: # pragma: no cover (isdigit guards)
raise SystemExit(f"Invalid port '{value}'")
return DEFAULT_HOST, port_only, None, False
# Leading colon :port -> bind all interfaces (0.0.0.0 + ::)
if value.startswith(":") and value != ":":
port_part = value[1:]
if not port_part.isdigit():
raise SystemExit(f"Invalid port in '{value}'")
return None, int(port_part), None, True
# UNIX domain socket
if value.startswith("unix:"):
uds_path = value[5:] or None
if uds_path is None:
raise SystemExit("unix: path must not be empty")
return None, None, uds_path, False
# Unbracketed IPv6 (cannot safely contain a port) -> detect by multiple colons
if value.count(":") > 1 and not value.startswith("["):
try:
ipaddress.IPv6Address(value)
except ValueError as e: # pragma: no cover
raise SystemExit(f"Invalid IPv6 address '{value}': {e}")
return value, default_port, None, False
# Use urllib.parse for everything else (host[:port], :port, [ipv6][:port])
parsed = urlparse(f"//{value}") # // prefix lets urlparse treat it as netloc
host = parsed.hostname
port = parsed.port
# Host may be None if empty (e.g. ':5500')
if not host:
host = DEFAULT_HOST
if port is None:
port = default_port
# Validate IP literals (optional; hostname passes through)
try:
# Strip brackets if somehow present (urlparse removes them already)
ipaddress.ip_address(host)
except ValueError:
# Not an IP address -> treat as hostname; no action
pass
return host, port, None, False
def add_common_options(p: argparse.ArgumentParser) -> None:
p.add_argument(
"--rp-id", default="localhost", help="Relying Party ID (default: localhost)"
)
p.add_argument("--rp-name", help="Relying Party name (default: same as rp-id)")
p.add_argument("--origin", help="Origin URL (default: https://<rp-id>)")
def main():
# Configure logging to remove the "ERROR:root:" prefix
logging.basicConfig(level=logging.INFO, format="%(message)s", force=True)
parser = argparse.ArgumentParser(
description="Run the passkey authentication server"
prog="passkey-auth", description="Passkey authentication server"
)
parser.add_argument(
"--host", default="localhost", help="Host to bind to (default: localhost)"
sub = parser.add_subparsers(dest="command", required=True)
# serve subcommand
serve = sub.add_parser(
"serve", help="Run the server (production style, no auto-reload)"
)
parser.add_argument(
"--port", type=int, default=4401, help="Port to bind to (default: 4401)"
serve.add_argument(
"hostport",
nargs="?",
help=(
"Endpoint (default: localhost:4401). Forms: host[:port] | :port | "
"[ipv6][:port] | ipv6 | unix:/path.sock"
),
)
parser.add_argument(
"--dev", action="store_true", help="Enable development mode with auto-reload"
add_common_options(serve)
# dev subcommand
dev = sub.add_parser("dev", help="Run the server in development (auto-reload)")
dev.add_argument(
"hostport",
nargs="?",
help=(
"Endpoint (default: localhost:4402). Forms: host[:port] | :port | "
"[ipv6][:port] | ipv6 | unix:/path.sock"
),
)
parser.add_argument(
"--rp-id", default="localhost", help="Relying Party ID (default: localhost)"
)
parser.add_argument("--rp-name", help="Relying Party name (default: same as rp-id)")
parser.add_argument("--origin", help="Origin URL (default: https://<rp-id>)")
add_common_options(dev)
args = parser.parse_args()
# Initialize the application
try:
from .. import globals
default_port = DEFAULT_DEV_PORT if args.command == "dev" else DEFAULT_SERVE_PORT
host, port, uds, all_ifaces = parse_endpoint(args.hostport, default_port)
reload_enabled = args.command == "dev"
asyncio.run(
globals.init(rp_id=args.rp_id, rp_name=args.rp_name, origin=args.origin)
# Determine origin (dev mode default override)
effective_origin = args.origin
if reload_enabled and not effective_origin:
# Use a distinct port (4403) for RP origin in dev if not explicitly provided
effective_origin = "http://localhost:4403"
# Export configuration via environment for lifespan initialization in each process
os.environ.setdefault("PASSKEY_RP_ID", args.rp_id)
if args.rp_name:
os.environ["PASSKEY_RP_NAME"] = args.rp_name
if effective_origin:
os.environ["PASSKEY_ORIGIN"] = effective_origin
# One-time initialization + bootstrap before starting any server processes.
# Lifespan in worker processes will call globals.init with bootstrap disabled.
from passkey import globals as _globals # local import
asyncio.run(
_globals.init(
rp_id=args.rp_id,
rp_name=args.rp_name,
origin=effective_origin,
default_admin=os.getenv("PASSKEY_DEFAULT_ADMIN") or None,
default_org=os.getenv("PASSKEY_DEFAULT_ORG") or None,
bootstrap=True,
)
except ValueError as e:
logging.error(f"⚠️ {e}")
return
uvicorn.run(
"passkey.fastapi:app",
host=args.host,
port=args.port,
reload=args.dev,
log_level="info",
)
run_kwargs: dict = {
"reload": reload_enabled,
"log_level": "info",
}
if uds:
run_kwargs["uds"] = uds
else:
# For :port form (all interfaces) we will handle separately
if not all_ifaces:
run_kwargs["host"] = host
run_kwargs["port"] = port
bun_process: subprocess.Popen | None = None
if reload_enabled:
# Spawn frontend dev server (bun) only in the original parent (avoid duplicates on reload)
if os.environ.get("PASSKEY_BUN_PARENT") != "1":
os.environ["PASSKEY_BUN_PARENT"] = "1"
frontend_dir = Path(__file__).parent.parent.parent / "frontend"
if (frontend_dir / "package.json").exists():
try:
bun_process = subprocess.Popen(
["bun", "run", "dev"], cwd=str(frontend_dir)
)
logging.info("Started bun dev server")
except FileNotFoundError:
logging.warning(
"bun not found: skipping frontend dev server (install bun)"
)
def _terminate_bun(): # pragma: no cover
if bun_process and bun_process.poll() is None:
with contextlib.suppress(Exception):
bun_process.terminate()
atexit.register(_terminate_bun)
def _signal_handler(signum, frame): # pragma: no cover
_terminate_bun()
raise SystemExit(0)
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
if all_ifaces and not uds:
# If reload enabled, fallback to single dual-stack attempt (::) to keep reload simple
if reload_enabled:
run_kwargs["host"] = "::"
run_kwargs["port"] = port
uvicorn.run("passkey.fastapi:app", **run_kwargs)
else:
# Start two servers concurrently: IPv4 and IPv6
from uvicorn import Config, Server # noqa: E402 local import
from passkey.fastapi import app as fastapi_app # noqa: E402 local import
async def serve_both():
servers = []
assert port is not None
for h in ("0.0.0.0", "::"):
try:
cfg = Config(
app=fastapi_app,
host=h,
port=port,
log_level="info",
)
servers.append(Server(cfg))
except Exception as e: # pragma: no cover
logging.warning(f"Failed to configure server for {h}: {e}")
tasks = [asyncio.create_task(s.serve()) for s in servers]
await asyncio.gather(*tasks)
asyncio.run(serve_both())
else:
uvicorn.run("passkey.fastapi:app", **run_kwargs)
if __name__ == "__main__":
main()

View File

@@ -8,9 +8,9 @@ This module contains all the HTTP API endpoints for:
- Login/logout functionality
"""
from uuid import UUID
from uuid import UUID, uuid4
from fastapi import Cookie, Depends, FastAPI, Response
from fastapi import Body, Cookie, Depends, FastAPI, HTTPException, Response
from fastapi.security import HTTPBearer
from passkey.util import passphrase
@@ -27,6 +27,19 @@ bearer_auth = HTTPBearer(auto_error=True)
def register_api_routes(app: FastAPI):
"""Register all API routes on the FastAPI app."""
async def _get_ctx_and_admin_flags(auth_cookie: str):
"""Helper to get session context and admin flags from cookie."""
if not auth_cookie:
raise ValueError("Not authenticated")
ctx = await db.instance.get_session_context(session_key(auth_cookie))
if not ctx:
raise ValueError("Not authenticated")
role_perm_ids = set(ctx.role.permissions or [])
org_uuid_str = str(ctx.org.uuid)
is_global_admin = "auth/admin" in role_perm_ids
is_org_admin = f"auth/org:{org_uuid_str}" in role_perm_ids
return ctx, is_global_admin, is_org_admin
@app.post("/auth/validate")
async def validate_token(response: Response, auth=Cookie(None)):
"""Lightweight token validation endpoint."""
@@ -38,29 +51,50 @@ def register_api_routes(app: FastAPI):
@app.post("/auth/user-info")
async def api_user_info(response: Response, auth=Cookie(None)):
"""Get full user information for the authenticated user."""
reset = passphrase.is_well_formed(auth)
s = await (get_reset if reset else get_session)(auth)
# Session context (org, role, permissions)
"""Get user information.
- For authenticated sessions: return full context (org/role/permissions/credentials)
- For reset tokens: return only basic user information to drive reset flow
"""
try:
reset = auth and passphrase.is_well_formed(auth)
s = await (get_reset if reset else get_session)(auth)
except ValueError:
raise HTTPException(
status_code=401,
detail="Authentication Required",
headers={"WWW-Authenticate": "Bearer"},
)
# Minimal response for reset tokens
if reset:
u = await db.instance.get_user_by_uuid(s.user_uuid)
return {
"authenticated": False,
"session_type": s.info.get("type"),
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
"created_at": u.created_at.isoformat() if u.created_at else None,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
"visits": u.visits,
},
}
# Full context for authenticated sessions
ctx = await db.instance.get_session_context(session_key(auth))
# Fallback if context not available (e.g., reset session)
u = await db.instance.get_user_by_uuid(s.user_uuid)
# Get all credentials for the user
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
credentials = []
user_aaguids = set()
credentials: list[dict] = []
user_aaguids: set[str] = set()
for cred_id in credential_ids:
c = await db.instance.get_credential_by_id(cred_id)
# Convert AAGUID to string format
try:
c = await db.instance.get_credential_by_id(cred_id)
except ValueError:
continue # Skip dangling IDs
aaguid_str = str(c.aaguid)
user_aaguids.add(aaguid_str)
# Check if this is the current session credential
is_current_session = s.credential_uuid == c.uuid
credentials.append(
{
"credential_uuid": str(c.uuid),
@@ -71,37 +105,31 @@ def register_api_routes(app: FastAPI):
if c.last_verified
else None,
"sign_count": c.sign_count,
"is_current_session": is_current_session,
"is_current_session": s.credential_uuid == c.uuid,
}
)
# Get AAGUID information for only the AAGUIDs that the user has
credentials.sort(key=lambda cred: cred["created_at"]) # chronological
aaguid_info = aaguid.filter(user_aaguids)
# Sort credentials by creation date (earliest first, most recently created last)
credentials.sort(key=lambda cred: cred["created_at"])
# Permissions and roles
role_info = None
org_info = None
effective_permissions = []
effective_permissions: list[str] = []
is_global_admin = False
is_org_admin = False
if ctx:
role_info = {
"uuid": str(ctx.role.uuid),
"display_name": ctx.role.display_name,
"permissions": ctx.role.permissions, # IDs
"permissions": ctx.role.permissions,
}
org_info = {
"uuid": str(ctx.org.uuid),
"display_name": ctx.org.display_name,
"permissions": ctx.org.permissions, # IDs the org can grant
"permissions": ctx.org.permissions,
}
# Effective permissions are role permissions; API also returns full objects for convenience
effective_permissions = [p.id for p in (ctx.permissions or [])]
is_global_admin = "auth/admin" in role_info["permissions"]
# org admin permission is auth/org:<org_uuid>
is_org_admin = (
f"auth/org:{org_info['uuid']}" in role_info["permissions"]
if org_info
@@ -109,8 +137,8 @@ def register_api_routes(app: FastAPI):
)
return {
"authenticated": not reset,
"session_type": s.info["type"],
"authenticated": True,
"session_type": s.info.get("type"),
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
@@ -127,6 +155,232 @@ def register_api_routes(app: FastAPI):
"aaguid_info": aaguid_info,
}
# -------------------- Admin API: Organizations --------------------
@app.get("/auth/admin/orgs")
async def admin_list_orgs(auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or is_org_admin):
raise ValueError("Insufficient permissions")
orgs = await db.instance.list_organizations()
# If only org admin, filter to their org
if not is_global_admin:
orgs = [o for o in orgs if o.uuid == ctx.org.uuid]
def role_to_dict(r):
return {
"uuid": str(r.uuid),
"org_uuid": str(r.org_uuid),
"display_name": r.display_name,
"permissions": r.permissions,
}
async def org_to_dict(o):
# Fetch users for each org
users = await db.instance.get_organization_users(str(o.uuid))
return {
"uuid": str(o.uuid),
"display_name": o.display_name,
"permissions": o.permissions,
"roles": [role_to_dict(r) for r in o.roles],
"users": [
{
"uuid": str(u.uuid),
"display_name": u.display_name,
"role": role_name,
"visits": u.visits,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
}
for (u, role_name) in users
],
}
return [await org_to_dict(o) for o in orgs]
@app.post("/auth/admin/orgs")
async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Org as OrgDC # local import to avoid cycles in typing
org_uuid = uuid4()
display_name = payload.get("display_name") or "New Organization"
permissions = payload.get("permissions") or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.create_organization(org)
return {"uuid": str(org_uuid)}
@app.put("/auth/admin/orgs/{org_uuid}")
async def admin_update_org(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
from ..db import Org as OrgDC
current = await db.instance.get_organization(str(org_uuid))
display_name = payload.get("display_name") or current.display_name
permissions = (
payload.get("permissions")
if "permissions" in payload
else current.permissions
) or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.update_organization(org)
return {"status": "ok"}
@app.delete("/auth/admin/orgs/{org_uuid}")
async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
await db.instance.delete_organization(org_uuid)
return {"status": "ok"}
# Manage an org's grantable permissions
@app.post("/auth/admin/orgs/{org_uuid}/permissions/{permission_id}")
async def admin_add_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
await db.instance.add_permission_to_organization(str(org_uuid), permission_id)
return {"status": "ok"}
@app.delete("/auth/admin/orgs/{org_uuid}/permissions/{permission_id}")
async def admin_remove_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
await db.instance.remove_permission_from_organization(
str(org_uuid), permission_id
)
return {"status": "ok"}
# -------------------- Admin API: Roles --------------------
@app.post("/auth/admin/orgs/{org_uuid}/roles")
async def admin_create_role(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC
role_uuid = uuid4()
display_name = payload.get("display_name") or "New Role"
permissions = payload.get("permissions") or []
# Validate that permissions exist and are allowed by org
org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid) # raises if not found
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
role = RoleDC(
uuid=role_uuid,
org_uuid=org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.create_role(role)
return {"uuid": str(role_uuid)}
@app.put("/auth/admin/roles/{role_uuid}")
async def admin_update_role(
role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
role = await db.instance.get_role(role_uuid)
# Only org admins for that org or global admin can update
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)):
raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC
display_name = payload.get("display_name") or role.display_name
permissions = payload.get("permissions") or role.permissions
# Validate against org grantable permissions
org = await db.instance.get_organization(str(role.org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid) # raises if not found
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
updated = RoleDC(
uuid=role_uuid,
org_uuid=role.org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.update_role(updated)
return {"status": "ok"}
@app.delete("/auth/admin/roles/{role_uuid}")
async def admin_delete_role(role_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
role = await db.instance.get_role(role_uuid)
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)):
raise ValueError("Insufficient permissions")
await db.instance.delete_role(role_uuid)
return {"status": "ok"}
# -------------------- Admin API: Permissions (global) --------------------
@app.get("/auth/admin/permissions")
async def admin_list_permissions(auth=Cookie(None)):
_, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or is_org_admin):
raise ValueError("Insufficient permissions")
perms = await db.instance.list_permissions()
return [{"id": p.id, "display_name": p.display_name} for p in perms]
@app.post("/auth/admin/permissions")
async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Permission as PermDC
perm_id = payload.get("id")
display_name = payload.get("display_name")
if not perm_id or not display_name:
raise ValueError("id and display_name are required")
await db.instance.create_permission(
PermDC(id=perm_id, display_name=display_name)
)
return {"status": "ok"}
@app.put("/auth/admin/permissions/{permission_id}")
async def admin_update_permission(
permission_id: str, payload: dict = Body(...), auth=Cookie(None)
):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Permission as PermDC
display_name = payload.get("display_name")
if not display_name:
raise ValueError("display_name is required")
await db.instance.update_permission(
PermDC(id=permission_id, display_name=display_name)
)
return {"status": "ok"}
@app.delete("/auth/admin/permissions/{permission_id}")
async def admin_delete_permission(permission_id: str, auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
await db.instance.delete_permission(permission_id)
return {"status": "ok"}
@app.post("/auth/logout")
async def api_logout(response: Response, auth=Cookie(None)):
"""Log out the current user by clearing the session cookie and deleting from database."""

View File

@@ -1,5 +1,7 @@
import contextlib
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import Cookie, FastAPI, Request, Response
@@ -14,7 +16,41 @@ from .reset import register_reset_routes
STATIC_DIR = Path(__file__).parent.parent / "frontend-build"
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI): # pragma: no cover - startup path
"""Application lifespan to ensure globals (DB, passkey) are initialized in each process.
We populate configuration from environment variables (set by the CLI entrypoint)
so that uvicorn reload / multiprocess workers inherit the settings.
"""
from .. import globals
rp_id = os.getenv("PASSKEY_RP_ID", "localhost")
rp_name = os.getenv("PASSKEY_RP_NAME") or None
origin = os.getenv("PASSKEY_ORIGIN") or None
default_admin = (
os.getenv("PASSKEY_DEFAULT_ADMIN") or None
) # still passed for context
default_org = os.getenv("PASSKEY_DEFAULT_ORG") or None
try:
# CLI (__main__) performs bootstrap once; here we skip to avoid duplicate work
await globals.init(
rp_id=rp_id,
rp_name=rp_name,
origin=origin,
default_admin=default_admin,
default_org=default_org,
bootstrap=False,
)
except ValueError as e:
logging.error(f"⚠️ {e}")
# Re-raise to fail fast
raise
yield
# (Optional) add shutdown cleanup here later
app = FastAPI(lifespan=lifespan)
# Global exception handlers
@@ -71,16 +107,24 @@ async def redirect_to_index():
@app.get("/auth/admin")
async def serve_admin():
"""Serve the admin app entry point."""
# Vite MPA builds admin as admin.html in the same outDir
admin_html = STATIC_DIR / "admin.html"
# If configured to emit admin/index.html, support that too
if not admin_html.exists():
alt = STATIC_DIR / "admin" / "index.html"
if alt.exists():
return FileResponse(alt)
return FileResponse(admin_html)
async def serve_admin(auth=Cookie(None)):
"""Serve the admin app entry point if an authenticated session exists.
If no valid authenticated session cookie is present, return a 401 with the
main app's index.html so the frontend can initiate login/registration flow.
"""
if auth:
with contextlib.suppress(ValueError):
s = await get_session(auth)
if s.info and s.info.get("type") == "authenticated":
return FileResponse(STATIC_DIR / "admin" / "index.html")
# Not authenticated: serve main index with 401
return FileResponse(
STATIC_DIR / "index.html",
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
# Register API routes

View File

@@ -5,6 +5,7 @@ from fastapi.responses import RedirectResponse
from ..authsession import expires, get_session
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import passphrase, tokens
from . import session
@@ -43,10 +44,9 @@ def register_reset_routes(app):
reset_token: str,
):
"""Verifies the token and redirects to auth app for credential registration."""
# This route should only match to exact passphrases
print(f"Reset handler called with url: {request.url.path}")
if not passphrase.is_well_formed(reset_token):
raise HTTPException(status_code=404)
origin = global_passkey.instance.origin
try:
# Get session token to validate it exists and get user_id
key = tokens.reset_key(reset_token)
@@ -54,7 +54,7 @@ def register_reset_routes(app):
if not sess:
raise ValueError("Invalid or expired registration token")
response = RedirectResponse(url="/auth/", status_code=303)
response = RedirectResponse(url=f"{origin}/auth/", status_code=303)
session.set_session_cookie(response, reset_token)
return response
@@ -65,4 +65,4 @@ def register_reset_routes(app):
else:
logging.exception("Internal Server Error in reset_authentication")
msg = "Internal Server Error"
return RedirectResponse(url=f"/auth/#{msg}", status_code=303)
return RedirectResponse(url=f"{origin}/auth/#{msg}", status_code=303)