244 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			244 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| WebAuthn handler class that combines registration and authentication functionality.
 | |
| 
 | |
| This module provides a unified interface for WebAuthn operations including:
 | |
| - Registration challenge generation and verification
 | |
| - Authentication challenge generation and verification
 | |
| - Credential validation
 | |
| """
 | |
| 
 | |
| import json
 | |
| from datetime import datetime
 | |
| from urllib.parse import urlparse
 | |
| from uuid import UUID
 | |
| 
 | |
| import uuid7
 | |
| from webauthn import (
 | |
|     generate_authentication_options,
 | |
|     generate_registration_options,
 | |
|     verify_authentication_response,
 | |
|     verify_registration_response,
 | |
| )
 | |
| from webauthn.authentication.verify_authentication_response import (
 | |
|     VerifiedAuthentication,
 | |
| )
 | |
| from webauthn.helpers import (
 | |
|     options_to_json,
 | |
|     parse_authentication_credential_json,
 | |
|     parse_registration_credential_json,
 | |
| )
 | |
| from webauthn.helpers.cose import COSEAlgorithmIdentifier
 | |
| from webauthn.helpers.structs import (
 | |
|     AttestationConveyancePreference,
 | |
|     AuthenticationCredential,
 | |
|     AuthenticatorSelectionCriteria,
 | |
|     PublicKeyCredentialDescriptor,
 | |
|     ResidentKeyRequirement,
 | |
|     UserVerificationRequirement,
 | |
| )
 | |
| 
 | |
| from .db import Credential
 | |
| 
 | |
| 
 | |
| class Passkey:
 | |
|     """WebAuthn handler for registration and authentication operations."""
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         rp_id: str,
 | |
|         rp_name: str | None = None,
 | |
|         origin: str | None = None,
 | |
|         supported_pub_key_algs: list[COSEAlgorithmIdentifier] | None = None,
 | |
|     ):
 | |
|         """
 | |
|         Initialize the WebAuthn handler.
 | |
| 
 | |
|         Args:
 | |
|             rp_id: Your security domain (e.g. "example.com")
 | |
|             rp_name: The relying party display name (e.g. "Example App"). May be shown in authenticators.
 | |
|             origin: The origin URL of the application (e.g. "https://app.example.com").
 | |
|                    If no scheme is provided, "https://" will be prepended.
 | |
|                    Must be a subdomain or same as rp_id, with port and scheme but no path included.
 | |
|             supported_pub_key_algs: List of supported COSE algorithms (default is EDDSA, ECDSA_SHA_256, RSASSA_PKCS1_v1_5_SHA_256).
 | |
| 
 | |
|         Raises:
 | |
|             ValueError: If the origin domain doesn't match or isn't a subdomain of rp_id.
 | |
|         """
 | |
|         self.rp_id = rp_id
 | |
|         self.rp_name = rp_name or rp_id
 | |
|         self.origin = self._normalize_and_validate_origin(origin, rp_id)
 | |
|         self.supported_pub_key_algs = supported_pub_key_algs or [
 | |
|             COSEAlgorithmIdentifier.EDDSA,
 | |
|             COSEAlgorithmIdentifier.ECDSA_SHA_256,
 | |
|             COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256,
 | |
|         ]
 | |
| 
 | |
|     def _normalize_and_validate_origin(self, origin: str | None, rp_id: str) -> str:
 | |
|         if origin is None:
 | |
|             origin = f"https://{rp_id}"
 | |
|         elif "://" not in origin:
 | |
|             origin = f"https://{origin}"
 | |
| 
 | |
|         hostname = urlparse(origin).hostname
 | |
|         if not hostname:
 | |
|             raise ValueError(f"Invalid origin URL: no hostname found in '{origin}'")
 | |
| 
 | |
|         if hostname == rp_id or hostname.endswith(f".{rp_id}"):
 | |
|             return origin
 | |
| 
 | |
|         raise ValueError(
 | |
|             f"Origin domain '{hostname}' must be the same as or a subdomain of rp_id '{rp_id}'"
 | |
|         )
 | |
| 
 | |
|     ### Registration Methods ###
 | |
| 
 | |
|     def reg_generate_options(
 | |
|         self,
 | |
|         user_id: UUID,
 | |
|         user_name: str,
 | |
|         credential_ids: list[bytes] | None = None,
 | |
|         origin: str | None = None,
 | |
|         **regopts,
 | |
|     ) -> tuple[dict, bytes]:
 | |
|         """
 | |
|         Generate registration options for WebAuthn registration.
 | |
| 
 | |
|         Args:
 | |
|             user_id: The user ID as bytes
 | |
|             user_name: The username
 | |
|             credential_ids: For an already authenticated user, a list of credential IDs
 | |
|                 associated with the account. This prevents accidentally adding another
 | |
|                 credential on an authenticator that already has one of the listed IDs.
 | |
|             origin: The origin URL of the application (e.g. "https://app.example.com"). Must be a subdomain or same as rp_id, with port and scheme but no path included.
 | |
|             regopts: Additional arguments to generate_registration_options.
 | |
| 
 | |
|         Returns:
 | |
|             JSON dict containing options to be sent to client,
 | |
|             challenge bytes to keep during the registration process.
 | |
