import json import uuid from typing import Dict from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from webauthn import generate_registration_options, verify_registration_response from webauthn.helpers.cose import COSEAlgorithmIdentifier from webauthn.helpers.structs import ( AuthenticatorSelectionCriteria, ResidentKeyRequirement, UserVerificationRequirement, ) app = FastAPI(title="WebAuthn Registration Server") # In-memory storage for challenges (in production, use Redis or similar) active_challenges: Dict[str, str] = {} # WebAuthn configuration RP_ID = "localhost" RP_NAME = "WebAuthn Demo" ORIGIN = "http://localhost:8000" class ConnectionManager: def __init__(self): self.active_connections: Dict[str, WebSocket] = {} async def connect(self, websocket: WebSocket, client_id: str): await websocket.accept() self.active_connections[client_id] = websocket def disconnect(self, client_id: str): if client_id in self.active_connections: del self.active_connections[client_id] async def send_message(self, message: dict, client_id: str): if client_id in self.active_connections: await self.active_connections[client_id].send_text(json.dumps(message)) manager = ConnectionManager() @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: str): await manager.connect(websocket, client_id) try: while True: data = await websocket.receive_text() message = json.loads(data) if message["type"] == "registration_challenge": await handle_registration_challenge(message, client_id) elif message["type"] == "registration_response": await handle_registration_response(message, client_id) else: await manager.send_message( { "type": "error", "message": f"Unknown message type: {message['type']}", }, client_id, ) except WebSocketDisconnect: manager.disconnect(client_id) async def handle_registration_challenge(message: dict, client_id: str): """Handle registration challenge request""" try: username = message.get("username", "user@example.com") user_id = str(uuid.uuid4()).encode() # Generate registration options with Resident Key support options = generate_registration_options( rp_id=RP_ID, rp_name=RP_NAME, user_id=user_id, user_name=username, user_display_name=username, # Enable Resident Keys (discoverable credentials) authenticator_selection=AuthenticatorSelectionCriteria( resident_key=ResidentKeyRequirement.REQUIRED, user_verification=UserVerificationRequirement.PREFERRED, ), # Support common algorithms supported_pub_key_algs=[ COSEAlgorithmIdentifier.ECDSA_SHA_256, COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256, ], ) # Store challenge for this client active_challenges[client_id] = options.challenge # Convert options to dict for JSON serialization options_dict = { "challenge": options.challenge, "rp": { "name": options.rp.name, "id": options.rp.id, }, "user": { "id": options.user.id, "name": options.user.name, "displayName": options.user.display_name, }, "pubKeyCredParams": [ {"alg": param.alg, "type": param.type} for param in options.pub_key_cred_params ], "timeout": options.timeout, "attestation": options.attestation, "authenticatorSelection": { "residentKey": options.authenticator_selection.resident_key.value, "userVerification": options.authenticator_selection.user_verification.value, }, } await manager.send_message( {"type": "registration_challenge_response", "options": options_dict}, client_id, ) except Exception as e: await manager.send_message( {"type": "error", "message": f"Failed to generate challenge: {str(e)}"}, client_id, ) async def handle_registration_response(message: dict, client_id: str): """Handle registration response verification""" try: # Get the stored challenge if client_id not in active_challenges: await manager.send_message( {"type": "error", "message": "No active challenge found"}, client_id ) return expected_challenge = active_challenges[client_id] credential = message["credential"] # Verify the registration response verification = verify_registration_response( credential=credential, expected_challenge=expected_challenge, expected_origin=ORIGIN, expected_rp_id=RP_ID, ) if verification.verified: # Clean up the challenge del active_challenges[client_id] await manager.send_message( { "type": "registration_success", "message": "Registration successful!", "credentialId": verification.credential_id, "credentialPublicKey": verification.credential_public_key, }, client_id, ) else: await manager.send_message( {"type": "error", "message": "Registration verification failed"}, client_id, ) except Exception as e: await manager.send_message( {"type": "error", "message": f"Registration failed: {str(e)}"}, client_id ) # Serve static files app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/") async def get_index(): return HTMLResponse( content="""
Test WebAuthn registration with Resident Keys support