From 25d19b89b88a41f713c34f5e64bb57e9a8cd87dd Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Fri, 4 Jul 2025 17:08:56 -0600 Subject: [PATCH] General cleanup and minor improvements. Registration and auth currently working. --- passkeyauth/TODO.txt | 2 ++ passkeyauth/db.py | 63 ++++++++++++++++++++++++++++---------- passkeyauth/main.py | 24 +++++++++------ passkeyauth/passkey.py | 68 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 127 insertions(+), 30 deletions(-) diff --git a/passkeyauth/TODO.txt b/passkeyauth/TODO.txt index 95fa21c..6cc721e 100644 --- a/passkeyauth/TODO.txt +++ b/passkeyauth/TODO.txt @@ -4,3 +4,5 @@ Practical checks: Details: - Extract authenticator type - Extract user verified flag (present always required) +- Update user last seen on login +- Debug why aaguid is stored as zeroes only diff --git a/passkeyauth/db.py b/passkeyauth/db.py index 263dbef..eb8dd42 100644 --- a/passkeyauth/db.py +++ b/passkeyauth/db.py @@ -8,6 +8,7 @@ for managing users and credentials in a WebAuthn authentication system. from dataclasses import dataclass from datetime import datetime from typing import Optional +from uuid import UUID import aiosqlite @@ -18,7 +19,8 @@ SQL_CREATE_USERS = """ CREATE TABLE IF NOT EXISTS users ( user_id BINARY(16) PRIMARY KEY NOT NULL, user_name TEXT NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NULL ) """ @@ -28,7 +30,7 @@ SQL_CREATE_CREDENTIALS = """ user_id BINARY(16) NOT NULL, aaguid BINARY(16) NOT NULL, public_key BLOB NOT NULL, - sign_count INTEGER DEFAULT 0, + sign_count INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_used TIMESTAMP NULL, FOREIGN KEY (user_id) REFERENCES users (user_id) ON DELETE CASCADE @@ -40,7 +42,7 @@ SQL_GET_USER_BY_USER_ID = """ """ SQL_CREATE_USER = """ - INSERT INTO users (user_id, user_name) VALUES (?, ?) + INSERT INTO users (user_id, user_name, created_at, last_seen) VALUES (?, ?, ?, ?) """ SQL_STORE_CREDENTIAL = """ @@ -75,19 +77,20 @@ class User: user_id: bytes = b"" user_name: str = "" created_at: Optional[datetime] = None + last_seen: Optional[datetime] = None @dataclass class Credential: """Credential data model.""" - credential_id: bytes = b"" - user_id: bytes = b"" - aaguid: bytes = b"" - public_key: bytes = b"" - sign_count: int = 0 - created_at: Optional[datetime] = None - last_used: Optional[datetime] = None + credential_id: bytes + user_id: bytes + aaguid: UUID + public_key: bytes + sign_count: int + created_at: datetime + last_used: datetime | None = None class Database: @@ -109,15 +112,23 @@ class Database: async with conn.execute(SQL_GET_USER_BY_USER_ID, (user_id,)) as cursor: row = await cursor.fetchone() if row: - return User(user_id=row[0], user_name=row[1], created_at=row[2]) + return User( + user_id=row[0], + user_name=row[1], + created_at=row[2], + last_seen=row[3], + ) raise ValueError("User not found") - async def create_user(self, user_id: bytes, user_name: str) -> User: + async def create_user(self, user: User) -> User: """Create a new user and return the User dataclass.""" async with aiosqlite.connect(self.db_path) as conn: - await conn.execute(SQL_CREATE_USER, (user_id, user_name)) + await conn.execute( + SQL_CREATE_USER, + (user.user_id, user.user_name, user.created_at, user.last_seen), + ) await conn.commit() - return User(user_id=user_id, user_name=user_name) + return user async def store_credential(self, credential: Credential) -> None: """Store a credential for a user.""" @@ -127,7 +138,7 @@ class Database: ( credential.credential_id, credential.user_id, - credential.aaguid, + credential.aaguid.bytes, credential.public_key, credential.sign_count, ), @@ -145,7 +156,7 @@ class Database: return Credential( credential_id=row[0], user_id=row[1], - aaguid=row[2], + aaguid=UUID(bytes=row[2]), # Convert bytes to UUID public_key=row[3], sign_count=row[4], created_at=row[5], @@ -153,6 +164,13 @@ class Database: ) raise ValueError("Credential not found") + async def get_credentials_by_user_id(self, user_id: bytes) -> list[bytes]: + """Get all credential IDs for a user.""" + async with aiosqlite.connect(self.db_path) as conn: + async with conn.execute(SQL_GET_USER_CREDENTIALS, (user_id,)) as cursor: + rows = await cursor.fetchall() + return [row[0] for row in rows] + async def update_credential(self, credential: Credential) -> None: """Update the sign count for a credential.""" async with aiosqlite.connect(self.db_path) as conn: @@ -162,6 +180,19 @@ class Database: ) await conn.commit() + async def update_user_last_seen( + self, user_id: bytes, last_seen: datetime | None = None + ) -> None: + """Update the last_seen timestamp for a user.""" + if last_seen is None: + last_seen = datetime.now() + async with aiosqlite.connect(self.db_path) as conn: + await conn.execute( + "UPDATE users SET last_seen = ? WHERE user_id = ?", + (last_seen, user_id), + ) + await conn.commit() + # Global database instance db = Database() diff --git a/passkeyauth/main.py b/passkeyauth/main.py index a3e7b63..d0fc77b 100644 --- a/passkeyauth/main.py +++ b/passkeyauth/main.py @@ -9,14 +9,17 @@ This module provides a simple WebAuthn implementation that: - Enables true passwordless authentication where users don't need to enter a user_name """ +from contextlib import asynccontextmanager +from datetime import datetime from pathlib import Path +from uuid import UUID import uuid7 from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles -from .db import Credential, db +from .db import Credential, User, db from .passkey import Passkey STATIC_DIR = Path(__file__).parent.parent / "static" @@ -27,13 +30,14 @@ passkey = Passkey( origin="http://localhost:8000", ) -app = FastAPI(title="Passkey Auth") - -@app.on_event("startup") -async def startup_event(): - """Initialize database on startup.""" +@asynccontextmanager +async def lifespan(app: FastAPI): await db.init_database() + yield + + +app = FastAPI(title="Passkey Auth", lifespan=lifespan) @app.websocket("/ws/new_user_registration") @@ -42,21 +46,23 @@ async def websocket_register_new(ws: WebSocket): await ws.accept() try: form = await ws.receive_json() - user_id = uuid7.create().bytes + now = datetime.now() + user_id = uuid7.create(now).bytes user_name = form["user_name"] # Generate registration options and handle registration credential, verified = await register_chat(ws, user_id, user_name) # Store the user in the database - await db.create_user(user_id, user_name) + await db.create_user(User(user_id, user_name, now)) await db.store_credential( Credential( credential_id=credential.raw_id, user_id=user_id, - aaguid=b"", # verified.aaguid, + aaguid=UUID(verified.aaguid), public_key=verified.credential_public_key, sign_count=verified.sign_count, + created_at=now, ) ) await ws.send_json({"status": "success", "user_id": user_id.hex()}) diff --git a/passkeyauth/passkey.py b/passkeyauth/passkey.py index 97b6598..2aca32d 100644 --- a/passkeyauth/passkey.py +++ b/passkeyauth/passkey.py @@ -8,6 +8,8 @@ This module provides a unified interface for WebAuthn operations including: """ import json +from typing import Protocol +from uuid import UUID from webauthn import ( generate_authentication_options, @@ -24,6 +26,7 @@ from webauthn.helpers.cose import COSEAlgorithmIdentifier from webauthn.helpers.structs import ( AuthenticationCredential, AuthenticatorSelectionCriteria, + PublicKeyCredentialDescriptor, RegistrationCredential, ResidentKeyRequirement, UserVerificationRequirement, @@ -31,6 +34,23 @@ from webauthn.helpers.structs import ( from webauthn.registration.verify_registration_response import VerifiedRegistration +class StoredCredentialRecord(Protocol): + """ + Protocol for a stored credential record that must have settable attributes: + - id: The credential ID as bytes + - aaguid: The Authenticator Attestation GUID (AAGUID) + - public_key: The public key of the credential + - sign_count: The current sign count for the credential + + Note: Can be a dataclass, ORM or any other object that implements these attributes, but not dict. + """ + + id: bytes + aaguid: UUID + public_key: bytes + sign_count: int + + class Passkey: """WebAuthn handler for registration and authentication operations.""" @@ -62,7 +82,7 @@ class Passkey: ### Registration Methods ### def reg_generate_options( - self, user_id: bytes, user_name: str, display_name="", **regopts + self, user_id: bytes, user_name: str, **regopts ) -> tuple[dict, bytes]: """ Generate registration options for WebAuthn registration. @@ -71,6 +91,7 @@ class Passkey: user_id: The user ID as bytes user_name: The username display_name: The display name (defaults to user_name if empty) + regopts: Additional arguments to generate_registration_options. Returns: JSON dict containing options to be sent to client, challenge bytes to store @@ -80,7 +101,6 @@ class Passkey: rp_name=self.rp_name, user_id=user_id, user_name=user_name, - user_display_name=display_name or user_name, authenticator_selection=AuthenticatorSelectionCriteria( resident_key=ResidentKeyRequirement.REQUIRED, user_verification=UserVerificationRequirement.PREFERRED, @@ -117,16 +137,44 @@ class Passkey: ) return registration + def reg_store_credential( + self, + stored_credential: StoredCredentialRecord, + credential: RegistrationCredential, + verified: VerifiedRegistration, + ): + """ + Write the verified credential data to the stored credential record. + + Args: + stored_credential: A database record being created (dataclass, ORM, etc.) + credential: The registration credential response from the client + verified: The verified registration data + + This function sets attributes on stored_credential (id, aaguid, public_key, sign_count). + """ + stored_credential.id = credential.raw_id + stored_credential.aaguid = UUID(verified.aaguid) + stored_credential.public_key = verified.credential_public_key + stored_credential.sign_count = verified.sign_count + ### Authentication Methods ### async def auth_generate_options( - self, user_verification_required=False, **kwopts + self, + *, + user_verification_required=False, + allow_credential_ids: list[bytes] | None = None, + **authopts, ) -> tuple[dict, bytes]: """ Generate authentication options for WebAuthn authentication. Args: user_verification_required: The user will have to re-enter PIN or use biometrics for this operation. Useful when accessing security settings etc. + allow_credentials: For an already known user, a list of credential IDs associated with the account (less prompts during authentication). + authopts: Additional arguments to generate_authentication_options. + Returns: Tuple of (JSON to be sent to client, challenge bytes to store) """ @@ -137,7 +185,12 @@ class Passkey: if user_verification_required else UserVerificationRequirement.PREFERRED ), - **kwopts, + allow_credentials=( + None + if allow_credential_ids is None + else [PublicKeyCredentialDescriptor(id) for id in allow_credential_ids] + ), + **authopts, ) return json.loads(options_to_json(options)), options.challenge @@ -150,10 +203,15 @@ class Passkey: self, credential: AuthenticationCredential, expected_challenge: bytes, - stored_cred, + stored_cred: StoredCredentialRecord, ): """ Verify authentication response against locally stored credential data. + + Args: + credential: The authentication credential response from the client + expected_challenge: The earlier generated challenge bytes + stored_cred: The server stored credential record. Must have accessors .public_key and .sign_count, the latter of which is updated by this function! """ # Verify the authentication response verification = verify_authentication_response(