Bootstrap cleanup
This commit is contained in:
parent
f050dfb3f2
commit
07f5cf84fc
@ -1,4 +1,3 @@
|
|||||||
from .bootstrap import bootstrap_if_needed, force_bootstrap
|
|
||||||
from .sansio import Passkey
|
from .sansio import Passkey
|
||||||
|
|
||||||
__all__ = ["Passkey", "bootstrap_if_needed", "force_bootstrap"]
|
__all__ = ["Passkey"]
|
||||||
|
@ -46,10 +46,12 @@ async def bootstrap_system() -> dict:
|
|||||||
Returns:
|
Returns:
|
||||||
dict: Contains information about created entities and reset link
|
dict: Contains information about created entities and reset link
|
||||||
"""
|
"""
|
||||||
admin_uuid = uuid7.create()
|
# Create permission first - will fail if already exists
|
||||||
org_uuid = uuid7.create()
|
permission = Permission(id="auth/admin", display_name="Admin")
|
||||||
|
await globals.db.instance.create_permission(permission)
|
||||||
|
|
||||||
# Create organization
|
# Create organization
|
||||||
|
org_uuid = uuid7.create()
|
||||||
org = Org(
|
org = Org(
|
||||||
id=str(org_uuid),
|
id=str(org_uuid),
|
||||||
options={
|
options={
|
||||||
@ -59,11 +61,8 @@ async def bootstrap_system() -> dict:
|
|||||||
)
|
)
|
||||||
await globals.db.instance.create_organization(org)
|
await globals.db.instance.create_organization(org)
|
||||||
|
|
||||||
# Create permission
|
|
||||||
permission = Permission(id="auth/admin", display_name="Admin")
|
|
||||||
await globals.db.instance.create_permission(permission)
|
|
||||||
|
|
||||||
# Create admin user
|
# Create admin user
|
||||||
|
admin_uuid = uuid7.create()
|
||||||
user = User(
|
user = User(
|
||||||
uuid=admin_uuid,
|
uuid=admin_uuid,
|
||||||
display_name="Admin",
|
display_name="Admin",
|
||||||
@ -113,8 +112,19 @@ async def check_admin_credentials() -> bool:
|
|||||||
bool: True if a reset link was created, False if admin already has credentials
|
bool: True if a reset link was created, False if admin already has credentials
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Find admin users
|
# Get permission organizations to find admin users
|
||||||
admin_users = await globals.db.instance.find_users_by_role("Admin")
|
permission_orgs = await globals.db.instance.get_permission_organizations(
|
||||||
|
"auth/admin"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not permission_orgs:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Get users from the first organization with admin permission
|
||||||
|
org_users = await globals.db.instance.get_organization_users(
|
||||||
|
permission_orgs[0].id
|
||||||
|
)
|
||||||
|
admin_users = [user for user, role in org_users if role == "Admin"]
|
||||||
|
|
||||||
if not admin_users:
|
if not admin_users:
|
||||||
return False
|
return False
|
||||||
@ -148,53 +158,25 @@ async def bootstrap_if_needed() -> bool:
|
|||||||
bool: True if bootstrapping was performed, False if system was already set up
|
bool: True if bootstrapping was performed, False if system was already set up
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Check if any users exist
|
# Check if the admin permission exists - if it does, system is already bootstrapped
|
||||||
if await globals.db.instance.has_any_users():
|
await globals.db.instance.get_permission("auth/admin")
|
||||||
await check_admin_credentials()
|
# Permission exists, check if admin needs credentials
|
||||||
return False
|
await check_admin_credentials()
|
||||||
|
return False
|
||||||
except Exception:
|
except Exception:
|
||||||
|
# Permission doesn't exist, need to bootstrap
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# No users found, need to bootstrap
|
# No admin permission found, need to bootstrap
|
||||||
await bootstrap_system()
|
await bootstrap_system()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def force_bootstrap() -> dict:
|
|
||||||
"""
|
|
||||||
Force bootstrap the system (useful for testing or resetting).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Bootstrap result information
|
|
||||||
"""
|
|
||||||
return await bootstrap_system()
|
|
||||||
|
|
||||||
|
|
||||||
# CLI interface
|
# CLI interface
|
||||||
async def main():
|
async def main():
|
||||||
"""Main CLI entry point for bootstrapping."""
|
"""Main CLI entry point for bootstrapping."""
|
||||||
import argparse
|
await globals.init()
|
||||||
|
await bootstrap_if_needed()
|
||||||
from .db.sql import init
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
description="Bootstrap passkey authentication system"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--force",
|
|
||||||
action="store_true",
|
|
||||||
help="Force bootstrap even if system is already set up",
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
# Initialize database
|
|
||||||
await init()
|
|
||||||
|
|
||||||
if args.force:
|
|
||||||
await force_bootstrap()
|
|
||||||
else:
|
|
||||||
await bootstrap_if_needed()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -231,15 +231,6 @@ class DatabaseInterface(ABC):
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Create a new user and their first credential in a transaction."""
|
"""Create a new user and their first credential in a transaction."""
|
||||||
|
|
||||||
# Bootstrap helpers
|
|
||||||
@abstractmethod
|
|
||||||
async def has_any_users(self) -> bool:
|
|
||||||
"""Check if any users exist in the system."""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def find_users_by_role(self, role: str) -> list[User]:
|
|
||||||
"""Find all users with a specific role."""
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"User",
|
"User",
|
||||||
|
@ -643,36 +643,3 @@ class DB(DatabaseInterface):
|
|||||||
current_time = datetime.now()
|
current_time = datetime.now()
|
||||||
stmt = delete(SessionModel).where(SessionModel.expires < current_time)
|
stmt = delete(SessionModel).where(SessionModel.expires < current_time)
|
||||||
await session.execute(stmt)
|
await session.execute(stmt)
|
||||||
|
|
||||||
# Bootstrap helpers
|
|
||||||
async def has_any_users(self) -> bool:
|
|
||||||
"""Check if any users exist in the system."""
|
|
||||||
async with self.session() as session:
|
|
||||||
stmt = select(UserModel).limit(1)
|
|
||||||
result = await session.execute(stmt)
|
|
||||||
user = result.scalar_one_or_none()
|
|
||||||
return user is not None
|
|
||||||
|
|
||||||
async def find_users_by_role(self, role: str) -> list[User]:
|
|
||||||
"""Find all users with a specific role."""
|
|
||||||
async with self.session() as session:
|
|
||||||
stmt = select(UserModel).where(UserModel.role == role)
|
|
||||||
result = await session.execute(stmt)
|
|
||||||
user_models = result.scalars().all()
|
|
||||||
|
|
||||||
users = []
|
|
||||||
for user_model in user_models:
|
|
||||||
user = User(
|
|
||||||
uuid=UUID(bytes=user_model.uuid),
|
|
||||||
display_name=user_model.display_name,
|
|
||||||
org_uuid=UUID(bytes=user_model.org_uuid)
|
|
||||||
if user_model.org_uuid
|
|
||||||
else None,
|
|
||||||
role=user_model.role,
|
|
||||||
created_at=user_model.created_at,
|
|
||||||
last_seen=user_model.last_seen,
|
|
||||||
visits=user_model.visits,
|
|
||||||
)
|
|
||||||
users.append(user)
|
|
||||||
|
|
||||||
return users
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user