Compare commits

..

No commits in common. "dc0b0f461322494677fd16bae2afba256d37ef4f" and "3567b7802b3024a1c7489d0aeb1af7090388281a" have entirely different histories.

14 changed files with 485 additions and 1124 deletions

364
API.md
View File

@ -1,364 +0,0 @@
# PassKey Auth API Documentation
This document describes all API endpoints available in the PassKey Auth FastAPI application.
## Base URL
- **Development**: `http://localhost:4401`
- All API endpoints are prefixed with `/auth`
## Authentication
The API uses JWT tokens stored in HTTP-only cookies for session management. Some endpoints require authentication via session cookies.
---
## HTTP API Endpoints
### User Management
#### `POST /auth/user-info`
Get detailed information about the current authenticated user and their credentials.
**Authentication**: Required (session cookie)
**Response**:
```json
{
"status": "success",
"user": {
"user_id": "string (UUID)",
"user_name": "string",
"created_at": "string (ISO 8601)",
"last_seen": "string (ISO 8601)",
"visits": "number"
},
"credentials": [
{
"credential_id": "string (hex)",
"aaguid": "string (UUID)",
"created_at": "string (ISO 8601)",
"last_used": "string (ISO 8601) | null",
"last_verified": "string (ISO 8601) | null",
"sign_count": "number",
"is_current_session": "boolean"
}
],
"aaguid_info": "object (AAGUID information)"
}
```
**Error Response**:
```json
{
"error": "Not authenticated" | "Failed to get user info: <error_message>"
}
```
---
### Session Management
#### `POST /auth/logout`
Log out the current user by clearing the session cookie.
**Authentication**: Not required
**Response**:
```json
{
"status": "success",
"message": "Logged out successfully"
}
```
#### `POST /auth/set-session`
Set session cookie using JWT token from request body or Authorization header.
**Authentication**: Not required
**Request Body** (alternative to Authorization header):
```json
{
"token": "string (JWT token)"
}
```
**Headers** (alternative to request body):
```
Authorization: Bearer <JWT_token>
```
**Response**:
```json
{
"status": "success",
"message": "Session cookie set successfully",
"user_id": "string (UUID)"
}
```
**Error Response**:
```json
{
"error": "No session token provided" | "Invalid or expired session token" | "Failed to set session: <error_message>"
}
```
#### `GET /auth/forward-auth`
Verification endpoint for use with Caddy forward_auth or Nginx auth_request.
**Authentication**: Required (session cookie)
**Success Response**:
- Status: `204 No Content`
- Headers: `x-auth-user-id: <user_id>`
**Error Response**:
- Status: `401 Unauthorized`
- Returns authentication app HTML page
- Headers: `www-authenticate: PrivateToken`
---
### Credential Management
#### `POST /auth/delete-credential`
Delete a specific passkey credential for the current user.
**Authentication**: Required (session cookie)
**Request Body**:
```json
{
"credential_id": "string (hex-encoded credential ID)"
}
```
**Response**:
```json
{
"status": "success",
"message": "Credential deleted successfully"
}
```
**Error Response**:
```json
{
"error": "Not authenticated" | "credential_id is required" | "Invalid credential_id format" | "Credential not found or access denied" | "Cannot delete current session credential" | "Cannot delete last remaining credential" | "Failed to delete credential: <error_message>"
}
```
---
### Device Addition
#### `POST /auth/create-device-link`
Generate a device addition link for authenticated users to add new passkeys to their account.
**Authentication**: Required (session cookie)
**Response**:
```json
{
"status": "success",
"message": "Device addition link generated successfully",
"addition_link": "string (URL)",
"expires_in_hours": 24
}
```
**Error Response**:
```json
{
"error": "Authentication required" | "Failed to create device addition link: <error_message>"
}
```
#### `POST /auth/validate-device-token`
Validate a device addition token and return associated user information.
**Authentication**: Not required
**Request Body**:
```json
{
"token": "string (device addition token)"
}
```
**Response**:
```json
{
"status": "success",
"valid": true,
"user_id": "string (UUID)",
"user_name": "string",
"token": "string (device addition token)"
}
```
**Error Response**:
```json
{
"error": "Device addition token is required" | "Invalid or expired device addition token" | "Device addition token has expired" | "Failed to validate device addition token: <error_message>"
}
```
---
### Static File Serving
#### `GET /auth/{passphrase}`
Handle passphrase-based authentication redirect with cookie setting.
**Parameters**:
- `passphrase`: String matching pattern `^\w+(\.\w+){2,}$` (e.g., "word1.word2.word3")
**Response**:
- Status: `303 See Other`
- Redirect to: `/`
- Sets temporary cookie: `auth-token` (expires in 2 seconds)
#### `GET /auth`
Serve the main authentication app.
**Response**: Returns the main `index.html` file for the authentication SPA.
#### `GET /auth/assets/{path}`
Serve static assets (CSS, JS, images) for the authentication app.
#### `GET /{path:path}`
Catch-all route for SPA routing. Serves `index.html` for all non-API routes when requesting HTML content.
**Response**:
- For HTML requests: Returns `index.html`
- For non-HTML requests: Returns `404 Not Found` JSON response
---
## WebSocket API Endpoints
All WebSocket endpoints are mounted under `/auth/ws/`.
### Registration
#### `WS /auth/ws/register_new`
Register a new user with a new passkey credential.
**Flow**:
1. Client connects to WebSocket
2. Server sends registration options
3. Client performs WebAuthn ceremony and sends response
4. Server validates and creates new user + credential
5. Server sends JWT token for session establishment
**Server Messages**:
```json
// Registration options
{
"rp": { "id": "localhost", "name": "Passkey Auth" },
"user": { "id": "base64", "name": "string", "displayName": "string" },
"challenge": "base64",
"pubKeyCredParams": [...],
"timeout": 60000,
"attestation": "none",
"authenticatorSelection": {...}
}
// Success response
{
"status": "success",
"message": "User registered successfully",
"token": "string (JWT)"
}
```
#### `WS /auth/ws/add_credential`
Add a new passkey credential to an existing authenticated user.
**Authentication**: Required (session cookie)
**Flow**:
1. Client connects with valid session
2. Server sends registration options for existing user
3. Client performs WebAuthn ceremony and sends response
4. Server validates and adds new credential
5. Server sends success confirmation
#### `WS /auth/ws/add_device_credential`
Add a new passkey credential using a device addition token.
**Flow**:
1. Client connects and sends device addition token
2. Server validates token and sends registration options
3. Client performs WebAuthn ceremony and sends response
4. Server validates, adds credential, and cleans up token
5. Server sends JWT token for session establishment
**Initial Client Message**:
```json
{
"token": "string (device addition token)"
}
```
### Authentication
#### `WS /auth/ws/authenticate`
Authenticate using existing passkey credentials.
**Flow**:
1. Client connects to WebSocket
2. Server sends authentication options
3. Client performs WebAuthn ceremony and sends response
4. Server validates credential and updates usage stats
5. Server sends JWT token for session establishment
**Server Messages**:
```json
// Authentication options
{
"challenge": "base64",
"timeout": 60000,
"rpId": "localhost",
"allowCredentials": [...] // Optional, for non-discoverable credentials
}
// Success response
{
"status": "success",
"message": "Authentication successful",
"token": "string (JWT)"
}
// Error response
{
"status": "error",
"message": "error description"
}
```
---
## Error Handling
All endpoints return consistent error responses:
```json
{
"error": "string (error description)"
}
```
## Security Features
- **HTTP-only Cookies**: Session tokens are stored in secure, HTTP-only cookies
- **CSRF Protection**: SameSite cookie attributes prevent CSRF attacks
- **Token Validation**: All JWT tokens are validated and automatically refreshed
- **Credential Isolation**: Users can only access and modify their own credentials
- **Time-based Expiration**: Device addition tokens expire after 24 hours
- **Rate Limiting**: WebSocket connections are limited and validated
## CORS and Headers
The application includes appropriate CORS headers and security headers for production use with reverse proxies like Caddy or Nginx.

