passkey-auth/passkey/db/__init__.py
2025-08-06 14:39:44 -06:00

252 lines
7.1 KiB
Python

"""
Database module for WebAuthn passkey authentication.
This module provides dataclasses and database abstractions for managing
users, credentials, and sessions in a WebAuthn authentication system.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID
@dataclass
class User:
uuid: UUID
display_name: str
org_uuid: UUID | None = None
role: str | None = None
created_at: datetime | None = None
last_seen: datetime | None = None
visits: int = 0
@dataclass
class Credential:
uuid: UUID
credential_id: bytes # Long binary ID passed from the authenticator
user_uuid: UUID
aaguid: UUID
public_key: bytes
sign_count: int
created_at: datetime
last_used: datetime | None = None
last_verified: datetime | None = None
@dataclass
class Org:
"""Organization data structure."""
id: str # ASCII primary key
options: dict
@dataclass
class Permission:
"""Permission data structure."""
id: str # String primary key (max 32 chars)
display_name: str
@dataclass
class Session:
"""Session data structure."""
key: bytes
user_uuid: UUID
expires: datetime
info: dict
credential_uuid: UUID | None = None
class DatabaseInterface(ABC):
"""Abstract base class defining the database interface.
This class defines the public API that database implementations should provide.
Implementations may use decorators like @with_session that modify method signatures
at runtime, so this interface focuses on the logical operations rather than
exact parameter matching.
"""
@abstractmethod
async def init_db(self) -> None:
"""Initialize database tables."""
pass
# User operations
@abstractmethod
async def get_user_by_uuid(self, user_uuid: UUID) -> User:
"""Get user record by WebAuthn user UUID."""
@abstractmethod
async def create_user(self, user: User) -> None:
"""Create a new user."""
# Credential operations
@abstractmethod
async def create_credential(self, credential: Credential) -> None:
"""Store a credential for a user."""
@abstractmethod
async def get_credential_by_id(self, credential_id: bytes) -> Credential:
"""Get credential by credential ID."""
@abstractmethod
async def get_credentials_by_user_uuid(self, user_uuid: UUID) -> list[bytes]:
"""Get all credential IDs for a user."""
@abstractmethod
async def update_credential(self, credential: Credential) -> None:
"""Update the sign count, created_at, last_used, and last_verified for a credential."""
@abstractmethod
async def delete_credential(self, uuid: UUID, user_uuid: UUID) -> None:
"""Delete a specific credential for a user."""
# Session operations
@abstractmethod
async def create_session(
self,
user_uuid: UUID,
key: bytes,
expires: datetime,
info: dict,
credential_uuid: UUID | None = None,
) -> None:
"""Create a new session."""
@abstractmethod
async def get_session(self, key: bytes) -> Session | None:
"""Get session by key."""
@abstractmethod
async def delete_session(self, key: bytes) -> None:
"""Delete session by key."""
@abstractmethod
async def update_session(
self, key: bytes, expires: datetime, info: dict
) -> Session | None:
"""Update session expiry and info."""
@abstractmethod
async def cleanup(self) -> None:
"""Called periodically to clean up expired records."""
# Organization operations
@abstractmethod
async def create_organization(self, organization: Org) -> None:
"""Create a new organization."""
@abstractmethod
async def get_organization(self, org_id: str) -> Org:
"""Get organization by ID."""
@abstractmethod
async def update_organization(self, organization: Org) -> None:
"""Update organization options."""
@abstractmethod
async def delete_organization(self, org_id: str) -> None:
"""Delete organization by ID."""
@abstractmethod
async def add_user_to_organization(
self, user_uuid: UUID, org_id: str, role: str
) -> None:
"""Set a user's organization and role."""
@abstractmethod
async def remove_user_from_organization(self, user_uuid: UUID) -> None:
"""Remove a user from their organization."""
@abstractmethod
async def get_user_organization(self, user_uuid: UUID) -> tuple[Org, str] | None:
"""Get the organization and role for a user."""
@abstractmethod
async def get_organization_users(self, org_id: str) -> list[tuple[User, str]]:
"""Get all users in an organization with their roles."""
@abstractmethod
async def get_user_role_in_organization(
self, user_uuid: UUID, org_id: str
) -> str | None:
"""Get a user's role in a specific organization."""
@abstractmethod
async def update_user_role_in_organization(
self, user_uuid: UUID, new_role: str
) -> None:
"""Update a user's role in their organization."""
# Permission operations
@abstractmethod
async def create_permission(self, permission: Permission) -> None:
"""Create a new permission."""
@abstractmethod
async def get_permission(self, permission_id: str) -> Permission:
"""Get permission by ID."""
@abstractmethod
async def update_permission(self, permission: Permission) -> None:
"""Update permission details."""
@abstractmethod
async def delete_permission(self, permission_id: str) -> None:
"""Delete permission by ID."""
@abstractmethod
async def add_permission_to_organization(
self, org_id: str, permission_id: str
) -> None:
"""Add a permission to an organization."""
@abstractmethod
async def remove_permission_from_organization(
self, org_id: str, permission_id: str
) -> None:
"""Remove a permission from an organization."""
@abstractmethod
async def get_organization_permissions(self, org_id: str) -> list[Permission]:
"""Get all permissions assigned to an organization."""
@abstractmethod
async def get_permission_organizations(self, permission_id: str) -> list[Org]:
"""Get all organizations that have a specific permission."""
# Combined operations
@abstractmethod
async def login(self, user_uuid: UUID, credential: Credential) -> None:
"""Update user and credential timestamps after successful login."""
@abstractmethod
async def create_user_and_credential(
self, user: User, credential: Credential
) -> None:
"""Create a new user and their first credential in a transaction."""
# Bootstrap helpers
@abstractmethod
async def has_any_users(self) -> bool:
"""Check if any users exist in the system."""
@abstractmethod
async def find_users_by_role(self, role: str) -> list[User]:
"""Find all users with a specific role."""
__all__ = [
"User",
"Credential",
"Session",
"Org",
"Permission",
"DatabaseInterface",
]