diff --git a/dev.py b/dev.py deleted file mode 100644 index 409f1a0..0000000 --- a/dev.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python3 -""" -Development server runner -""" - -if __name__ == "__main__": - from passkeyauth.main import main - - main() diff --git a/main.py b/main.py deleted file mode 100644 index 79b106f..0000000 --- a/main.py +++ /dev/null @@ -1,326 +0,0 @@ -import json -import uuid -from typing import Dict - -from fastapi import FastAPI, WebSocket, WebSocketDisconnect -from fastapi.responses import HTMLResponse -from fastapi.staticfiles import StaticFiles -from webauthn import generate_registration_options, verify_registration_response -from webauthn.helpers.cose import COSEAlgorithmIdentifier -from webauthn.helpers.structs import ( - AuthenticatorSelectionCriteria, - ResidentKeyRequirement, - UserVerificationRequirement, -) - -app = FastAPI(title="WebAuthn Registration Server") - -# In-memory storage for challenges (in production, use Redis or similar) -active_challenges: Dict[str, str] = {} - -# WebAuthn configuration -RP_ID = "localhost" -RP_NAME = "WebAuthn Demo" -ORIGIN = "http://localhost:8000" - - -class ConnectionManager: - def __init__(self): - self.active_connections: Dict[str, WebSocket] = {} - - async def connect(self, websocket: WebSocket, client_id: str): - await websocket.accept() - self.active_connections[client_id] = websocket - - def disconnect(self, client_id: str): - if client_id in self.active_connections: - del self.active_connections[client_id] - - async def send_message(self, message: dict, client_id: str): - if client_id in self.active_connections: - await self.active_connections[client_id].send_text(json.dumps(message)) - - -manager = ConnectionManager() - - -@app.websocket("/ws/{client_id}") -async def websocket_endpoint(websocket: WebSocket, client_id: str): - await manager.connect(websocket, client_id) - try: - while True: - data = await websocket.receive_text() - message = json.loads(data) - - if message["type"] == "registration_challenge": - await handle_registration_challenge(message, client_id) - elif message["type"] == "registration_response": - await handle_registration_response(message, client_id) - else: - await manager.send_message( - { - "type": "error", - "message": f"Unknown message type: {message['type']}", - }, - client_id, - ) - - except WebSocketDisconnect: - manager.disconnect(client_id) - - -async def handle_registration_challenge(message: dict, client_id: str): - """Handle registration challenge request""" - try: - username = message.get("username", "user@example.com") - user_id = str(uuid.uuid4()).encode() - - # Generate registration options with Resident Key support - options = generate_registration_options( - rp_id=RP_ID, - rp_name=RP_NAME, - user_id=user_id, - user_name=username, - user_display_name=username, - # Enable Resident Keys (discoverable credentials) - authenticator_selection=AuthenticatorSelectionCriteria( - resident_key=ResidentKeyRequirement.REQUIRED, - user_verification=UserVerificationRequirement.PREFERRED, - ), - # Support common algorithms - supported_pub_key_algs=[ - COSEAlgorithmIdentifier.ECDSA_SHA_256, - COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256, - ], - ) - - # Store challenge for this client - active_challenges[client_id] = options.challenge - - # Convert options to dict for JSON serialization - options_dict = { - "challenge": options.challenge, - "rp": { - "name": options.rp.name, - "id": options.rp.id, - }, - "user": { - "id": options.user.id, - "name": options.user.name, - "displayName": options.user.display_name, - }, - "pubKeyCredParams": [ - {"alg": param.alg, "type": param.type} - for param in options.pub_key_cred_params - ], - "timeout": options.timeout, - "attestation": options.attestation, - "authenticatorSelection": { - "residentKey": options.authenticator_selection.resident_key.value, - "userVerification": options.authenticator_selection.user_verification.value, - }, - } - - await manager.send_message( - {"type": "registration_challenge_response", "options": options_dict}, - client_id, - ) - - except Exception as e: - await manager.send_message( - {"type": "error", "message": f"Failed to generate challenge: {str(e)}"}, - client_id, - ) - - -async def handle_registration_response(message: dict, client_id: str): - """Handle registration response verification""" - try: - # Get the stored challenge - if client_id not in active_challenges: - await manager.send_message( - {"type": "error", "message": "No active challenge found"}, client_id - ) - return - - expected_challenge = active_challenges[client_id] - credential = message["credential"] - - # Verify the registration response - verification = verify_registration_response( - credential=credential, - expected_challenge=expected_challenge, - expected_origin=ORIGIN, - expected_rp_id=RP_ID, - ) - - if verification.verified: - # Clean up the challenge - del active_challenges[client_id] - - await manager.send_message( - { - "type": "registration_success", - "message": "Registration successful!", - "credentialId": verification.credential_id, - "credentialPublicKey": verification.credential_public_key, - }, - client_id, - ) - else: - await manager.send_message( - {"type": "error", "message": "Registration verification failed"}, - client_id, - ) - - except Exception as e: - await manager.send_message( - {"type": "error", "message": f"Registration failed: {str(e)}"}, client_id - ) - - -# Serve static files -app.mount("/static", StaticFiles(directory="static"), name="static") - - -@app.get("/") -async def get_index(): - return HTMLResponse( - content=""" - - - - WebAuthn Registration Demo - - - - -
-

WebAuthn Registration Demo

-

Test WebAuthn registration with Resident Keys support

- -
- - -
- - - -
-
-
- - - - - """ - ) - - -if __name__ == "__main__": - import uvicorn - - uvicorn.run(app, host="0.0.0.0", port=8000)