Compare commits

..

14 Commits

Author SHA1 Message Date
Leo Vasanko
b0a1bb72dc Cleaner logout. 2025-09-02 19:11:25 -06:00
Leo Vasanko
b324276173 Cleaned up login/logout flows. 2025-09-02 19:08:16 -06:00
Leo Vasanko
10e55f63b5 Fix url_for query arg on reset link redirect. 2025-09-02 18:32:56 -06:00
Leo Vasanko
074daebd14 Fix matching bug 2025-09-02 18:22:21 -06:00
Leo Vasanko
c9f9b28bf4 Major refactoring of admin API (permissions, paths) 2025-09-02 18:08:06 -06:00
Leo Vasanko
bfc777fb56 Refactoring permissions checks. 2025-09-02 17:28:26 -06:00
Leo Vasanko
3cd6a59b26 Utility module for accessing frontend in backend code. 2025-09-02 16:06:10 -06:00
Leo Vasanko
dd20e7e7f8 Move forward auth under /admin/api/forward 2025-09-02 15:03:39 -06:00
Leo Vasanko
cbf6223d4b New lint option path in pyproject 2025-09-02 15:03:02 -06:00
Leo Vasanko
9feac6e9a8 Moved exception handlers to sub apps. 2025-09-02 14:57:06 -06:00
Leo Vasanko
8c07945661 Rename variable to silence linter 2025-09-02 14:45:23 -06:00
Leo Vasanko
312d23b79a Refactor API under /auth/api 2025-09-02 14:32:19 -06:00
Leo Vasanko
859cc9ed41 Restructure admin app separate of user api. 2025-09-02 14:04:52 -06:00
Leo Vasanko
cead912ddc Remove icon, prefer automatic use of /favicon.ico of the host site. 2025-09-02 10:17:40 -06:00
22 changed files with 811 additions and 843 deletions

2
API.md
View File

@ -8,7 +8,7 @@ This document describes all API endpoints available in the PassKey Auth FastAPI
### HTTP Endpoints ### HTTP Endpoints
GET /auth/ - Main authentication app GET /auth/ - Main authentication app
GET /auth/forward-auth - Authentication validation for Caddy/Nginx GET /auth/api/forward - Authentication validation for Caddy/Nginx (was /auth/forward-auth)
POST /auth/validate - Token validation endpoint POST /auth/validate - Token validation endpoint
POST /auth/user-info - Get authenticated user information POST /auth/user-info - Get authenticated user information
POST /auth/logout - Logout current user POST /auth/logout - Logout current user

View File

@ -1,7 +1,7 @@
(auth) { (auth) {
# Permission check (named arg: perm=...) # Permission check (named arg: perm=...)
forward_auth localhost:4401 { forward_auth localhost:4401 {
uri /auth/forward-auth?{args.0} uri /auth/api/forward?{args.0}
copy_headers x-auth-* copy_headers x-auth-*
} }
} }

View File

@ -2,7 +2,6 @@
<html lang=""> <html lang="">
<head> <head>
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<link rel="icon" href="/src/assets/icon.webp" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Admin</title> <title>Admin</title>
</head> </head>

View File

@ -2,7 +2,6 @@
<html lang=""> <html lang="">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<link rel="icon" href="/src/assets/icon.webp">
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Authentication</title> <title>Authentication</title>
</head> </head>

View File

