A lot of cleanup, restructuring project directory.
This commit is contained in:
		
							
								
								
									
										233
									
								
								passkey/sansio.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										233
									
								
								passkey/sansio.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,233 @@ | ||||
| """ | ||||
| WebAuthn handler class that combines registration and authentication functionality. | ||||
|  | ||||
| This module provides a unified interface for WebAuthn operations including: | ||||
| - Registration challenge generation and verification | ||||
| - Authentication challenge generation and verification | ||||
| - Credential validation | ||||
| """ | ||||
|  | ||||
| import json | ||||
| from dataclasses import dataclass | ||||
| from datetime import datetime | ||||
| from uuid import UUID | ||||
|  | ||||
| from webauthn import ( | ||||
|     generate_authentication_options, | ||||
|     generate_registration_options, | ||||
|     verify_authentication_response, | ||||
|     verify_registration_response, | ||||
| ) | ||||
| from webauthn.authentication.verify_authentication_response import ( | ||||
|     VerifiedAuthentication, | ||||
| ) | ||||
| from webauthn.helpers import ( | ||||
|     options_to_json, | ||||
|     parse_authentication_credential_json, | ||||
|     parse_registration_credential_json, | ||||
| ) | ||||
| from webauthn.helpers.cose import COSEAlgorithmIdentifier | ||||
| from webauthn.helpers.structs import ( | ||||
|     AttestationConveyancePreference, | ||||
|     AuthenticationCredential, | ||||
|     AuthenticatorSelectionCriteria, | ||||
|     PublicKeyCredentialDescriptor, | ||||
|     ResidentKeyRequirement, | ||||
|     UserVerificationRequirement, | ||||
| ) | ||||
|  | ||||
|  | ||||
| @dataclass | ||||
| class StoredCredential: | ||||
|     """Credential data stored in the database.""" | ||||
|  | ||||
|     # Fields set only at registration time | ||||
|     credential_id: bytes | ||||
|     user_id: UUID | ||||
|     aaguid: UUID | ||||
|     public_key: bytes | ||||
|     # Mutable fields that may be updated during authentication | ||||
|     sign_count: int | ||||
|     created_at: datetime | ||||
|     last_used: datetime | None = None | ||||
|     last_verified: datetime | None = None | ||||
|  | ||||
|  | ||||
| class Passkey: | ||||
|     """WebAuthn handler for registration and authentication operations.""" | ||||
|  | ||||
|     def __init__( | ||||
|         self, | ||||
|         rp_id: str, | ||||
|         rp_name: str, | ||||
|         origin: str | None = None, | ||||
|         supported_pub_key_algs: list[COSEAlgorithmIdentifier] | None = None, | ||||
|     ): | ||||
|         """ | ||||
|         Initialize the WebAuthn handler. | ||||
|  | ||||
|         Args: | ||||
|             rp_id: Your security domain (e.g. "example.com") | ||||
|             rp_name: The relying party name (e.g., "My Application" - visible to users) | ||||
|             origin: The origin URL of the application (e.g. "https://app.example.com"). Must be a subdomain or same as rp_id, with port and scheme but no path included. | ||||
|             supported_pub_key_algs: List of supported COSE algorithms (default is EDDSA, ECDSA_SHA_256, RSASSA_PKCS1_v1_5_SHA_256). | ||||
|         """ | ||||
|         self.rp_id = rp_id | ||||
|         self.rp_name = rp_name | ||||
|         self.origin = origin or f"https://{rp_id}" | ||||
|         self.supported_pub_key_algs = supported_pub_key_algs or [ | ||||
|             COSEAlgorithmIdentifier.EDDSA, | ||||
|             COSEAlgorithmIdentifier.ECDSA_SHA_256, | ||||
|             COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256, | ||||
|         ] | ||||
|  | ||||
|     ### Registration Methods ### | ||||
|  | ||||
|     def reg_generate_options( | ||||
|         self, | ||||
|         user_id: UUID, | ||||
|         user_name: str, | ||||
|         credential_ids: list[bytes] | None = None, | ||||
|         origin: str | None = None, | ||||
|         **regopts, | ||||
|     ) -> tuple[dict, bytes]: | ||||
|         """ | ||||
|         Generate registration options for WebAuthn registration. | ||||
|  | ||||
|         Args: | ||||
|             user_id: The user ID as bytes | ||||
|             user_name: The username | ||||
|             credential_ids: For an already authenticated user, a list of credential IDs | ||||
|                 associated with the account. This prevents accidentally adding another | ||||
|                 credential on an authenticator that already has one of the listed IDs. | ||||
|             origin: The origin URL of the application (e.g. "https://app.example.com"). Must be a subdomain or same as rp_id, with port and scheme but no path included. | ||||
|             regopts: Additional arguments to generate_registration_options. | ||||
|  | ||||
|         Returns: | ||||
|             JSON dict containing options to be sent to client, | ||||
|             challenge bytes to keep during the registration process. | ||||
|         """ | ||||
|         options = generate_registration_options( | ||||
|             rp_id=self.rp_id, | ||||
|             rp_name=self.rp_name, | ||||
|             user_id=user_id.bytes, | ||||
|             user_name=user_name, | ||||
|             attestation=AttestationConveyancePreference.DIRECT, | ||||
|             authenticator_selection=AuthenticatorSelectionCriteria( | ||||
|                 resident_key=ResidentKeyRequirement.REQUIRED, | ||||
|                 user_verification=UserVerificationRequirement.PREFERRED, | ||||
|             ), | ||||
|             exclude_credentials=_convert_credential_ids(credential_ids), | ||||
|             supported_pub_key_algs=self.supported_pub_key_algs, | ||||
|             **regopts, | ||||
|         ) | ||||
|         return json.loads(options_to_json(options)), options.challenge | ||||
|  | ||||
|     def reg_verify( | ||||
|         self, | ||||
|         response_json: dict | str, | ||||
|         expected_challenge: bytes, | ||||
|         user_id: UUID, | ||||
|         origin: str | None = None, | ||||
|     ) -> StoredCredential: | ||||
|         """ | ||||
|         Verify registration response. | ||||
|  | ||||
|         Args: | ||||
|             credential: The credential response from the client | ||||
|             expected_challenge: The expected challenge bytes | ||||
|  | ||||
|         Returns: | ||||
|             Registration verification result | ||||
|         """ | ||||
|         credential = parse_registration_credential_json(response_json) | ||||
|         registration = verify_registration_response( | ||||
|             credential=credential, | ||||
|             expected_challenge=expected_challenge, | ||||
|             expected_origin=origin or self.origin, | ||||
|             expected_rp_id=self.rp_id, | ||||
|         ) | ||||
|         return StoredCredential( | ||||
|             credential_id=credential.raw_id, | ||||
|             user_id=user_id, | ||||
|             aaguid=UUID(registration.aaguid), | ||||
|             public_key=registration.credential_public_key, | ||||
|             sign_count=registration.sign_count, | ||||
|             created_at=datetime.now(), | ||||
|         ) | ||||
|  | ||||
|     ### Authentication Methods ### | ||||
|  | ||||
|     def auth_generate_options( | ||||
|         self, | ||||
|         *, | ||||
|         user_verification_required=False, | ||||
|         credential_ids: list[bytes] | None = None, | ||||
|         **authopts, | ||||
|     ) -> tuple[dict, bytes]: | ||||
|         """ | ||||
|         Generate authentication options for WebAuthn authentication. | ||||
|  | ||||
|         Args: | ||||
|             user_verification_required: The user will have to re-enter PIN or use biometrics for this operation. Useful when accessing security settings etc. | ||||
|             credential_ids: For an already known user, a list of credential IDs associated with the account (less prompts during authentication). | ||||
|             authopts: Additional arguments to generate_authentication_options. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple of (JSON to be sent to client, challenge bytes to store) | ||||
|         """ | ||||
|         options = generate_authentication_options( | ||||
|             rp_id=self.rp_id, | ||||
|             user_verification=( | ||||
|                 UserVerificationRequirement.REQUIRED | ||||
|                 if user_verification_required | ||||
|                 else UserVerificationRequirement.DISCOURAGED | ||||
|             ), | ||||
|             allow_credentials=_convert_credential_ids(credential_ids), | ||||
|             **authopts, | ||||
|         ) | ||||
|         return json.loads(options_to_json(options)), options.challenge | ||||
|  | ||||
|     def auth_parse(self, response: dict | str) -> AuthenticationCredential: | ||||
|         return parse_authentication_credential_json(response) | ||||
|  | ||||
|     def auth_verify( | ||||
|         self, | ||||
|         credential: AuthenticationCredential, | ||||
|         expected_challenge: bytes, | ||||
|         stored_cred: StoredCredential, | ||||
|         origin: str | None = None, | ||||
|     ) -> VerifiedAuthentication: | ||||
|         """ | ||||
|         Verify authentication response against locally stored credential data. | ||||
|  | ||||
|         Args: | ||||
|             credential: The authentication credential response from the client | ||||
|             expected_challenge: The earlier generated challenge bytes | ||||
|             stored_cred: The server stored credential record (modified by this function) | ||||
|         """ | ||||
|         expected_origin = origin or self.origin | ||||
|         # Verify the authentication response | ||||
|         verification = verify_authentication_response( | ||||
|             credential=credential, | ||||
|             expected_challenge=expected_challenge, | ||||
|             expected_origin=expected_origin, | ||||
|             expected_rp_id=self.rp_id, | ||||
|             credential_public_key=stored_cred.public_key, | ||||
|             credential_current_sign_count=stored_cred.sign_count, | ||||
|         ) | ||||
|         stored_cred.sign_count = verification.new_sign_count | ||||
|         now = datetime.now() | ||||
|         stored_cred.last_used = now | ||||
|         if verification.user_verified: | ||||
|             stored_cred.last_verified = now | ||||
|         return verification | ||||
|  | ||||
|  | ||||
| def _convert_credential_ids( | ||||
|     credential_ids: list[bytes] | None, | ||||
| ) -> list[PublicKeyCredentialDescriptor] | None: | ||||
|     """A helper to convert a list of credential IDs to PublicKeyCredentialDescriptor objects, or pass through None.""" | ||||
|     if credential_ids is None: | ||||
|         return None | ||||
|     return [PublicKeyCredentialDescriptor(id) for id in credential_ids] | ||||
		Reference in New Issue
	
	Block a user
	 Leo Vasanko
					Leo Vasanko