""" Bootstrap module for passkey authentication system. This module handles initial system setup when a new database is created, including creating default admin user, organization, permissions, and generating a reset link for initial admin setup. """ import asyncio import logging from datetime import datetime import uuid7 from . import authsession, globals from .db import Org, Permission, Role, User from .util import passphrase, tokens logger = logging.getLogger(__name__) # Shared log message template for admin reset links ADMIN_RESET_MESSAGE = """\ %s 👤 Admin %s - Use this link to register a Passkey for the admin user! """ async def _create_and_log_admin_reset_link(user_uuid, message, session_type) -> str: """Create an admin reset link and log it with the provided message.""" token = passphrase.generate() await globals.db.instance.create_session( user_uuid=user_uuid, key=tokens.reset_key(token), expires=authsession.expires(), info={"type": session_type}, ) reset_link = f"{globals.passkey.instance.origin}/auth/{token}" logger.info(ADMIN_RESET_MESSAGE, message, reset_link) return reset_link async def bootstrap_system( user_name: str | None = None, org_name: str | None = None ) -> dict: """ Bootstrap the entire system with default data. Args: user_name: Display name for the admin user (default: "Admin") org_name: Display name for the organization (default: "Organization") Returns: dict: Contains information about created entities and reset link """ # Create permission first - will fail if already exists perm0 = Permission(id="auth/admin", display_name="Master Admin") await globals.db.instance.create_permission(perm0) org = Org(uuid7.create(), org_name or "Organization") await globals.db.instance.create_organization(org) perm1 = Permission( id=f"auth/org:{org.uuid}", display_name=f"{org.display_name} Admin" ) await globals.db.instance.create_permission(perm1) # Allow this org to grant admin permissions await globals.db.instance.add_permission_to_organization(str(org.uuid), perm0.id) await globals.db.instance.add_permission_to_organization(str(org.uuid), perm1.id) # Create an Administration role granting both org and global admin role = Role(uuid7.create(), org.uuid, "Administration", permissions=[perm0.id, perm1.id]) await globals.db.instance.create_role(role) user = User( uuid=uuid7.create(), display_name=user_name or "Admin", role_uuid=role.uuid, created_at=datetime.now(), visits=0, ) await globals.db.instance.create_user(user) # Generate reset link and log it reset_link = await _create_and_log_admin_reset_link( user.uuid, "✅ Bootstrap completed!", "admin bootstrap" ) return { "user": user, "org": org, "role": role, "permissions": [perm0, perm1], "reset_link": reset_link, } async def check_admin_credentials() -> bool: """ Check if the admin user needs credentials and create a reset link if needed. Returns: bool: True if a reset link was created, False if admin already has credentials """ try: # Get permission organizations to find admin users permission_orgs = await globals.db.instance.get_permission_organizations( "auth/admin" ) if not permission_orgs: return False # Get users from the first organization with admin permission org_users = await globals.db.instance.get_organization_users( str(permission_orgs[0].uuid) ) admin_users = [user for user, role in org_users if role == "Administration"] if not admin_users: return False # Check first admin user for credentials admin_user = admin_users[0] credentials = await globals.db.instance.get_credentials_by_user_uuid( admin_user.uuid ) if not credentials: # Admin exists but has no credentials, create reset link await _create_and_log_admin_reset_link( admin_user.uuid, "⚠️ Admin user has no credentials!", "admin registration", ) return True return False except Exception: return False async def bootstrap_if_needed( default_admin: str | None = None, default_org: str | None = None ) -> bool: """ Check if system needs bootstrapping and perform it if necessary. Args: default_admin: Display name for the admin user default_org: Display name for the organization Returns: bool: True if bootstrapping was performed, False if system was already set up """ try: # Check if the admin permission exists - if it does, system is already bootstrapped await globals.db.instance.get_permission("auth/admin") # Permission exists, system is already bootstrapped # Check if admin needs credentials (only for already-bootstrapped systems) await check_admin_credentials() return False except Exception: # Permission doesn't exist, need to bootstrap pass # No admin permission found, need to bootstrap # Bootstrap creates the admin user AND the reset link, so no need to check credentials after await bootstrap_system(default_admin, default_org) return True # CLI interface async def main(): """Main CLI entry point for bootstrapping.""" import argparse # Configure logging for CLI usage logging.basicConfig(level=logging.INFO, format="%(message)s", force=True) parser = argparse.ArgumentParser( description="Bootstrap passkey authentication system" ) parser.add_argument( "--user-name", default=None, help="Name for the admin user (default: Admin)", ) parser.add_argument( "--org-name", default=None, help="Name for the organization (default: Organization)", ) args = parser.parse_args() await globals.init(default_admin=args.user_name, default_org=args.org_name) if __name__ == "__main__": asyncio.run(main())