Compare commits

...

2 Commits

Author SHA1 Message Date
Leo Vasanko
2e3ce32779 Bootstrapping cleanup, avoid double operations. 2025-08-07 00:45:12 -06:00
Leo Vasanko
07f5cf84fc Bootstrap cleanup 2025-08-06 23:56:13 -06:00
5 changed files with 70 additions and 87 deletions

View File

@ -1,4 +1,3 @@
from .bootstrap import bootstrap_if_needed, force_bootstrap
from .sansio import Passkey from .sansio import Passkey
__all__ = ["Passkey", "bootstrap_if_needed", "force_bootstrap"] __all__ = ["Passkey"]

View File

@ -6,6 +6,7 @@ including creating default admin user, organization, permissions, and
generating a reset link for initial admin setup. generating a reset link for initial admin setup.
""" """
import asyncio
import logging import logging
from datetime import datetime from datetime import datetime
@ -39,34 +40,39 @@ async def _create_and_log_admin_reset_link(user_uuid, message, session_type) ->
return reset_link return reset_link
async def bootstrap_system() -> dict: async def bootstrap_system(
user_name: str | None = None, org_name: str | None = None
) -> dict:
""" """
Bootstrap the entire system with default data. Bootstrap the entire system with default data.
Args:
user_name: Display name for the admin user (default: "Admin")
org_name: Display name for the organization (default: "Organization")
Returns: Returns:
dict: Contains information about created entities and reset link dict: Contains information about created entities and reset link
""" """
admin_uuid = uuid7.create() # Create permission first - will fail if already exists
org_uuid = uuid7.create() permission = Permission(id="auth/admin", display_name="Admin")
await globals.db.instance.create_permission(permission)
# Create organization # Create organization
org_uuid = uuid7.create()
org = Org( org = Org(
id=str(org_uuid), id=str(org_uuid),
options={ options={
"display_name": "Organization", "display_name": org_name or "Organization",
"created_at": datetime.now().isoformat(), "created_at": datetime.now().isoformat(),
}, },
) )
await globals.db.instance.create_organization(org) await globals.db.instance.create_organization(org)
# Create permission
permission = Permission(id="auth/admin", display_name="Admin")
await globals.db.instance.create_permission(permission)
# Create admin user # Create admin user
admin_uuid = uuid7.create()
user = User( user = User(
uuid=admin_uuid, uuid=admin_uuid,
display_name="Admin", display_name=user_name or "Admin",
org_uuid=org_uuid, org_uuid=org_uuid,
role="Admin", role="Admin",
created_at=datetime.now(), created_at=datetime.now(),
@ -113,8 +119,19 @@ async def check_admin_credentials() -> bool:
bool: True if a reset link was created, False if admin already has credentials bool: True if a reset link was created, False if admin already has credentials
""" """
try: try:
# Find admin users # Get permission organizations to find admin users
admin_users = await globals.db.instance.find_users_by_role("Admin") permission_orgs = await globals.db.instance.get_permission_organizations(
"auth/admin"
)
if not permission_orgs:
return False
# Get users from the first organization with admin permission
org_users = await globals.db.instance.get_organization_users(
permission_orgs[0].id
)
admin_users = [user for user, role in org_users if role == "Admin"]
if not admin_users: if not admin_users:
return False return False
@ -140,64 +157,69 @@ async def check_admin_credentials() -> bool:
return False return False
async def bootstrap_if_needed() -> bool: async def bootstrap_if_needed(
default_admin: str | None = None, default_org: str | None = None
) -> bool:
""" """
Check if system needs bootstrapping and perform it if necessary. Check if system needs bootstrapping and perform it if necessary.
Args:
default_admin: Display name for the admin user
default_org: Display name for the organization
Returns: Returns:
bool: True if bootstrapping was performed, False if system was already set up bool: True if bootstrapping was performed, False if system was already set up
""" """
try: try:
# Check if any users exist # Check if the admin permission exists - if it does, system is already bootstrapped
if await globals.db.instance.has_any_users(): await globals.db.instance.get_permission("auth/admin")
await check_admin_credentials() # Permission exists, system is already bootstrapped
# Check if admin needs credentials (only for already-bootstrapped systems)
admin_needs_reset = await check_admin_credentials()
if not admin_needs_reset:
# Use the same format as the reset link messages
logger.info(
ADMIN_RESET_MESSAGE,
" System already bootstrapped - no action needed",
"Admin user already has credentials",
)
return False return False
except Exception: except Exception:
# Permission doesn't exist, need to bootstrap
pass pass
# No users found, need to bootstrap # No admin permission found, need to bootstrap
await bootstrap_system() # Bootstrap creates the admin user AND the reset link, so no need to check credentials after
await bootstrap_system(default_admin, default_org)
return True return True
async def force_bootstrap() -> dict:
"""
Force bootstrap the system (useful for testing or resetting).
Returns:
dict: Bootstrap result information
"""
return await bootstrap_system()
# CLI interface # CLI interface
async def main(): async def main():
"""Main CLI entry point for bootstrapping.""" """Main CLI entry point for bootstrapping."""
import argparse import argparse
from .db.sql import init # Configure logging for CLI usage
logging.basicConfig(level=logging.INFO, format="%(message)s", force=True)
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Bootstrap passkey authentication system" description="Bootstrap passkey authentication system"
) )
parser.add_argument( parser.add_argument(
"--force", "--user-name",
action="store_true", default=None,
help="Force bootstrap even if system is already set up", help="Name for the admin user (default: Admin)",
)
parser.add_argument(
"--org-name",
default=None,
help="Name for the organization (default: Organization)",
) )
args = parser.parse_args() args = parser.parse_args()
# Initialize database await globals.init(default_admin=args.user_name, default_org=args.org_name)
await init()
if args.force:
await force_bootstrap()
else:
await bootstrap_if_needed()
if __name__ == "__main__": if __name__ == "__main__":
import asyncio
asyncio.run(main()) asyncio.run(main())

