Compare commits

..

14 Commits

Author SHA1 Message Date
Leo Vasanko
b0a1bb72dc Cleaner logout. 2025-09-02 19:11:25 -06:00
Leo Vasanko
b324276173 Cleaned up login/logout flows. 2025-09-02 19:08:16 -06:00
Leo Vasanko
10e55f63b5 Fix url_for query arg on reset link redirect. 2025-09-02 18:32:56 -06:00
Leo Vasanko
074daebd14 Fix matching bug 2025-09-02 18:22:21 -06:00
Leo Vasanko
c9f9b28bf4 Major refactoring of admin API (permissions, paths) 2025-09-02 18:08:06 -06:00
Leo Vasanko
bfc777fb56 Refactoring permissions checks. 2025-09-02 17:28:26 -06:00
Leo Vasanko
3cd6a59b26 Utility module for accessing frontend in backend code. 2025-09-02 16:06:10 -06:00
Leo Vasanko
dd20e7e7f8 Move forward auth under /admin/api/forward 2025-09-02 15:03:39 -06:00
Leo Vasanko
cbf6223d4b New lint option path in pyproject 2025-09-02 15:03:02 -06:00
Leo Vasanko
9feac6e9a8 Moved exception handlers to sub apps. 2025-09-02 14:57:06 -06:00
Leo Vasanko
8c07945661 Rename variable to silence linter 2025-09-02 14:45:23 -06:00
Leo Vasanko
312d23b79a Refactor API under /auth/api 2025-09-02 14:32:19 -06:00
Leo Vasanko
859cc9ed41 Restructure admin app separate of user api. 2025-09-02 14:04:52 -06:00
Leo Vasanko
cead912ddc Remove icon, prefer automatic use of /favicon.ico of the host site. 2025-09-02 10:17:40 -06:00
22 changed files with 811 additions and 843 deletions

2
API.md
View File

@ -8,7 +8,7 @@ This document describes all API endpoints available in the PassKey Auth FastAPI
### HTTP Endpoints
GET /auth/ - Main authentication app
GET /auth/forward-auth - Authentication validation for Caddy/Nginx
GET /auth/api/forward - Authentication validation for Caddy/Nginx (was /auth/forward-auth)
POST /auth/validate - Token validation endpoint
POST /auth/user-info - Get authenticated user information
POST /auth/logout - Logout current user

View File

@ -1,7 +1,7 @@
(auth) {
# Permission check (named arg: perm=...)
forward_auth localhost:4401 {
uri /auth/forward-auth?{args.0}
uri /auth/api/forward?{args.0}
copy_headers x-auth-*
}
}

View File

@ -2,7 +2,6 @@
<html lang="">
<head>
<meta charset="UTF-8" />
<link rel="icon" href="/src/assets/icon.webp" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Admin</title>
</head>

View File

@ -2,7 +2,6 @@
<html lang="">
<head>
<meta charset="UTF-8">
<link rel="icon" href="/src/assets/icon.webp">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Authentication</title>
</head>

View File

