From f050dfb3f2db3d0d4409a1965c65dd4babe549a5 Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Wed, 6 Aug 2025 14:39:44 -0600 Subject: [PATCH] Bootstrap code cleanup. --- passkey/bootstrap.py | 254 +++++++++++++++++++---------------------- passkey/db/__init__.py | 9 ++ passkey/db/sql.py | 33 ++++++ scripts/bootstrap.py | 36 ------ 4 files changed, 161 insertions(+), 171 deletions(-) delete mode 100644 scripts/bootstrap.py diff --git a/passkey/bootstrap.py b/passkey/bootstrap.py index 9bfa654..36becd4 100644 --- a/passkey/bootstrap.py +++ b/passkey/bootstrap.py @@ -6,143 +6,138 @@ including creating default admin user, organization, permissions, and generating a reset link for initial admin setup. """ +import logging from datetime import datetime import uuid7 -from .authsession import expires +from . import authsession, globals from .db import Org, Permission, User -from .globals import db from .util import passphrase, tokens +logger = logging.getLogger(__name__) -class BootstrapManager: - """Manages system bootstrapping operations.""" +# Shared log message template for admin reset links +ADMIN_RESET_MESSAGE = """%s - def __init__(self): - self.admin_uuid = uuid7.create() - self.org_uuid = uuid7.create() +šŸ‘¤ Admin %s + - Use this link to register a Passkey for the admin user! +""" - async def create_default_organization(self) -> Org: - """Create the default organization.""" - org = Org( - id=str(self.org_uuid), # Use UUID string as required by database - options={ - "display_name": "Organization", - "description": "Default organization for passkey authentication system", - "created_at": datetime.now().isoformat(), - }, - ) - await db.instance.create_organization(org) - return org - async def create_admin_permission(self) -> Permission: - """Create the admin permission.""" - permission = Permission( - id="auth/admin", display_name="Authentication Administration" - ) - await db.instance.create_permission(permission) - return permission +async def _create_and_log_admin_reset_link(user_uuid, message, session_type) -> str: + """Create an admin reset link and log it with the provided message.""" + token = passphrase.generate() + await globals.db.instance.create_session( + user_uuid=user_uuid, + key=tokens.reset_key(token), + expires=authsession.expires(), + info={"type": session_type}, + ) + reset_link = f"{globals.passkey.instance.origin}/auth/{token}" + logger.info(ADMIN_RESET_MESSAGE, message, reset_link) + return reset_link - async def create_default_admin_user(self) -> User: - """Create the default admin user.""" - user = User( - uuid=self.admin_uuid, - display_name="Admin", - org_uuid=self.org_uuid, - role="Admin", - created_at=datetime.now(), - visits=0, - ) - await db.instance.create_user(user) - return user - async def assign_permissions_to_organization( - self, org_id: str, permission_id: str - ) -> None: - """Assign permission to organization.""" - await db.instance.add_permission_to_organization(org_id, permission_id) +async def bootstrap_system() -> dict: + """ + Bootstrap the entire system with default data. - async def generate_admin_reset_link(self) -> str: - """Generate a reset link for the admin user to register their first passkey.""" - # Generate a human-readable passphrase token - token = passphrase.generate() # e.g., "cross.rotate.yin.note.evoke" + Returns: + dict: Contains information about created entities and reset link + """ + admin_uuid = uuid7.create() + org_uuid = uuid7.create() - # Create a reset session for the admin user - await db.instance.create_session( - user_uuid=self.admin_uuid, - key=tokens.reset_key(token), - expires=expires(), - info={ - "type": "bootstrap_reset", - "description": "Initial admin setup", - "created_at": datetime.now().isoformat(), - }, + # Create organization + org = Org( + id=str(org_uuid), + options={ + "display_name": "Organization", + "created_at": datetime.now().isoformat(), + }, + ) + await globals.db.instance.create_organization(org) + + # Create permission + permission = Permission(id="auth/admin", display_name="Admin") + await globals.db.instance.create_permission(permission) + + # Create admin user + user = User( + uuid=admin_uuid, + display_name="Admin", + org_uuid=org_uuid, + role="Admin", + created_at=datetime.now(), + visits=0, + ) + await globals.db.instance.create_user(user) + + # Link user to organization and assign permissions + await globals.db.instance.add_user_to_organization( + user_uuid=admin_uuid, org_id=org.id, role="Admin" + ) + await globals.db.instance.add_permission_to_organization(org.id, permission.id) + + # Generate reset link and log it + reset_link = await _create_and_log_admin_reset_link( + admin_uuid, "āœ… Bootstrap completed!", "admin bootstrap" + ) + + result = { + "admin_user": { + "uuid": str(user.uuid), + "display_name": user.display_name, + "role": user.role, + }, + "organization": { + "id": org.id, + "display_name": org.options.get("display_name"), + }, + "permission": { + "id": permission.id, + "display_name": permission.display_name, + }, + "reset_link": reset_link, + } + + return result + + +async def check_admin_credentials() -> bool: + """ + Check if the admin user needs credentials and create a reset link if needed. + + Returns: + bool: True if a reset link was created, False if admin already has credentials + """ + try: + # Find admin users + admin_users = await globals.db.instance.find_users_by_role("Admin") + + if not admin_users: + return False + + # Check first admin user for credentials + admin_user = admin_users[0] + credentials = await globals.db.instance.get_credentials_by_user_uuid( + admin_user.uuid ) - return token + if not credentials: + # Admin exists but has no credentials, create reset link + await _create_and_log_admin_reset_link( + admin_user.uuid, + "āš ļø Admin user has no credentials!", + "admin registration", + ) + return True - async def bootstrap_system(self) -> dict: - """ - Bootstrap the entire system with default data. + return False - Returns: - dict: Contains information about created entities and reset link - """ - print("šŸš€ Bootstrapping passkey authentication system...") - - # Create default organization - print("šŸ“‚ Creating default organization...") - org = await self.create_default_organization() - - # Create admin permission - print("šŸ” Creating admin permission...") - permission = await self.create_admin_permission() - - # Create admin user - print("šŸ‘¤ Creating admin user...") - user = await self.create_default_admin_user() - - # Assign admin to organization - print("šŸ¢ Assigning admin to organization...") - await db.instance.add_user_to_organization( - user_uuid=self.admin_uuid, org_id=org.id, role="Admin" - ) - - # Assign permission to organization - print("⚔ Assigning permissions to organization...") - await self.assign_permissions_to_organization(org.id, permission.id) - - # Generate reset link for admin - print("šŸ”— Generating admin setup link...") - reset_token = await self.generate_admin_reset_link() - - result = { - "admin_user": { - "uuid": str(user.uuid), - "display_name": user.display_name, - "role": user.role, - }, - "organization": { - "id": org.id, - "display_name": org.options.get("display_name"), - }, - "permission": { - "id": permission.id, - "display_name": permission.display_name, - }, - "reset_token": reset_token, - } - - print("\nāœ… Bootstrap completed successfully!") - print(f"\nšŸ”‘ Admin Reset Token: {reset_token}") - print("\nšŸ“‹ Use this token to set up the admin user's first passkey.") - print(" The token will be valid for 24 hours.") - print(f"\nšŸ‘¤ Admin User UUID: {user.uuid}") - print(f"šŸ¢ Organization: {org.options.get('display_name')} (ID: {org.id})") - print(f"šŸ” Permission: {permission.display_name} (ID: {permission.id})") - - return result + except Exception: + return False async def bootstrap_if_needed() -> bool: @@ -153,25 +148,15 @@ async def bootstrap_if_needed() -> bool: bool: True if bootstrapping was performed, False if system was already set up """ try: - # Try to get any organization to see if system is already bootstrapped - # We'll use a more robust check by looking for existing users - from sqlalchemy import select - - from .db.sql import DB, UserModel - - async with DB().session() as session: - stmt = select(UserModel).limit(1) - result = await session.execute(stmt) - user = result.scalar_one_or_none() - if user: - print("ā„¹ļø System already bootstrapped (found existing users).") - return False + # Check if any users exist + if await globals.db.instance.has_any_users(): + await check_admin_credentials() + return False except Exception: pass # No users found, need to bootstrap - manager = BootstrapManager() - await manager.bootstrap_system() + await bootstrap_system() return True @@ -182,8 +167,7 @@ async def force_bootstrap() -> dict: Returns: dict: Bootstrap result information """ - manager = BootstrapManager() - return await manager.bootstrap_system() + return await bootstrap_system() # CLI interface diff --git a/passkey/db/__init__.py b/passkey/db/__init__.py index 72a106a..a6a74be 100644 --- a/passkey/db/__init__.py +++ b/passkey/db/__init__.py @@ -231,6 +231,15 @@ class DatabaseInterface(ABC): ) -> None: """Create a new user and their first credential in a transaction.""" + # Bootstrap helpers + @abstractmethod + async def has_any_users(self) -> bool: + """Check if any users exist in the system.""" + + @abstractmethod + async def find_users_by_role(self, role: str) -> list[User]: + """Find all users with a specific role.""" + __all__ = [ "User", diff --git a/passkey/db/sql.py b/passkey/db/sql.py index 695efd2..0397d7f 100644 --- a/passkey/db/sql.py +++ b/passkey/db/sql.py @@ -643,3 +643,36 @@ class DB(DatabaseInterface): current_time = datetime.now() stmt = delete(SessionModel).where(SessionModel.expires < current_time) await session.execute(stmt) + + # Bootstrap helpers + async def has_any_users(self) -> bool: + """Check if any users exist in the system.""" + async with self.session() as session: + stmt = select(UserModel).limit(1) + result = await session.execute(stmt) + user = result.scalar_one_or_none() + return user is not None + + async def find_users_by_role(self, role: str) -> list[User]: + """Find all users with a specific role.""" + async with self.session() as session: + stmt = select(UserModel).where(UserModel.role == role) + result = await session.execute(stmt) + user_models = result.scalars().all() + + users = [] + for user_model in user_models: + user = User( + uuid=UUID(bytes=user_model.uuid), + display_name=user_model.display_name, + org_uuid=UUID(bytes=user_model.org_uuid) + if user_model.org_uuid + else None, + role=user_model.role, + created_at=user_model.created_at, + last_seen=user_model.last_seen, + visits=user_model.visits, + ) + users.append(user) + + return users diff --git a/scripts/bootstrap.py b/scripts/bootstrap.py deleted file mode 100644 index fddb975..0000000 --- a/scripts/bootstrap.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python3 -""" -Bootstrap CLI script for passkey authentication system. - -This script initializes a new passkey authentication system with: -- Default admin user -- Default organization -- Admin permissions -- Reset token for initial setup -""" - -import asyncio -import sys - - -async def main(): - """Main CLI entry point.""" - from passkey.bootstrap import main as bootstrap_main - from passkey.db.sql import init - - print("Initializing passkey authentication database...") - await init() - - print("\nRunning bootstrap process...") - await bootstrap_main() - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\nāŒ Bootstrap interrupted by user") - sys.exit(1) - except Exception as e: - print(f"\nāŒ Bootstrap failed: {e}") - sys.exit(1)