View File

@ -231,15 +231,6 @@ class DatabaseInterface(ABC):
) -> None: ) -> None:
"""Create a new user and their first credential in a transaction.""" """Create a new user and their first credential in a transaction."""
# Bootstrap helpers
@abstractmethod
async def has_any_users(self) -> bool:
"""Check if any users exist in the system."""
@abstractmethod
async def find_users_by_role(self, role: str) -> list[User]:
"""Find all users with a specific role."""
__all__ = [ __all__ = [
"User", "User",

View File

@ -643,36 +643,3 @@ class DB(DatabaseInterface):
current_time = datetime.now() current_time = datetime.now()
stmt = delete(SessionModel).where(SessionModel.expires < current_time) stmt = delete(SessionModel).where(SessionModel.expires < current_time)
await session.execute(stmt) await session.execute(stmt)
# Bootstrap helpers
async def has_any_users(self) -> bool:
"""Check if any users exist in the system."""
async with self.session() as session:
stmt = select(UserModel).limit(1)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
return user is not None
async def find_users_by_role(self, role: str) -> list[User]:
"""Find all users with a specific role."""
async with self.session() as session:
stmt = select(UserModel).where(UserModel.role == role)
result = await session.execute(stmt)
user_models = result.scalars().all()
users = []
for user_model in user_models:
user = User(
uuid=UUID(bytes=user_model.uuid),
display_name=user_model.display_name,
org_uuid=UUID(bytes=user_model.org_uuid)
if user_model.org_uuid
else None,
role=user_model.role,
created_at=user_model.created_at,
last_seen=user_model.last_seen,
visits=user_model.visits,
)
users.append(user)
return users

View File

@ -27,7 +27,11 @@ class Manager(Generic[T]):
async def init( async def init(
rp_id: str = "localhost", rp_name: str | None = None, origin: str | None = None rp_id: str = "localhost",
rp_name: str | None = None,
origin: str | None = None,
default_admin: str | None = None,
default_org: str | None = None,
) -> None: ) -> None:
"""Initialize the global database, passkey instance, and bootstrap the system if needed.""" """Initialize the global database, passkey instance, and bootstrap the system if needed."""
# Initialize passkey instance with provided parameters # Initialize passkey instance with provided parameters
@ -48,7 +52,7 @@ async def init(
# Bootstrap system if needed # Bootstrap system if needed
from .bootstrap import bootstrap_if_needed from .bootstrap import bootstrap_if_needed
await bootstrap_if_needed() await bootstrap_if_needed(default_admin, default_org)
# Global instances # Global instances