@ -22,8 +22,12 @@ import PermissionDeniedView from '@/components/PermissionDeniedView.vue'
const store = useAuthStore()
onMounted(async () => {
// Detect restricted mode: any path not starting with /auth/
if (!location.pathname.startsWith('/auth/')) {
// Detect restricted mode:
// We only allow full functionality on the exact /auth/ (or /auth) path.
// Any other path (including /, /foo, /auth/admin, etc.) is treated as restricted
// so the app will only show login or permission denied views.
const path = location.pathname
if (!(path === '/auth/' || path === '/auth')) {
store.setRestrictedMode(true)
}
// Load branding / settings first (non-blocking for auth flow)

View File

@ -147,7 +147,7 @@ async function loadOrgs() {
if (data.detail) throw new Error(data.detail)
// Restructure to attach users to roles instead of flat user list at org level
orgs.value = data.map(o => {
const roles = o.roles.map(r => ({ ...r, users: [] }))
const roles = o.roles.map(r => ({ ...r, org_uuid: o.uuid, users: [] }))
const roleMap = Object.fromEntries(roles.map(r => [r.display_name, r]))
for (const u of o.users || []) {
if (roleMap[u.role]) roleMap[u.role].users.push(u)
@ -167,7 +167,7 @@ async function load() {
loading.value = true
error.value = null
try {
const res = await fetch('/auth/user-info', { method: 'POST' })
const res = await fetch('/auth/api/user-info', { method: 'POST' })
const data = await res.json()
if (data.detail) throw new Error(data.detail)
info.value = data
@ -250,7 +250,7 @@ function updateRole(role) { openDialog('role-update', { role }) }
function deleteRole(role) {
openDialog('confirm', { message: `Delete role ${role.display_name}?`, action: async () => {
const res = await fetch(`/auth/admin/roles/${role.uuid}`, { method: 'DELETE' })
const res = await fetch(`/auth/admin/orgs/${role.org_uuid}/roles/${role.uuid}`, { method: 'DELETE' })
const data = await res.json(); if (data.detail) throw new Error(data.detail)
await loadOrgs()
} })
@ -321,7 +321,7 @@ const pageHeading = computed(() => {
watch(selectedUser, async (u) => {
if (!u) { userDetail.value = null; return }
try {
const res = await fetch(`/auth/admin/users/${u.uuid}`)
const res = await fetch(`/auth/admin/orgs/${u.org_uuid}/users/${u.uuid}`)
const data = await res.json()
if (data.detail) throw new Error(data.detail)
userDetail.value = data
@ -359,7 +359,7 @@ async function toggleRolePermission(role, permId, checked) {
const prev = [...role.permissions]
role.permissions = next
try {
const res = await fetch(`/auth/admin/roles/${role.uuid}`, {
const res = await fetch(`/auth/admin/orgs/${role.org_uuid}/roles/${role.uuid}`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: role.display_name, permissions: next })
@ -379,7 +379,7 @@ async function onUserNameSaved() {
await loadOrgs()
if (selectedUser.value) {
try {
const r = await fetch(`/auth/admin/users/${selectedUser.value.uuid}`)
const r = await fetch(`/auth/admin/orgs/${selectedUser.value.org_uuid}/users/${selectedUser.value.uuid}`)
const jd = await r.json()
if (!r.ok || jd.detail) throw new Error(jd.detail || 'Reload failed')
userDetail.value = jd
@ -409,7 +409,7 @@ async function submitDialog() {
const { role } = dialog.value.data; const name = dialog.value.data.name?.trim(); if (!name) throw new Error('Name required')
const permsCsv = dialog.value.data.perms || ''
const perms = permsCsv.split(',').map(s=>s.trim()).filter(Boolean)
const res = await fetch(`/auth/admin/roles/${role.uuid}`, { method: 'PUT', headers: { 'content-type': 'application/json' }, body: JSON.stringify({ display_name: name, permissions: perms }) })
const res = await fetch(`/auth/admin/orgs/${role.org_uuid}/roles/${role.uuid}`, { method: 'PUT', headers: { 'content-type': 'application/json' }, body: JSON.stringify({ display_name: name, permissions: perms }) })
const d = await res.json(); if (d.detail) throw new Error(d.detail); await loadOrgs()
} else if (t === 'user-create') {
const { org, role } = dialog.value.data; const name = dialog.value.data.name?.trim(); if (!name) throw new Error('Name required')
@ -496,7 +496,7 @@ async function submitDialog() {
:loading="loading"
:org-display-name="userDetail.org.display_name"
:role-name="userDetail.role"
:update-endpoint="`/auth/admin/users/${selectedUser.uuid}/display-name`"
:update-endpoint="`/auth/admin/orgs/${selectedUser.org_uuid}/users/${selectedUser.uuid}/display-name`"
@saved="onUserNameSaved"
/>
<div v-else-if="userDetail?.error" class="error small">{{ userDetail.error }}</div>
@ -512,7 +512,7 @@ async function submitDialog() {
<p class="matrix-hint muted">Use the token dialog to register a new credential for the member.</p>
<RegistrationLinkModal
v-if="showRegModal"
:endpoint="`/auth/admin/users/${selectedUser.uuid}/create-link`"
:endpoint="`/auth/admin/orgs/${selectedUser.org_uuid}/users/${selectedUser.uuid}/create-link`"
:auto-copy="false"
@close="showRegModal = false"
@copied="onLinkCopied"

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

View File

@ -46,7 +46,7 @@ const copyLink = async (event) => {
onMounted(async () => {
try {
const response = await fetch('/auth/create-link', { method: 'POST' })
const response = await fetch('/auth/api/create-link', { method: 'POST' })
const result = await response.json()
if (result.detail) throw new Error(result.detail)

View File

@ -27,9 +27,9 @@ const handleLogin = async () => {
await authStore.authenticate()
authStore.showMessage('Authentication successful!', 'success', 2000)
if (authStore.restrictedMode) {
// In restricted mode after successful auth show permission denied (no profile outside /auth/)
authStore.currentView = 'permission-denied'
} else if (location.pathname.startsWith('/auth/')) {
// Restricted mode: reload so the app re-mounts and selectView() applies (will become permission denied)
location.reload()
} else if (location.pathname === '/auth/') {
authStore.currentView = 'profile'
} else {
location.reload()

View File

@ -32,7 +32,6 @@ function back() {
}
async function logout() {
await authStore.logout()
authStore.currentView = 'login'
}
</script>
<style scoped>

View File

@ -9,7 +9,7 @@
:created-at="authStore.userInfo.user.created_at"
:last-seen="authStore.userInfo.user.last_seen"
:loading="authStore.isLoading"
update-endpoint="/auth/user/display-name"
update-endpoint="/auth/api/user/display-name"
@saved="authStore.loadUserInfo()"
/>
@ -144,7 +144,6 @@ const deleteCredential = async (credentialId) => {
const logout = async () => {
await authStore.logout()
authStore.currentView = 'login'
}
const isAdmin = computed(() => !!(authStore.userInfo?.is_global_admin || authStore.userInfo?.is_org_admin))

View File

@ -8,7 +8,7 @@ export const useAuthStore = defineStore('auth', {
settings: null, // Server provided settings (/auth/settings)
isLoading: false,
resetToken: null, // transient reset token
restrictedMode: false, // If true, app loaded outside /auth/ and should restrict to login or permission denied
restrictedMode: false, // Anywhere other than /auth/: restrict to login or permission denied
// UI State
currentView: 'login',
@ -32,7 +32,7 @@ export const useAuthStore = defineStore('auth', {
}
},
async setSessionCookie(sessionToken) {
const response = await fetch('/auth/set-session', {
const response = await fetch('/auth/api/set-session', {
method: 'POST',
headers: {'Authorization': `Bearer ${sessionToken}`},
})
@ -87,7 +87,7 @@ export const useAuthStore = defineStore('auth', {
async loadUserInfo() {
const headers = {}
// Reset tokens are only passed via query param now, not Authorization header
const url = this.resetToken ? `/auth/user-info?reset=${encodeURIComponent(this.resetToken)}` : '/auth/user-info'
const url = this.resetToken ? `/auth/api/user-info?reset=${encodeURIComponent(this.resetToken)}` : '/auth/api/user-info'
const response = await fetch(url, { method: 'POST', headers })
let result = null
try {
@ -109,7 +109,7 @@ export const useAuthStore = defineStore('auth', {
},
async loadSettings() {
try {
const res = await fetch('/auth/settings')
const res = await fetch('/auth/api/settings')
if (!res.ok) return
const data = await res.json()
this.settings = data
@ -121,7 +121,7 @@ export const useAuthStore = defineStore('auth', {
}
},
async deleteCredential(uuid) {
const response = await fetch(`/auth/credential/${uuid}`, {method: 'Delete'})
const response = await fetch(`/auth/api/credential/${uuid}`, {method: 'Delete'})
const result = await response.json()
if (result.detail) throw new Error(`Server: ${result.detail}`)
@ -129,12 +129,12 @@ export const useAuthStore = defineStore('auth', {
},
async logout() {
try {
await fetch('/auth/logout', {method: 'POST'})
await fetch('/auth/api/logout', {method: 'POST'})
location.reload()
} catch (error) {
console.error('Logout error:', error)
this.showMessage(error.message, 'error')
}
this.userInfo = null
},
}
})

View File

@ -18,13 +18,13 @@ from .util import passphrase, tokens
def _init_logger() -> logging.Logger:
l = logging.getLogger(__name__)
if not l.handlers and not logging.getLogger().handlers:
logger = logging.getLogger(__name__)
if not logger.handlers and not logging.getLogger().handlers:
h = logging.StreamHandler()
h.setFormatter(logging.Formatter("%(message)s"))
l.addHandler(h)
l.setLevel(logging.INFO)
return l
logger.addHandler(h)
logger.setLevel(logging.INFO)
return logger
logger = _init_logger()

View File

@ -1,17 +1,14 @@
import argparse
import asyncio
import atexit
import contextlib
import ipaddress
import logging
import os
import signal
import subprocess
from pathlib import Path
from urllib.parse import urlparse
import uvicorn
from passkey.util import frontend
DEFAULT_HOST = "localhost"
DEFAULT_SERVE_PORT = 4401
DEFAULT_DEV_PORT = 4402
@ -180,36 +177,11 @@ def main():
run_kwargs["host"] = host
run_kwargs["port"] = port
bun_process: subprocess.Popen | None = None
if devmode:
# Spawn frontend dev server (bun) only in the original parent (avoid duplicates on reload)
# Spawn frontend dev server (bun or npm) only once in parent process
if os.environ.get("PASSKEY_BUN_PARENT") != "1":
os.environ["PASSKEY_BUN_PARENT"] = "1"
frontend_dir = Path(__file__).parent.parent.parent / "frontend"
if (frontend_dir / "package.json").exists():
try:
bun_process = subprocess.Popen(
["bun", "--bun", "run", "dev"], cwd=str(frontend_dir)
)
logging.info("Started bun dev server")
except FileNotFoundError:
logging.warning(
"bun not found: skipping frontend dev server (install bun)"
)
def _terminate_bun(): # pragma: no cover
if bun_process and bun_process.poll() is None:
with contextlib.suppress(Exception):
bun_process.terminate()
atexit.register(_terminate_bun)
def _signal_handler(signum, frame): # pragma: no cover
_terminate_bun()
raise SystemExit(0)
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
frontend.run_dev()
if all_ifaces and not uds:
# If reload enabled, fallback to single dual-stack attempt (::) to keep reload simple

444
passkey/fastapi/admin.py Normal file
View File

@ -0,0 +1,444 @@
import logging
from uuid import UUID, uuid4
from fastapi import Body, Cookie, FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from ..authsession import expires
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import frontend, passphrase, permutil, querysafe, tokens
from . import authz
app = FastAPI()
@app.exception_handler(ValueError)
async def value_error_handler(_request, exc: ValueError): # pragma: no cover - simple
return JSONResponse(status_code=400, content={"detail": str(exc)})
@app.exception_handler(Exception)
async def general_exception_handler(_request, exc: Exception):
logging.exception("Unhandled exception in admin app")
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
@app.get("")
def adminapp_slashmissing(request: Request):
print("HERE")
return RedirectResponse(url=request.url_for("adminapp"))
@app.get("/")
async def adminapp(auth=Cookie(None)):
try:
await authz.verify(auth, ["auth:admin", "auth:org:*"], match=permutil.has_any)
return FileResponse(frontend.file("admin/index.html"))
except HTTPException as e:
return FileResponse(frontend.file("index.html"), status_code=e.status_code)
# -------------------- Organizations --------------------
@app.get("/orgs")
async def admin_list_orgs(auth=Cookie(None)):
ctx = await authz.verify(auth, ["auth:admin", "auth:org:*"], match=permutil.has_any)
orgs = await db.instance.list_organizations()
if "auth:admin" not in ctx.role.permissions:
orgs = [o for o in orgs if f"auth:org:{o.uuid}" in ctx.role.permissions]
def role_to_dict(r):
return {
"uuid": str(r.uuid),
"org_uuid": str(r.org_uuid),
"display_name": r.display_name,
"permissions": r.permissions,
}
async def org_to_dict(o):
users = await db.instance.get_organization_users(str(o.uuid))
return {
"uuid": str(o.uuid),
"display_name": o.display_name,
"permissions": o.permissions,
"roles": [role_to_dict(r) for r in o.roles],
"users": [
{
"uuid": str(u.uuid),
"display_name": u.display_name,
"role": role_name,
"visits": u.visits,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
}
for (u, role_name) in users
],
}
return [await org_to_dict(o) for o in orgs]
@app.post("/orgs")
async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"])
from ..db import Org as OrgDC # local import to avoid cycles
org_uuid = uuid4()
display_name = payload.get("display_name") or "New Organization"
permissions = payload.get("permissions") or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.create_organization(org)
return {"uuid": str(org_uuid)}
@app.put("/orgs/{org_uuid}")
async def admin_update_org(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
from ..db import Org as OrgDC # local import to avoid cycles
current = await db.instance.get_organization(str(org_uuid))
display_name = payload.get("display_name") or current.display_name
permissions = payload.get("permissions") or current.permissions or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.update_organization(org)
return {"status": "ok"}
@app.delete("/orgs/{org_uuid}")
async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
ctx = await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
if ctx.org.uuid == org_uuid:
raise ValueError("Cannot delete the organization you belong to")
await db.instance.delete_organization(org_uuid)
return {"status": "ok"}
@app.post("/orgs/{org_uuid}/permission")
async def admin_add_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
await authz.verify(auth, ["auth:admin"])
await db.instance.add_permission_to_organization(str(org_uuid), permission_id)
return {"status": "ok"}
@app.delete("/orgs/{org_uuid}/permission")
async def admin_remove_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
await authz.verify(auth, ["auth:admin"])
await db.instance.remove_permission_from_organization(str(org_uuid), permission_id)
return {"status": "ok"}
# -------------------- Roles --------------------
@app.post("/orgs/{org_uuid}/roles")
async def admin_create_role(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
await authz.verify(auth, ["auth:admin", f"auth:org:{org_uuid}"])
from ..db import Role as RoleDC
role_uuid = uuid4()
display_name = payload.get("display_name") or "New Role"
perms = payload.get("permissions") or []
org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or [])
for pid in perms:
await db.instance.get_permission(pid)
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
role = RoleDC(
uuid=role_uuid,
org_uuid=org_uuid,
display_name=display_name,
permissions=perms,
)
await db.instance.create_role(role)
return {"uuid": str(role_uuid)}
@app.put("/orgs/{org_uuid}/roles/{role_uuid}")
async def admin_update_role(
org_uuid: UUID, role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
# Verify caller is global admin or admin of provided org
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
role = await db.instance.get_role(role_uuid)
if role.org_uuid != org_uuid:
raise HTTPException(status_code=404, detail="Role not found in organization")
from ..db import Role as RoleDC
display_name = payload.get("display_name") or role.display_name
permissions = payload.get("permissions") or role.permissions
org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid)
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
updated = RoleDC(
uuid=role_uuid,
org_uuid=org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.update_role(updated)
return {"status": "ok"}
@app.delete("/orgs/{org_uuid}/roles/{role_uuid}")
async def admin_delete_role(org_uuid: UUID, role_uuid: UUID, auth=Cookie(None)):
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
role = await db.instance.get_role(role_uuid)
if role.org_uuid != org_uuid:
raise HTTPException(status_code=404, detail="Role not found in organization")
await db.instance.delete_role(role_uuid)
return {"status": "ok"}
# -------------------- Users --------------------
@app.post("/orgs/{org_uuid}/users")
async def admin_create_user(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
display_name = payload.get("display_name")
role_name = payload.get("role")
if not display_name or not role_name:
raise ValueError("display_name and role are required")
from ..db import User as UserDC
roles = await db.instance.get_roles_by_organization(str(org_uuid))
role_obj = next((r for r in roles if r.display_name == role_name), None)
if not role_obj:
raise ValueError("Role not found in organization")
user_uuid = uuid4()
user = UserDC(
uuid=user_uuid,
display_name=display_name,
role_uuid=role_obj.uuid,
visits=0,
created_at=None,
)
await db.instance.create_user(user)
return {"uuid": str(user_uuid)}
@app.put("/orgs/{org_uuid}/users/{user_uuid}/role")
async def admin_update_user_role(
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
new_role = payload.get("role")
if not new_role:
raise ValueError("role is required")
try:
user_org, _current_role = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise ValueError("User not found")
if user_org.uuid != org_uuid:
raise ValueError("User does not belong to this organization")
roles = await db.instance.get_roles_by_organization(str(org_uuid))
if not any(r.display_name == new_role for r in roles):
raise ValueError("Role not found in organization")
await db.instance.update_user_role_in_organization(user_uuid, new_role)
return {"status": "ok"}
@app.post("/orgs/{org_uuid}/users/{user_uuid}/create-link")
async def admin_create_user_registration_link(
org_uuid: UUID, user_uuid: UUID, auth=Cookie(None)
):
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if user_org.uuid != org_uuid:
raise HTTPException(status_code=404, detail="User not found in organization")
ctx = await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
if (
"auth:admin" not in ctx.role.permissions
and f"auth:org:{org_uuid}" not in ctx.role.permissions
):
raise HTTPException(status_code=403, detail="Insufficient permissions")
token = passphrase.generate()
await db.instance.create_session(
user_uuid=user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info={"type": "device addition", "created_by_admin": True},
)
origin = global_passkey.instance.origin
url = f"{origin}/auth/{token}"
return {"url": url, "expires": expires().isoformat()}
@app.get("/orgs/{org_uuid}/users/{user_uuid}")
async def admin_get_user_detail(org_uuid: UUID, user_uuid: UUID, auth=Cookie(None)):
try:
user_org, role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if user_org.uuid != org_uuid:
raise HTTPException(status_code=404, detail="User not found in organization")
ctx = await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
if (
"auth:admin" not in ctx.role.permissions
and f"auth:org:{org_uuid}" not in ctx.role.permissions
):
raise HTTPException(status_code=403, detail="Insufficient permissions")
user = await db.instance.get_user_by_uuid(user_uuid)
cred_ids = await db.instance.get_credentials_by_user_uuid(user_uuid)
creds: list[dict] = []
aaguids: set[str] = set()
for cid in cred_ids:
try:
c = await db.instance.get_credential_by_id(cid)
except ValueError:
continue
aaguid_str = str(c.aaguid)
aaguids.add(aaguid_str)
creds.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat()
if c.last_verified
else None,
"sign_count": c.sign_count,
}
)
from .. import aaguid as aaguid_mod
aaguid_info = aaguid_mod.filter(aaguids)
return {
"display_name": user.display_name,
"org": {"display_name": user_org.display_name},
"role": role_name,
"visits": user.visits,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_seen": user.last_seen.isoformat() if user.last_seen else None,
"credentials": creds,
"aaguid_info": aaguid_info,
}
@app.put("/orgs/{org_uuid}/users/{user_uuid}/display-name")
async def admin_update_user_display_name(
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if user_org.uuid != org_uuid:
raise HTTPException(status_code=404, detail="User not found in organization")
ctx = await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
if (
"auth:admin" not in ctx.role.permissions
and f"auth:org:{org_uuid}" not in ctx.role.permissions
):
raise HTTPException(status_code=403, detail="Insufficient permissions")
new_name = (payload.get("display_name") or "").strip()
if not new_name:
raise HTTPException(status_code=400, detail="display_name required")
if len(new_name) > 64:
raise HTTPException(status_code=400, detail="display_name too long")
await db.instance.update_user_display_name(user_uuid, new_name)
return {"status": "ok"}
# -------------------- Permissions (global) --------------------
@app.get("/permissions")
async def admin_list_permissions(auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"], match=permutil.has_any)
perms = await db.instance.list_permissions()
return [{"id": p.id, "display_name": p.display_name} for p in perms]
@app.post("/permissions")
async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"])
from ..db import Permission as PermDC
perm_id = payload.get("id")
display_name = payload.get("display_name")
if not perm_id or not display_name:
raise ValueError("id and display_name are required")
querysafe.assert_safe(perm_id, field="id")
await db.instance.create_permission(PermDC(id=perm_id, display_name=display_name))
return {"status": "ok"}
@app.put("/permission")
async def admin_update_permission(
permission_id: str, display_name: str, auth=Cookie(None)
):
await authz.verify(auth, ["auth:admin"])
from ..db import Permission as PermDC
if not display_name:
raise ValueError("display_name is required")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.update_permission(
PermDC(id=permission_id, display_name=display_name)
)
return {"status": "ok"}
@app.post("/permission/rename")
async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"])
old_id = payload.get("old_id")
new_id = payload.get("new_id")
display_name = payload.get("display_name")
if not old_id or not new_id:
raise ValueError("old_id and new_id required")
querysafe.assert_safe(old_id, field="old_id")
querysafe.assert_safe(new_id, field="new_id")
if display_name is None:
perm = await db.instance.get_permission(old_id)
display_name = perm.display_name
rename_fn = getattr(db.instance, "rename_permission", None)
if not rename_fn:
raise ValueError("Permission renaming not supported by this backend")
await rename_fn(old_id, new_id, display_name)
return {"status": "ok"}
@app.delete("/permission")
async def admin_delete_permission(permission_id: str, auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"])
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.delete_permission(permission_id)
return {"status": "ok"}

View File

@ -1,646 +1,228 @@
"""
API endpoints for user management and session handling.
import logging
from contextlib import suppress
from uuid import UUID
This module contains all the HTTP API endpoints for:
- User information retrieval
- User credentials management
- Session token validation and refresh
- Login/logout functionality
"""
from uuid import UUID, uuid4
from fastapi import Body, Cookie, Depends, FastAPI, HTTPException, Query, Response
from fastapi import (
Body,
Cookie,
Depends,
FastAPI,
HTTPException,
Query,
Request,
Response,
)
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer
from passkey.util import passphrase
from .. import aaguid
from ..authsession import delete_credential, expires, get_reset, get_session
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import querysafe, tokens
from ..util import passphrase, permutil, tokens
from ..util.tokens import session_key
from . import authz, session
bearer_auth = HTTPBearer(auto_error=True)
app = FastAPI()
def register_api_routes(app: FastAPI):
"""Register all API routes on the FastAPI app."""
async def _get_ctx_and_admin_flags(auth_cookie: str):
"""Helper to get session context and admin flags from cookie."""
if not auth_cookie:
raise ValueError("Not authenticated")
ctx = await db.instance.get_session_context(session_key(auth_cookie))
if not ctx:
raise ValueError("Not authenticated")
role_perm_ids = set(ctx.role.permissions or [])
org_uuid_str = str(ctx.org.uuid)
is_global_admin = "auth:admin" in role_perm_ids
is_org_admin = f"auth:org:{org_uuid_str}" in role_perm_ids
return ctx, is_global_admin, is_org_admin
@app.exception_handler(ValueError)
async def value_error_handler(_request: Request, exc: ValueError): # pragma: no cover
return JSONResponse(status_code=400, content={"detail": str(exc)})
@app.post("/auth/validate")
async def validate_token(perm=Query(None), auth=Cookie(None)):
"""Lightweight token validation endpoint.
Query Params:
- perm: repeated permission IDs the caller must possess (ALL required)
"""
@app.exception_handler(Exception)
async def general_exception_handler(
_request: Request, exc: Exception
): # pragma: no cover
logging.exception("Unhandled exception in API app")
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
s = await authz.verify(auth, perm)
return {"valid": True, "user_uuid": str(s.user_uuid)}
@app.get("/auth/settings")
async def get_settings():
"""Return server runtime settings safe for public consumption.
@app.post("/validate")
async def validate_token(perm: list[str] = Query([]), auth=Cookie(None)):
ctx = await authz.verify(auth, perm)
return {"valid": True, "user_uuid": str(ctx.session.user_uuid)}
Provides relying party metadata used by the frontend to brand UI.
"""
pk = global_passkey.instance
@app.get("/forward")
async def forward_authentication(perm: list[str] = Query([]), auth=Cookie(None)):
"""Forward auth validation for Caddy/Nginx (moved from /auth/forward-auth).
Query Params:
- perm: repeated permission IDs the authenticated user must possess (ALL required).
Success: 204 No Content with x-auth-user-uuid header.
Failure (unauthenticated / unauthorized): 4xx JSON body with detail.
"""
try:
ctx = await authz.verify(auth, perm)
return Response(
status_code=204, headers={"x-auth-user-uuid": str(ctx.session.user_uuid)}
)
except HTTPException as e: # pass through explicitly
raise e
@app.get("/settings")
async def get_settings():
pk = global_passkey.instance
return {"rp_id": pk.rp_id, "rp_name": pk.rp_name}
@app.post("/user-info")
async def api_user_info(reset: str | None = None, auth=Cookie(None)):
authenticated = False
try:
if reset:
if not passphrase.is_well_formed(reset):
raise ValueError("Invalid reset token")
s = await get_reset(reset)
else:
if auth is None:
raise ValueError("Authentication Required")
s = await get_session(auth)
authenticated = True
except ValueError as e:
raise HTTPException(401, str(e))
u = await db.instance.get_user_by_uuid(s.user_uuid)
if not authenticated: # minimal response for reset tokens
return {
"rp_id": pk.rp_id,
"rp_name": pk.rp_name,
}
@app.post("/auth/user-info")
async def api_user_info(reset: str | None = None, auth=Cookie(None)):
"""Get user information.
- For authenticated sessions: return full context (org/role/permissions/credentials)
- For reset tokens: return only basic user information to drive reset flow
"""
authenticated = False
try:
if reset:
if not passphrase.is_well_formed(reset):
raise ValueError("Invalid reset token")
s = await get_reset(reset)
else:
if auth is None:
raise ValueError("Authentication Required")
s = await get_session(auth)
authenticated = True
except ValueError as e:
raise HTTPException(401, str(e))
u = await db.instance.get_user_by_uuid(s.user_uuid)
# Minimal response for reset tokens
if not authenticated:
return {
"authenticated": False,
"session_type": s.info.get("type"),
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
},
}
# Full context for authenticated sessions
assert authenticated and auth is not None
ctx = await db.instance.get_session_context(session_key(auth))
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
credentials: list[dict] = []
user_aaguids: set[str] = set()
for cred_id in credential_ids:
try:
c = await db.instance.get_credential_by_id(cred_id)
except ValueError:
continue # Skip dangling IDs
aaguid_str = str(c.aaguid)
user_aaguids.add(aaguid_str)
credentials.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat()
if c.last_verified
else None,
"sign_count": c.sign_count,
"is_current_session": s.credential_uuid == c.uuid,
}
)
credentials.sort(key=lambda cred: cred["created_at"]) # chronological
aaguid_info = aaguid.filter(user_aaguids)
role_info = None
org_info = None
effective_permissions: list[str] = []
is_global_admin = False
is_org_admin = False
if ctx:
role_info = {
"uuid": str(ctx.role.uuid),
"display_name": ctx.role.display_name,
"permissions": ctx.role.permissions,
}
org_info = {
"uuid": str(ctx.org.uuid),
"display_name": ctx.org.display_name,
"permissions": ctx.org.permissions,
}
effective_permissions = [p.id for p in (ctx.permissions or [])]
is_global_admin = "auth:admin" in role_info["permissions"]
is_org_admin = (
f"auth:org:{org_info['uuid']}" in role_info["permissions"]
if org_info
else False
)
return {
"authenticated": True,
"authenticated": False,
"session_type": s.info.get("type"),
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
"created_at": u.created_at.isoformat() if u.created_at else None,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
"visits": u.visits,
},
"org": org_info,
"role": role_info,
"permissions": effective_permissions,
"is_global_admin": is_global_admin,
"is_org_admin": is_org_admin,
"credentials": credentials,
"aaguid_info": aaguid_info,
"user": {"user_uuid": str(u.uuid), "user_name": u.display_name},
}
# -------------------- Admin API: Organizations --------------------
assert authenticated and auth is not None
@app.get("/auth/admin/orgs")
async def admin_list_orgs(auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or is_org_admin):
raise ValueError("Insufficient permissions")
orgs = await db.instance.list_organizations()
# If only org admin, filter to their org
if not is_global_admin:
orgs = [o for o in orgs if o.uuid == ctx.org.uuid]
def role_to_dict(r):
return {
"uuid": str(r.uuid),
"org_uuid": str(r.org_uuid),
"display_name": r.display_name,
"permissions": r.permissions,
ctx = await permutil.session_context(auth)
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
credentials: list[dict] = []
user_aaguids: set[str] = set()
for cred_id in credential_ids:
try:
c = await db.instance.get_credential_by_id(cred_id)
except ValueError:
continue
aaguid_str = str(c.aaguid)
user_aaguids.add(aaguid_str)
credentials.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat()
if c.last_verified
else None,
"sign_count": c.sign_count,
"is_current_session": s.credential_uuid == c.uuid,
}
async def org_to_dict(o):
# Fetch users for each org
users = await db.instance.get_organization_users(str(o.uuid))
return {
"uuid": str(o.uuid),
"display_name": o.display_name,
"permissions": o.permissions,
"roles": [role_to_dict(r) for r in o.roles],
"users": [
{
"uuid": str(u.uuid),
"display_name": u.display_name,
"role": role_name,
"visits": u.visits,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
}
for (u, role_name) in users
],
}
return [await org_to_dict(o) for o in orgs]
@app.post("/auth/admin/orgs")
async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Org as OrgDC # local import to avoid cycles in typing
org_uuid = uuid4()
display_name = payload.get("display_name") or "New Organization"
permissions = payload.get("permissions") or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.create_organization(org)
return {"uuid": str(org_uuid)}
@app.put("/auth/admin/orgs/{org_uuid}")
async def admin_update_org(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
# Only global admins can modify org definitions (simpler rule)
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Org as OrgDC # local import to avoid cycles
current = await db.instance.get_organization(str(org_uuid))
display_name = payload.get("display_name") or current.display_name
permissions = payload.get("permissions") or current.permissions or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.update_organization(org)
return {"status": "ok"}
@app.delete("/auth/admin/orgs/{org_uuid}")
async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
# Org admins cannot delete at all (avoid self-lockout)
raise ValueError("Global admin required")
# Prevent deleting the organization that the acting global admin currently belongs to
# if that deletion would remove their effective access (e.g., last org granting auth/admin)
try:
acting_org_uuid = ctx.org.uuid if ctx.org else None
except Exception:
acting_org_uuid = None
if acting_org_uuid and acting_org_uuid == org_uuid:
# Never allow deletion of the caller's own organization to avoid immediate account deletion.
raise ValueError("Cannot delete the organization you belong to")
await db.instance.delete_organization(org_uuid)
return {"status": "ok"}
# Manage an org's grantable permissions (query param for permission_id)
@app.post("/auth/admin/orgs/{org_uuid}/permission")
async def admin_add_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.add_permission_to_organization(str(org_uuid), permission_id)
return {"status": "ok"}
@app.delete("/auth/admin/orgs/{org_uuid}/permission")
async def admin_remove_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.remove_permission_from_organization(
str(org_uuid), permission_id
)
return {"status": "ok"}
credentials.sort(key=lambda cred: cred["created_at"])
aaguid_info = aaguid.filter(user_aaguids)
# -------------------- Admin API: Roles --------------------
@app.post("/auth/admin/orgs/{org_uuid}/roles")
async def admin_create_role(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC
role_uuid = uuid4()
display_name = payload.get("display_name") or "New Role"
permissions = payload.get("permissions") or []
# Validate that permissions exist and are allowed by org
org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid) # raises if not found
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
role = RoleDC(
uuid=role_uuid,
org_uuid=org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.create_role(role)
return {"uuid": str(role_uuid)}
@app.put("/auth/admin/roles/{role_uuid}")
async def admin_update_role(
role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
role = await db.instance.get_role(role_uuid)
# Only org admins for that org or global admin can update
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)):
raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC
display_name = payload.get("display_name") or role.display_name
permissions = payload.get("permissions") or role.permissions
# Validate against org grantable permissions
org = await db.instance.get_organization(str(role.org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid) # raises if not found
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
updated = RoleDC(
uuid=role_uuid,
org_uuid=role.org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.update_role(updated)
return {"status": "ok"}
@app.post("/auth/admin/orgs/{org_uuid}/users")
async def admin_create_user(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
"""Create a new user within an organization.
Body parameters:
- display_name: str (required)
- role: str (required) display name of existing role in that org
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
display_name = payload.get("display_name")
role_name = payload.get("role")
if not display_name or not role_name:
raise ValueError("display_name and role are required")
# Validate role exists in org
from ..db import User as UserDC # local import to avoid cycles
roles = await db.instance.get_roles_by_organization(str(org_uuid))
role_obj = next((r for r in roles if r.display_name == role_name), None)
if not role_obj:
raise ValueError("Role not found in organization")
# Create user
user_uuid = uuid4()
user = UserDC(
uuid=user_uuid,
display_name=display_name,
role_uuid=role_obj.uuid,
visits=0,
created_at=None,
)
await db.instance.create_user(user)
return {"uuid": str(user_uuid)}
@app.delete("/auth/admin/roles/{role_uuid}")
async def admin_delete_role(role_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
role = await db.instance.get_role(role_uuid)
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)):
raise ValueError("Insufficient permissions")
await db.instance.delete_role(role_uuid)
return {"status": "ok"}
# -------------------- Admin API: Users (role management) --------------------
@app.put("/auth/admin/orgs/{org_uuid}/users/{user_uuid}/role")
async def admin_update_user_role(
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
"""Change a user's role within their organization.
Body: {"role": "New Role Display Name"}
Only global admins or admins of the organization can perform this.
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
new_role = payload.get("role")
if not new_role:
raise ValueError("role is required")
# Verify user belongs to this org
try:
user_org, _current_role = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise ValueError("User not found")
if user_org.uuid != org_uuid:
raise ValueError("User does not belong to this organization")
# Ensure role exists in org and update
roles = await db.instance.get_roles_by_organization(str(org_uuid))
if not any(r.display_name == new_role for r in roles):
raise ValueError("Role not found in organization")
await db.instance.update_user_role_in_organization(user_uuid, new_role)
return {"status": "ok"}
@app.post("/auth/admin/users/{user_uuid}/create-link")
async def admin_create_user_registration_link(user_uuid: UUID, auth=Cookie(None)):
"""Create a device registration/reset link for a specific user (admin only).
Returns JSON: {"url": str, "expires": iso8601}
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
# Ensure user exists and fetch their org
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)):
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Generate human-readable reset token and store as session with reset key
token = passphrase.generate()
await db.instance.create_session(
user_uuid=user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info={"type": "device addition", "created_by_admin": True},
)
origin = global_passkey.instance.origin
url = f"{origin}/auth/{token}"
return {"url": url, "expires": expires().isoformat()}
@app.get("/auth/admin/users/{user_uuid}")
async def admin_get_user_detail(user_uuid: UUID, auth=Cookie(None)):
"""Get detailed information about a user (admin only)."""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
try:
user_org, role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)):
raise HTTPException(status_code=403, detail="Insufficient permissions")
user = await db.instance.get_user_by_uuid(user_uuid)
# Gather credentials
cred_ids = await db.instance.get_credentials_by_user_uuid(user_uuid)
creds: list[dict] = []
aaguids: set[str] = set()
for cid in cred_ids:
try:
c = await db.instance.get_credential_by_id(cid)
except ValueError:
continue
aaguid_str = str(c.aaguid)
aaguids.add(aaguid_str)
creds.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat()
if c.last_verified
else None,
"sign_count": c.sign_count,
}
role_info = None
org_info = None
effective_permissions: list[str] = []
is_global_admin = False
is_org_admin = False
if ctx:
role_info = {
"uuid": str(ctx.role.uuid),
"display_name": ctx.role.display_name,
"permissions": ctx.role.permissions,
}
org_info = {
"uuid": str(ctx.org.uuid),
"display_name": ctx.org.display_name,
"permissions": ctx.org.permissions,
}
effective_permissions = [p.id for p in (ctx.permissions or [])]
is_global_admin = "auth:admin" in (role_info["permissions"] or [])
if org_info:
is_org_admin = f"auth:org:{org_info['uuid']}" in (
role_info["permissions"] or []
)
from .. import aaguid as aaguid_mod
aaguid_info = aaguid_mod.filter(aaguids)
return {
"display_name": user.display_name,
"org": {"display_name": user_org.display_name},
"role": role_name,
"visits": user.visits,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_seen": user.last_seen.isoformat() if user.last_seen else None,
"credentials": creds,
"aaguid_info": aaguid_info,
}
return {
"authenticated": True,
"session_type": s.info.get("type"),
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
"created_at": u.created_at.isoformat() if u.created_at else None,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
"visits": u.visits,
},
"org": org_info,
"role": role_info,
"permissions": effective_permissions,
"is_global_admin": is_global_admin,
"is_org_admin": is_org_admin,
"credentials": credentials,
"aaguid_info": aaguid_info,
}
@app.put("/auth/user/display-name")
async def user_update_display_name(payload: dict = Body(...), auth=Cookie(None)):
"""Authenticated user updates their own display name."""
if not auth:
raise HTTPException(status_code=401, detail="Authentication Required")
s = await get_session(auth)
new_name = (payload.get("display_name") or "").strip()
if not new_name:
raise HTTPException(status_code=400, detail="display_name required")
if len(new_name) > 64:
raise HTTPException(status_code=400, detail="display_name too long")
await db.instance.update_user_display_name(s.user_uuid, new_name)
return {"status": "ok"}
@app.put("/auth/admin/users/{user_uuid}/display-name")
async def admin_update_user_display_name(
user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
"""Admin updates a user's display name."""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)):
raise HTTPException(status_code=403, detail="Insufficient permissions")
new_name = (payload.get("display_name") or "").strip()
if not new_name:
raise HTTPException(status_code=400, detail="display_name required")
if len(new_name) > 64:
raise HTTPException(status_code=400, detail="display_name too long")
await db.instance.update_user_display_name(user_uuid, new_name)
return {"status": "ok"}
@app.put("/user/display-name")
async def user_update_display_name(payload: dict = Body(...), auth=Cookie(None)):
if not auth:
raise HTTPException(status_code=401, detail="Authentication Required")
s = await get_session(auth)
new_name = (payload.get("display_name") or "").strip()
if not new_name:
raise HTTPException(status_code=400, detail="display_name required")
if len(new_name) > 64:
raise HTTPException(status_code=400, detail="display_name too long")
await db.instance.update_user_display_name(s.user_uuid, new_name)
return {"status": "ok"}
# Admin API: Permissions (global)
@app.get("/auth/admin/permissions")
async def admin_list_permissions(auth=Cookie(None)):
_, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or is_org_admin):
raise ValueError("Insufficient permissions")
perms = await db.instance.list_permissions()
return [{"id": p.id, "display_name": p.display_name} for p in perms]
@app.post("/logout")
async def api_logout(response: Response, auth=Cookie(None)):
if not auth:
return {"message": "Already logged out"}
with suppress(Exception):
await db.instance.delete_session(session_key(auth))
response.delete_cookie("auth")
return {"message": "Logged out successfully"}
@app.post("/auth/admin/permissions")
async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Permission as PermDC
perm_id = payload.get("id")
display_name = payload.get("display_name")
if not perm_id or not display_name:
raise ValueError("id and display_name are required")
querysafe.assert_safe(perm_id, field="id")
await db.instance.create_permission(
PermDC(id=perm_id, display_name=display_name)
)
return {"status": "ok"}
@app.post("/set-session")
async def api_set_session(response: Response, auth=Depends(bearer_auth)):
user = await get_session(auth.credentials)
session.set_session_cookie(response, auth.credentials)
return {
"message": "Session cookie set successfully",
"user_uuid": str(user.user_uuid),
}
@app.put("/auth/admin/permission")
async def admin_update_permission(
permission_id: str, display_name: str, auth=Cookie(None)
):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Permission as PermDC
if not display_name:
raise ValueError("display_name is required")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.update_permission(
PermDC(id=permission_id, display_name=display_name)
)
return {"status": "ok"}
@app.delete("/credential/{uuid}")
async def api_delete_credential(uuid: UUID, auth: str = Cookie(None)):
await delete_credential(uuid, auth)
return {"message": "Credential deleted successfully"}
@app.post("/auth/admin/permission/rename")
async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)):
"""Rename a permission's id (and optionally display name) updating all references.
Body: { "old_id": str, "new_id": str, "display_name": str|null }
"""
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
old_id = payload.get("old_id")
new_id = payload.get("new_id")
display_name = payload.get("display_name")
if not old_id or not new_id:
raise ValueError("old_id and new_id required")
querysafe.assert_safe(old_id, field="old_id")
querysafe.assert_safe(new_id, field="new_id")
if display_name is None:
# Fetch old to retain display name
perm = await db.instance.get_permission(old_id)
display_name = perm.display_name
# rename_permission added to interface; use getattr for forward compatibility
rename_fn = getattr(db.instance, "rename_permission", None)
if not rename_fn:
raise ValueError("Permission renaming not supported by this backend")
await rename_fn(old_id, new_id, display_name)
return {"status": "ok"}
@app.delete("/auth/admin/permission")
async def admin_delete_permission(permission_id: str, auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.delete_permission(permission_id)
return {"status": "ok"}
@app.post("/auth/logout")
async def api_logout(response: Response, auth=Cookie(None)):
"""Log out the current user by clearing the session cookie and deleting from database."""
if not auth:
return {"message": "Already logged out"}
# Remove from database if possible
try:
await db.instance.delete_session(session_key(auth))
except Exception:
...
response.delete_cookie("auth")
return {"message": "Logged out successfully"}
@app.post("/auth/set-session")
async def api_set_session(response: Response, auth=Depends(bearer_auth)):
"""Set session cookie from Authorization Bearer session token (never via query)."""
user = await get_session(auth.credentials)
session.set_session_cookie(response, auth.credentials)
return {
"message": "Session cookie set successfully",
"user_uuid": str(user.user_uuid),
}
@app.delete("/auth/credential/{uuid}")
async def api_delete_credential(
response: Response, uuid: UUID, auth: str = Cookie(None)
):
"""Delete a specific credential for the current user."""
await delete_credential(uuid, auth)
return {"message": "Credential deleted successfully"}
@app.post("/create-link")
async def api_create_link(request: Request, auth=Cookie(None)):
s = await get_session(auth)
token = passphrase.generate()
await db.instance.create_session(
user_uuid=s.user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info=session.infodict(request, "device addition"),
)
origin = global_passkey.instance.origin.rstrip("/")
url = f"{origin}/auth/{token}"
return {
"message": "Registration link generated successfully",
"url": url,
"expires": expires().isoformat(),
}

View File

@ -1,39 +1,39 @@
"""Authorization utilities shared across FastAPI endpoints.
Provides helper(s) to validate a session token (from cookie) and optionally
enforce that the user possesses a given permission (either via their role or
their organization level permissions).
"""
import logging
from fastapi import HTTPException
from ..authsession import get_session
from ..globals import db
from ..util.tokens import session_key
from ..util import permutil
logger = logging.getLogger(__name__)
async def verify(auth: str | None, perm: list[str] | str | None):
async def verify(auth: str | None, perm: list[str], match=permutil.has_all):
"""Validate session token and optional list of required permissions.
Returns the Session object on success. Raises HTTPException on failure.
401: unauthenticated / invalid session
403: one or more required permissions missing
Returns the session context.
Raises HTTPException on failure:
401: unauthenticated / invalid session
403: required permissions missing
"""
if not auth:
raise HTTPException(status_code=401, detail="Authentication required")
session = await get_session(auth)
if perm is not None:
if isinstance(perm, str):
perm = [perm]
ctx = await db.instance.get_session_context(session_key(auth))
if not ctx:
raise HTTPException(status_code=401, detail="Session not found")
available = set(ctx.role.permissions or []) | (
set(ctx.org.permissions or []) if ctx.org else set()
ctx = await permutil.session_context(auth)
if not ctx:
raise HTTPException(status_code=401, detail="Session not found")
if not match(ctx, perm):
# Determine which permissions are missing for clearer diagnostics
missing = sorted(set(perm) - set(ctx.role.permissions))
logger.warning(
"Permission denied: user=%s role=%s missing=%s required=%s granted=%s", # noqa: E501
getattr(ctx.user, "uuid", "?"),
getattr(ctx.role, "display_name", "?"),
missing,
perm,
ctx.role.permissions,
)
if any(p not in available for p in perm):
raise HTTPException(status_code=403, detail="Permission required")
return session
raise HTTPException(status_code=403, detail="Permission required")
__all__ = ["verify"]
return ctx

View File

@ -1,19 +1,14 @@
import contextlib
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import Cookie, FastAPI, HTTPException, Query, Request, Response
from fastapi.responses import FileResponse, JSONResponse
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from ..authsession import get_session
from . import authz, ws
from .api import register_api_routes
from .reset import register_reset_routes
from passkey.util import frontend, passphrase
STATIC_DIR = Path(__file__).parent.parent / "frontend-build"
from . import admin, api, ws
@asynccontextmanager
@ -51,80 +46,27 @@ async def lifespan(app: FastAPI): # pragma: no cover - startup path
app = FastAPI(lifespan=lifespan)
# Global exception handlers
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
"""Handle ValueError exceptions globally with 400 status code."""
return JSONResponse(status_code=400, content={"detail": str(exc)})
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle all other exceptions globally with 500 status code."""
logging.exception("Internal Server Error")
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
# Mount the WebSocket subapp
app.mount("/auth/ws", ws.app)
@app.get("/auth/forward-auth")
async def forward_authentication(request: Request, perm=Query(None), auth=Cookie(None)):
"""A validation endpoint to use with Caddy forward_auth or Nginx auth_request.
Query Params:
- perm: repeated permission IDs the authenticated user must possess (ALL required).
Success: 204 No Content with x-auth-user-uuid header.
Failure (unauthenticated / unauthorized): 4xx with index.html body so the
client (reverse proxy or browser) can initiate auth flow.
"""
try:
s = await authz.verify(auth, perm)
return Response(
status_code=204,
headers={"x-auth-user-uuid": str(s.user_uuid)},
)
except HTTPException as e:
return FileResponse(STATIC_DIR / "index.html", e.status_code)
# Serve static files
app.mount("/auth/admin/", admin.app)
app.mount("/auth/api/", api.app)
app.mount("/auth/ws/", ws.app)
app.mount(
"/auth/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="static assets"
"/auth/assets/", StaticFiles(directory=frontend.file("assets")), name="assets"
)
@app.get("/auth/")
async def redirect_to_index():
async def frontapp():
"""Serve the main authentication app."""
return FileResponse(STATIC_DIR / "index.html")
return FileResponse(frontend.file("index.html"))
@app.get("/auth/admin")
async def serve_admin(auth=Cookie(None)):
"""Serve the admin app entry point if an authenticated session exists.
If no valid authenticated session cookie is present, return a 401 with the
main app's index.html so the frontend can initiate login/registration flow.
"""
if auth:
with contextlib.suppress(ValueError):
s = await get_session(auth)
if s.info and s.info.get("type") == "authenticated":
return FileResponse(STATIC_DIR / "admin" / "index.html")
# Not authenticated: serve main index with 401
return FileResponse(
STATIC_DIR / "index.html",
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
# Register API routes
register_api_routes(app)
register_reset_routes(app)
@app.get("/auth/{reset}")
async def reset_link(request: Request, reset: str):
"""Pretty URL for reset links."""
if reset == "admin":
# Admin app missing trailing slash lands here, be friendly to user
return RedirectResponse(request.url_for("adminapp"), status_code=303)
if not passphrase.is_well_formed(reset):
raise HTTPException(status_code=404)
url = request.url_for("frontapp").include_query_params(reset=reset)
return RedirectResponse(url, status_code=303)

View File

@ -1,59 +0,0 @@
from pathlib import Path
from fastapi import Cookie, HTTPException, Request, Response
from fastapi.responses import RedirectResponse
from ..authsession import expires, get_session
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import passphrase, tokens
from . import session
# Local copy to avoid circular import with mainapp
STATIC_DIR = Path(__file__).parent.parent / "frontend-build"
def register_reset_routes(app):
"""Register all device addition/reset routes on the FastAPI app."""
@app.post("/auth/create-link")
async def api_create_link(request: Request, response: Response, auth=Cookie(None)):
"""Create a device addition link for the authenticated user."""
# Require authentication
s = await get_session(auth)
# Generate a human-readable token
token = passphrase.generate() # e.g., "cross.rotate.yin.note.evoke"
await db.instance.create_session(
user_uuid=s.user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info=session.infodict(request, "device addition"),
)
# Generate the device addition link with pretty URL using configured origin
origin = global_passkey.instance.origin.rstrip("/")
path = request.url.path.removesuffix("create-link") + token # /auth/<token>
url = f"{origin}{path}"
return {
"message": "Registration link generated successfully",
"url": url,
"expires": expires().isoformat(),
}
@app.get("/auth/{reset_token}")
async def reset_authentication(request: Request, reset_token: str):
"""Validate reset token and redirect with it as query parameter (no cookies).
After validation we 303 redirect to /auth/?reset=<token>. The frontend will:
- Read the token from location.search
- Use it via Authorization header or websocket query param
- history.replaceState to remove it from the address bar/history
"""
if not passphrase.is_well_formed(reset_token):
raise HTTPException(status_code=404)
origin = global_passkey.instance.origin
# Do not verify existence/expiry here; frontend + user-info endpoint will handle invalid tokens.
redirect_url = f"{origin}/auth/?reset={reset_token}"
return RedirectResponse(url=redirect_url, status_code=303)

60
passkey/util/frontend.py Normal file
View File

@ -0,0 +1,60 @@
from importlib import resources
from pathlib import Path
__all__ = ["path", "file", "run_dev"]
def _resolve_static_dir() -> Path:
# Try packaged path via importlib.resources (works for wheel/installed).
try: # pragma: no cover - trivial path resolution
pkg_dir = resources.files("passkey") / "frontend-build"
fs_path = Path(str(pkg_dir))
if fs_path.is_dir():
return fs_path
except Exception: # pragma: no cover - defensive
pass
# Fallback for editable/development before build.
return Path(__file__).parent.parent / "frontend-build"
path: Path = _resolve_static_dir()
def file(*parts: str) -> Path:
"""Return a child path under the static root."""
return path.joinpath(*parts)
def run_dev():
"""Spawn the frontend dev server (bun or npm) as a background process."""
import atexit
import shutil
import signal
import subprocess
devpath = Path(__file__).parent.parent.parent / "frontend"
if not (devpath / "package.json").exists():
raise RuntimeError(
"Dev frontend is only available when running from git."
if "site-packages" in devpath.parts
else f"Frontend source code not found at {devpath}"
)
bun = shutil.which("bun")
npm = shutil.which("npm") if bun is None else None
if not bun and not npm:
raise RuntimeError("Neither bun nor npm found on PATH for dev server")
cmd: list[str] = [bun, "--bun", "run", "dev"] if bun else [npm, "run", "dev"] # type: ignore[list-item]
proc = subprocess.Popen(cmd, cwd=str(devpath))
def _terminate():
if proc.poll() is None:
proc.terminate()
atexit.register(_terminate)
def _signal_handler(signum, frame):
_terminate()
raise SystemExit(0)
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, _signal_handler)

28
passkey/util/permutil.py Normal file
View File

@ -0,0 +1,28 @@
"""Minimal permission helpers with '*' wildcard support (no DB expansion)."""
from collections.abc import Sequence
from fnmatch import fnmatchcase
from ..globals import db
from .tokens import session_key
__all__ = ["has_any", "has_all", "session_context"]
def _match(perms: set[str], patterns: Sequence[str]):
return (
any(fnmatchcase(p, pat) for p in perms) if "*" in pat else pat in perms
for pat in patterns
)
def has_any(ctx, patterns: Sequence[str]) -> bool:
return any(_match(ctx.role.permissions, patterns)) if ctx else False
def has_all(ctx, patterns: Sequence[str]) -> bool:
return all(_match(ctx.role.permissions, patterns)) if ctx else False
async def session_context(auth: str | None):
return await db.instance.get_session_context(session_key(auth)) if auth else None

View File

@ -29,15 +29,14 @@ dev = [
[tool.ruff]
target-version = "py39"
line-length = 88
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"]
ignore = ["E501"] # Line too long
[tool.ruff.isort]
known-first-party = ["passkey"]
isort.known-first-party = ["passkey"]
[project.scripts]
passkey-auth = "passkey.fastapi.__main__:main"
[tool.hatch.build]
artifacts = ["passkeyauth/frontend-static"]
artifacts = ["passkey/frontend-build"]
targets.sdist.hooks.custom.path = "scripts/build-frontend.py"