View File

@ -5,36 +5,39 @@
<RegisterView v-if="store.currentView === 'register'" />
<ProfileView v-if="store.currentView === 'profile'" />
<DeviceLinkView v-if="store.currentView === 'device-link'" />
<AddCredentialView v-if="store.currentView === 'add-credential'" />
<AddDeviceCredentialView v-if="store.currentView === 'add-device-credential'" />
</div>
</template>
<script setup>
import { onMounted } from 'vue'
import { onMounted, ref } from 'vue'
import { useAuthStore } from '@/stores/auth'
import StatusMessage from '@/components/StatusMessage.vue'
import LoginView from '@/components/LoginView.vue'
import RegisterView from '@/components/RegisterView.vue'
import ProfileView from '@/components/ProfileView.vue'
import DeviceLinkView from '@/components/DeviceLinkView.vue'
import AddCredentialView from '@/components/AddCredentialView.vue'
import AddDeviceCredentialView from '@/components/AddDeviceCredentialView.vue'
import { getCookie } from './utils/helpers'
const store = useAuthStore()
let isLoggedIn
onMounted(async () => {
// Check for device addition session first
try {
await store.loadUserInfo()
} catch (error) {
console.log('Failed to load user info:', error)
store.currentView = 'login'
if (getCookie('auth-token')) {
store.currentView = 'add-device-credential'
return
}
if (store.currentCredentials.length) {
// User is logged in, go to profile
isLoggedIn = await store.validateStoredToken()
if (isLoggedIn) {
// User is logged in, load their data and go to profile
try {
await store.loadUserInfo()
store.currentView = 'profile'
} else if (store.currentUser) {
// User is logged in via reset link, allow adding a credential
store.currentView = 'add-credential'
} catch (error) {
console.error('Failed to load user info:', error)
store.currentView = 'login'
}
} else {
// User is not logged in, show login
store.currentView = 'login'

View File

@ -15,42 +15,30 @@
<script setup>
import { useAuthStore } from '@/stores/auth'
import { registerWithSession } from '@/utils/passkey'
import { registerWithToken } from '@/utils/passkey'
import { ref, onMounted } from 'vue'
import { getCookie } from '@/utils/helpers'
const authStore = useAuthStore()
const hasDeviceSession = ref(false)
const token = ref(null)
// Check existing session on app load
onMounted(async () => {
try {
// Check if we have a device addition session
const response = await fetch('/auth/device-session-check', {
credentials: 'include'
})
const data = await response.json()
if (data.device_addition_session) {
hasDeviceSession.value = true
} else {
authStore.showMessage('No device addition session found.', 'error')
authStore.currentView = 'login'
}
} catch (error) {
authStore.showMessage('Failed to check device addition session.', 'error')
onMounted(() => {
// Check for 'auth-token' cookie
token.value = getCookie('auth-token')
if (!token.value) {
authStore.showMessage('No registration token cookie found.', 'error')
authStore.currentView = 'login'
return
}
// Delete the cookie
document.cookie = 'auth-token=; Max-Age=0; path=/'
})
function register() {
if (!hasDeviceSession.value) {
authStore.showMessage('No valid device addition session', 'error')
return
}
authStore.isLoading = true
authStore.showMessage('Starting registration...', 'info')
registerWithSession().finally(() => {
registerWithToken(token.value).finally(() => {
authStore.isLoading = false
}).then(() => {
authStore.showMessage('Passkey registered successfully!', 'success', 2000)

View File

@ -17,12 +17,12 @@
<div v-if="authStore.isLoading">
<p>Loading credentials...</p>
</div>
<div v-else-if="authStore.currentCredentials.length === 0">
<div v-else-if="userCredentialsData.credentials.length === 0">
<p>No passkeys found.</p>
</div>
<div v-else>
<div
v-for="credential in authStore.currentCredentials"
v-for="credential in userCredentialsData.credentials"
:key="credential.credential_id"
:class="['credential-item', { 'current-session': credential.is_current_session }]"
>
@ -84,21 +84,36 @@ import { formatDate } from '@/utils/helpers'
import { registerCredential } from '@/utils/passkey'
const authStore = useAuthStore()
const currentCredentials = ref([])
const userCredentialsData = ref({ credentials: [], aaguid_info: {} })
const updateInterval = ref(null)
onMounted(async () => {
try {
await authStore.loadUserInfo()
currentCredentials.value = await authStore.loadCredentials()
} catch (error) {
authStore.showMessage(`Failed to load user info: ${error.message}`, 'error')
authStore.currentView = 'login'
return
}
// Fetch user credentials from the server
try {
const response = await fetch('/auth/user-credentials')
const result = await response.json()
console.log('Fetch Response:', result) // Log the entire response
if (result.error) throw new Error(result.error)
Object.assign(userCredentialsData.value, result) // Store the entire response
} catch (error) {
console.error('Failed to fetch user credentials:', error)
}
updateInterval.value = setInterval(() => {
// Trigger Vue reactivity to update formatDate fields
authStore.currentUser = { ...authStore.currentUser }
authStore.currentCredentials = [...authStore.currentCredentials]
userCredentialsData.value.credentials = [...userCredentialsData.value.credentials]
}, 60000) // Update every minute
})
@ -109,12 +124,12 @@ onUnmounted(() => {
})
const getCredentialAuthName = (credential) => {
const authInfo = authStore.aaguidInfo[credential.aaguid]
const authInfo = userCredentialsData.value.aaguid_info[credential.aaguid]
return authInfo ? authInfo.name : 'Unknown Authenticator'
}
const getCredentialAuthIcon = (credential) => {
const authInfo = authStore.aaguidInfo[credential.aaguid]
const authInfo = userCredentialsData.value.aaguid_info[credential.aaguid]
if (!authInfo) return null
const isDarkMode = window.matchMedia && window.matchMedia('(prefers-color-scheme: dark)').matches
@ -127,7 +142,7 @@ const addNewCredential = async () => {
authStore.isLoading = true
authStore.showMessage('Adding new passkey...', 'info')
const result = await registerCredential()
await authStore.loadUserInfo()
currentCredentials.value = await authStore.loadCredentials()
authStore.showMessage('New passkey added successfully!', 'success', 3000)
} catch (error) {
console.error('Failed to add new passkey:', error)
@ -142,6 +157,7 @@ const deleteCredential = async (credentialId) => {
try {
await authStore.deleteCredential(credentialId)
currentCredentials.value = await authStore.loadCredentials()
authStore.showMessage('Passkey deleted successfully!', 'success', 3000)
} catch (error) {
authStore.showMessage(`Failed to delete passkey: ${error.message}`, 'error')

View File

@ -5,8 +5,6 @@ export const useAuthStore = defineStore('auth', {
state: () => ({
// Auth State
currentUser: null,
currentCredentials: [],
aaguidInfo: {},
isLoading: false,
// UI State
@ -30,6 +28,15 @@ export const useAuthStore = defineStore('auth', {
}, duration)
}
},
async validateStoredToken() {
try {
const response = await fetch('/auth/validate-token')
const result = await response.json()
return result.status === 'success'
} catch (error) {
return false
}
},
async setSessionCookie(sessionToken) {
const response = await fetch('/auth/set-session', {
method: 'POST',
@ -75,13 +82,24 @@ export const useAuthStore = defineStore('auth', {
}
},
async loadUserInfo() {
const response = await fetch('/auth/user-info', {method: 'POST'})
const response = await fetch('/auth/user-info')
const result = await response.json()
if (result.error) throw new Error(`Server: ${result.error}`)
this.currentUser = result.user
this.currentCredentials = result.credentials || []
this.aaguidInfo = result.aaguid_info || {}
},
async loadCredentials() {
this.isLoading = true
try {
const response = await fetch('/auth/user-credentials')
const result = await response.json()
if (result.error) throw new Error(`Server: ${result.error}`)
this.currentCredentials = result.credentials
this.aaguidInfo = result.aaguid_info || {}
} finally {
this.isLoading = false
}
},
async deleteCredential(credentialId) {
const response = await fetch('/auth/delete-credential', {
@ -94,7 +112,7 @@ export const useAuthStore = defineStore('auth', {
const result = await response.json()
if (result.error) throw new Error(`Server: ${result.error}`)
await this.loadUserInfo()
await this.loadCredentials()
},
async logout() {
try {

View File

@ -24,9 +24,6 @@ export async function registerCredential() {
export async function registerWithToken(token) {
return register('/auth/ws/add_device_credential', { token })
}
export async function registerWithSession() {
return register('/auth/ws/add_device_credential_session')
}
export async function authenticateUser() {
const ws = await aWebSocket('/auth/ws/authenticate')

View File

@ -21,7 +21,7 @@ from sqlalchemy import (
select,
update,
)
from sqlalchemy.dialects.sqlite import BLOB, JSON
from sqlalchemy.dialects.sqlite import BLOB
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
@ -52,8 +52,8 @@ class UserModel(Base):
class CredentialModel(Base):
__tablename__ = "credentials"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
credential_id: Mapped[bytes] = mapped_column(LargeBinary(64), unique=True)
credential_id: Mapped[bytes] = mapped_column(LargeBinary(64), primary_key=True)
user_id: Mapped[bytes] = mapped_column(
LargeBinary(16), ForeignKey("users.user_id", ondelete="CASCADE")
)
@ -68,18 +68,14 @@ class CredentialModel(Base):
user: Mapped["UserModel"] = relationship("UserModel", back_populates="credentials")
class SessionModel(Base):
__tablename__ = "sessions"
class ResetTokenModel(Base):
__tablename__ = "reset_tokens"
token: Mapped[str] = mapped_column(String(32), primary_key=True)
token: Mapped[str] = mapped_column(String(64), primary_key=True)
user_id: Mapped[bytes] = mapped_column(
LargeBinary(16), ForeignKey("users.user_id", ondelete="CASCADE")
)
credential_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("credentials.id", ondelete="SET NULL")
)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
info: Mapped[dict | None] = mapped_column(JSON, nullable=True)
# Relationship to user
user: Mapped["UserModel"] = relationship("UserModel")
@ -94,6 +90,13 @@ class User:
visits: int = 0
@dataclass
class ResetToken:
token: str
user_id: UUID
created_at: datetime
# Global engine and session factory
engine = create_async_engine(DB_PATH, echo=False)
async_session_factory = async_sessionmaker(engine, expire_on_commit=False)
@ -240,6 +243,45 @@ class DB:
await self.session.execute(stmt)
await self.session.commit()
async def create_reset_token(self, user_id: UUID, token: str | None = None) -> str:
"""Create a new reset token for a user."""
if token is None:
token = secrets.token_urlsafe(32)
reset_token_model = ResetTokenModel(
token=token,
user_id=user_id.bytes,
created_at=datetime.now(),
)
self.session.add(reset_token_model)
await self.session.flush()
return token
async def get_reset_token(self, token: str) -> ResetToken | None:
"""Get reset token by token string."""
stmt = select(ResetTokenModel).where(ResetTokenModel.token == token)
result = await self.session.execute(stmt)
token_model = result.scalar_one_or_none()
if token_model:
return ResetToken(
token=token_model.token,
user_id=UUID(bytes=token_model.user_id),
created_at=token_model.created_at,
)
return None
async def delete_reset_token(self, token: str) -> None:
"""Delete a reset token (used after successful credential addition)."""
stmt = delete(ResetTokenModel).where(ResetTokenModel.token == token)
await self.session.execute(stmt)
async def cleanup_expired_tokens(self) -> None:
"""Remove expired reset tokens (older than 24 hours)."""
expiry_time = datetime.now() - timedelta(hours=24)
stmt = delete(ResetTokenModel).where(ResetTokenModel.created_at < expiry_time)
await self.session.execute(stmt)
async def get_user_by_username(self, user_name: str) -> User | None:
"""Get user by username."""
stmt = select(UserModel).where(UserModel.user_name == user_name)
@ -256,98 +298,6 @@ class DB:
)
return None
async def create_session(
self,
user_id: UUID,
credential_id: int | None = None,
token: str | None = None,
info: dict | None = None,
) -> str:
"""Create a new authentication session for a user. If credential_id is None, creates a session without a specific credential."""
if token is None:
token = secrets.token_urlsafe(12)
session_model = SessionModel(
token=token,
user_id=user_id.bytes,
credential_id=credential_id,
created_at=datetime.now(),
info=info,
)
self.session.add(session_model)
await self.session.flush()
return token
async def create_session_by_credential_id(
self,
user_id: UUID,
credential_id: bytes | None,
token: str | None = None,
info: dict | None = None,
) -> str:
"""Create a new authentication session for a user using WebAuthn credential ID. If credential_id is None, creates a session without a specific credential."""
if credential_id is None:
return await self.create_session(user_id, None, token, info)
# Get the database ID from the credential
stmt = select(CredentialModel.id).where(
CredentialModel.credential_id == credential_id
)
result = await self.session.execute(stmt)
db_credential_id = result.scalar_one()
return await self.create_session(user_id, db_credential_id, token, info)
async def get_session(self, token: str) -> dict | None:
"""Get session by token string."""
stmt = select(SessionModel).where(SessionModel.token == token)
result = await self.session.execute(stmt)
session_model = result.scalar_one_or_none()
if session_model:
# Check if session is expired (24 hours)
expiry_time = session_model.created_at + timedelta(hours=24)
if datetime.now() > expiry_time:
# Clean up expired session
await self.delete_session(token)
return None
return {
"token": session_model.token,
"user_id": UUID(bytes=session_model.user_id),
"credential_id": session_model.credential_id,
"created_at": session_model.created_at,
"info": session_model.info or {},
}
return None
async def delete_session(self, token: str) -> None:
"""Delete a session by token."""
stmt = delete(SessionModel).where(SessionModel.token == token)
await self.session.execute(stmt)
async def cleanup_expired_sessions(self) -> None:
"""Remove expired sessions (older than 24 hours)."""
expiry_time = datetime.now() - timedelta(hours=24)
stmt = delete(SessionModel).where(SessionModel.created_at < expiry_time)
await self.session.execute(stmt)
async def refresh_session(self, token: str) -> str | None:
"""Refresh a session by updating its created_at timestamp."""
session_data = await self.get_session(token)
if not session_data:
return None
# Delete old session
await self.delete_session(token)
# Create new session with same user and credential
return await self.create_session(
session_data["user_id"],
session_data["credential_id"],
info=session_data["info"],
)
# Standalone functions that handle database connections internally
async def init_database() -> None:
@ -408,55 +358,31 @@ async def create_new_session(user_id: UUID, credential: StoredCredential) -> Non
await db.create_new_session(user_id, credential)
async def create_reset_token(user_id: UUID, token: str | None = None) -> str:
"""Create a reset token for a user."""
async with connect() as db:
return await db.create_reset_token(user_id, token)
async def get_reset_token(token: str) -> ResetToken | None:
"""Get reset token by token string."""
async with connect() as db:
return await db.get_reset_token(token)
async def delete_reset_token(token: str) -> None:
"""Delete a reset token (used after successful credential addition)."""
async with connect() as db:
await db.delete_reset_token(token)
async def cleanup_expired_tokens() -> None:
"""Remove expired reset tokens (older than 24 hours)."""
async with connect() as db:
await db.cleanup_expired_tokens()
async def get_user_by_username(user_name: str) -> User | None:
"""Get user by username."""
async with connect() as db:
return await db.get_user_by_username(user_name)
async def create_session(
user_id: UUID,
credential_id: int | None = None,
token: str | None = None,
info: dict | None = None,
) -> str:
"""Create a new authentication session for a user. If credential_id is None, creates a session without a specific credential."""
async with connect() as db:
return await db.create_session(user_id, credential_id, token, info)
async def create_session_by_credential_id(
user_id: UUID,
credential_id: bytes | None,
token: str | None = None,
info: dict | None = None,
) -> str:
"""Create a new authentication session for a user using WebAuthn credential ID. If credential_id is None, creates a session without a specific credential."""
async with connect() as db:
return await db.create_session_by_credential_id(
user_id, credential_id, token, info
)
async def get_session(token: str) -> dict | None:
"""Get session by token string."""
async with connect() as db:
return await db.get_session(token)
async def delete_session(token: str) -> None:
"""Delete a session by token."""
async with connect() as db:
await db.delete_session(token)
async def cleanup_expired_sessions() -> None:
"""Remove expired sessions (older than 24 hours)."""
async with connect() as db:
await db.cleanup_expired_sessions()
async def refresh_session(token: str) -> str | None:
"""Refresh a session by updating its created_at timestamp."""
async with connect() as db:
return await db.refresh_session(token)

View File

@ -8,12 +8,12 @@ This module contains all the HTTP API endpoints for:
- Login/logout functionality
"""
from fastapi import FastAPI, Request, Response
from fastapi import Request, Response
from .. import aaguid
from ..db import sql
from ..util.session import refresh_session_token, validate_session_token
from .session import (
from ..util.jwt import refresh_session_token, validate_session_token
from .session_manager import (
clear_session_cookie,
get_current_user,
get_session_token_from_bearer,
@ -22,271 +22,213 @@ from .session import (
)
def register_api_routes(app: FastAPI):
"""Register all API routes on the FastAPI app."""
async def get_user_info(request: Request) -> dict:
"""Get user information from session cookie."""
try:
user = await get_current_user(request)
if not user:
return {"error": "Not authenticated"}
@app.post("/auth/user-info")
async def api_user_info(request: Request, response: Response):
"""Get user information and credentials from session cookie."""
try:
user = await get_current_user(request)
if not user:
return {"error": "Not authenticated"}
return {
"status": "success",
"user": {
"user_id": str(user.user_id),
"user_name": user.user_name,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_seen": user.last_seen.isoformat() if user.last_seen else None,
"visits": user.visits,
},
}
except Exception as e:
return {"error": f"Failed to get user info: {str(e)}"}
# Get current session credential ID
current_credential_id = None
session_token = get_session_token_from_cookie(request)
if session_token:
token_data = await validate_session_token(session_token)
if token_data:
current_credential_id = token_data.get("credential_id")
# Get all credentials for the user
credential_ids = await sql.get_user_credentials(user.user_id)
async def get_user_credentials(request: Request) -> dict:
"""Get all credentials for a user using session cookie."""
try:
user = await get_current_user(request)
if not user:
return {"error": "Not authenticated"}
credentials = []
user_aaguids = set()
for cred_id in credential_ids:
stored_cred = await sql.get_credential_by_id(cred_id)
# Convert AAGUID to string format
aaguid_str = str(stored_cred.aaguid)
user_aaguids.add(aaguid_str)
# Check if this is the current session credential
is_current_session = current_credential_id == stored_cred.credential_id
credentials.append(
{
"credential_id": stored_cred.credential_id.hex(),
"aaguid": aaguid_str,
"created_at": stored_cred.created_at.isoformat(),
"last_used": stored_cred.last_used.isoformat()
if stored_cred.last_used
else None,
"last_verified": stored_cred.last_verified.isoformat()
if stored_cred.last_verified
else None,
"sign_count": stored_cred.sign_count,
"is_current_session": is_current_session,
}
)
# Get AAGUID information for only the AAGUIDs that the user has
aaguid_info = aaguid.filter(user_aaguids)
# Sort credentials by creation date (earliest first, most recently created last)
credentials.sort(key=lambda cred: cred["created_at"])
return {
"status": "success",
"user": {
"user_id": str(user.user_id),
"user_name": user.user_name,
"created_at": user.created_at.isoformat()
if user.created_at
else None,
"last_seen": user.last_seen.isoformat() if user.last_seen else None,
"visits": user.visits,
},
"credentials": credentials,
"aaguid_info": aaguid_info,
}
except Exception as e:
return {"error": f"Failed to get user info: {str(e)}"}
@app.post("/auth/logout")
async def api_logout(request: Request, response: Response):
"""Log out the current user by clearing the session cookie and deleting from database."""
# Get the session token before clearing the cookie
# Get current session credential ID
current_credential_id = None
session_token = get_session_token_from_cookie(request)
# Clear the cookie
clear_session_cookie(response)
# Delete the session from the database if it exists
if session_token:
from ..util.session import logout_session
token_data = validate_session_token(session_token)
if token_data:
current_credential_id = token_data.get("credential_id")
try:
await logout_session(session_token)
except Exception:
# Continue even if session deletion fails
pass
# Get all credentials for the user
credential_ids = await sql.get_user_credentials(user.user_id)
return {"status": "success", "message": "Logged out successfully"}
credentials = []
user_aaguids = set()
@app.post("/auth/set-session")
async def api_set_session(request: Request, response: Response):
"""Set session cookie using JWT token from request body or Authorization header."""
try:
session_token = await get_session_token_from_bearer(request)
for cred_id in credential_ids:
stored_cred = await sql.get_credential_by_id(cred_id)
if not session_token:
return {"error": "No session token provided"}
# Validate the session token
token_data = await validate_session_token(session_token)
if not token_data:
return {"error": "Invalid or expired session token"}
# Set the HTTP-only cookie
set_session_cookie(response, session_token)
return {
"status": "success",
"message": "Session cookie set successfully",
"user_id": str(token_data["user_id"]),
}
except Exception as e:
return {"error": f"Failed to set session: {str(e)}"}
@app.post("/auth/delete-credential")
async def api_delete_credential(request: Request):
"""Delete a specific credential for the current user."""
try:
user = await get_current_user(request)
if not user:
return {"error": "Not authenticated"}
# Get the credential ID from the request body
try:
body = await request.json()
credential_id = body.get("credential_id")
if not credential_id:
return {"error": "credential_id is required"}
except Exception:
return {"error": "Invalid request body"}
# Convert credential_id from hex string to bytes
try:
credential_id_bytes = bytes.fromhex(credential_id)
except ValueError:
return {"error": "Invalid credential_id format"}
# First, verify the credential belongs to the current user
try:
stored_cred = await sql.get_credential_by_id(credential_id_bytes)
if stored_cred.user_id != user.user_id:
return {"error": "Credential not found or access denied"}
except ValueError:
return {"error": "Credential not found"}
# Convert AAGUID to string format
aaguid_str = str(stored_cred.aaguid)
user_aaguids.add(aaguid_str)
# Check if this is the current session credential
session_token = get_session_token_from_cookie(request)
if session_token:
token_data = await validate_session_token(session_token)
if (
token_data
and token_data.get("credential_id") == credential_id_bytes
):
return {"error": "Cannot delete current session credential"}
is_current_session = current_credential_id == stored_cred.credential_id
# Get user's remaining credentials count
remaining_credentials = await sql.get_user_credentials(user.user_id)
if len(remaining_credentials) <= 1:
return {"error": "Cannot delete last remaining credential"}
credentials.append(
{
"credential_id": stored_cred.credential_id.hex(),
"aaguid": aaguid_str,
"created_at": stored_cred.created_at.isoformat(),
"last_used": stored_cred.last_used.isoformat()
if stored_cred.last_used
else None,
"last_verified": stored_cred.last_verified.isoformat()
if stored_cred.last_verified
else None,
"sign_count": stored_cred.sign_count,
"is_current_session": is_current_session,
}
)
# Delete the credential
await sql.delete_user_credential(credential_id_bytes)
# Get AAGUID information for only the AAGUIDs that the user has
aaguid_info = aaguid.filter(user_aaguids)
return {"status": "success", "message": "Credential deleted successfully"}
# Sort credentials by creation date (earliest first, most recently created last)
credentials.sort(key=lambda cred: cred["created_at"])
except Exception as e:
return {"error": f"Failed to delete credential: {str(e)}"}
@app.get("/auth/sessions")
async def api_get_sessions(request: Request):
"""Get all active sessions for the current user."""
try:
user = await get_current_user(request)
if not user:
return {"error": "Authentication required"}
# Get all sessions for this user
from sqlalchemy import select
from ..db.sql import SessionModel, connect
async with connect() as db:
stmt = select(SessionModel).where(
SessionModel.user_id == user.user_id.bytes
)
result = await db.session.execute(stmt)
session_models = result.scalars().all()
sessions = []
current_token = get_session_token_from_cookie(request)
for session in session_models:
# Check if session is expired
from datetime import datetime, timedelta
expiry_time = session.created_at + timedelta(hours=24)
is_expired = datetime.now() > expiry_time
sessions.append(
{
"token": session.token[:8]
+ "...", # Only show first 8 chars for security
"created_at": session.created_at.isoformat(),
"client_ip": session.info.get("client_ip")
if session.info
else None,
"user_agent": session.info.get("user_agent")
if session.info
else None,
"connection_type": session.info.get(
"connection_type", "http"
)
if session.info
else "http",
"is_current": session.token == current_token,
"is_reset_token": session.credential_id is None,
"is_expired": is_expired,
}
)
return {
"status": "success",
"sessions": sessions,
"total_sessions": len(sessions),
}
except Exception as e:
return {"error": f"Failed to get sessions: {str(e)}"}
return {
"status": "success",
"credentials": credentials,
"aaguid_info": aaguid_info,
}
except Exception as e:
return {"error": f"Failed to get credentials: {str(e)}"}
async def validate_token(request: Request, response: Response) -> dict:
"""Validate a session token and return user info. Also refreshes the token if valid."""
async def refresh_token(request: Request, response: Response) -> dict:
"""Refresh the session token."""
try:
session_token = get_session_token_from_cookie(request)
if not session_token:
return {"error": "No session token found"}
# Validate and refresh the token
new_token = refresh_session_token(session_token)
if new_token:
set_session_cookie(response, new_token)
return {"status": "success", "refreshed": True}
else:
clear_session_cookie(response)
return {"error": "Invalid or expired session token"}
except Exception as e:
return {"error": f"Failed to refresh token: {str(e)}"}
async def validate_token(request: Request) -> dict:
"""Validate a session token and return user info."""
try:
session_token = get_session_token_from_cookie(request)
if not session_token:
return {"error": "No session token found"}
# Validate the session token
token_data = await validate_session_token(session_token)
token_data = validate_session_token(session_token)
if not token_data:
clear_session_cookie(response)
return {"error": "Invalid or expired session token"}
# Refresh the token if valid
new_token = await refresh_session_token(session_token)
if new_token:
set_session_cookie(response, new_token)
return {
"status": "success",
"valid": True,
"refreshed": bool(new_token),
"user_id": str(token_data["user_id"]),
"credential_id": token_data["credential_id"].hex()
if token_data["credential_id"]
else None,
"created_at": token_data["created_at"].isoformat(),
"credential_id": token_data["credential_id"].hex(),
"issued_at": token_data["issued_at"],
"expires_at": token_data["expires_at"],
}
except Exception as e:
return {"error": f"Failed to validate token: {str(e)}"}
async def logout(response: Response) -> dict:
"""Log out the current user by clearing the session cookie."""
clear_session_cookie(response)
return {"status": "success", "message": "Logged out successfully"}
async def set_session(request: Request, response: Response) -> dict:
"""Set session cookie using JWT token from request body or Authorization header."""
try:
session_token = await get_session_token_from_bearer(request)
if not session_token:
return {"error": "No session token provided"}
# Validate the session token
token_data = validate_session_token(session_token)
if not token_data:
return {"error": "Invalid or expired session token"}
# Set the HTTP-only cookie
set_session_cookie(response, session_token)
return {
"status": "success",
"message": "Session cookie set successfully",
"user_id": str(token_data["user_id"]),
}
except Exception as e:
return {"error": f"Failed to set session: {str(e)}"}
async def delete_credential(request: Request) -> dict:
"""Delete a specific credential for the current user."""
try:
user = await get_current_user(request)
if not user:
return {"error": "Not authenticated"}
# Get the credential ID from the request body
try:
body = await request.json()
credential_id = body.get("credential_id")
if not credential_id:
return {"error": "credential_id is required"}
except Exception:
return {"error": "Invalid request body"}
# Convert credential_id from hex string to bytes
try:
credential_id_bytes = bytes.fromhex(credential_id)
except ValueError:
return {"error": "Invalid credential_id format"}
# First, verify the credential belongs to the current user
try:
stored_cred = await sql.get_credential_by_id(credential_id_bytes)
if stored_cred.user_id != user.user_id:
return {"error": "Credential not found or access denied"}
except ValueError:
return {"error": "Credential not found"}
# Check if this is the current session credential
session_token = get_session_token_from_cookie(request)
if session_token:
token_data = validate_session_token(session_token)
if token_data and token_data.get("credential_id") == credential_id_bytes:
return {"error": "Cannot delete current session credential"}
# Get user's remaining credentials count
remaining_credentials = await sql.get_user_credentials(user.user_id)
if len(remaining_credentials) <= 1:
return {"error": "Cannot delete last remaining credential"}
# Delete the credential
await sql.delete_user_credential(credential_id_bytes)
return {"status": "success", "message": "Credential deleted successfully"}
except Exception as e:
return {"error": f"Failed to delete credential: {str(e)}"}

View File

@ -18,18 +18,26 @@ from fastapi import (
Request,
Response,
)
from fastapi import (
Path as FastAPIPath,
)
from fastapi.responses import (
FileResponse,
JSONResponse,
RedirectResponse,
)
from fastapi.staticfiles import StaticFiles
from ..db import sql
from .api_handlers import (
register_api_routes,
delete_credential,
get_user_credentials,
get_user_info,
logout,
refresh_token,
set_session,
validate_token,
)
from .reset_handlers import register_reset_routes
from .reset_handlers import create_device_addition_link, validate_device_addition_token
from .ws_handlers import ws_app
STATIC_DIR = Path(__file__).parent.parent / "frontend-build"
@ -41,23 +49,43 @@ async def lifespan(app: FastAPI):
yield
app = FastAPI(lifespan=lifespan)
app = FastAPI(title="Passkey Auth", lifespan=lifespan)
# Mount the WebSocket subapp
app.mount("/auth/ws", ws_app)
# Register API routes
register_api_routes(app)
register_reset_routes(app)
@app.get("/auth/user-info")
async def api_get_user_info(request: Request):
"""Get user information from session cookie."""
return await get_user_info(request)
@app.get("/auth/user-credentials")
async def api_get_user_credentials(request: Request):
"""Get all credentials for a user using session cookie."""
return await get_user_credentials(request)
@app.post("/auth/refresh-token")
async def api_refresh_token(request: Request, response: Response):
"""Refresh the session token."""
return await refresh_token(request, response)
@app.get("/auth/validate-token")
async def api_validate_token(request: Request):
"""Validate a session token and return user info."""
return await validate_token(request)
@app.get("/auth/forward-auth")
async def forward_authentication(request: Request):
"""A verification endpoint to use with Caddy forward_auth or Nginx auth_request."""
# Create a dummy response object for internal validation (we won't use it for cookies)
response = Response()
result = await validate_token(request, response)
result = await validate_token(request)
if result.get("status") != "success":
# Serve the index.html of the authentication app if not authenticated
return FileResponse(
@ -73,6 +101,52 @@ async def forward_authentication(request: Request):
)
@app.post("/auth/logout")
async def api_logout(response: Response):
"""Log out the current user by clearing the session cookie."""
return await logout(response)
@app.post("/auth/set-session")
async def api_set_session(request: Request, response: Response):
"""Set session cookie using JWT token from request body or Authorization header."""
return await set_session(request, response)
@app.post("/auth/delete-credential")
async def api_delete_credential(request: Request):
"""Delete a specific credential for the current user."""
return await delete_credential(request)
@app.post("/auth/create-device-link")
async def api_create_device_link(request: Request):
"""Create a device addition link for the authenticated user."""
return await create_device_addition_link(request)
@app.post("/auth/validate-device-token")
async def api_validate_device_token(request: Request):
"""Validate a device addition token."""
return await validate_device_addition_token(request)
@app.get("/auth/{passphrase}")
async def reset_authentication(
passphrase: str = FastAPIPath(pattern=r"^\w+(\.\w+){2,}$"),
):
response = RedirectResponse(url="/", status_code=303)
response.set_cookie(
key="auth-token",
value=passphrase,
httponly=False,
secure=True,
samesite="strict",
max_age=2,
)
return response
# Serve static files
app.mount(
"/auth/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="static assets"
@ -90,7 +164,7 @@ async def redirect_to_index():
async def spa_handler(request: Request, path: str):
"""Serve the Vue SPA for all routes (except API and static)"""
if "text/html" not in request.headers.get("accept", ""):
return JSONResponse({"error": "Not Found"}, status_code=404)
return Response(content="Not Found", status_code=404)
return FileResponse(STATIC_DIR / "index.html")

View File

@ -7,101 +7,92 @@ This module provides endpoints for authenticated users to:
- Add new passkeys to existing accounts via tokens
"""
from fastapi import FastAPI, Path, Request
from fastapi.responses import RedirectResponse
from datetime import datetime, timedelta
from fastapi import Request
from ..db import sql
from ..util.passphrase import generate
from ..util.session import get_client_info
from .session import get_current_user, is_device_addition_session, set_session_cookie
from .session_manager import get_current_user
def register_reset_routes(app: FastAPI):
"""Register all device addition/reset routes on the FastAPI app."""
async def create_device_addition_link(request: Request) -> dict:
"""Create a device addition link for the authenticated user."""
try:
# Require authentication
user = await get_current_user(request)
if not user:
return {"error": "Authentication required"}
@app.post("/auth/create-device-link")
async def api_create_device_link(request: Request):
"""Create a device addition link for the authenticated user."""
try:
# Require authentication
user = await get_current_user(request)
if not user:
return {"error": "Authentication required"}
# Generate a human-readable token
token = generate(n=4, sep=".") # e.g., "able-ocean-forest-dawn"
# Generate a human-readable token
token = generate(n=4, sep=".") # e.g., "able-ocean-forest-dawn"
# Create reset token in database
await sql.create_reset_token(user.user_id, token)
# Create session token in database with credential_id=None for device addition
client_info = get_client_info(request)
await sql.create_session(user.user_id, None, token, client_info)
# Generate the device addition link with pretty URL
addition_link = f"{request.headers.get('origin', '')}/auth/{token}"
# Generate the device addition link with pretty URL
addition_link = f"{request.headers.get('origin', '')}/auth/{token}"
return {
"status": "success",
"message": "Device addition link generated successfully",
"addition_link": addition_link,
"expires_in_hours": 24,
}
return {
"status": "success",
"message": "Device addition link generated successfully",
"addition_link": addition_link,
"expires_in_hours": 24,
}
except Exception as e:
return {"error": f"Failed to create device addition link: {str(e)}"}
except Exception as e:
return {"error": f"Failed to create device addition link: {str(e)}"}
@app.get("/auth/device-session-check")
async def check_device_session(request: Request):
"""Check if the current session is for device addition."""
is_device_session = await is_device_addition_session(request)
return {"device_addition_session": is_device_session}
async def validate_device_addition_token(request: Request) -> dict:
"""Validate a device addition token and return user info."""
try:
body = await request.json()
token = body.get("token")
@app.get("/auth/{passphrase}")
async def reset_authentication(
request: Request,
passphrase: str = Path(pattern=r"^\w+(\.\w+){2,}$"),
):
try:
# Get session token to validate it exists and get user_id
session_data = await sql.get_session(passphrase)
if not session_data:
# Token doesn't exist, redirect to home
return RedirectResponse(url="/", status_code=303)
if not token:
return {"error": "Device addition token is required"}
# Check if this is a device addition session (credential_id is None)
if session_data["credential_id"] is not None:
# Not a device addition session, redirect to home
return RedirectResponse(url="/", status_code=303)
# Get reset token
reset_token = await sql.get_reset_token(token)
if not reset_token:
return {"error": "Invalid or expired device addition token"}
# Create a device addition session token for the user
client_info = get_client_info(request)
session_token = await sql.create_session(
session_data["user_id"], None, None, client_info
)
# Check if token is expired (24 hours)
expiry_time = reset_token.created_at + timedelta(hours=24)
if datetime.now() > expiry_time:
return {"error": "Device addition token has expired"}
# Create response and set session cookie
response = RedirectResponse(url="/auth/", status_code=303)
set_session_cookie(response, session_token)
# Get user info
user = await sql.get_user_by_id(reset_token.user_id)
return response
return {
"status": "success",
"valid": True,
"user_id": str(user.user_id),
"user_name": user.user_name,
"token": token,
}
except Exception:
# On any error, redirect to home
return RedirectResponse(url="/", status_code=303)
except Exception as e:
return {"error": f"Failed to validate device addition token: {str(e)}"}
async def use_device_addition_token(token: str) -> dict:
"""Delete a device addition token after successful use."""
try:
# Get session token first to validate it exists and is not expired
session_data = await sql.get_session(token)
if not session_data:
# Get reset token first to validate it exists and is not expired
reset_token = await sql.get_reset_token(token)
if not reset_token:
return {"error": "Invalid or expired device addition token"}
# Check if this is a device addition session (credential_id is None)
if session_data["credential_id"] is not None:
return {"error": "Invalid device addition token"}
# Check if token is expired (24 hours)
expiry_time = reset_token.created_at + timedelta(hours=24)
if datetime.now() > expiry_time:
return {"error": "Device addition token has expired"}
# Delete the token (it's now used)
await sql.delete_session(token)
await sql.delete_reset_token(token)
return {
"status": "success",

View File

@ -12,7 +12,7 @@ from uuid import UUID
from fastapi import Request, Response
from ..db.sql import User, get_user_by_id
from ..util.session import validate_session_token
from ..util.jwt import validate_session_token
COOKIE_NAME = "auth"
COOKIE_MAX_AGE = 86400 # 24 hours
@ -24,7 +24,7 @@ async def get_current_user(request: Request) -> User | None:
if not session_token:
return None
token_data = await validate_session_token(session_token)
token_data = validate_session_token(session_token)
if not token_data:
return None
@ -63,7 +63,7 @@ async def validate_session_from_request(request: Request) -> dict | None:
if not session_token:
return None
return await validate_session_token(session_token)
return validate_session_token(session_token)
async def get_session_token_from_bearer(request: Request) -> str | None:
@ -91,58 +91,8 @@ async def get_user_from_cookie_string(cookie_header: str) -> UUID | None:
if not session_token:
return None
token_data = await validate_session_token(session_token)
token_data = validate_session_token(session_token)
if not token_data:
return None
return token_data["user_id"]
async def is_device_addition_session(request: Request) -> bool:
"""Check if the current session is for device addition."""
session_token = request.cookies.get(COOKIE_NAME)
if not session_token:
return False
token_data = await validate_session_token(session_token)
if not token_data:
return False
return token_data.get("device_addition", False)
async def get_device_addition_user_id(request: Request) -> UUID | None:
"""Get user ID from device addition session."""
session_token = request.cookies.get(COOKIE_NAME)
if not session_token:
return None
token_data = await validate_session_token(session_token)
if not token_data or not token_data.get("device_addition"):
return None
return token_data.get("user_id")
async def get_device_addition_user_id_from_cookie(cookie_header: str) -> UUID | None:
"""Parse cookie header and return user ID if valid device addition session exists."""
if not cookie_header:
return None
# Parse cookies from header (simple implementation)
cookies = {}
for cookie in cookie_header.split(";"):
cookie = cookie.strip()
if "=" in cookie:
name, value = cookie.split("=", 1)
cookies[name] = value
session_token = cookies.get(COOKIE_NAME)
if not session_token:
return None
token_data = await validate_session_token(session_token)
if not token_data or not token_data.get("device_addition"):
return None
return token_data["user_id"]

View File

@ -19,8 +19,8 @@ from webauthn.helpers.exceptions import InvalidAuthenticationResponse
from ..db import sql
from ..db.sql import User
from ..sansio import Passkey
from ..util.session import create_session_token, get_client_info_from_websocket
from .session import get_user_from_cookie_string
from ..util.jwt import create_session_token
from .session_manager import get_user_from_cookie_string
# Create a FastAPI subapp for WebSocket endpoints
ws_app = FastAPI()
@ -69,10 +69,7 @@ async def websocket_register_new(ws: WebSocket, user_name: str):
)
# Create a session token for the new user
client_info = get_client_info_from_websocket(ws)
session_token = await create_session_token(
user_id, credential.credential_id, client_info
)
session_token = create_session_token(user_id, credential.credential_id)
await ws.send_json(
{
@ -184,56 +181,6 @@ async def websocket_add_device_credential(ws: WebSocket, token: str):
await ws.send_json({"error": "Internal Server Error"})
@ws_app.websocket("/add_device_credential_session")
async def websocket_add_device_credential_session(ws: WebSocket):
"""Add a new credential for an existing user via device addition session."""
await ws.accept()
origin = ws.headers.get("origin")
try:
# Get device addition user ID from session cookie
cookie_header = ws.headers.get("cookie", "")
from .session import get_device_addition_user_id_from_cookie
user_id = await get_device_addition_user_id_from_cookie(cookie_header)
if not user_id:
await ws.send_json({"error": "No valid device addition session found"})
return
# Get user information
user = await sql.get_user_by_id(user_id)
if not user:
await ws.send_json({"error": "User not found"})
return
# WebAuthn registration
# Fetch challenge IDs for the user
challenge_ids = await sql.get_user_credentials(user_id)
credential = await register_chat(
ws, user_id, user.user_name, challenge_ids, origin=origin
)
# Store the new credential in the database
await sql.create_credential_for_user(credential)
await ws.send_json(
{
"status": "success",
"user_id": str(user_id),
"credential_id": credential.credential_id.hex(),
"message": "New credential added successfully via device addition session",
}
)
except ValueError as e:
await ws.send_json({"error": str(e)})
except WebSocketDisconnect:
pass
except Exception:
logging.exception("Internal Server Error")
await ws.send_json({"error": "Internal Server Error"})
@ws_app.websocket("/authenticate")
async def websocket_authenticate(ws: WebSocket):
await ws.accept()
@ -251,9 +198,8 @@ async def websocket_authenticate(ws: WebSocket):
await sql.login_user(stored_cred.user_id, stored_cred)
# Create a session token for the authenticated user
client_info = get_client_info_from_websocket(ws)
session_token = await create_session_token(
stored_cred.user_id, stored_cred.credential_id, client_info
session_token = create_session_token(
stored_cred.user_id, stored_cred.credential_id
)
await ws.send_json(

View File

@ -58,28 +58,6 @@ class JWTManager:
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def create_token_without_credential(self, user_id: UUID) -> str:
"""
Create a JWT token for device addition (without credential ID).
Args:
user_id: The user's UUID
Returns:
JWT token string for device addition
"""
now = datetime.now()
payload = {
"user_id": str(user_id),
"credential_id": None, # No credential for device addition
"device_addition": True, # Flag to indicate this is for device addition
"iat": now,
"exp": now + timedelta(hours=2), # Shorter expiry for device addition
"iss": "passkeyauth",
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def validate_token(self, token: str) -> Optional[dict]:
"""
Validate a JWT token and return the payload.
@ -98,23 +76,12 @@ class JWTManager:
issuer="passkeyauth",
)
result = {
return {
"user_id": UUID(payload["user_id"]),
"credential_id": bytes.fromhex(payload["credential_id"]),
"issued_at": payload["iat"],
"expires_at": payload["exp"],
}
# Handle credential_id for regular tokens vs device addition tokens
if payload.get("credential_id") is not None:
result["credential_id"] = bytes.fromhex(payload["credential_id"])
else:
result["credential_id"] = None
# Add device addition flag if present
if payload.get("device_addition"):
result["device_addition"] = True
return result
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
@ -155,11 +122,6 @@ def create_session_token(user_id: UUID, credential_id: bytes) -> str:
return get_jwt_manager().create_token(user_id, credential_id)
def create_device_addition_token(user_id: UUID) -> str:
"""Create a token for device addition."""
return get_jwt_manager().create_token_without_credential(user_id)
def validate_session_token(token: str) -> Optional[dict]:
"""Validate a session token."""
return get_jwt_manager().validate_token(token)

View File

@ -1,88 +0,0 @@
"""
Database session management for WebAuthn authentication.
This module provides session management using database tokens instead of JWT tokens.
Session tokens are stored in the database and validated on each request.
"""
from datetime import datetime
from typing import Optional
from uuid import UUID
from fastapi import Request
from ..db import sql
def get_client_info(request: Request) -> dict:
"""Extract client information from FastAPI request and return as dict."""
# Get client IP (handle X-Forwarded-For for proxies)
# Get user agent
return {
"client_ip": request.client.host if request.client else "",
"user_agent": request.headers.get("user-agent", "")[:500],
}
def get_client_info_from_websocket(ws) -> dict:
"""Extract client information from WebSocket connection and return as dict."""
# Get client IP from WebSocket
client_ip = None
if hasattr(ws, "client") and ws.client:
client_ip = ws.client.host
# Check for forwarded headers
if hasattr(ws, "headers"):
forwarded_for = ws.headers.get("x-forwarded-for")
if forwarded_for:
client_ip = forwarded_for.split(",")[0].strip()
# Get user agent from WebSocket headers
user_agent = None
if hasattr(ws, "headers"):
user_agent = ws.headers.get("user-agent")
# Truncate user agent if too long
if user_agent and len(user_agent) > 500: # Keep some margin
user_agent = user_agent[:500]
return {
"client_ip": client_ip,
"user_agent": user_agent,
"timestamp": datetime.now().isoformat(),
"connection_type": "websocket",
}
async def create_session_token(
user_id: UUID, credential_id: bytes, info: dict | None = None
) -> str:
"""Create a session token for a user."""
return await sql.create_session_by_credential_id(user_id, credential_id, None, info)
async def validate_session_token(token: str) -> Optional[dict]:
"""Validate a session token."""
session_data = await sql.get_session(token)
if not session_data:
return None
return {
"user_id": session_data["user_id"],
"credential_id": session_data["credential_id"],
"created_at": session_data["created_at"],
}
async def refresh_session_token(token: str) -> Optional[str]:
"""Refresh a session token."""
return await sql.refresh_session(token)
async def delete_session_token(token: str) -> None:
"""Delete a session token."""
await sql.delete_session(token)
async def logout_session(token: str) -> None:
"""Log out a user by deleting their session token."""
await sql.delete_session(token)