Compare commits

..

No commits in common. "4db7f2e9a6c5134879923cc6fdaddf5d50339502" and "e0717f005a92db97303dc01116ff8fd5a37ba15a" have entirely different histories.

17 changed files with 159 additions and 1737 deletions

View File

@ -19,55 +19,52 @@ A minimal FastAPI WebAuthn server with WebSocket support for passkey registratio
## Quick Start
### Install (editable dev mode)
### Using uv (recommended)
```fish
uv pip install -e .[dev]
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Clone/navigate to the project directory
cd passkeyauth
# Install dependencies and run
uv run passkeyauth.main:main
```
### Run (new CLI)
`passkey-auth` now provides subcommands:
```text
passkey-auth serve [host:port] [--options]
passkey-auth dev [--options]
```
Examples (fish shell shown):
### Using pip
```fish
# Production style (no reload)
passkey-auth serve
passkey-auth serve 0.0.0.0:8080 --rp-id example.com --origin https://example.com
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate.fish # or venv/bin/activate for bash
# Development (auto-reload)
passkey-auth dev # localhost:4401
passkey-auth dev :5500 # localhost on port 5500
passkey-auth dev 127.0.0.1 # host only, default port 4401
# Install the package in development mode
pip install -e ".[dev]"
# Run the server
python -m passkeyauth.main
```
Available options (both subcommands):
### Using hatch
```text
--rp-id <id> Relying Party ID (default: localhost)
--rp-name <name> Relying Party name (default: same as rp-id)
--origin <url> Explicit origin (default: https://<rp-id>)
```fish
# Install hatch if you haven't already
pip install hatch
# Run the development server
hatch run python -m passkeyauth.main
```
### Legacy Invocation
## Usage
If you previously used `python -m passkey.fastapi --dev --host ...`, switch to the new form above. The old flags `--host`, `--port`, and `--dev` are replaced by the `[host:port]` positional and the `dev` subcommand.
## Usage (Web)
1. Start the server with one of the commands above
2. Open your browser to `http://localhost:4401/auth/` (or your chosen host/port)
1. Start the server using one of the methods above
2. Open your browser to `http://localhost:8000`
3. Enter a username (or use the default)
4. Click "Register Passkey"
5. Follow your authenticator's prompts
5. Follow your authenticator's prompts to create a passkey
Real-time status updates stream over WebSocket.
The WebSocket connection will show real-time status updates as you progress through the registration flow.
## Development

View File

@ -4,7 +4,7 @@
"private": true,
"type": "module",
"scripts": {
"dev": "vite --clearScreen false",
"dev": "vite",
"build": "vite build",
"preview": "vite preview"
},

View File