@ -22,8 +22,12 @@ import PermissionDeniedView from '@/components/PermissionDeniedView.vue'
const store = useAuthStore() const store = useAuthStore()
onMounted(async () => { onMounted(async () => {
// Detect restricted mode: any path not starting with /auth/ // Detect restricted mode:
if (!location.pathname.startsWith('/auth/')) { // We only allow full functionality on the exact /auth/ (or /auth) path.
// Any other path (including /, /foo, /auth/admin, etc.) is treated as restricted
// so the app will only show login or permission denied views.
const path = location.pathname
if (!(path === '/auth/' || path === '/auth')) {
store.setRestrictedMode(true) store.setRestrictedMode(true)
} }
// Load branding / settings first (non-blocking for auth flow) // Load branding / settings first (non-blocking for auth flow)

View File

@ -147,7 +147,7 @@ async function loadOrgs() {
if (data.detail) throw new Error(data.detail) if (data.detail) throw new Error(data.detail)
// Restructure to attach users to roles instead of flat user list at org level // Restructure to attach users to roles instead of flat user list at org level
orgs.value = data.map(o => { orgs.value = data.map(o => {
const roles = o.roles.map(r => ({ ...r, users: [] })) const roles = o.roles.map(r => ({ ...r, org_uuid: o.uuid, users: [] }))
const roleMap = Object.fromEntries(roles.map(r => [r.display_name, r])) const roleMap = Object.fromEntries(roles.map(r => [r.display_name, r]))
for (const u of o.users || []) { for (const u of o.users || []) {
if (roleMap[u.role]) roleMap[u.role].users.push(u) if (roleMap[u.role]) roleMap[u.role].users.push(u)
@ -167,7 +167,7 @@ async function load() {
loading.value = true loading.value = true
error.value = null error.value = null
try { try {
const res = await fetch('/auth/user-info', { method: 'POST' }) const res = await fetch('/auth/api/user-info', { method: 'POST' })
const data = await res.json() const data = await res.json()
if (data.detail) throw new Error(data.detail) if (data.detail) throw new Error(data.detail)
info.value = data info.value = data
@ -250,7 +250,7 @@ function updateRole(role) { openDialog('role-update', { role }) }
function deleteRole(role) { function deleteRole(role) {
openDialog('confirm', { message: `Delete role ${role.display_name}?`, action: async () => { openDialog('confirm', { message: `Delete role ${role.display_name}?`, action: async () => {
const res = await fetch(`/auth/admin/roles/${role.uuid}`, { method: 'DELETE' }) const res = await fetch(`/auth/admin/orgs/${role.org_uuid}/roles/${role.uuid}`, { method: 'DELETE' })
const data = await res.json(); if (data.detail) throw new Error(data.detail) const data = await res.json(); if (data.detail) throw new Error(data.detail)
await loadOrgs() await loadOrgs()
} }) } })
@ -321,7 +321,7 @@ const pageHeading = computed(() => {
watch(selectedUser, async (u) => { watch(selectedUser, async (u) => {
if (!u) { userDetail.value = null; return } if (!u) { userDetail.value = null; return }
try { try {
const res = await fetch(`/auth/admin/users/${u.uuid}`) const res = await fetch(`/auth/admin/orgs/${u.org_uuid}/users/${u.uuid}`)
const data = await res.json() const data = await res.json()
if (data.detail) throw new Error(data.detail) if (data.detail) throw new Error(data.detail)
userDetail.value = data userDetail.value = data
@ -359,7 +359,7 @@ async function toggleRolePermission(role, permId, checked) {
const prev = [...role.permissions] const prev = [...role.permissions]
role.permissions = next role.permissions = next
try { try {
const res = await fetch(`/auth/admin/roles/${role.uuid}`, { const res = await fetch(`/auth/admin/orgs/${role.org_uuid}/roles/${role.uuid}`, {
method: 'PUT', method: 'PUT',
headers: { 'content-type': 'application/json' }, headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: role.display_name, permissions: next }) body: JSON.stringify({ display_name: role.display_name, permissions: next })
@ -379,7 +379,7 @@ async function onUserNameSaved() {
await loadOrgs() await loadOrgs()
if (selectedUser.value) { if (selectedUser.value) {
try { try {
const r = await fetch(`/auth/admin/users/${selectedUser.value.uuid}`) const r = await fetch(`/auth/admin/orgs/${selectedUser.value.org_uuid}/users/${selectedUser.value.uuid}`)
const jd = await r.json() const jd = await r.json()
if (!r.ok || jd.detail) throw new Error(jd.detail || 'Reload failed') if (!r.ok || jd.detail) throw new Error(jd.detail || 'Reload failed')
userDetail.value = jd userDetail.value = jd
@ -409,7 +409,7 @@ async function submitDialog() {
const { role } = dialog.value.data; const name = dialog.value.data.name?.trim(); if (!name) throw new Error('Name required') const { role } = dialog.value.data; const name = dialog.value.data.name?.trim(); if (!name) throw new Error('Name required')
const permsCsv = dialog.value.data.perms || '' const permsCsv = dialog.value.data.perms || ''
const perms = permsCsv.split(',').map(s=>s.trim()).filter(Boolean) const perms = permsCsv.split(',').map(s=>s.trim()).filter(Boolean)
const res = await fetch(`/auth/admin/roles/${role.uuid}`, { method: 'PUT', headers: { 'content-type': 'application/json' }, body: JSON.stringify({ display_name: name, permissions: perms }) }) const res = await fetch(`/auth/admin/orgs/${role.org_uuid}/roles/${role.uuid}`, { method: 'PUT', headers: { 'content-type': 'application/json' }, body: JSON.stringify({ display_name: name, permissions: perms }) })
const d = await res.json(); if (d.detail) throw new Error(d.detail); await loadOrgs() const d = await res.json(); if (d.detail) throw new Error(d.detail); await loadOrgs()
} else if (t === 'user-create') { } else if (t === 'user-create') {
const { org, role } = dialog.value.data; const name = dialog.value.data.name?.trim(); if (!name) throw new Error('Name required') const { org, role } = dialog.value.data; const name = dialog.value.data.name?.trim(); if (!name) throw new Error('Name required')
@ -496,7 +496,7 @@ async function submitDialog() {
:loading="loading" :loading="loading"
:org-display-name="userDetail.org.display_name" :org-display-name="userDetail.org.display_name"
:role-name="userDetail.role" :role-name="userDetail.role"
:update-endpoint="`/auth/admin/users/${selectedUser.uuid}/display-name`" :update-endpoint="`/auth/admin/orgs/${selectedUser.org_uuid}/users/${selectedUser.uuid}/display-name`"
@saved="onUserNameSaved" @saved="onUserNameSaved"
/> />
<div v-else-if="userDetail?.error" class="error small">{{ userDetail.error }}</div> <div v-else-if="userDetail?.error" class="error small">{{ userDetail.error }}</div>
@ -512,7 +512,7 @@ async function submitDialog() {
<p class="matrix-hint muted">Use the token dialog to register a new credential for the member.</p> <p class="matrix-hint muted">Use the token dialog to register a new credential for the member.</p>
<RegistrationLinkModal <RegistrationLinkModal
v-if="showRegModal" v-if="showRegModal"
:endpoint="`/auth/admin/users/${selectedUser.uuid}/create-link`" :endpoint="`/auth/admin/orgs/${selectedUser.org_uuid}/users/${selectedUser.uuid}/create-link`"
:auto-copy="false" :auto-copy="false"
@close="showRegModal = false" @close="showRegModal = false"
@copied="onLinkCopied" @copied="onLinkCopied"

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

View File

@ -46,7 +46,7 @@ const copyLink = async (event) => {
onMounted(async () => { onMounted(async () => {
try { try {
const response = await fetch('/auth/create-link', { method: 'POST' }) const response = await fetch('/auth/api/create-link', { method: 'POST' })
const result = await response.json() const result = await response.json()
if (result.detail) throw new Error(result.detail) if (result.detail) throw new Error(result.detail)

View File

@ -27,9 +27,9 @@ const handleLogin = async () => {
await authStore.authenticate() await authStore.authenticate()
authStore.showMessage('Authentication successful!', 'success', 2000) authStore.showMessage('Authentication successful!', 'success', 2000)
if (authStore.restrictedMode) { if (authStore.restrictedMode) {
// In restricted mode after successful auth show permission denied (no profile outside /auth/) // Restricted mode: reload so the app re-mounts and selectView() applies (will become permission denied)
authStore.currentView = 'permission-denied' location.reload()
} else if (location.pathname.startsWith('/auth/')) { } else if (location.pathname === '/auth/') {
authStore.currentView = 'profile' authStore.currentView = 'profile'
} else { } else {
location.reload() location.reload()

View File

@ -32,7 +32,6 @@ function back() {
} }
async function logout() { async function logout() {
await authStore.logout() await authStore.logout()
authStore.currentView = 'login'
} }
</script> </script>
<style scoped> <style scoped>

View File

@ -9,7 +9,7 @@
:created-at="authStore.userInfo.user.created_at" :created-at="authStore.userInfo.user.created_at"
:last-seen="authStore.userInfo.user.last_seen" :last-seen="authStore.userInfo.user.last_seen"
:loading="authStore.isLoading" :loading="authStore.isLoading"
update-endpoint="/auth/user/display-name" update-endpoint="/auth/api/user/display-name"
@saved="authStore.loadUserInfo()" @saved="authStore.loadUserInfo()"
/> />
@ -144,7 +144,6 @@ const deleteCredential = async (credentialId) => {
const logout = async () => { const logout = async () => {
await authStore.logout() await authStore.logout()
authStore.currentView = 'login'
} }
const isAdmin = computed(() => !!(authStore.userInfo?.is_global_admin || authStore.userInfo?.is_org_admin)) const isAdmin = computed(() => !!(authStore.userInfo?.is_global_admin || authStore.userInfo?.is_org_admin))

View File

@ -8,7 +8,7 @@ export const useAuthStore = defineStore('auth', {
settings: null, // Server provided settings (/auth/settings) settings: null, // Server provided settings (/auth/settings)
isLoading: false, isLoading: false,
resetToken: null, // transient reset token resetToken: null, // transient reset token
restrictedMode: false, // If true, app loaded outside /auth/ and should restrict to login or permission denied restrictedMode: false, // Anywhere other than /auth/: restrict to login or permission denied
// UI State // UI State
currentView: 'login', currentView: 'login',
@ -32,7 +32,7 @@ export const useAuthStore = defineStore('auth', {
} }
}, },
async setSessionCookie(sessionToken) { async setSessionCookie(sessionToken) {
const response = await fetch('/auth/set-session', { const response = await fetch('/auth/api/set-session', {
method: 'POST', method: 'POST',
headers: {'Authorization': `Bearer ${sessionToken}`}, headers: {'Authorization': `Bearer ${sessionToken}`},
}) })
@ -87,7 +87,7 @@ export const useAuthStore = defineStore('auth', {
async loadUserInfo() { async loadUserInfo() {
const headers = {} const headers = {}
// Reset tokens are only passed via query param now, not Authorization header // Reset tokens are only passed via query param now, not Authorization header
const url = this.resetToken ? `/auth/user-info?reset=${encodeURIComponent(this.resetToken)}` : '/auth/user-info' const url = this.resetToken ? `/auth/api/user-info?reset=${encodeURIComponent(this.resetToken)}` : '/auth/api/user-info'
const response = await fetch(url, { method: 'POST', headers }) const response = await fetch(url, { method: 'POST', headers })
let result = null let result = null
try { try {
@ -109,7 +109,7 @@ export const useAuthStore = defineStore('auth', {
}, },
async loadSettings() { async loadSettings() {
try { try {
const res = await fetch('/auth/settings') const res = await fetch('/auth/api/settings')
if (!res.ok) return if (!res.ok) return
const data = await res.json() const data = await res.json()
this.settings = data this.settings = data
@ -121,7 +121,7 @@ export const useAuthStore = defineStore('auth', {
} }
}, },
async deleteCredential(uuid) { async deleteCredential(uuid) {
const response = await fetch(`/auth/credential/${uuid}`, {method: 'Delete'}) const response = await fetch(`/auth/api/credential/${uuid}`, {method: 'Delete'})
const result = await response.json() const result = await response.json()
if (result.detail) throw new Error(`Server: ${result.detail}`) if (result.detail) throw new Error(`Server: ${result.detail}`)
@ -129,12 +129,12 @@ export const useAuthStore = defineStore('auth', {
}, },
async logout() { async logout() {
try { try {
await fetch('/auth/logout', {method: 'POST'}) await fetch('/auth/api/logout', {method: 'POST'})
location.reload()
} catch (error) { } catch (error) {
console.error('Logout error:', error) console.error('Logout error:', error)
this.showMessage(error.message, 'error')
} }
this.userInfo = null
}, },
} }
}) })

View File

@ -18,13 +18,13 @@ from .util import passphrase, tokens
def _init_logger() -> logging.Logger: def _init_logger() -> logging.Logger:
l = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if not l.handlers and not logging.getLogger().handlers: if not logger.handlers and not logging.getLogger().handlers:
h = logging.StreamHandler() h = logging.StreamHandler()
h.setFormatter(logging.Formatter("%(message)s")) h.setFormatter(logging.Formatter("%(message)s"))
l.addHandler(h) logger.addHandler(h)
l.setLevel(logging.INFO) logger.setLevel(logging.INFO)
return l return logger
logger = _init_logger() logger = _init_logger()

View File

@ -1,17 +1,14 @@
import argparse import argparse
import asyncio import asyncio
import atexit
import contextlib
import ipaddress import ipaddress
import logging import logging
import os import os
import signal
import subprocess
from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
import uvicorn import uvicorn
from passkey.util import frontend
DEFAULT_HOST = "localhost" DEFAULT_HOST = "localhost"
DEFAULT_SERVE_PORT = 4401 DEFAULT_SERVE_PORT = 4401
DEFAULT_DEV_PORT = 4402 DEFAULT_DEV_PORT = 4402
@ -180,36 +177,11 @@ def main():
run_kwargs["host"] = host run_kwargs["host"] = host
run_kwargs["port"] = port run_kwargs["port"] = port
bun_process: subprocess.Popen | None = None
if devmode: if devmode:
# Spawn frontend dev server (bun) only in the original parent (avoid duplicates on reload) # Spawn frontend dev server (bun or npm) only once in parent process
if os.environ.get("PASSKEY_BUN_PARENT") != "1": if os.environ.get("PASSKEY_BUN_PARENT") != "1":
os.environ["PASSKEY_BUN_PARENT"] = "1" os.environ["PASSKEY_BUN_PARENT"] = "1"
frontend_dir = Path(__file__).parent.parent.parent / "frontend" frontend.run_dev()
if (frontend_dir / "package.json").exists():
try:
bun_process = subprocess.Popen(
["bun", "--bun", "run", "dev"], cwd=str(frontend_dir)
)
logging.info("Started bun dev server")
except FileNotFoundError:
logging.warning(
"bun not found: skipping frontend dev server (install bun)"
)
def _terminate_bun(): # pragma: no cover
if bun_process and bun_process.poll() is None:
with contextlib.suppress(Exception):
bun_process.terminate()
atexit.register(_terminate_bun)
def _signal_handler(signum, frame): # pragma: no cover
_terminate_bun()
raise SystemExit(0)
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
if all_ifaces and not uds: if all_ifaces and not uds:
# If reload enabled, fallback to single dual-stack attempt (::) to keep reload simple # If reload enabled, fallback to single dual-stack attempt (::) to keep reload simple

444
passkey/fastapi/admin.py Normal file
View File

@ -0,0 +1,444 @@
import logging
from uuid import UUID, uuid4
from fastapi import Body, Cookie, FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from ..authsession import expires
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import frontend, passphrase, permutil, querysafe, tokens
from . import authz
app = FastAPI()
@app.exception_handler(ValueError)
async def value_error_handler(_request, exc: ValueError): # pragma: no cover - simple
return JSONResponse(status_code=400, content={"detail": str(exc)})
@app.exception_handler(Exception)
async def general_exception_handler(_request, exc: Exception):
logging.exception("Unhandled exception in admin app")
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
@app.get("")
def adminapp_slashmissing(request: Request):
print("HERE")
return RedirectResponse(url=request.url_for("adminapp"))
@app.get("/")
async def adminapp(auth=Cookie(None)):
try:
await authz.verify(auth, ["auth:admin", "auth:org:*"], match=permutil.has_any)
return FileResponse(frontend.file("admin/index.html"))
except HTTPException as e:
return FileResponse(frontend.file("index.html"), status_code=e.status_code)
# -------------------- Organizations --------------------
@app.get("/orgs")
async def admin_list_orgs(auth=Cookie(None)):
ctx = await authz.verify(auth, ["auth:admin", "auth:org:*"], match=permutil.has_any)
orgs = await db.instance.list_organizations()
if "auth:admin" not in ctx.role.permissions:
orgs = [o for o in orgs if f"auth:org:{o.uuid}" in ctx.role.permissions]
def role_to_dict(r):
return {
"uuid": str(r.uuid),
"org_uuid": str(r.org_uuid),
"display_name": r.display_name,
"permissions": r.permissions,
}
async def org_to_dict(o):
users = await db.instance.get_organization_users(str(o.uuid))
return {
"uuid": str(o.uuid),
"display_name": o.display_name,
"permissions": o.permissions,
"roles": [role_to_dict(r) for r in o.roles],
"users": [
{
"uuid": str(u.uuid),
"display_name": u.display_name,
"role": role_name,
"visits": u.visits,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
}
for (u, role_name) in users
],
}
return [await org_to_dict(o) for o in orgs]
@app.post("/orgs")
async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"])
from ..db import Org as OrgDC # local import to avoid cycles
org_uuid = uuid4()
display_name = payload.get("display_name") or "New Organization"
permissions = payload.get("permissions") or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.create_organization(org)
return {"uuid": str(org_uuid)}
@app.put("/orgs/{org_uuid}")
async def admin_update_org(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
from ..db import Org as OrgDC # local import to avoid cycles
current = await db.instance.get_organization(str(org_uuid))
display_name = payload.get("display_name") or current.display_name
permissions = payload.get("permissions") or current.permissions or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.update_organization(org)
return {"status": "ok"}
@app.delete("/orgs/{org_uuid}")
async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
ctx = await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
if ctx.org.uuid == org_uuid:
raise ValueError("Cannot delete the organization you belong to")
await db.instance.delete_organization(org_uuid)
return {"status": "ok"}
@app.post("/orgs/{org_uuid}/permission")
async def admin_add_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
await authz.verify(auth, ["auth:admin"])
await db.instance.add_permission_to_organization(str(org_uuid), permission_id)
return {"status": "ok"}
@app.delete("/orgs/{org_uuid}/permission")
async def admin_remove_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
await authz.verify(auth, ["auth:admin"])
await db.instance.remove_permission_from_organization(str(org_uuid), permission_id)
return {"status": "ok"}
# -------------------- Roles --------------------
@app.post("/orgs/{org_uuid}/roles")
async def admin_create_role(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
await authz.verify(auth, ["auth:admin", f"auth:org:{org_uuid}"])
from ..db import Role as RoleDC
role_uuid = uuid4()
display_name = payload.get("display_name") or "New Role"
perms = payload.get("permissions") or []
org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or [])
for pid in perms:
await db.instance.get_permission(pid)
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
role = RoleDC(
uuid=role_uuid,
org_uuid=org_uuid,
display_name=display_name,
permissions=perms,
)
await db.instance.create_role(role)
return {"uuid": str(role_uuid)}
@app.put("/orgs/{org_uuid}/roles/{role_uuid}")
async def admin_update_role(
org_uuid: UUID, role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
# Verify caller is global admin or admin of provided org
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
role = await db.instance.get_role(role_uuid)
if role.org_uuid != org_uuid:
raise HTTPException(status_code=404, detail="Role not found in organization")
from ..db import Role as RoleDC
display_name = payload.get("display_name") or role.display_name
permissions = payload.get("permissions") or role.permissions
org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid)
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
updated = RoleDC(
uuid=role_uuid,
org_uuid=org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.update_role(updated)
return {"status": "ok"}
@app.delete("/orgs/{org_uuid}/roles/{role_uuid}")
async def admin_delete_role(org_uuid: UUID, role_uuid: UUID, auth=Cookie(None)):
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
role = await db.instance.get_role(role_uuid)
if role.org_uuid != org_uuid:
raise HTTPException(status_code=404, detail="Role not found in organization")
await db.instance.delete_role(role_uuid)
return {"status": "ok"}
# -------------------- Users --------------------
@app.post("/orgs/{org_uuid}/users")
async def admin_create_user(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
display_name = payload.get("display_name")
role_name = payload.get("role")
if not display_name or not role_name:
raise ValueError("display_name and role are required")
from ..db import User as UserDC
roles = await db.instance.get_roles_by_organization(str(org_uuid))
role_obj = next((r for r in roles if r.display_name == role_name), None)
if not role_obj:
raise ValueError("Role not found in organization")
user_uuid = uuid4()
user = UserDC(
uuid=user_uuid,
display_name=display_name,
role_uuid=role_obj.uuid,
visits=0,
created_at=None,
)
await db.instance.create_user(user)
return {"uuid": str(user_uuid)}
@app.put("/orgs/{org_uuid}/users/{user_uuid}/role")
async def admin_update_user_role(
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
new_role = payload.get("role")
if not new_role:
raise ValueError("role is required")
try:
user_org, _current_role = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise ValueError("User not found")
if user_org.uuid != org_uuid:
raise ValueError("User does not belong to this organization")
roles = await db.instance.get_roles_by_organization(str(org_uuid))
if not any(r.display_name == new_role for r in roles):
raise ValueError("Role not found in organization")
await db.instance.update_user_role_in_organization(user_uuid, new_role)
return {"status": "ok"}
@app.post("/orgs/{org_uuid}/users/{user_uuid}/create-link")
async def admin_create_user_registration_link(
org_uuid: UUID, user_uuid: UUID, auth=Cookie(None)
):
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if user_org.uuid != org_uuid:
raise HTTPException(status_code=404, detail="User not found in organization")
ctx = await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
if (
"auth:admin" not in ctx.role.permissions
and f"auth:org:{org_uuid}" not in ctx.role.permissions
):
raise HTTPException(status_code=403, detail="Insufficient permissions")
token = passphrase.generate()
await db.instance.create_session(
user_uuid=user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info={"type": "device addition", "created_by_admin": True},
)
origin = global_passkey.instance.origin
url = f"{origin}/auth/{token}"
return {"url": url, "expires": expires().isoformat()}
@app.get("/orgs/{org_uuid}/users/{user_uuid}")
async def admin_get_user_detail(org_uuid: UUID, user_uuid: UUID, auth=Cookie(None)):
try:
user_org, role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if user_org.uuid != org_uuid:
raise HTTPException(status_code=404, detail="User not found in organization")
ctx = await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
if (
"auth:admin" not in ctx.role.permissions
and f"auth:org:{org_uuid}" not in ctx.role.permissions
):
raise HTTPException(status_code=403, detail="Insufficient permissions")
user = await db.instance.get_user_by_uuid(user_uuid)
cred_ids = await db.instance.get_credentials_by_user_uuid(user_uuid)
creds: list[dict] = []
aaguids: set[str] = set()
for cid in cred_ids:
try:
c = await db.instance.get_credential_by_id(cid)
except ValueError:
continue
aaguid_str = str(c.aaguid)
aaguids.add(aaguid_str)
creds.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat()
if c.last_verified
else None,
"sign_count": c.sign_count,
}
)
from .. import aaguid as aaguid_mod
aaguid_info = aaguid_mod.filter(aaguids)
return {
"display_name": user.display_name,
"org": {"display_name": user_org.display_name},
"role": role_name,
"visits": user.visits,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_seen": user.last_seen.isoformat() if user.last_seen else None,
"credentials": creds,
"aaguid_info": aaguid_info,
}
@app.put("/orgs/{org_uuid}/users/{user_uuid}/display-name")
async def admin_update_user_display_name(
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if user_org.uuid != org_uuid:
raise HTTPException(status_code=404, detail="User not found in organization")
ctx = await authz.verify(
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
)
if (
"auth:admin" not in ctx.role.permissions
and f"auth:org:{org_uuid}" not in ctx.role.permissions
):
raise HTTPException(status_code=403, detail="Insufficient permissions")
new_name = (payload.get("display_name") or "").strip()
if not new_name:
raise HTTPException(status_code=400, detail="display_name required")
if len(new_name) > 64:
raise HTTPException(status_code=400, detail="display_name too long")
await db.instance.update_user_display_name(user_uuid, new_name)
return {"status": "ok"}
# -------------------- Permissions (global) --------------------
@app.get("/permissions")
async def admin_list_permissions(auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"], match=permutil.has_any)
perms = await db.instance.list_permissions()
return [{"id": p.id, "display_name": p.display_name} for p in perms]
@app.post("/permissions")
async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"])
from ..db import Permission as PermDC
perm_id = payload.get("id")
display_name = payload.get("display_name")
if not perm_id or not display_name:
raise ValueError("id and display_name are required")
querysafe.assert_safe(perm_id, field="id")
await db.instance.create_permission(PermDC(id=perm_id, display_name=display_name))
return {"status": "ok"}
@app.put("/permission")
async def admin_update_permission(
permission_id: str, display_name: str, auth=Cookie(None)
):
await authz.verify(auth, ["auth:admin"])
from ..db import Permission as PermDC
if not display_name:
raise ValueError("display_name is required")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.update_permission(
PermDC(id=permission_id, display_name=display_name)
)
return {"status": "ok"}
@app.post("/permission/rename")
async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"])
old_id = payload.get("old_id")
new_id = payload.get("new_id")
display_name = payload.get("display_name")
if not old_id or not new_id:
raise ValueError("old_id and new_id required")
querysafe.assert_safe(old_id, field="old_id")
querysafe.assert_safe(new_id, field="new_id")
if display_name is None:
perm = await db.instance.get_permission(old_id)
display_name = perm.display_name
rename_fn = getattr(db.instance, "rename_permission", None)
if not rename_fn:
raise ValueError("Permission renaming not supported by this backend")
await rename_fn(old_id, new_id, display_name)
return {"status": "ok"}
@app.delete("/permission")
async def admin_delete_permission(permission_id: str, auth=Cookie(None)):
await authz.verify(auth, ["auth:admin"])
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.delete_permission(permission_id)
return {"status": "ok"}

View File

@ -1,77 +1,79 @@
""" import logging
API endpoints for user management and session handling. from contextlib import suppress
from uuid import UUID
This module contains all the HTTP API endpoints for: from fastapi import (
- User information retrieval Body,
- User credentials management Cookie,
- Session token validation and refresh Depends,
- Login/logout functionality FastAPI,
""" HTTPException,
Query,
from uuid import UUID, uuid4 Request,
Response,
from fastapi import Body, Cookie, Depends, FastAPI, HTTPException, Query, Response )
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer from fastapi.security import HTTPBearer
from passkey.util import passphrase
from .. import aaguid from .. import aaguid
from ..authsession import delete_credential, expires, get_reset, get_session from ..authsession import delete_credential, expires, get_reset, get_session
from ..globals import db from ..globals import db
from ..globals import passkey as global_passkey from ..globals import passkey as global_passkey
from ..util import querysafe, tokens from ..util import passphrase, permutil, tokens
from ..util.tokens import session_key from ..util.tokens import session_key
from . import authz, session from . import authz, session
bearer_auth = HTTPBearer(auto_error=True) bearer_auth = HTTPBearer(auto_error=True)
app = FastAPI()
def register_api_routes(app: FastAPI):
"""Register all API routes on the FastAPI app."""
async def _get_ctx_and_admin_flags(auth_cookie: str): @app.exception_handler(ValueError)
"""Helper to get session context and admin flags from cookie.""" async def value_error_handler(_request: Request, exc: ValueError): # pragma: no cover
if not auth_cookie: return JSONResponse(status_code=400, content={"detail": str(exc)})
raise ValueError("Not authenticated")
ctx = await db.instance.get_session_context(session_key(auth_cookie))
if not ctx:
raise ValueError("Not authenticated")
role_perm_ids = set(ctx.role.permissions or [])
org_uuid_str = str(ctx.org.uuid)
is_global_admin = "auth:admin" in role_perm_ids
is_org_admin = f"auth:org:{org_uuid_str}" in role_perm_ids
return ctx, is_global_admin, is_org_admin
@app.post("/auth/validate")
async def validate_token(perm=Query(None), auth=Cookie(None)): @app.exception_handler(Exception)
"""Lightweight token validation endpoint. async def general_exception_handler(
_request: Request, exc: Exception
): # pragma: no cover
logging.exception("Unhandled exception in API app")
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
@app.post("/validate")
async def validate_token(perm: list[str] = Query([]), auth=Cookie(None)):
ctx = await authz.verify(auth, perm)
return {"valid": True, "user_uuid": str(ctx.session.user_uuid)}
@app.get("/forward")
async def forward_authentication(perm: list[str] = Query([]), auth=Cookie(None)):
"""Forward auth validation for Caddy/Nginx (moved from /auth/forward-auth).
Query Params: Query Params:
- perm: repeated permission IDs the caller must possess (ALL required) - perm: repeated permission IDs the authenticated user must possess (ALL required).
Success: 204 No Content with x-auth-user-uuid header.
Failure (unauthenticated / unauthorized): 4xx JSON body with detail.
""" """
try:
ctx = await authz.verify(auth, perm)
return Response(
status_code=204, headers={"x-auth-user-uuid": str(ctx.session.user_uuid)}
)
except HTTPException as e: # pass through explicitly
raise e
s = await authz.verify(auth, perm)
return {"valid": True, "user_uuid": str(s.user_uuid)}
@app.get("/auth/settings") @app.get("/settings")
async def get_settings(): async def get_settings():
"""Return server runtime settings safe for public consumption.
Provides relying party metadata used by the frontend to brand UI.
"""
pk = global_passkey.instance pk = global_passkey.instance
return { return {"rp_id": pk.rp_id, "rp_name": pk.rp_name}
"rp_id": pk.rp_id,
"rp_name": pk.rp_name,
}
@app.post("/auth/user-info")
async def api_user_info(reset: str | None = None, auth=Cookie(None)):
"""Get user information.
- For authenticated sessions: return full context (org/role/permissions/credentials) @app.post("/user-info")
- For reset tokens: return only basic user information to drive reset flow async def api_user_info(reset: str | None = None, auth=Cookie(None)):
"""
authenticated = False authenticated = False
try: try:
if reset: if reset:
@ -88,29 +90,24 @@ def register_api_routes(app: FastAPI):
u = await db.instance.get_user_by_uuid(s.user_uuid) u = await db.instance.get_user_by_uuid(s.user_uuid)
# Minimal response for reset tokens if not authenticated: # minimal response for reset tokens
if not authenticated:
return { return {
"authenticated": False, "authenticated": False,
"session_type": s.info.get("type"), "session_type": s.info.get("type"),
"user": { "user": {"user_uuid": str(u.uuid), "user_name": u.display_name},
"user_uuid": str(u.uuid),
"user_name": u.display_name,
},
} }
# Full context for authenticated sessions
assert authenticated and auth is not None assert authenticated and auth is not None
ctx = await db.instance.get_session_context(session_key(auth))
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
ctx = await permutil.session_context(auth)
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
credentials: list[dict] = [] credentials: list[dict] = []
user_aaguids: set[str] = set() user_aaguids: set[str] = set()
for cred_id in credential_ids: for cred_id in credential_ids:
try: try:
c = await db.instance.get_credential_by_id(cred_id) c = await db.instance.get_credential_by_id(cred_id)
except ValueError: except ValueError:
continue # Skip dangling IDs continue
aaguid_str = str(c.aaguid) aaguid_str = str(c.aaguid)
user_aaguids.add(aaguid_str) user_aaguids.add(aaguid_str)
credentials.append( credentials.append(
@ -126,8 +123,7 @@ def register_api_routes(app: FastAPI):
"is_current_session": s.credential_uuid == c.uuid, "is_current_session": s.credential_uuid == c.uuid,
} }
) )
credentials.sort(key=lambda cred: cred["created_at"])
credentials.sort(key=lambda cred: cred["created_at"]) # chronological
aaguid_info = aaguid.filter(user_aaguids) aaguid_info = aaguid.filter(user_aaguids)
role_info = None role_info = None
@ -147,11 +143,10 @@ def register_api_routes(app: FastAPI):
"permissions": ctx.org.permissions, "permissions": ctx.org.permissions,
} }
effective_permissions = [p.id for p in (ctx.permissions or [])] effective_permissions = [p.id for p in (ctx.permissions or [])]
is_global_admin = "auth:admin" in role_info["permissions"] is_global_admin = "auth:admin" in (role_info["permissions"] or [])
is_org_admin = ( if org_info:
f"auth:org:{org_info['uuid']}" in role_info["permissions"] is_org_admin = f"auth:org:{org_info['uuid']}" in (
if org_info role_info["permissions"] or []
else False
) )
return { return {
@ -173,335 +168,9 @@ def register_api_routes(app: FastAPI):
"aaguid_info": aaguid_info, "aaguid_info": aaguid_info,
} }
# -------------------- Admin API: Organizations --------------------
@app.get("/auth/admin/orgs") @app.put("/user/display-name")
async def admin_list_orgs(auth=Cookie(None)): async def user_update_display_name(payload: dict = Body(...), auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or is_org_admin):
raise ValueError("Insufficient permissions")
orgs = await db.instance.list_organizations()
# If only org admin, filter to their org
if not is_global_admin:
orgs = [o for o in orgs if o.uuid == ctx.org.uuid]
def role_to_dict(r):
return {
"uuid": str(r.uuid),
"org_uuid": str(r.org_uuid),
"display_name": r.display_name,
"permissions": r.permissions,
}
async def org_to_dict(o):
# Fetch users for each org
users = await db.instance.get_organization_users(str(o.uuid))
return {
"uuid": str(o.uuid),
"display_name": o.display_name,
"permissions": o.permissions,
"roles": [role_to_dict(r) for r in o.roles],
"users": [
{
"uuid": str(u.uuid),
"display_name": u.display_name,
"role": role_name,
"visits": u.visits,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
}
for (u, role_name) in users
],
}
return [await org_to_dict(o) for o in orgs]
@app.post("/auth/admin/orgs")
async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Org as OrgDC # local import to avoid cycles in typing
org_uuid = uuid4()
display_name = payload.get("display_name") or "New Organization"
permissions = payload.get("permissions") or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.create_organization(org)
return {"uuid": str(org_uuid)}
@app.put("/auth/admin/orgs/{org_uuid}")
async def admin_update_org(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
# Only global admins can modify org definitions (simpler rule)
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Org as OrgDC # local import to avoid cycles
current = await db.instance.get_organization(str(org_uuid))
display_name = payload.get("display_name") or current.display_name
permissions = payload.get("permissions") or current.permissions or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.update_organization(org)
return {"status": "ok"}
@app.delete("/auth/admin/orgs/{org_uuid}")
async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
# Org admins cannot delete at all (avoid self-lockout)
raise ValueError("Global admin required")
# Prevent deleting the organization that the acting global admin currently belongs to
# if that deletion would remove their effective access (e.g., last org granting auth/admin)
try:
acting_org_uuid = ctx.org.uuid if ctx.org else None
except Exception:
acting_org_uuid = None
if acting_org_uuid and acting_org_uuid == org_uuid:
# Never allow deletion of the caller's own organization to avoid immediate account deletion.
raise ValueError("Cannot delete the organization you belong to")
await db.instance.delete_organization(org_uuid)
return {"status": "ok"}
# Manage an org's grantable permissions (query param for permission_id)
@app.post("/auth/admin/orgs/{org_uuid}/permission")
async def admin_add_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.add_permission_to_organization(str(org_uuid), permission_id)
return {"status": "ok"}
@app.delete("/auth/admin/orgs/{org_uuid}/permission")
async def admin_remove_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.remove_permission_from_organization(
str(org_uuid), permission_id
)
return {"status": "ok"}
# -------------------- Admin API: Roles --------------------
@app.post("/auth/admin/orgs/{org_uuid}/roles")
async def admin_create_role(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC
role_uuid = uuid4()
display_name = payload.get("display_name") or "New Role"
permissions = payload.get("permissions") or []
# Validate that permissions exist and are allowed by org
org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid) # raises if not found
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
role = RoleDC(
uuid=role_uuid,
org_uuid=org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.create_role(role)
return {"uuid": str(role_uuid)}
@app.put("/auth/admin/roles/{role_uuid}")
async def admin_update_role(
role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
role = await db.instance.get_role(role_uuid)
# Only org admins for that org or global admin can update
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)):
raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC
display_name = payload.get("display_name") or role.display_name
permissions = payload.get("permissions") or role.permissions
# Validate against org grantable permissions
org = await db.instance.get_organization(str(role.org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid) # raises if not found
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
updated = RoleDC(
uuid=role_uuid,
org_uuid=role.org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.update_role(updated)
return {"status": "ok"}
@app.post("/auth/admin/orgs/{org_uuid}/users")
async def admin_create_user(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
"""Create a new user within an organization.
Body parameters:
- display_name: str (required)
- role: str (required) display name of existing role in that org
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
display_name = payload.get("display_name")
role_name = payload.get("role")
if not display_name or not role_name:
raise ValueError("display_name and role are required")
# Validate role exists in org
from ..db import User as UserDC # local import to avoid cycles
roles = await db.instance.get_roles_by_organization(str(org_uuid))
role_obj = next((r for r in roles if r.display_name == role_name), None)
if not role_obj:
raise ValueError("Role not found in organization")
# Create user
user_uuid = uuid4()
user = UserDC(
uuid=user_uuid,
display_name=display_name,
role_uuid=role_obj.uuid,
visits=0,
created_at=None,
)
await db.instance.create_user(user)
return {"uuid": str(user_uuid)}
@app.delete("/auth/admin/roles/{role_uuid}")
async def admin_delete_role(role_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
role = await db.instance.get_role(role_uuid)
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)):
raise ValueError("Insufficient permissions")
await db.instance.delete_role(role_uuid)
return {"status": "ok"}
# -------------------- Admin API: Users (role management) --------------------
@app.put("/auth/admin/orgs/{org_uuid}/users/{user_uuid}/role")
async def admin_update_user_role(
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
"""Change a user's role within their organization.
Body: {"role": "New Role Display Name"}
Only global admins or admins of the organization can perform this.
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
new_role = payload.get("role")
if not new_role:
raise ValueError("role is required")
# Verify user belongs to this org
try:
user_org, _current_role = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise ValueError("User not found")
if user_org.uuid != org_uuid:
raise ValueError("User does not belong to this organization")
# Ensure role exists in org and update
roles = await db.instance.get_roles_by_organization(str(org_uuid))
if not any(r.display_name == new_role for r in roles):
raise ValueError("Role not found in organization")
await db.instance.update_user_role_in_organization(user_uuid, new_role)
return {"status": "ok"}
@app.post("/auth/admin/users/{user_uuid}/create-link")
async def admin_create_user_registration_link(user_uuid: UUID, auth=Cookie(None)):
"""Create a device registration/reset link for a specific user (admin only).
Returns JSON: {"url": str, "expires": iso8601}
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
# Ensure user exists and fetch their org
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)):
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Generate human-readable reset token and store as session with reset key
token = passphrase.generate()
await db.instance.create_session(
user_uuid=user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info={"type": "device addition", "created_by_admin": True},
)
origin = global_passkey.instance.origin
url = f"{origin}/auth/{token}"
return {"url": url, "expires": expires().isoformat()}
@app.get("/auth/admin/users/{user_uuid}")
async def admin_get_user_detail(user_uuid: UUID, auth=Cookie(None)):
"""Get detailed information about a user (admin only)."""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
try:
user_org, role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)):
raise HTTPException(status_code=403, detail="Insufficient permissions")
user = await db.instance.get_user_by_uuid(user_uuid)
# Gather credentials
cred_ids = await db.instance.get_credentials_by_user_uuid(user_uuid)
creds: list[dict] = []
aaguids: set[str] = set()
for cid in cred_ids:
try:
c = await db.instance.get_credential_by_id(cid)
except ValueError:
continue
aaguid_str = str(c.aaguid)
aaguids.add(aaguid_str)
creds.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat()
if c.last_verified
else None,
"sign_count": c.sign_count,
}
)
from .. import aaguid as aaguid_mod
aaguid_info = aaguid_mod.filter(aaguids)
return {
"display_name": user.display_name,
"org": {"display_name": user_org.display_name},
"role": role_name,
"visits": user.visits,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_seen": user.last_seen.isoformat() if user.last_seen else None,
"credentials": creds,
"aaguid_info": aaguid_info,
}
@app.put("/auth/user/display-name")
async def user_update_display_name(payload: dict = Body(...), auth=Cookie(None)):
"""Authenticated user updates their own display name."""
if not auth: if not auth:
raise HTTPException(status_code=401, detail="Authentication Required") raise HTTPException(status_code=401, detail="Authentication Required")
s = await get_session(auth) s = await get_session(auth)
@ -513,134 +182,47 @@ def register_api_routes(app: FastAPI):
await db.instance.update_user_display_name(s.user_uuid, new_name) await db.instance.update_user_display_name(s.user_uuid, new_name)
return {"status": "ok"} return {"status": "ok"}
@app.put("/auth/admin/users/{user_uuid}/display-name")
async def admin_update_user_display_name(
user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
"""Admin updates a user's display name."""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)):
raise HTTPException(status_code=403, detail="Insufficient permissions")
new_name = (payload.get("display_name") or "").strip()
if not new_name:
raise HTTPException(status_code=400, detail="display_name required")
if len(new_name) > 64:
raise HTTPException(status_code=400, detail="display_name too long")
await db.instance.update_user_display_name(user_uuid, new_name)
return {"status": "ok"}
# Admin API: Permissions (global) @app.post("/logout")
async def api_logout(response: Response, auth=Cookie(None)):
@app.get("/auth/admin/permissions")
async def admin_list_permissions(auth=Cookie(None)):
_, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or is_org_admin):
raise ValueError("Insufficient permissions")
perms = await db.instance.list_permissions()
return [{"id": p.id, "display_name": p.display_name} for p in perms]
@app.post("/auth/admin/permissions")
async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Permission as PermDC
perm_id = payload.get("id")
display_name = payload.get("display_name")
if not perm_id or not display_name:
raise ValueError("id and display_name are required")
querysafe.assert_safe(perm_id, field="id")
await db.instance.create_permission(
PermDC(id=perm_id, display_name=display_name)
)
return {"status": "ok"}
@app.put("/auth/admin/permission")
async def admin_update_permission(
permission_id: str, display_name: str, auth=Cookie(None)
):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Permission as PermDC
if not display_name:
raise ValueError("display_name is required")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.update_permission(
PermDC(id=permission_id, display_name=display_name)
)
return {"status": "ok"}
@app.post("/auth/admin/permission/rename")
async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)):
"""Rename a permission's id (and optionally display name) updating all references.
Body: { "old_id": str, "new_id": str, "display_name": str|null }
"""
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
old_id = payload.get("old_id")
new_id = payload.get("new_id")
display_name = payload.get("display_name")
if not old_id or not new_id:
raise ValueError("old_id and new_id required")
querysafe.assert_safe(old_id, field="old_id")
querysafe.assert_safe(new_id, field="new_id")
if display_name is None:
# Fetch old to retain display name
perm = await db.instance.get_permission(old_id)
display_name = perm.display_name
# rename_permission added to interface; use getattr for forward compatibility
rename_fn = getattr(db.instance, "rename_permission", None)
if not rename_fn:
raise ValueError("Permission renaming not supported by this backend")
await rename_fn(old_id, new_id, display_name)
return {"status": "ok"}
@app.delete("/auth/admin/permission")
async def admin_delete_permission(permission_id: str, auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.delete_permission(permission_id)
return {"status": "ok"}
@app.post("/auth/logout")
async def api_logout(response: Response, auth=Cookie(None)):
"""Log out the current user by clearing the session cookie and deleting from database."""
if not auth: if not auth:
return {"message": "Already logged out"} return {"message": "Already logged out"}
# Remove from database if possible with suppress(Exception):
try:
await db.instance.delete_session(session_key(auth)) await db.instance.delete_session(session_key(auth))
except Exception:
...
response.delete_cookie("auth") response.delete_cookie("auth")
return {"message": "Logged out successfully"} return {"message": "Logged out successfully"}
@app.post("/auth/set-session")
async def api_set_session(response: Response, auth=Depends(bearer_auth)): @app.post("/set-session")
"""Set session cookie from Authorization Bearer session token (never via query).""" async def api_set_session(response: Response, auth=Depends(bearer_auth)):
user = await get_session(auth.credentials) user = await get_session(auth.credentials)
session.set_session_cookie(response, auth.credentials) session.set_session_cookie(response, auth.credentials)
return { return {
"message": "Session cookie set successfully", "message": "Session cookie set successfully",
"user_uuid": str(user.user_uuid), "user_uuid": str(user.user_uuid),
} }
@app.delete("/auth/credential/{uuid}")
async def api_delete_credential( @app.delete("/credential/{uuid}")
response: Response, uuid: UUID, auth: str = Cookie(None) async def api_delete_credential(uuid: UUID, auth: str = Cookie(None)):
):
"""Delete a specific credential for the current user."""
await delete_credential(uuid, auth) await delete_credential(uuid, auth)
return {"message": "Credential deleted successfully"} return {"message": "Credential deleted successfully"}
@app.post("/create-link")
async def api_create_link(request: Request, auth=Cookie(None)):
s = await get_session(auth)
token = passphrase.generate()
await db.instance.create_session(
user_uuid=s.user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info=session.infodict(request, "device addition"),
)
origin = global_passkey.instance.origin.rstrip("/")
url = f"{origin}/auth/{token}"
return {
"message": "Registration link generated successfully",
"url": url,
"expires": expires().isoformat(),
}

View File

@ -1,39 +1,39 @@
"""Authorization utilities shared across FastAPI endpoints. import logging
Provides helper(s) to validate a session token (from cookie) and optionally
enforce that the user possesses a given permission (either via their role or
their organization level permissions).
"""
from fastapi import HTTPException from fastapi import HTTPException
from ..authsession import get_session from ..util import permutil
from ..globals import db
from ..util.tokens import session_key logger = logging.getLogger(__name__)
async def verify(auth: str | None, perm: list[str] | str | None): async def verify(auth: str | None, perm: list[str], match=permutil.has_all):
"""Validate session token and optional list of required permissions. """Validate session token and optional list of required permissions.
Returns the Session object on success. Raises HTTPException on failure. Returns the session context.
Raises HTTPException on failure:
401: unauthenticated / invalid session 401: unauthenticated / invalid session
403: one or more required permissions missing 403: required permissions missing
""" """
if not auth: if not auth:
raise HTTPException(status_code=401, detail="Authentication required") raise HTTPException(status_code=401, detail="Authentication required")
session = await get_session(auth)
if perm is not None: ctx = await permutil.session_context(auth)
if isinstance(perm, str):
perm = [perm]
ctx = await db.instance.get_session_context(session_key(auth))
if not ctx: if not ctx:
raise HTTPException(status_code=401, detail="Session not found") raise HTTPException(status_code=401, detail="Session not found")
available = set(ctx.role.permissions or []) | (
set(ctx.org.permissions or []) if ctx.org else set() if not match(ctx, perm):
# Determine which permissions are missing for clearer diagnostics
missing = sorted(set(perm) - set(ctx.role.permissions))
logger.warning(
"Permission denied: user=%s role=%s missing=%s required=%s granted=%s", # noqa: E501
getattr(ctx.user, "uuid", "?"),
getattr(ctx.role, "display_name", "?"),
missing,
perm,
ctx.role.permissions,
) )
if any(p not in available for p in perm):
raise HTTPException(status_code=403, detail="Permission required") raise HTTPException(status_code=403, detail="Permission required")
return session
return ctx
__all__ = ["verify"]

View File

@ -1,19 +1,14 @@
import contextlib
import logging import logging
import os import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import Cookie, FastAPI, HTTPException, Query, Request, Response from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from ..authsession import get_session from passkey.util import frontend, passphrase
from . import authz, ws
from .api import register_api_routes
from .reset import register_reset_routes
STATIC_DIR = Path(__file__).parent.parent / "frontend-build" from . import admin, api, ws
@asynccontextmanager @asynccontextmanager
@ -51,80 +46,27 @@ async def lifespan(app: FastAPI): # pragma: no cover - startup path
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)
app.mount("/auth/admin/", admin.app)
app.mount("/auth/api/", api.app)
# Global exception handlers app.mount("/auth/ws/", ws.app)
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
"""Handle ValueError exceptions globally with 400 status code."""
return JSONResponse(status_code=400, content={"detail": str(exc)})
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle all other exceptions globally with 500 status code."""
logging.exception("Internal Server Error")
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
# Mount the WebSocket subapp
app.mount("/auth/ws", ws.app)
@app.get("/auth/forward-auth")
async def forward_authentication(request: Request, perm=Query(None), auth=Cookie(None)):
"""A validation endpoint to use with Caddy forward_auth or Nginx auth_request.
Query Params:
- perm: repeated permission IDs the authenticated user must possess (ALL required).
Success: 204 No Content with x-auth-user-uuid header.
Failure (unauthenticated / unauthorized): 4xx with index.html body so the
client (reverse proxy or browser) can initiate auth flow.
"""
try:
s = await authz.verify(auth, perm)
return Response(
status_code=204,
headers={"x-auth-user-uuid": str(s.user_uuid)},
)
except HTTPException as e:
return FileResponse(STATIC_DIR / "index.html", e.status_code)
# Serve static files
app.mount( app.mount(
"/auth/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="static assets" "/auth/assets/", StaticFiles(directory=frontend.file("assets")), name="assets"
) )
@app.get("/auth/") @app.get("/auth/")
async def redirect_to_index(): async def frontapp():
"""Serve the main authentication app.""" """Serve the main authentication app."""
return FileResponse(STATIC_DIR / "index.html") return FileResponse(frontend.file("index.html"))
@app.get("/auth/admin") @app.get("/auth/{reset}")
async def serve_admin(auth=Cookie(None)): async def reset_link(request: Request, reset: str):
"""Serve the admin app entry point if an authenticated session exists. """Pretty URL for reset links."""
if reset == "admin":
If no valid authenticated session cookie is present, return a 401 with the # Admin app missing trailing slash lands here, be friendly to user
main app's index.html so the frontend can initiate login/registration flow. return RedirectResponse(request.url_for("adminapp"), status_code=303)
""" if not passphrase.is_well_formed(reset):
if auth: raise HTTPException(status_code=404)
with contextlib.suppress(ValueError): url = request.url_for("frontapp").include_query_params(reset=reset)
s = await get_session(auth) return RedirectResponse(url, status_code=303)
if s.info and s.info.get("type") == "authenticated":
return FileResponse(STATIC_DIR / "admin" / "index.html")
# Not authenticated: serve main index with 401
return FileResponse(
STATIC_DIR / "index.html",
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
# Register API routes
register_api_routes(app)
register_reset_routes(app)

View File

@ -1,59 +0,0 @@
from pathlib import Path
from fastapi import Cookie, HTTPException, Request, Response
from fastapi.responses import RedirectResponse
from ..authsession import expires, get_session
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import passphrase, tokens
from . import session
# Local copy to avoid circular import with mainapp
STATIC_DIR = Path(__file__).parent.parent / "frontend-build"
def register_reset_routes(app):
"""Register all device addition/reset routes on the FastAPI app."""
@app.post("/auth/create-link")
async def api_create_link(request: Request, response: Response, auth=Cookie(None)):
"""Create a device addition link for the authenticated user."""
# Require authentication
s = await get_session(auth)
# Generate a human-readable token
token = passphrase.generate() # e.g., "cross.rotate.yin.note.evoke"
await db.instance.create_session(
user_uuid=s.user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info=session.infodict(request, "device addition"),
)
# Generate the device addition link with pretty URL using configured origin
origin = global_passkey.instance.origin.rstrip("/")
path = request.url.path.removesuffix("create-link") + token # /auth/<token>
url = f"{origin}{path}"
return {
"message": "Registration link generated successfully",
"url": url,
"expires": expires().isoformat(),
}
@app.get("/auth/{reset_token}")
async def reset_authentication(request: Request, reset_token: str):
"""Validate reset token and redirect with it as query parameter (no cookies).
After validation we 303 redirect to /auth/?reset=<token>. The frontend will:
- Read the token from location.search
- Use it via Authorization header or websocket query param
- history.replaceState to remove it from the address bar/history
"""
if not passphrase.is_well_formed(reset_token):
raise HTTPException(status_code=404)
origin = global_passkey.instance.origin
# Do not verify existence/expiry here; frontend + user-info endpoint will handle invalid tokens.
redirect_url = f"{origin}/auth/?reset={reset_token}"
return RedirectResponse(url=redirect_url, status_code=303)

60
passkey/util/frontend.py Normal file
View File

@ -0,0 +1,60 @@
from importlib import resources
from pathlib import Path
__all__ = ["path", "file", "run_dev"]
def _resolve_static_dir() -> Path:
# Try packaged path via importlib.resources (works for wheel/installed).
try: # pragma: no cover - trivial path resolution
pkg_dir = resources.files("passkey") / "frontend-build"
fs_path = Path(str(pkg_dir))
if fs_path.is_dir():
return fs_path
except Exception: # pragma: no cover - defensive
pass
# Fallback for editable/development before build.
return Path(__file__).parent.parent / "frontend-build"
path: Path = _resolve_static_dir()
def file(*parts: str) -> Path:
"""Return a child path under the static root."""
return path.joinpath(*parts)
def run_dev():
"""Spawn the frontend dev server (bun or npm) as a background process."""
import atexit
import shutil
import signal
import subprocess
devpath = Path(__file__).parent.parent.parent / "frontend"
if not (devpath / "package.json").exists():
raise RuntimeError(
"Dev frontend is only available when running from git."
if "site-packages" in devpath.parts
else f"Frontend source code not found at {devpath}"
)
bun = shutil.which("bun")
npm = shutil.which("npm") if bun is None else None
if not bun and not npm:
raise RuntimeError("Neither bun nor npm found on PATH for dev server")
cmd: list[str] = [bun, "--bun", "run", "dev"] if bun else [npm, "run", "dev"] # type: ignore[list-item]
proc = subprocess.Popen(cmd, cwd=str(devpath))
def _terminate():
if proc.poll() is None:
proc.terminate()
atexit.register(_terminate)
def _signal_handler(signum, frame):
_terminate()
raise SystemExit(0)
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, _signal_handler)

28
passkey/util/permutil.py Normal file
View File

@ -0,0 +1,28 @@
"""Minimal permission helpers with '*' wildcard support (no DB expansion)."""
from collections.abc import Sequence
from fnmatch import fnmatchcase
from ..globals import db
from .tokens import session_key
__all__ = ["has_any", "has_all", "session_context"]
def _match(perms: set[str], patterns: Sequence[str]):
return (
any(fnmatchcase(p, pat) for p in perms) if "*" in pat else pat in perms
for pat in patterns
)
def has_any(ctx, patterns: Sequence[str]) -> bool:
return any(_match(ctx.role.permissions, patterns)) if ctx else False
def has_all(ctx, patterns: Sequence[str]) -> bool:
return all(_match(ctx.role.permissions, patterns)) if ctx else False
async def session_context(auth: str | None):
return await db.instance.get_session_context(session_key(auth)) if auth else None

View File

@ -29,15 +29,14 @@ dev = [
[tool.ruff] [tool.ruff]
target-version = "py39" target-version = "py39"
line-length = 88 line-length = 88
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"] select = ["E", "F", "I", "N", "W", "UP"]
ignore = ["E501"] # Line too long ignore = ["E501"] # Line too long
isort.known-first-party = ["passkey"]
[tool.ruff.isort]
known-first-party = ["passkey"]
[project.scripts] [project.scripts]
passkey-auth = "passkey.fastapi.__main__:main" passkey-auth = "passkey.fastapi.__main__:main"
[tool.hatch.build] [tool.hatch.build]
artifacts = ["passkeyauth/frontend-static"] artifacts = ["passkey/frontend-build"]
targets.sdist.hooks.custom.path = "scripts/build-frontend.py" targets.sdist.hooks.custom.path = "scripts/build-frontend.py"