|         """
 | |
|         options = generate_registration_options(
 | |
|             rp_id=self.rp_id,
 | |
|             rp_name=self.rp_name,
 | |
|             user_id=user_id.bytes,
 | |
|             user_name=user_name,
 | |
|             attestation=AttestationConveyancePreference.DIRECT,
 | |
|             authenticator_selection=AuthenticatorSelectionCriteria(
 | |
|                 resident_key=ResidentKeyRequirement.REQUIRED,
 | |
|                 user_verification=UserVerificationRequirement.PREFERRED,
 | |
|             ),
 | |
|             exclude_credentials=_convert_credential_ids(credential_ids),
 | |
|             supported_pub_key_algs=self.supported_pub_key_algs,
 | |
|             **regopts,
 | |
|         )
 | |
|         return json.loads(options_to_json(options)), options.challenge
 | |
| 
 | |
|     def reg_verify(
 | |
|         self,
 | |
|         response_json: dict | str,
 | |
|         expected_challenge: bytes,
 | |
|         user_uuid: UUID,
 | |
|         origin: str | None = None,
 | |
|     ) -> Credential:
 | |
|         """
 | |
|         Verify registration response.
 | |
| 
 | |
|         Args:
 | |
|             credential: The credential response from the client
 | |
|             expected_challenge: The expected challenge bytes
 | |
| 
 | |
|         Returns:
 | |
|             Registration verification result
 | |
|         """
 | |
|         credential = parse_registration_credential_json(response_json)
 | |
|         registration = verify_registration_response(
 | |
|             credential=credential,
 | |
|             expected_challenge=expected_challenge,
 | |
|             expected_origin=origin or self.origin,
 | |
|             expected_rp_id=self.rp_id,
 | |
|         )
 | |
|         return Credential(
 | |
|             uuid=uuid7.create(),
 | |
|             credential_id=credential.raw_id,
 | |
|             user_uuid=user_uuid,
 | |
|             aaguid=UUID(registration.aaguid),
 | |
|             public_key=registration.credential_public_key,
 | |
|             sign_count=registration.sign_count,
 | |
|             created_at=datetime.now(),
 | |
|         )
 | |
| 
 | |
|     ### Authentication Methods ###
 | |
| 
 | |
|     def auth_generate_options(
 | |
|         self,
 | |
|         *,
 | |
|         user_verification_required=False,
 | |
|         credential_ids: list[bytes] | None = None,
 | |
|         **authopts,
 | |
|     ) -> tuple[dict, bytes]:
 | |
|         """
 | |
|         Generate authentication options for WebAuthn authentication.
 | |
| 
 | |
|         Args:
 | |
|             user_verification_required: The user will have to re-enter PIN or use biometrics for this operation. Useful when accessing security settings etc.
 | |
|             credential_ids: For an already known user, a list of credential IDs associated with the account (less prompts during authentication).
 | |
|             authopts: Additional arguments to generate_authentication_options.
 | |
| 
 | |
|         Returns:
 | |
|             Tuple of (JSON to be sent to client, challenge bytes to store)
 | |
|         """
 | |
|         options = generate_authentication_options(
 | |
|             rp_id=self.rp_id,
 | |
|             user_verification=(
 | |
|                 UserVerificationRequirement.REQUIRED
 | |
|                 if user_verification_required
 | |
|                 else UserVerificationRequirement.DISCOURAGED
 | |
|             ),
 | |
|             allow_credentials=_convert_credential_ids(credential_ids),
 | |
|             **authopts,
 | |
|         )
 | |
|         return json.loads(options_to_json(options)), options.challenge
 | |
| 
 | |
|     def auth_parse(self, response: dict | str) -> AuthenticationCredential:
 | |
|         return parse_authentication_credential_json(response)
 | |
| 
 | |
|     def auth_verify(
 | |
|         self,
 | |
|         credential: AuthenticationCredential,
 | |
|         expected_challenge: bytes,
 | |
|         stored_cred: Credential,
 | |
|         origin: str | None = None,
 | |
|     ) -> VerifiedAuthentication:
 | |
|         """
 | |
|         Verify authentication response against locally stored credential data.
 | |
| 
 | |
|         Args:
 | |
|             credential: The authentication credential response from the client
 | |
|             expected_challenge: The earlier generated challenge bytes
 | |
|             stored_cred: The server stored credential record (modified by this function)
 | |
|         """
 | |
|         expected_origin = origin or self.origin
 | |
|         # Verify the authentication response
 | |
|         verification = verify_authentication_response(
 | |
|             credential=credential,
 | |
|             expected_challenge=expected_challenge,
 | |
|             expected_origin=expected_origin,
 | |
|             expected_rp_id=self.rp_id,
 | |
|             credential_public_key=stored_cred.public_key,
 | |
|             credential_current_sign_count=stored_cred.sign_count,
 | |
|         )
 | |
|         stored_cred.sign_count = verification.new_sign_count
 | |
|         now = datetime.now()
 | |
|         stored_cred.last_used = now
 | |
|         if verification.user_verified:
 | |
|             stored_cred.last_verified = now
 | |
|         return verification
 | |
| 
 | |
| 
 | |
| def _convert_credential_ids(
 | |
|     credential_ids: list[bytes] | None,
 | |
| ) -> list[PublicKeyCredentialDescriptor] | None:
 | |
|     """A helper to convert a list of credential IDs to PublicKeyCredentialDescriptor objects, or pass through None."""
 | |
|     if credential_ids is None:
 | |
|         return None
 | |
|     return [PublicKeyCredentialDescriptor(id) for id in credential_ids]
 | 