@ -1,54 +1,9 @@
<script setup>
import { ref, onMounted, computed, watch } from 'vue'
import CredentialList from '@/components/CredentialList.vue'
import RegistrationLinkModal from '@/components/RegistrationLinkModal.vue'
import StatusMessage from '@/components/StatusMessage.vue'
import { useAuthStore } from '@/stores/auth'
import { ref, onMounted } from 'vue'
const info = ref(null)
const loading = ref(true)
const error = ref(null)
const orgs = ref([])
const permissions = ref([])
const currentOrgId = ref(null) // UUID of selected org for detail view
const currentUserId = ref(null) // UUID for user detail view
const userDetail = ref(null) // cached user detail object
const userLink = ref(null) // latest generated registration link
const userLinkExpires = ref(null)
const authStore = useAuthStore()
function parseHash() {
const h = window.location.hash || ''
currentOrgId.value = null
currentUserId.value = null
if (h.startsWith('#org/')) {
currentOrgId.value = h.slice(5)
} else if (h.startsWith('#user/')) {
currentUserId.value = h.slice(6)
}
}
async function loadOrgs() {
const res = await fetch('/auth/admin/orgs')
const data = await res.json()
if (data.detail) throw new Error(data.detail)
// Restructure to attach users to roles instead of flat user list at org level
orgs.value = data.map(o => {
const roles = o.roles.map(r => ({ ...r, users: [] }))
const roleMap = Object.fromEntries(roles.map(r => [r.display_name, r]))
for (const u of o.users || []) {
if (roleMap[u.role]) roleMap[u.role].users.push(u)
}
return { ...o, roles }
})
}
async function loadPermissions() {
const res = await fetch('/auth/admin/permissions')
const data = await res.json()
if (data.detail) throw new Error(data.detail)
permissions.value = data
}
async function load() {
loading.value = true
@ -58,18 +13,6 @@ async function load() {
const data = await res.json()
if (data.detail) throw new Error(data.detail)
info.value = data
if (data.authenticated && (data.is_global_admin || data.is_org_admin)) {
await Promise.all([loadOrgs(), loadPermissions()])
}
// After loading orgs decide view if not global admin
if (!data.is_global_admin && data.is_org_admin && orgs.value.length === 1) {
if (!window.location.hash || window.location.hash === '#overview') {
currentOrgId.value = orgs.value[0].uuid
window.location.hash = `#org/${currentOrgId.value}`
} else {
parseHash()
}
} else parseHash()
} catch (e) {
error.value = e.message
} finally {
@ -77,270 +20,13 @@ async function load() {
}
}
// Org actions
async function createOrg() {
const name = prompt('New organization display name:')
if (!name) return
const res = await fetch('/auth/admin/orgs', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: name, permissions: [] })
})
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
async function updateOrg(org) {
const name = prompt('Organization display name:', org.display_name)
if (!name) return
const res = await fetch(`/auth/admin/orgs/${org.uuid}`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: name, permissions: org.permissions })
})
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
async function deleteOrg(org) {
if (!confirm(`Delete organization ${org.display_name}?`)) return
const res = await fetch(`/auth/admin/orgs/${org.uuid}`, { method: 'DELETE' })
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
async function createUserInRole(org, role) {
const displayName = prompt(`New member display name for role "${role.display_name}":`)
if (!displayName) return
const res = await fetch(`/auth/admin/orgs/${org.uuid}/users`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: displayName, role: role.display_name })
})
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
async function moveUserToRole(org, user, targetRoleDisplayName) {
if (user.role === targetRoleDisplayName) return
const res = await fetch(`/auth/admin/orgs/${org.uuid}/users/${user.uuid}/role`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ role: targetRoleDisplayName })
})
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
function onUserDragStart(e, user, org_uuid) {
e.dataTransfer.effectAllowed = 'move'
e.dataTransfer.setData('text/plain', JSON.stringify({ user_uuid: user.uuid, org_uuid }))
}
function onRoleDragOver(e) {
e.preventDefault()
e.dataTransfer.dropEffect = 'move'
}
function onRoleDrop(e, org, role) {
e.preventDefault()
try {
const data = JSON.parse(e.dataTransfer.getData('text/plain'))
if (data.org_uuid !== org.uuid) return // only within same org
const user = org.roles.flatMap(r => r.users).find(u => u.uuid === data.user_uuid)
if (user) moveUserToRole(org, user, role.display_name)
} catch (_) { /* ignore */ }
}
async function addOrgPermission(org) {
const id = prompt('Permission ID to add:', permissions.value[0]?.id || '')
if (!id) return
const res = await fetch(`/auth/admin/orgs/${org.uuid}/permissions/${encodeURIComponent(id)}`, { method: 'POST' })
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
async function removeOrgPermission(org, permId) {
const res = await fetch(`/auth/admin/orgs/${org.uuid}/permissions/${encodeURIComponent(permId)}`, { method: 'DELETE' })
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
// Role actions
async function createRole(org) {
const name = prompt('New role display name:')
if (!name) return
const res = await fetch(`/auth/admin/orgs/${org.uuid}/roles`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: name, permissions: [] })
})
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
async function updateRole(role) {
const name = prompt('Role display name:', role.display_name)
if (!name) return
const csv = prompt('Permission IDs (comma-separated):', role.permissions.join(', ')) || ''
const perms = csv.split(',').map(s => s.trim()).filter(Boolean)
const res = await fetch(`/auth/admin/roles/${role.uuid}`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: name, permissions: perms })
})
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
async function deleteRole(role) {
if (!confirm(`Delete role ${role.display_name}?`)) return
const res = await fetch(`/auth/admin/roles/${role.uuid}`, { method: 'DELETE' })
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadOrgs()
}
// Permission actions
async function createPermission() {
const id = prompt('Permission ID (e.g., auth/example):')
if (!id) return
const name = prompt('Permission display name:')
if (!name) return
const res = await fetch('/auth/admin/permissions', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ id, display_name: name })
})
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadPermissions()
}
async function updatePermission(p) {
const name = prompt('Permission display name:', p.display_name)
if (!name) return
const res = await fetch(`/auth/admin/permissions/${encodeURIComponent(p.id)}`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: name })
})
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadPermissions()
}
async function deletePermission(p) {
if (!confirm(`Delete permission ${p.id}?`)) return
const res = await fetch(`/auth/admin/permissions/${encodeURIComponent(p.id)}`, { method: 'DELETE' })
const data = await res.json()
if (data.detail) return alert(data.detail)
await loadPermissions()
}
onMounted(() => {
window.addEventListener('hashchange', parseHash)
load()
})
const selectedOrg = computed(() => orgs.value.find(o => o.uuid === currentOrgId.value) || null)
function openOrg(o) {
window.location.hash = `#org/${o.uuid}`
}
function goOverview() {
window.location.hash = '#overview'
}
function openUser(u) {
window.location.hash = `#user/${u.uuid}`
}
const selectedUser = computed(() => {
if (!currentUserId.value) return null
for (const o of orgs.value) {
for (const r of o.roles) {
const u = r.users.find(x => x.uuid === currentUserId.value)
if (u) return { ...u, org_uuid: o.uuid, role_display_name: r.display_name }
}
}
return null
})
watch(selectedUser, async (u) => {
if (!u) { userDetail.value = null; return }
try {
const res = await fetch(`/auth/admin/users/${u.uuid}`)
const data = await res.json()
if (data.detail) throw new Error(data.detail)
userDetail.value = data
} catch (e) {
userDetail.value = { error: e.message }
}
})
const showRegModal = ref(false)
function generateUserRegistrationLink(u) {
showRegModal.value = true
}
function onLinkCopied() {
authStore.showMessage('Link copied to clipboard!')
}
function copy(text) {
if (!text) return
navigator.clipboard.writeText(text)
.catch(()=>{})
}
function permissionDisplayName(id) {
return permissions.value.find(p => p.id === id)?.display_name || id
}
async function toggleRolePermission(role, permId, checked) {
// Build next permission list
const has = role.permissions.includes(permId)
if (checked && has) return
if (!checked && !has) return
const next = checked ? [...role.permissions, permId] : role.permissions.filter(p => p !== permId)
// Optimistic update
const prev = [...role.permissions]
role.permissions = next
try {
const res = await fetch(`/auth/admin/roles/${role.uuid}`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ display_name: role.display_name, permissions: next })
})
const data = await res.json()
if (data.detail) throw new Error(data.detail)
} catch (e) {
alert(e.message || 'Failed to update role permission')
role.permissions = prev // revert
}
}
onMounted(load)
</script>
<template>
<div class="container">
<h1 v-if="!selectedUser">
<template v-if="!selectedOrg">Passkey Admin</template>
<template v-else>Organization Admin</template>
<a href="/auth/" class="back-link" title="Back to User App">User</a>
<a v-if="selectedOrg && info?.is_global_admin" @click.prevent="goOverview" href="#overview" class="nav-link" title="Back to overview">Overview</a>
</h1>
<p class="subtitle" v-if="!selectedUser">Manage organizations, roles, and permissions</p>
<h1>Passkey Admin</h1>
<p class="subtitle">Manage organizations, roles, and permissions</p>
<div v-if="loading">Loading</div>
<div v-else-if="error" class="error">{{ error }}</div>
@ -352,175 +38,28 @@ async function toggleRolePermission(role, permId, checked) {
<p>Insufficient permissions.</p>
</div>
<div v-else>
<!-- Removed user-specific info (current org, effective permissions, admin flags) -->
<!-- Overview Page -->
<div v-if="!selectedUser && !selectedOrg && (info.is_global_admin || info.is_org_admin)" class="card">
<h2>Organizations</h2>
<div class="actions">
<button @click="createOrg" v-if="info.is_global_admin">+ Create Org</button>
</div>
<table class="org-table">
<thead>
<tr>
<th>Name</th>
<th>Roles</th>
<th>Members</th>
<th v-if="info.is_global_admin">Actions</th>
</tr>
</thead>
<tbody>
<tr v-for="o in orgs" :key="o.uuid">
<td><a href="#org/{{o.uuid}}" @click.prevent="openOrg(o)">{{ o.display_name }}</a></td>
<td>{{ o.roles.length }}</td>
<td>{{ o.roles.reduce((acc,r)=>acc + r.users.length,0) }}</td>
<td v-if="info.is_global_admin">
<button @click="updateOrg(o)" class="icon-btn" aria-label="Rename organization" title="Rename organization"></button>
<button @click="deleteOrg(o)" class="icon-btn delete-icon" aria-label="Delete organization" title="Delete organization"></button>
</td>
</tr>
</tbody>
</table>
<div class="card">
<h2>User</h2>
<div>{{ info.user.user_name }} ({{ info.user.user_uuid }})</div>
<div>Role: {{ info.role?.display_name }}</div>
</div>
<!-- User Detail Page -->
<div v-if="selectedUser" class="card user-detail">
<h2 class="user-title"><span>{{ userDetail?.display_name || selectedUser.display_name }}</span></h2>
<div v-if="userDetail && !userDetail.error" class="user-meta">
<p class="small">Organization: {{ userDetail.org.display_name }}</p>
<p class="small">Role: {{ userDetail.role }}</p>
<p class="small">Visits: {{ userDetail.visits }}</p>
<p class="small">Created: {{ userDetail.created_at ? new Date(userDetail.created_at).toLocaleString() : '—' }}</p>
<p class="small">Last Seen: {{ userDetail.last_seen ? new Date(userDetail.last_seen).toLocaleString() : '—' }}</p>
<h3 class="cred-title">Registered Passkeys</h3>
<CredentialList :credentials="userDetail.credentials" :aaguid-info="userDetail.aaguid_info" />
</div>
<div v-else-if="userDetail?.error" class="error small">{{ userDetail.error }}</div>
<div class="actions">
<button @click="generateUserRegistrationLink(selectedUser)">Generate Registration Token</button>
<button @click="goOverview" v-if="info.is_global_admin" class="icon-btn" title="Overview">🏠</button>
<button @click="openOrg(selectedOrg)" v-if="selectedOrg" class="icon-btn" title="Back to Org"></button>
</div>
<p class="matrix-hint muted">Use the token dialog to register a new credential for the member.</p>
<RegistrationLinkModal
v-if="showRegModal"
:endpoint="`/auth/admin/users/${selectedUser.uuid}/create-link`"
:auto-copy="false"
@close="showRegModal = false"
@copied="onLinkCopied"
/>
<div class="card">
<h2>Organization</h2>
<div>{{ info.org?.display_name }} ({{ info.org?.uuid }})</div>
<div>Role permissions: {{ info.role?.permissions?.join(', ') }}</div>
<div>Org grantable: {{ info.org?.permissions?.join(', ') }}</div>
</div>
<!-- Organization Detail Page -->
<div v-else-if="selectedOrg" class="card">
<h2 class="org-title" :title="selectedOrg.uuid">
<span class="org-name">{{ selectedOrg.display_name }}</span>
<button @click="updateOrg(selectedOrg)" class="icon-btn" aria-label="Rename organization" title="Rename organization"></button>
</h2>
<div class="org-actions">
<button @click="deleteOrg(selectedOrg)" v-if="info.is_global_admin" class="icon-btn delete-icon" aria-label="Delete organization" title="Delete organization"></button>
<button @click="createRole(selectedOrg)">+ Role</button>
<button @click="goOverview" v-if="info.is_global_admin">Back</button>
</div>
<div class="matrix-wrapper">
<h3>Permissions Matrix</h3>
<div class="matrix-scroll">
<div
class="perm-matrix-grid"
:style="{ gridTemplateColumns: 'minmax(180px, 1fr) ' + selectedOrg.roles.map(()=> '2.2rem').join(' ') }"
>
<!-- Headers -->
<div class="grid-head perm-head">Permission</div>
<div
v-for="r in selectedOrg.roles"
:key="'head-' + r.uuid"
class="grid-head role-head"
:title="r.display_name"
>
<span>{{ r.display_name }}</span>
</div>
<!-- Data Rows -->
<template v-for="pid in selectedOrg.permissions" :key="pid">
<div class="perm-name" :title="pid">{{ permissionDisplayName(pid) }}</div>
<div
v-for="r in selectedOrg.roles"
:key="r.uuid + '-' + pid"
class="matrix-cell"
>
<input
type="checkbox"
:checked="r.permissions.includes(pid)"
@change="e => toggleRolePermission(r, pid, e.target.checked)"
/>
</div>
</template>
</div>
</div>
<p class="matrix-hint muted">Toggle which permissions each role grants.</p>
</div>
<div class="roles-grid">
<div
v-for="r in selectedOrg.roles"
:key="r.uuid"
class="role-column"
@dragover="onRoleDragOver"
@drop="e => onRoleDrop(e, selectedOrg, r)"
>
<div class="role-header">
<strong class="role-name" :title="r.uuid">
<span>{{ r.display_name }}</span>
<button @click="updateRole(r)" class="icon-btn" aria-label="Rename role" title="Rename role"></button>
</strong>
<div class="role-actions">
<button @click="createUserInRole(selectedOrg, r)" class="plus-btn" aria-label="Add user" title="Add user"></button>
</div>
</div>
<template v-if="r.users.length > 0">
<ul class="user-list">
<li
v-for="u in r.users"
:key="u.uuid"
class="user-chip"
draggable="true"
@dragstart="e => onUserDragStart(e, u, selectedOrg.uuid)"
@click="openUser(u)"
:title="u.uuid"
>
<span class="name">{{ u.display_name }}</span>
<span class="meta">{{ u.last_seen ? new Date(u.last_seen).toLocaleDateString() : '—' }}</span>
</li>
</ul>
</template>
<div v-else class="empty-role">
<p class="empty-text muted">No members</p>
<button @click="deleteRole(r)" class="icon-btn delete-icon" aria-label="Delete empty role" title="Delete role"></button>
</div>
</div>
</div>
</div>
<div v-if="!selectedUser && !selectedOrg && (info.is_global_admin || info.is_org_admin)" class="card">
<h2>All Permissions</h2>
<div class="actions">
<button @click="createPermission">+ Create Permission</button>
</div>
<div v-for="p in permissions" :key="p.id" class="perm" :title="p.id">
<div class="perm-name-line">
<span>{{ p.display_name }}</span>
<button @click="updatePermission(p)" class="icon-btn" aria-label="Rename permission" title="Rename permission"></button>
</div>
<div class="perm-actions">
<button @click="deletePermission(p)" class="icon-btn delete-icon" aria-label="Delete permission" title="Delete permission"></button>
</div>
</div>
<div class="card">
<h2>Permissions</h2>
<div>Effective: {{ info.permissions?.join(', ') }}</div>
<div>Global admin: {{ info.is_global_admin ? 'yes' : 'no' }}</div>
<div>Org admin: {{ info.is_org_admin ? 'yes' : 'no' }}</div>
</div>
</div>
</div>
</div>
<StatusMessage />
</template>
<style scoped>
@ -528,77 +67,4 @@ async function toggleRolePermission(role, permId, checked) {
.subtitle { color: #888 }
.card { margin: 1rem 0; padding: 1rem; border: 1px solid #eee; border-radius: 8px; }
.error { color: #a00 }
.actions { margin-bottom: .5rem }
.org { border-top: 1px dashed #eee; padding: .5rem 0 }
.org-header { display: flex; gap: .5rem; align-items: baseline }
.user-item { display: flex; gap: .5rem; margin: .15rem 0 }
.users-table { width: 100%; border-collapse: collapse; margin-top: .25rem; }
.users-table th, .users-table td { padding: .25rem .4rem; text-align: left; border-bottom: 1px solid #eee; font-weight: normal; }
.users-table th { font-size: .75rem; text-transform: uppercase; letter-spacing: .05em; color: #555; }
.users-table tbody tr:hover { background: #fafafa; }
.org-actions, .role-actions, .perm-actions { display: flex; gap: .5rem; margin: .25rem 0 }
.muted { color: #666 }
.small { font-size: .9em }
.pill-list { display: flex; flex-wrap: wrap; gap: .25rem }
.pill { background: #f3f3f3; border: 1px solid #e2e2e2; border-radius: 999px; padding: .1rem .5rem; display: inline-flex; align-items: center; gap: .25rem }
.pill-x { background: transparent; border: none; color: #900; cursor: pointer }
button { padding: .25rem .5rem; border-radius: 6px; border: 1px solid #ddd; background: #fff; cursor: pointer }
button:hover { background: #f7f7f7 }
.roles-grid { display: flex; gap: 1rem; align-items: stretch; overflow-x: auto; padding: .5rem 0 }
.role-column { background: #fafafa; border: 1px solid #eee; border-radius: 8px; padding: .5rem; min-width: 200px; flex: 0 0 240px; display: flex; flex-direction: column; }
.role-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: .25rem }
.user-list { list-style: none; padding: 0; margin: 0; display: flex; flex-direction: column; gap: .25rem; flex: 1 1 auto; }
.user-chip { background: #fff; border: 1px solid #ddd; border-radius: 6px; padding: .25rem .4rem; display: flex; justify-content: space-between; gap: .5rem; cursor: grab; }
.user-chip:active { cursor: grabbing }
.user-chip .name { font-weight: 500 }
.user-chip .meta { font-size: .65rem; color: #666 }
.role-column.drag-over { outline: 2px dashed #66a; }
.org-table { width: 100%; border-collapse: collapse; }
.org-table th, .org-table td { padding: .4rem .5rem; border-bottom: 1px solid #eee; text-align: left; }
.org-table th { font-size: .75rem; text-transform: uppercase; letter-spacing: .05em; color: #555; }
.org-table a { text-decoration: none; color: #0366d6; }
.org-table a:hover { text-decoration: underline; }
.nav-link { font-size: .6em; margin-left: .5rem; background: #eee; padding: .25em .6em; border-radius: 999px; border: 1px solid #ccc; text-decoration: none; }
.nav-link:hover { background: #ddd; }
.back-link { font-size: .5em; margin-left: .75rem; text-decoration: none; background: #eee; padding: .25em .6em; border-radius: 999px; border: 1px solid #ccc; vertical-align: middle; line-height: 1.2; }
.back-link:hover { background: #ddd; }
.matrix-wrapper { margin: 1rem 0; text-align: left; }
.matrix-scroll { overflow-x: auto; text-align: left; }
.perm-matrix-grid { display: inline-grid; gap: 0; align-items: stretch; margin-right: 4rem; }
.perm-matrix-grid > * { background: #fff; border: none; padding: .35rem .4rem; font-size: .75rem; }
.perm-matrix-grid .grid-head { background: transparent; border: none; font-size: .65rem; letter-spacing: .05em; font-weight: 600; text-transform: uppercase; display: flex; justify-content: center; align-items: flex-end; padding-bottom: .25rem; }
.perm-matrix-grid .perm-head { justify-content: flex-start; align-items: flex-end; }
.perm-matrix-grid .role-head span { writing-mode: vertical-rl; transform: rotate(180deg); font-size: .6rem; line-height: 1; }
.perm-matrix-grid .perm-name { font-weight: 500; white-space: nowrap; text-align: left; }
.perm-matrix-grid .matrix-cell { display: flex; justify-content: center; align-items: center; }
.perm-matrix-grid .matrix-cell input { cursor: pointer; }
.matrix-hint { font-size: .7rem; margin-top: .25rem; }
/* Inline organization title with icon */
.org-title { display: flex; align-items: center; gap: .4rem; }
.org-title .org-name { flex: 0 1 auto; }
/* Plus button for adding users */
.plus-btn { background: none; border: none; font-size: 1.15rem; line-height: 1; padding: 0 .1rem; cursor: pointer; opacity: .6; }
.plus-btn:hover, .plus-btn:focus { opacity: 1; outline: none; }
.plus-btn:focus-visible { outline: 2px solid #555; outline-offset: 2px; }
.empty-role { display: flex; flex-direction: column; gap: .4rem; align-items: flex-start; padding: .35rem .25rem; flex: 1 1 auto; width: 100%; }
.empty-role .empty-text { font-size: .7rem; margin: 0; }
.delete-icon { color: #c00; }
.delete-icon:hover, .delete-icon:focus { color: #ff0000; }
.user-detail .user-link-box { margin-top: .75rem; font-size: .7rem; background: #fff; border: 1px dashed #ccc; padding: .5rem; border-radius: 6px; cursor: pointer; word-break: break-all; }
.user-detail .user-link-box:hover { background: #f9f9f9; }
.user-detail .user-link-box .expires { font-size: .6rem; margin-top: .25rem; color: #555; }
/* Minimal icon button for rename/edit actions */
.icon-btn { background: none; border: none; padding: 0 .15rem; margin-left: .15rem; cursor: pointer; font-size: .8rem; line-height: 1; opacity: .55; vertical-align: middle; }
.icon-btn:hover, .icon-btn:focus { opacity: .95; outline: none; }
.icon-btn:focus-visible { outline: 2px solid #555; outline-offset: 2px; }
.icon-btn:active { transform: translateY(1px); }
.org-title { display: flex; align-items: baseline; gap: .25rem; }
.role-name { display: inline-flex; align-items: center; gap: .15rem; font-weight: 600; }
.perm-name-line { display: flex; align-items: center; gap: .15rem; }
.user-meta { margin-top: .25rem; }
.cred-title { margin-top: .75rem; font-size: .85rem; }
.cred-list { list-style: none; padding: 0; margin: .25rem 0 .5rem; display: flex; flex-direction: column; gap: .35rem; }
.cred-item { background: #fff; border: 1px solid #eee; border-radius: 6px; padding: .35rem .5rem; font-size: .65rem; }
.cred-line { display: flex; flex-direction: column; gap: .15rem; }
.cred-line .dates { color: #555; font-size: .6rem; }
</style>

View File

@ -1,9 +1,6 @@
import '../assets/style.css'
import { createApp } from 'vue'
import { createPinia } from 'pinia'
import AdminApp from './AdminApp.vue'
const app = createApp(AdminApp)
app.use(createPinia())
app.mount('#admin-app')
createApp(AdminApp).mount('#admin-app')

View File

@ -1,84 +0,0 @@
<template>
<div class="credential-list">
<div v-if="loading"><p>Loading credentials...</p></div>
<div v-else-if="!credentials?.length"><p>No passkeys found.</p></div>
<div v-else>
<div
v-for="credential in credentials"
:key="credential.credential_uuid"
:class="['credential-item', { 'current-session': credential.is_current_session }]"
>
<div class="credential-header">
<div class="credential-icon">
<img
v-if="getCredentialAuthIcon(credential)"
:src="getCredentialAuthIcon(credential)"
:alt="getCredentialAuthName(credential)"
class="auth-icon"
width="32"
height="32"
>
<span v-else class="auth-emoji">🔑</span>
</div>
<div class="credential-info">
<h4>{{ getCredentialAuthName(credential) }}</h4>
</div>
<div class="credential-dates">
<span class="date-label">Created:</span>
<span class="date-value">{{ formatDate(credential.created_at) }}</span>
<span class="date-label" v-if="credential.last_used">Last used:</span>
<span class="date-value" v-if="credential.last_used">{{ formatDate(credential.last_used) }}</span>
</div>
<div class="credential-actions" v-if="allowDelete">
<button
@click="$emit('delete', credential)"
class="btn-delete-credential"
:disabled="credential.is_current_session"
:title="credential.is_current_session ? 'Cannot delete current session credential' : 'Delete passkey'"
>🗑</button>
</div>
</div>
</div>
</div>
</div>
</template>
<script setup>
import { computed } from 'vue'
import { formatDate } from '@/utils/helpers'
const props = defineProps({
credentials: { type: Array, default: () => [] },
aaguidInfo: { type: Object, default: () => ({}) },
loading: { type: Boolean, default: false },
allowDelete: { type: Boolean, default: false },
})
const getCredentialAuthName = (credential) => {
const info = props.aaguidInfo?.[credential.aaguid]
return info ? info.name : 'Unknown Authenticator'
}
const getCredentialAuthIcon = (credential) => {
const info = props.aaguidInfo?.[credential.aaguid]
if (!info) return null
const isDarkMode = window.matchMedia && window.matchMedia('(prefers-color-scheme: dark)').matches
const iconKey = isDarkMode ? 'icon_dark' : 'icon_light'
return info[iconKey] || null
}
</script>
<style scoped>
.credential-list { display: flex; flex-direction: column; gap: .75rem; margin-top: .5rem; }
.credential-item { border: 1px solid #ddd; border-radius: 8px; padding: .5rem .75rem; background: #fff; }
.credential-header { display: flex; align-items: center; gap: 1rem; }
.credential-icon { width: 40px; height: 40px; display: flex; align-items: center; justify-content: center; }
.auth-icon { border-radius: 6px; }
.credential-info { flex: 1 1 auto; }
.credential-info h4 { margin: 0; font-size: .9rem; }
.credential-dates { display: grid; grid-auto-flow: column; gap: .4rem; font-size: .65rem; align-items: center; }
.date-label { font-weight: 600; }
.credential-actions { margin-left: auto; }
.btn-delete-credential { background: none; border: none; cursor: pointer; font-size: .9rem; }
.btn-delete-credential:disabled { opacity: .3; cursor: not-allowed; }
</style>

View File

@ -1,7 +1,7 @@
<template>
<div class="container">
<div class="view active">
<h1>👋 Welcome! <a v-if="isAdmin" href="/auth/admin/" class="admin-link" title="Admin Console">Admin</a></h1>
<h1>👋 Welcome!</h1>
<div v-if="authStore.userInfo?.user" class="user-info">
<h3>👤 {{ authStore.userInfo.user.user_name }}</h3>
<span><strong>Visits:</strong></span>
@ -78,7 +78,7 @@
</template>
<script setup>
import { ref, onMounted, onUnmounted, computed } from 'vue'
import { ref, onMounted, onUnmounted } from 'vue'
import { useAuthStore } from '@/stores/auth'
import { formatDate } from '@/utils/helpers'
import passkey from '@/utils/passkey'
@ -145,8 +145,6 @@ const logout = async () => {
await authStore.logout()
authStore.currentView = 'login'
}
const isAdmin = computed(() => !!(authStore.userInfo?.is_global_admin || authStore.userInfo?.is_org_admin))
</script>
<style scoped>
@ -162,20 +160,3 @@ const isAdmin = computed(() => !!(authStore.userInfo?.is_global_admin || authSto
text-align: left;
}
</style>
<style scoped>
.admin-link {
font-size: 0.6em;
margin-left: 0.75rem;
text-decoration: none;
background: var(--color-background-soft, #eee);
padding: 0.2em 0.6em;
border-radius: 999px;
border: 1px solid var(--color-border, #ccc);
vertical-align: middle;
line-height: 1.2;
}
.admin-link:hover {
background: var(--color-background-mute, #ddd);
}
</style>

View File

@ -1,87 +0,0 @@
<template>
<div class="dialog-overlay" @keydown.esc.prevent="$emit('close')">
<div class="device-dialog" role="dialog" aria-modal="true" aria-labelledby="regTitle">
<div style="display:flex; justify-content:space-between; align-items:center; margin-bottom:10px;">
<h2 id="regTitle" style="margin:0; font-size:1.25rem;">📱 Device Registration Link</h2>
<button class="icon-btn" @click="$emit('close')" aria-label="Close"></button>
</div>
<div class="device-link-section">
<div class="qr-container">
<a v-if="url" :href="url" @click.prevent="copy" class="qr-link">
<canvas ref="qrCanvas" class="qr-code"></canvas>
<p>{{ displayUrl }}</p>
</a>
<div v-else>
<em>Generating link...</em>
</div>
<p>
<strong>Scan and visit the URL on another device.</strong><br>
<small> Expires in 24 hours and one-time use.</small>
</p>
<div v-if="expires" style="font-size:12px; margin-top:6px;">Expires: {{ new Date(expires).toLocaleString() }}</div>
</div>
</div>
<div style="display:flex; justify-content:flex-end; gap:.5rem; margin-top:10px;">
<button class="btn-secondary" @click="$emit('close')">Close</button>
<button class="btn-primary" :disabled="!url" @click="copy">Copy Link</button>
</div>
</div>
</div>
</template>
<script setup>
import { ref, onMounted, watch, computed, nextTick } from 'vue'
import QRCode from 'qrcode/lib/browser'
const props = defineProps({
endpoint: { type: String, required: true }, // POST endpoint returning {url, expires}
autoCopy: { type: Boolean, default: true }
})
const emit = defineEmits(['close','generated','copied'])
const url = ref(null)
const expires = ref(null)
const qrCanvas = ref(null)
const displayUrl = computed(() => url.value ? url.value.replace(/^[^:]+:\/\//,'') : '')
async function fetchLink() {
try {
const res = await fetch(props.endpoint, { method: 'POST' })
const data = await res.json()
if (data.detail) throw new Error(data.detail)
url.value = data.url
expires.value = data.expires
emit('generated', { url: data.url, expires: data.expires })
await nextTick()
drawQR()
if (props.autoCopy) copy()
} catch (e) {
url.value = null
expires.value = null
console.error('Failed to create link', e)
}
}
async function drawQR() {
if (!url.value) return
await nextTick()
if (!qrCanvas.value) return
QRCode.toCanvas(qrCanvas.value, url.value, { scale: 8 }, err => { if (err) console.error(err) })
}
async function copy() {
if (!url.value) return
try { await navigator.clipboard.writeText(url.value); emit('copied', url.value); emit('close') } catch (_) { /* ignore */ }
}
onMounted(fetchLink)
watch(url, () => drawQR(), { flush: 'post' })
</script>
<style scoped>
.icon-btn { background:none; border:none; cursor:pointer; font-size:1rem; opacity:.6; }
.icon-btn:hover { opacity:1; }
/* Minimal extra styling; main look comes from global styles */
.qr-link { text-decoration:none; color:inherit; }
</style>

View File

@ -20,21 +20,9 @@ export default defineConfig(({ command, mode }) => ({
port: 4403,
proxy: {
'/auth/': {
target: 'http://localhost:4402',
target: 'http://localhost:4401',
ws: true,
changeOrigin: false,
// We proxy API + WS under /auth/, but want Vite to serve the SPA entrypoints
// and static assets so that HMR works. Bypass tells http-proxy to skip
// proxying when we return a (possibly rewritten) local path.
bypass(req) {
const url = req.url || ''
// Bypass only root SPA entrypoints + static assets so Vite serves them for HMR.
// Admin API endpoints (e.g., /auth/admin/orgs) must still hit backend.
if (url === '/auth/' || url === '/auth') return '/'
if (url === '/auth/admin' || url === '/auth/admin/') return '/admin/'
if (url.startsWith('/auth/assets/')) return url.replace(/^\/auth/, '')
// Everything else (including /auth/admin/* APIs) should proxy.
}
changeOrigin: false
}
}
},
@ -47,7 +35,9 @@ export default defineConfig(({ command, mode }) => ({
index: resolve(__dirname, 'index.html'),
admin: resolve(__dirname, 'admin/index.html')
},
output: {}
output: {
// Ensure HTML files land as /auth/index.html and /auth/admin.html -> we will serve /auth/admin mapping in backend
}
}
}
}))

View File

@ -16,18 +16,7 @@ from . import authsession, globals
from .db import Org, Permission, Role, User
from .util import passphrase, tokens
def _init_logger() -> logging.Logger:
l = logging.getLogger(__name__)
if not l.handlers and not logging.getLogger().handlers:
h = logging.StreamHandler()
h.setFormatter(logging.Formatter("%(message)s"))
l.addHandler(h)
l.setLevel(logging.INFO)
return l
logger = _init_logger()
logger = logging.getLogger(__name__)
# Shared log message template for admin reset links
ADMIN_RESET_MESSAGE = """\
@ -72,18 +61,17 @@ async def bootstrap_system(
org = Org(uuid7.create(), org_name or "Organization")
await globals.db.instance.create_organization(org)
# After creation, org.permissions now includes the auto-created org admin permission
# Allow this org to grant global admin explicitly
perm1 = Permission(
id=f"auth/org:{org.uuid}", display_name=f"{org.display_name} Admin"
)
await globals.db.instance.create_permission(perm1)
# Allow this org to grant admin permissions
await globals.db.instance.add_permission_to_organization(str(org.uuid), perm0.id)
await globals.db.instance.add_permission_to_organization(str(org.uuid), perm1.id)
# Create an Administration role granting both org and global admin
# Compose permissions for Administration role: global admin + org admin auto-perm
role = Role(
uuid7.create(),
org.uuid,
"Administration",
permissions=[perm0.id, *org.permissions],
)
role = Role(uuid7.create(), org.uuid, "Administration", permissions=[perm0.id, perm1.id])
await globals.db.instance.create_role(role)
user = User(
@ -104,10 +92,7 @@ async def bootstrap_system(
"user": user,
"org": org,
"role": role,
"permissions": [
perm0,
*[Permission(id=p, display_name="") for p in org.permissions],
],
"permissions": [perm0, perm1],
"reset_link": reset_link,
}

View File

@ -105,14 +105,6 @@ class DatabaseInterface(ABC):
async def create_role(self, role: Role) -> None:
"""Create new role."""
@abstractmethod
async def update_role(self, role: Role) -> None:
"""Update a role's display name and synchronize its permissions."""
@abstractmethod
async def delete_role(self, role_uuid: UUID) -> None:
"""Delete a role by UUID. Implementations may prevent deletion if users exist."""
# Credential operations
@abstractmethod
async def create_credential(self, credential: Credential) -> None:
@ -173,10 +165,6 @@ class DatabaseInterface(ABC):
async def get_organization(self, org_id: str) -> Org:
"""Get organization by ID, including its permission IDs and roles (with their permission IDs)."""
@abstractmethod
async def list_organizations(self) -> list[Org]:
"""List all organizations with their roles and permission IDs."""
@abstractmethod
async def update_organization(self, org: Org) -> None:
"""Update organization options."""
@ -187,7 +175,7 @@ class DatabaseInterface(ABC):
@abstractmethod
async def add_user_to_organization(
self, user_uuid: UUID, org_id: str, role: str
self, user_uuid: UUID, org_id: str, role: str
) -> None:
"""Set a user's organization and role."""
@ -205,10 +193,6 @@ class DatabaseInterface(ABC):
async def get_organization_users(self, org_id: str) -> list[tuple[User, str]]:
"""Get all users in an organization with their roles."""
@abstractmethod
async def get_roles_by_organization(self, org_id: str) -> list[Role]:
"""List roles belonging to an organization."""
@abstractmethod
async def get_user_role_in_organization(
self, user_uuid: UUID, org_id: str
@ -230,10 +214,6 @@ class DatabaseInterface(ABC):
async def get_permission(self, permission_id: str) -> Permission:
"""Get permission by ID."""
@abstractmethod
async def list_permissions(self) -> list[Permission]:
"""List all permissions."""
@abstractmethod
async def update_permission(self, permission: Permission) -> None:
"""Update permission details."""
@ -268,9 +248,7 @@ class DatabaseInterface(ABC):
"""Add a permission to a role."""
@abstractmethod
async def remove_permission_from_role(
self, role_uuid: UUID, permission_id: str
) -> None:
async def remove_permission_from_role(self, role_uuid: UUID, permission_id: str) -> None:
"""Remove a permission from a role."""
@abstractmethod
@ -281,10 +259,6 @@ class DatabaseInterface(ABC):
async def get_permission_roles(self, permission_id: str) -> list[Role]:
"""List all roles that grant a permission."""
@abstractmethod
async def get_role(self, role_uuid: UUID) -> Role:
"""Get a role by UUID, including its permission IDs."""
# Combined operations
@abstractmethod
async def login(self, user_uuid: UUID, credential: Credential) -> None:

View File

@ -441,42 +441,13 @@ class DB(DatabaseInterface):
display_name=org.display_name,
)
session.add(org_model)
# Persist any explicitly provided org grantable permissions
# Persist org permissions the org is allowed to grant
if org.permissions:
for perm_id in set(org.permissions):
for perm_id in org.permissions:
session.add(
OrgPermission(org_uuid=org.uuid.bytes, permission_id=perm_id)
)
# Automatically create an organization admin permission if not present.
# Pattern: auth/org:<org-uuid>
auto_perm_id = f"auth/org:{org.uuid}"
# Only create if it does not already exist (in case caller passed it)
existing_perm = await session.execute(
select(PermissionModel).where(PermissionModel.id == auto_perm_id)
)
if not existing_perm.scalar_one_or_none():
session.add(
PermissionModel(
id=auto_perm_id,
display_name=f"{org.display_name} Admin",
)
)
# Ensure org is allowed to grant its own admin permission (insert if missing)
existing_org_perm = await session.execute(
select(OrgPermission).where(
OrgPermission.org_uuid == org.uuid.bytes,
OrgPermission.permission_id == auto_perm_id,
)
)
if not existing_org_perm.scalar_one_or_none():
session.add(
OrgPermission(org_uuid=org.uuid.bytes, permission_id=auto_perm_id)
)
# Reflect the automatically added permission in the dataclass instance
if auto_perm_id not in org.permissions:
org.permissions.append(auto_perm_id)
async def get_organization(self, org_id: str) -> Org:
async with self.session() as session:
# Convert string ID to UUID bytes for lookup
@ -517,48 +488,6 @@ class DB(DatabaseInterface):
return org_dc
async def list_organizations(self) -> list[Org]:
async with self.session() as session:
# Load all orgs
orgs_result = await session.execute(select(OrgModel))
org_models = orgs_result.scalars().all()
if not org_models:
return []
# Preload org permissions mapping
org_perms_result = await session.execute(select(OrgPermission))
org_perms = org_perms_result.scalars().all()
perms_by_org: dict[bytes, list[str]] = {}
for op in org_perms:
perms_by_org.setdefault(op.org_uuid, []).append(op.permission_id)
# Preload roles
roles_result = await session.execute(select(RoleModel))
role_models = roles_result.scalars().all()
# Preload role permissions mapping
rp_result = await session.execute(select(RolePermission))
rps = rp_result.scalars().all()
perms_by_role: dict[bytes, list[str]] = {}
for rp in rps:
perms_by_role.setdefault(rp.role_uuid, []).append(rp.permission_id)
# Build org dataclasses with roles and permission IDs
roles_by_org: dict[bytes, list[Role]] = {}
for rm in role_models:
r_dc = rm.as_dataclass()
r_dc.permissions = perms_by_role.get(rm.uuid, [])
roles_by_org.setdefault(rm.org_uuid, []).append(r_dc)
orgs: list[Org] = []
for om in org_models:
o_dc = om.as_dataclass()
o_dc.permissions = perms_by_org.get(om.uuid, [])
o_dc.roles = roles_by_org.get(om.uuid, [])
orgs.append(o_dc)
return orgs
async def update_organization(self, org: Org) -> None:
async with self.session() as session:
stmt = (
@ -576,7 +505,9 @@ class DB(DatabaseInterface):
if org.permissions:
for perm_id in org.permissions:
await session.merge(
OrgPermission(org_uuid=org.uuid.bytes, permission_id=perm_id)
OrgPermission(
org_uuid=org.uuid.bytes, permission_id=perm_id
)
)
async def delete_organization(self, org_uuid: UUID) -> None:
@ -626,9 +557,9 @@ class DB(DatabaseInterface):
async def transfer_user_to_organization(
self, user_uuid: UUID, new_org_id: str, new_role: str | None = None
) -> None:
# Users are members of an org that never changes after creation.
# Disallow transfers across organizations to enforce invariant.
raise ValueError("Users cannot be transferred to a different organization")
# Users are members of an org that never changes after creation.
# Disallow transfers across organizations to enforce invariant.
raise ValueError("Users cannot be transferred to a different organization")
async def get_user_organization(self, user_uuid: UUID) -> tuple[Org, str]:
async with self.session() as session:
@ -755,11 +686,6 @@ class DB(DatabaseInterface):
stmt = delete(PermissionModel).where(PermissionModel.id == permission_id)
await session.execute(stmt)
async def list_permissions(self) -> list[Permission]:
async with self.session() as session:
result = await session.execute(select(PermissionModel))
return [p.as_dataclass() for p in result.scalars().all()]
async def add_permission_to_role(self, role_uuid: UUID, permission_id: str) -> None:
async with self.session() as session:
# Ensure role exists
@ -770,9 +696,7 @@ class DB(DatabaseInterface):
raise ValueError("Role not found")
# Ensure permission exists
perm_stmt = select(PermissionModel).where(
PermissionModel.id == permission_id
)
perm_stmt = select(PermissionModel).where(PermissionModel.id == permission_id)
perm_result = await session.execute(perm_stmt)
if not perm_result.scalar_one_or_none():
raise ValueError("Permission not found")
@ -781,9 +705,7 @@ class DB(DatabaseInterface):
RolePermission(role_uuid=role_uuid.bytes, permission_id=permission_id)
)
async def remove_permission_from_role(
self, role_uuid: UUID, permission_id: str
) -> None:
async def remove_permission_from_role(self, role_uuid: UUID, permission_id: str) -> None:
async with self.session() as session:
await session.execute(
delete(RolePermission)
@ -795,9 +717,7 @@ class DB(DatabaseInterface):
async with self.session() as session:
stmt = (
select(PermissionModel)
.join(
RolePermission, PermissionModel.id == RolePermission.permission_id
)
.join(RolePermission, PermissionModel.id == RolePermission.permission_id)
.where(RolePermission.role_uuid == role_uuid.bytes)
)
result = await session.execute(stmt)
@ -813,76 +733,6 @@ class DB(DatabaseInterface):
result = await session.execute(stmt)
return [r.as_dataclass() for r in result.scalars().all()]
async def update_role(self, role: Role) -> None:
async with self.session() as session:
# Update role display_name
await session.execute(
update(RoleModel)
.where(RoleModel.uuid == role.uuid.bytes)
.values(display_name=role.display_name)
)
# Sync role permissions: delete all then insert current set
await session.execute(
delete(RolePermission).where(
RolePermission.role_uuid == role.uuid.bytes
)
)
if role.permissions:
for perm_id in set(role.permissions):
session.add(
RolePermission(role_uuid=role.uuid.bytes, permission_id=perm_id)
)
async def delete_role(self, role_uuid: UUID) -> None:
async with self.session() as session:
# Prevent deleting a role that still has users
# Quick existence check for users assigned to the role
existing_user = await session.execute(
select(UserModel.uuid).where(UserModel.role_uuid == role_uuid.bytes)
)
if existing_user.first() is not None:
raise ValueError("Cannot delete role with assigned users")
await session.execute(
delete(RoleModel).where(RoleModel.uuid == role_uuid.bytes)
)
async def get_role(self, role_uuid: UUID) -> Role:
async with self.session() as session:
result = await session.execute(
select(RoleModel).where(RoleModel.uuid == role_uuid.bytes)
)
role_model = result.scalar_one_or_none()
if not role_model:
raise ValueError("Role not found")
r_dc = role_model.as_dataclass()
perms_result = await session.execute(
select(RolePermission.permission_id).where(
RolePermission.role_uuid == role_uuid.bytes
)
)
r_dc.permissions = [row[0] for row in perms_result.fetchall()]
return r_dc
async def get_roles_by_organization(self, org_id: str) -> list[Role]:
async with self.session() as session:
org_uuid = UUID(org_id)
result = await session.execute(
select(RoleModel).where(RoleModel.org_uuid == org_uuid.bytes)
)
role_models = result.scalars().all()
roles: list[Role] = []
for rm in role_models:
r_dc = rm.as_dataclass()
perms_result = await session.execute(
select(RolePermission.permission_id).where(
RolePermission.role_uuid == rm.uuid
)
)
r_dc.permissions = [row[0] for row in perms_result.fetchall()]
roles.append(r_dc)
return roles
async def add_permission_to_organization(
self, org_id: str, permission_id: str
) -> None:
@ -994,9 +844,7 @@ class DB(DatabaseInterface):
.join(RoleModel, UserModel.role_uuid == RoleModel.uuid)
.join(OrgModel, RoleModel.org_uuid == OrgModel.uuid)
.outerjoin(RolePermission, RoleModel.uuid == RolePermission.role_uuid)
.outerjoin(
PermissionModel, RolePermission.permission_id == PermissionModel.id
)
.outerjoin(PermissionModel, RolePermission.permission_id == PermissionModel.id)
.where(SessionModel.key == session_key)
)

View File

@ -1,249 +1,52 @@
import argparse
import asyncio
import atexit
import contextlib
import ipaddress
import logging
import os
import signal
import subprocess
from pathlib import Path
from urllib.parse import urlparse
import uvicorn
DEFAULT_HOST = "localhost"
DEFAULT_SERVE_PORT = 4401
DEFAULT_DEV_PORT = 4402
def parse_endpoint(
value: str | None, default_port: int
) -> tuple[str | None, int | None, str | None, bool]:
"""Parse an endpoint using stdlib (urllib.parse, ipaddress).
Returns (host, port, uds_path). If uds_path is not None, host/port are None.
Supported forms:
- host[:port]
- :port (uses default host)
- [ipv6][:port] (bracketed for port usage)
- ipv6 (unbracketed, no port allowed -> default port)
- unix:/path/to/socket.sock
- None -> defaults (localhost:4401)
Notes:
- For IPv6 with an explicit port you MUST use brackets (e.g. [::1]:8080)
- Unbracketed IPv6 like ::1 implies the default port.
"""
if not value:
return DEFAULT_HOST, default_port, None, False
# Port only (numeric) -> localhost:port
if value.isdigit():
try:
port_only = int(value)
except ValueError: # pragma: no cover (isdigit guards)
raise SystemExit(f"Invalid port '{value}'")
return DEFAULT_HOST, port_only, None, False
# Leading colon :port -> bind all interfaces (0.0.0.0 + ::)
if value.startswith(":") and value != ":":
port_part = value[1:]
if not port_part.isdigit():
raise SystemExit(f"Invalid port in '{value}'")
return None, int(port_part), None, True
# UNIX domain socket
if value.startswith("unix:"):
uds_path = value[5:] or None
if uds_path is None:
raise SystemExit("unix: path must not be empty")
return None, None, uds_path, False
# Unbracketed IPv6 (cannot safely contain a port) -> detect by multiple colons
if value.count(":") > 1 and not value.startswith("["):
try:
ipaddress.IPv6Address(value)
except ValueError as e: # pragma: no cover
raise SystemExit(f"Invalid IPv6 address '{value}': {e}")
return value, default_port, None, False
# Use urllib.parse for everything else (host[:port], :port, [ipv6][:port])
parsed = urlparse(f"//{value}") # // prefix lets urlparse treat it as netloc
host = parsed.hostname
port = parsed.port
# Host may be None if empty (e.g. ':5500')
if not host:
host = DEFAULT_HOST
if port is None:
port = default_port
# Validate IP literals (optional; hostname passes through)
try:
# Strip brackets if somehow present (urlparse removes them already)
ipaddress.ip_address(host)
except ValueError:
# Not an IP address -> treat as hostname; no action
pass
return host, port, None, False
def add_common_options(p: argparse.ArgumentParser) -> None:
p.add_argument(
"--rp-id", default="localhost", help="Relying Party ID (default: localhost)"
)
p.add_argument("--rp-name", help="Relying Party name (default: same as rp-id)")
p.add_argument("--origin", help="Origin URL (default: https://<rp-id>)")
def main():
# Configure logging to remove the "ERROR:root:" prefix
logging.basicConfig(level=logging.INFO, format="%(message)s", force=True)
parser = argparse.ArgumentParser(
prog="passkey-auth", description="Passkey authentication server"
description="Run the passkey authentication server"
)
sub = parser.add_subparsers(dest="command", required=True)
# serve subcommand
serve = sub.add_parser(
"serve", help="Run the server (production style, no auto-reload)"
parser.add_argument(
"--host", default="localhost", help="Host to bind to (default: localhost)"
)
serve.add_argument(
"hostport",
nargs="?",
help=(
"Endpoint (default: localhost:4401). Forms: host[:port] | :port | "
"[ipv6][:port] | ipv6 | unix:/path.sock"
),
parser.add_argument(
"--port", type=int, default=4401, help="Port to bind to (default: 4401)"
)
add_common_options(serve)
# dev subcommand
dev = sub.add_parser("dev", help="Run the server in development (auto-reload)")
dev.add_argument(
"hostport",
nargs="?",
help=(
"Endpoint (default: localhost:4402). Forms: host[:port] | :port | "
"[ipv6][:port] | ipv6 | unix:/path.sock"
),
parser.add_argument(
"--dev", action="store_true", help="Enable development mode with auto-reload"
)
add_common_options(dev)
parser.add_argument(
"--rp-id", default="localhost", help="Relying Party ID (default: localhost)"
)
parser.add_argument("--rp-name", help="Relying Party name (default: same as rp-id)")
parser.add_argument("--origin", help="Origin URL (default: https://<rp-id>)")
args = parser.parse_args()
default_port = DEFAULT_DEV_PORT if args.command == "dev" else DEFAULT_SERVE_PORT
host, port, uds, all_ifaces = parse_endpoint(args.hostport, default_port)
reload_enabled = args.command == "dev"
# Initialize the application
try:
from .. import globals
# Determine origin (dev mode default override)
effective_origin = args.origin
if reload_enabled and not effective_origin:
# Use a distinct port (4403) for RP origin in dev if not explicitly provided
effective_origin = "http://localhost:4403"
# Export configuration via environment for lifespan initialization in each process
os.environ.setdefault("PASSKEY_RP_ID", args.rp_id)
if args.rp_name:
os.environ["PASSKEY_RP_NAME"] = args.rp_name
if effective_origin:
os.environ["PASSKEY_ORIGIN"] = effective_origin
# One-time initialization + bootstrap before starting any server processes.
# Lifespan in worker processes will call globals.init with bootstrap disabled.
from passkey import globals as _globals # local import
asyncio.run(
_globals.init(
rp_id=args.rp_id,
rp_name=args.rp_name,
origin=effective_origin,
default_admin=os.getenv("PASSKEY_DEFAULT_ADMIN") or None,
default_org=os.getenv("PASSKEY_DEFAULT_ORG") or None,
bootstrap=True,
asyncio.run(
globals.init(rp_id=args.rp_id, rp_name=args.rp_name, origin=args.origin)
)
except ValueError as e:
logging.error(f"⚠️ {e}")
return
uvicorn.run(
"passkey.fastapi:app",
host=args.host,
port=args.port,
reload=args.dev,
log_level="info",
)
run_kwargs: dict = {
"reload": reload_enabled,
"log_level": "info",
}
if uds:
run_kwargs["uds"] = uds
else:
# For :port form (all interfaces) we will handle separately
if not all_ifaces:
run_kwargs["host"] = host
run_kwargs["port"] = port
bun_process: subprocess.Popen | None = None
if reload_enabled:
# Spawn frontend dev server (bun) only in the original parent (avoid duplicates on reload)
if os.environ.get("PASSKEY_BUN_PARENT") != "1":
os.environ["PASSKEY_BUN_PARENT"] = "1"
frontend_dir = Path(__file__).parent.parent.parent / "frontend"
if (frontend_dir / "package.json").exists():
try:
bun_process = subprocess.Popen(
["bun", "run", "dev"], cwd=str(frontend_dir)
)
logging.info("Started bun dev server")
except FileNotFoundError:
logging.warning(
"bun not found: skipping frontend dev server (install bun)"
)
def _terminate_bun(): # pragma: no cover
if bun_process and bun_process.poll() is None:
with contextlib.suppress(Exception):
bun_process.terminate()
atexit.register(_terminate_bun)
def _signal_handler(signum, frame): # pragma: no cover
_terminate_bun()
raise SystemExit(0)
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
if all_ifaces and not uds:
# If reload enabled, fallback to single dual-stack attempt (::) to keep reload simple
if reload_enabled:
run_kwargs["host"] = "::"
run_kwargs["port"] = port
uvicorn.run("passkey.fastapi:app", **run_kwargs)
else:
# Start two servers concurrently: IPv4 and IPv6
from uvicorn import Config, Server # noqa: E402 local import
from passkey.fastapi import app as fastapi_app # noqa: E402 local import
async def serve_both():
servers = []
assert port is not None
for h in ("0.0.0.0", "::"):
try:
cfg = Config(
app=fastapi_app,
host=h,
port=port,
log_level="info",
)
servers.append(Server(cfg))
except Exception as e: # pragma: no cover
logging.warning(f"Failed to configure server for {h}: {e}")
tasks = [asyncio.create_task(s.serve()) for s in servers]
await asyncio.gather(*tasks)
asyncio.run(serve_both())
else:
uvicorn.run("passkey.fastapi:app", **run_kwargs)
if __name__ == "__main__":
main()

View File

@ -8,18 +8,16 @@ This module contains all the HTTP API endpoints for:
- Login/logout functionality
"""
from uuid import UUID, uuid4
from uuid import UUID
from fastapi import Body, Cookie, Depends, FastAPI, HTTPException, Response
from fastapi import Cookie, Depends, FastAPI, Response
from fastapi.security import HTTPBearer
from passkey.util import passphrase
from .. import aaguid
from ..authsession import delete_credential, expires, get_reset, get_session
from ..authsession import delete_credential, get_reset, get_session
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import tokens
from ..util.tokens import session_key
from . import session
@ -29,19 +27,6 @@ bearer_auth = HTTPBearer(auto_error=True)
def register_api_routes(app: FastAPI):
"""Register all API routes on the FastAPI app."""
async def _get_ctx_and_admin_flags(auth_cookie: str):
"""Helper to get session context and admin flags from cookie."""
if not auth_cookie:
raise ValueError("Not authenticated")
ctx = await db.instance.get_session_context(session_key(auth_cookie))
if not ctx:
raise ValueError("Not authenticated")
role_perm_ids = set(ctx.role.permissions or [])
org_uuid_str = str(ctx.org.uuid)
is_global_admin = "auth/admin" in role_perm_ids
is_org_admin = f"auth/org:{org_uuid_str}" in role_perm_ids
return ctx, is_global_admin, is_org_admin
@app.post("/auth/validate")
async def validate_token(response: Response, auth=Cookie(None)):
"""Lightweight token validation endpoint."""
@ -53,50 +38,29 @@ def register_api_routes(app: FastAPI):
@app.post("/auth/user-info")
async def api_user_info(response: Response, auth=Cookie(None)):
"""Get user information.
- For authenticated sessions: return full context (org/role/permissions/credentials)
- For reset tokens: return only basic user information to drive reset flow
"""
try:
reset = auth and passphrase.is_well_formed(auth)
s = await (get_reset if reset else get_session)(auth)
except ValueError:
raise HTTPException(
status_code=401,
detail="Authentication Required",
headers={"WWW-Authenticate": "Bearer"},
)
# Minimal response for reset tokens
if reset:
u = await db.instance.get_user_by_uuid(s.user_uuid)
return {
"authenticated": False,
"session_type": s.info.get("type"),
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
"created_at": u.created_at.isoformat() if u.created_at else None,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
"visits": u.visits,
},
}
# Full context for authenticated sessions
"""Get full user information for the authenticated user."""
reset = passphrase.is_well_formed(auth)
s = await (get_reset if reset else get_session)(auth)
# Session context (org, role, permissions)
ctx = await db.instance.get_session_context(session_key(auth))
# Fallback if context not available (e.g., reset session)
u = await db.instance.get_user_by_uuid(s.user_uuid)
# Get all credentials for the user
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
credentials: list[dict] = []
user_aaguids: set[str] = set()
credentials = []
user_aaguids = set()
for cred_id in credential_ids:
try:
c = await db.instance.get_credential_by_id(cred_id)
except ValueError:
continue # Skip dangling IDs
c = await db.instance.get_credential_by_id(cred_id)
# Convert AAGUID to string format
aaguid_str = str(c.aaguid)
user_aaguids.add(aaguid_str)
# Check if this is the current session credential
is_current_session = s.credential_uuid == c.uuid
credentials.append(
{
"credential_uuid": str(c.uuid),
@ -107,31 +71,37 @@ def register_api_routes(app: FastAPI):
if c.last_verified
else None,
"sign_count": c.sign_count,
"is_current_session": s.credential_uuid == c.uuid,
"is_current_session": is_current_session,
}
)
credentials.sort(key=lambda cred: cred["created_at"]) # chronological
# Get AAGUID information for only the AAGUIDs that the user has
aaguid_info = aaguid.filter(user_aaguids)
# Sort credentials by creation date (earliest first, most recently created last)
credentials.sort(key=lambda cred: cred["created_at"])
# Permissions and roles
role_info = None
org_info = None
effective_permissions: list[str] = []
effective_permissions = []
is_global_admin = False
is_org_admin = False
if ctx:
role_info = {
"uuid": str(ctx.role.uuid),
"display_name": ctx.role.display_name,
"permissions": ctx.role.permissions,
"permissions": ctx.role.permissions, # IDs
}
org_info = {
"uuid": str(ctx.org.uuid),
"display_name": ctx.org.display_name,
"permissions": ctx.org.permissions,
"permissions": ctx.org.permissions, # IDs the org can grant
}
# Effective permissions are role permissions; API also returns full objects for convenience
effective_permissions = [p.id for p in (ctx.permissions or [])]
is_global_admin = "auth/admin" in role_info["permissions"]
# org admin permission is auth/org:<org_uuid>
is_org_admin = (
f"auth/org:{org_info['uuid']}" in role_info["permissions"]
if org_info
@ -139,8 +109,8 @@ def register_api_routes(app: FastAPI):
)
return {
"authenticated": True,
"session_type": s.info.get("type"),
"authenticated": not reset,
"session_type": s.info["type"],
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
@ -157,371 +127,6 @@ def register_api_routes(app: FastAPI):
"aaguid_info": aaguid_info,
}
# -------------------- Admin API: Organizations --------------------
@app.get("/auth/admin/orgs")
async def admin_list_orgs(auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or is_org_admin):
raise ValueError("Insufficient permissions")
orgs = await db.instance.list_organizations()
# If only org admin, filter to their org
if not is_global_admin:
orgs = [o for o in orgs if o.uuid == ctx.org.uuid]
def role_to_dict(r):
return {
"uuid": str(r.uuid),
"org_uuid": str(r.org_uuid),
"display_name": r.display_name,
"permissions": r.permissions,
}
async def org_to_dict(o):
# Fetch users for each org
users = await db.instance.get_organization_users(str(o.uuid))
return {
"uuid": str(o.uuid),
"display_name": o.display_name,
"permissions": o.permissions,
"roles": [role_to_dict(r) for r in o.roles],
"users": [
{
"uuid": str(u.uuid),
"display_name": u.display_name,
"role": role_name,
"visits": u.visits,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
}
for (u, role_name) in users
],
}
return [await org_to_dict(o) for o in orgs]
@app.post("/auth/admin/orgs")
async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Org as OrgDC # local import to avoid cycles in typing
org_uuid = uuid4()
display_name = payload.get("display_name") or "New Organization"
permissions = payload.get("permissions") or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.create_organization(org)
return {"uuid": str(org_uuid)}
@app.put("/auth/admin/orgs/{org_uuid}")
async def admin_update_org(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
from ..db import Org as OrgDC
current = await db.instance.get_organization(str(org_uuid))
display_name = payload.get("display_name") or current.display_name
permissions = (
payload.get("permissions")
if "permissions" in payload
else current.permissions
) or []
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
await db.instance.update_organization(org)
return {"status": "ok"}
@app.delete("/auth/admin/orgs/{org_uuid}")
async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
await db.instance.delete_organization(org_uuid)
return {"status": "ok"}
# Manage an org's grantable permissions
@app.post("/auth/admin/orgs/{org_uuid}/permissions/{permission_id}")
async def admin_add_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
await db.instance.add_permission_to_organization(str(org_uuid), permission_id)
return {"status": "ok"}
@app.delete("/auth/admin/orgs/{org_uuid}/permissions/{permission_id}")
async def admin_remove_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
await db.instance.remove_permission_from_organization(
str(org_uuid), permission_id
)
return {"status": "ok"}
# -------------------- Admin API: Roles --------------------
@app.post("/auth/admin/orgs/{org_uuid}/roles")
async def admin_create_role(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC
role_uuid = uuid4()
display_name = payload.get("display_name") or "New Role"
permissions = payload.get("permissions") or []
# Validate that permissions exist and are allowed by org
org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid) # raises if not found
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
role = RoleDC(
uuid=role_uuid,
org_uuid=org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.create_role(role)
return {"uuid": str(role_uuid)}
@app.put("/auth/admin/roles/{role_uuid}")
async def admin_update_role(
role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
role = await db.instance.get_role(role_uuid)
# Only org admins for that org or global admin can update
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)):
raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC
display_name = payload.get("display_name") or role.display_name
permissions = payload.get("permissions") or role.permissions
# Validate against org grantable permissions
org = await db.instance.get_organization(str(role.org_uuid))
grantable = set(org.permissions or [])
for pid in permissions:
await db.instance.get_permission(pid) # raises if not found
if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}")
updated = RoleDC(
uuid=role_uuid,
org_uuid=role.org_uuid,
display_name=display_name,
permissions=permissions,
)
await db.instance.update_role(updated)
return {"status": "ok"}
@app.post("/auth/admin/orgs/{org_uuid}/users")
async def admin_create_user(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
"""Create a new user within an organization.
Body parameters:
- display_name: str (required)
- role: str (required) display name of existing role in that org
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
display_name = payload.get("display_name")
role_name = payload.get("role")
if not display_name or not role_name:
raise ValueError("display_name and role are required")
# Validate role exists in org
from ..db import User as UserDC # local import to avoid cycles
roles = await db.instance.get_roles_by_organization(str(org_uuid))
role_obj = next((r for r in roles if r.display_name == role_name), None)
if not role_obj:
raise ValueError("Role not found in organization")
# Create user
user_uuid = uuid4()
user = UserDC(
uuid=user_uuid,
display_name=display_name,
role_uuid=role_obj.uuid,
visits=0,
created_at=None,
last_seen=None,
)
await db.instance.create_user(user)
return {"uuid": str(user_uuid)}
@app.delete("/auth/admin/roles/{role_uuid}")
async def admin_delete_role(role_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
role = await db.instance.get_role(role_uuid)
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)):
raise ValueError("Insufficient permissions")
await db.instance.delete_role(role_uuid)
return {"status": "ok"}
# -------------------- Admin API: Users (role management) --------------------
@app.put("/auth/admin/orgs/{org_uuid}/users/{user_uuid}/role")
async def admin_update_user_role(
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
):
"""Change a user's role within their organization.
Body: {"role": "New Role Display Name"}
Only global admins or admins of the organization can perform this.
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)):
raise ValueError("Insufficient permissions")
new_role = payload.get("role")
if not new_role:
raise ValueError("role is required")
# Verify user belongs to this org
try:
user_org, _current_role = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise ValueError("User not found")
if user_org.uuid != org_uuid:
raise ValueError("User does not belong to this organization")
# Ensure role exists in org and update
roles = await db.instance.get_roles_by_organization(str(org_uuid))
if not any(r.display_name == new_role for r in roles):
raise ValueError("Role not found in organization")
await db.instance.update_user_role_in_organization(user_uuid, new_role)
return {"status": "ok"}
@app.post("/auth/admin/users/{user_uuid}/create-link")
async def admin_create_user_registration_link(user_uuid: UUID, auth=Cookie(None)):
"""Create a device registration/reset link for a specific user (admin only).
Returns JSON: {"url": str, "expires": iso8601}
"""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
# Ensure user exists and fetch their org
try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)):
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Generate human-readable reset token and store as session with reset key
token = passphrase.generate()
await db.instance.create_session(
user_uuid=user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info={"type": "device addition", "created_by_admin": True},
)
origin = global_passkey.instance.origin
url = f"{origin}/auth/{token}"
return {"url": url, "expires": expires().isoformat()}
@app.get("/auth/admin/users/{user_uuid}")
async def admin_get_user_detail(user_uuid: UUID, auth=Cookie(None)):
"""Get detailed information about a user (admin only)."""
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
try:
user_org, role_name = await db.instance.get_user_organization(user_uuid)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)):
raise HTTPException(status_code=403, detail="Insufficient permissions")
user = await db.instance.get_user_by_uuid(user_uuid)
# Gather credentials
cred_ids = await db.instance.get_credentials_by_user_uuid(user_uuid)
creds: list[dict] = []
aaguids: set[str] = set()
for cid in cred_ids:
try:
c = await db.instance.get_credential_by_id(cid)
except ValueError:
continue
aaguid_str = str(c.aaguid)
aaguids.add(aaguid_str)
creds.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat() if c.last_verified else None,
"sign_count": c.sign_count,
}
)
from .. import aaguid as aaguid_mod
aaguid_info = aaguid_mod.filter(aaguids)
return {
"display_name": user.display_name,
"org": {"display_name": user_org.display_name},
"role": role_name,
"visits": user.visits,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_seen": user.last_seen.isoformat() if user.last_seen else None,
"credentials": creds,
"aaguid_info": aaguid_info,
}
# -------------------- Admin API: Permissions (global) --------------------
@app.get("/auth/admin/permissions")
async def admin_list_permissions(auth=Cookie(None)):
_, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth)
if not (is_global_admin or is_org_admin):
raise ValueError("Insufficient permissions")
perms = await db.instance.list_permissions()
return [{"id": p.id, "display_name": p.display_name} for p in perms]
@app.post("/auth/admin/permissions")
async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Permission as PermDC
perm_id = payload.get("id")
display_name = payload.get("display_name")
if not perm_id or not display_name:
raise ValueError("id and display_name are required")
await db.instance.create_permission(
PermDC(id=perm_id, display_name=display_name)
)
return {"status": "ok"}
@app.put("/auth/admin/permissions/{permission_id}")
async def admin_update_permission(
permission_id: str, payload: dict = Body(...), auth=Cookie(None)
):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
from ..db import Permission as PermDC
display_name = payload.get("display_name")
if not display_name:
raise ValueError("display_name is required")
await db.instance.update_permission(
PermDC(id=permission_id, display_name=display_name)
)
return {"status": "ok"}
@app.delete("/auth/admin/permissions/{permission_id}")
async def admin_delete_permission(permission_id: str, auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth)
if not is_global_admin:
raise ValueError("Global admin required")
await db.instance.delete_permission(permission_id)
return {"status": "ok"}
@app.post("/auth/logout")
async def api_logout(response: Response, auth=Cookie(None)):
"""Log out the current user by clearing the session cookie and deleting from database."""

View File

@ -1,7 +1,5 @@
import contextlib
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import Cookie, FastAPI, Request, Response
@ -16,41 +14,7 @@ from .reset import register_reset_routes
STATIC_DIR = Path(__file__).parent.parent / "frontend-build"
@asynccontextmanager
async def lifespan(app: FastAPI): # pragma: no cover - startup path
"""Application lifespan to ensure globals (DB, passkey) are initialized in each process.
We populate configuration from environment variables (set by the CLI entrypoint)
so that uvicorn reload / multiprocess workers inherit the settings.
"""
from .. import globals
rp_id = os.getenv("PASSKEY_RP_ID", "localhost")
rp_name = os.getenv("PASSKEY_RP_NAME") or None
origin = os.getenv("PASSKEY_ORIGIN") or None
default_admin = (
os.getenv("PASSKEY_DEFAULT_ADMIN") or None
) # still passed for context
default_org = os.getenv("PASSKEY_DEFAULT_ORG") or None
try:
# CLI (__main__) performs bootstrap once; here we skip to avoid duplicate work
await globals.init(
rp_id=rp_id,
rp_name=rp_name,
origin=origin,
default_admin=default_admin,
default_org=default_org,
bootstrap=False,
)
except ValueError as e:
logging.error(f"⚠️ {e}")
# Re-raise to fail fast
raise
yield
# (Optional) add shutdown cleanup here later
app = FastAPI(lifespan=lifespan)
app = FastAPI()
# Global exception handlers
@ -107,24 +71,16 @@ async def redirect_to_index():
@app.get("/auth/admin")
async def serve_admin(auth=Cookie(None)):
"""Serve the admin app entry point if an authenticated session exists.
If no valid authenticated session cookie is present, return a 401 with the
main app's index.html so the frontend can initiate login/registration flow.
"""
if auth:
with contextlib.suppress(ValueError):
s = await get_session(auth)
if s.info and s.info.get("type") == "authenticated":
return FileResponse(STATIC_DIR / "admin" / "index.html")
# Not authenticated: serve main index with 401
return FileResponse(
STATIC_DIR / "index.html",
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
async def serve_admin():
"""Serve the admin app entry point."""
# Vite MPA builds admin as admin.html in the same outDir
admin_html = STATIC_DIR / "admin.html"
# If configured to emit admin/index.html, support that too
if not admin_html.exists():
alt = STATIC_DIR / "admin" / "index.html"
if alt.exists():
return FileResponse(alt)
return FileResponse(admin_html)
# Register API routes

View File

@ -5,7 +5,6 @@ from fastapi.responses import RedirectResponse
from ..authsession import expires, get_session
from ..globals import db
from ..globals import passkey as global_passkey
from ..util import passphrase, tokens
from . import session
@ -44,9 +43,10 @@ def register_reset_routes(app):
reset_token: str,
):
"""Verifies the token and redirects to auth app for credential registration."""
# This route should only match to exact passphrases
print(f"Reset handler called with url: {request.url.path}")
if not passphrase.is_well_formed(reset_token):
raise HTTPException(status_code=404)
origin = global_passkey.instance.origin
try:
# Get session token to validate it exists and get user_id
key = tokens.reset_key(reset_token)
@ -54,7 +54,7 @@ def register_reset_routes(app):
if not sess:
raise ValueError("Invalid or expired registration token")
response = RedirectResponse(url=f"{origin}/auth/", status_code=303)
response = RedirectResponse(url="/auth/", status_code=303)
session.set_session_cookie(response, reset_token)
return response
@ -65,4 +65,4 @@ def register_reset_routes(app):
else:
logging.exception("Internal Server Error in reset_authentication")
msg = "Internal Server Error"
return RedirectResponse(url=f"{origin}/auth/#{msg}", status_code=303)
return RedirectResponse(url=f"/auth/#{msg}", status_code=303)

View File

@ -32,15 +32,8 @@ async def init(
origin: str | None = None,
default_admin: str | None = None,
default_org: str | None = None,
*,
bootstrap: bool = True,
) -> None:
"""Initialize global passkey + database.
If bootstrap=True (default) the system bootstrap_if_needed() will be invoked.
In FastAPI lifespan we call with bootstrap=False to avoid duplicate bootstrapping
since the CLI performs it once before servers start.
"""
"""Initialize the global database, passkey instance, and bootstrap the system if needed."""
# Initialize passkey instance with provided parameters
passkey.instance = Passkey(
rp_id=rp_id,
@ -56,11 +49,10 @@ async def init(
await sql.init()
if bootstrap:
# Bootstrap system if needed
from .bootstrap import bootstrap_if_needed
# Bootstrap system if needed
from .bootstrap import bootstrap_if_needed
await bootstrap_if_needed(default_admin, default_org)
await bootstrap_if_needed(default_admin, default_org)
# Global instances

View File

@ -28,7 +28,6 @@ class CustomBuildHook(BuildHookInterface):
stderr.write("\n### npm run build\n")
subprocess.run([npm, "run", "build"], check=True) # noqa: S603
else:
assert bun
stderr.write("### bun install\n")
subprocess.run([bun, "install"], check=True) # noqa: S603
stderr.write("\n### bun run build\n")