Compare commits
2 Commits
f96668b135
...
dcca3e3fbd
Author | SHA1 | Date | |
---|---|---|---|
![]() |
dcca3e3fbd | ||
![]() |
5a129220aa |
@ -1,3 +1,4 @@
|
|||||||
|
from .bootstrap import bootstrap_if_needed, force_bootstrap
|
||||||
from .sansio import Passkey
|
from .sansio import Passkey
|
||||||
|
|
||||||
__all__ = ["Passkey"]
|
__all__ = ["Passkey", "bootstrap_if_needed", "force_bootstrap"]
|
||||||
|
@ -11,7 +11,8 @@ independent of any web framework:
|
|||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from .db import Session, db
|
from .db import Session
|
||||||
|
from .globals import db
|
||||||
from .util.tokens import create_token, reset_key, session_key
|
from .util.tokens import create_token, reset_key, session_key
|
||||||
|
|
||||||
EXPIRES = timedelta(hours=24)
|
EXPIRES = timedelta(hours=24)
|
||||||
|
219
passkey/bootstrap.py
Normal file
219
passkey/bootstrap.py
Normal file
@ -0,0 +1,219 @@
|
|||||||
|
"""
|
||||||
|
Bootstrap module for passkey authentication system.
|
||||||
|
|
||||||
|
This module handles initial system setup when a new database is created,
|
||||||
|
including creating default admin user, organization, permissions, and
|
||||||
|
generating a reset link for initial admin setup.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import uuid7
|
||||||
|
|
||||||
|
from .authsession import expires
|
||||||
|
from .db import Org, Permission, User
|
||||||
|
from .globals import db
|
||||||
|
from .util import passphrase, tokens
|
||||||
|
|
||||||
|
|
||||||
|
class BootstrapManager:
|
||||||
|
"""Manages system bootstrapping operations."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.admin_uuid = uuid7.create()
|
||||||
|
self.org_uuid = uuid7.create()
|
||||||
|
|
||||||
|
async def create_default_organization(self) -> Org:
|
||||||
|
"""Create the default organization."""
|
||||||
|
org = Org(
|
||||||
|
id=str(self.org_uuid), # Use UUID string as required by database
|
||||||
|
options={
|
||||||
|
"display_name": "Organization",
|
||||||
|
"description": "Default organization for passkey authentication system",
|
||||||
|
"created_at": datetime.now().isoformat(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await db.instance.create_organization(org)
|
||||||
|
return org
|
||||||
|
|
||||||
|
async def create_admin_permission(self) -> Permission:
|
||||||
|
"""Create the admin permission."""
|
||||||
|
permission = Permission(
|
||||||
|
id="auth/admin", display_name="Authentication Administration"
|
||||||
|
)
|
||||||
|
await db.instance.create_permission(permission)
|
||||||
|
return permission
|
||||||
|
|
||||||
|
async def create_default_admin_user(self) -> User:
|
||||||
|
"""Create the default admin user."""
|
||||||
|
user = User(
|
||||||
|
uuid=self.admin_uuid,
|
||||||
|
display_name="Admin",
|
||||||
|
org_uuid=self.org_uuid,
|
||||||
|
role="Admin",
|
||||||
|
created_at=datetime.now(),
|
||||||
|
visits=0,
|
||||||
|
)
|
||||||
|
await db.instance.create_user(user)
|
||||||
|
return user
|
||||||
|
|
||||||
|
async def assign_permissions_to_organization(
|
||||||
|
self, org_id: str, permission_id: str
|
||||||
|
) -> None:
|
||||||
|
"""Assign permission to organization."""
|
||||||
|
await db.instance.add_permission_to_organization(org_id, permission_id)
|
||||||
|
|
||||||
|
async def generate_admin_reset_link(self) -> str:
|
||||||
|
"""Generate a reset link for the admin user to register their first passkey."""
|
||||||
|
# Generate a human-readable passphrase token
|
||||||
|
token = passphrase.generate() # e.g., "cross.rotate.yin.note.evoke"
|
||||||
|
|
||||||
|
# Create a reset session for the admin user
|
||||||
|
await db.instance.create_session(
|
||||||
|
user_uuid=self.admin_uuid,
|
||||||
|
key=tokens.reset_key(token),
|
||||||
|
expires=expires(),
|
||||||
|
info={
|
||||||
|
"type": "bootstrap_reset",
|
||||||
|
"description": "Initial admin setup",
|
||||||
|
"created_at": datetime.now().isoformat(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return token
|
||||||
|
|
||||||
|
async def bootstrap_system(self) -> dict:
|
||||||
|
"""
|
||||||
|
Bootstrap the entire system with default data.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: Contains information about created entities and reset link
|
||||||
|
"""
|
||||||
|
print("🚀 Bootstrapping passkey authentication system...")
|
||||||
|
|
||||||
|
# Create default organization
|
||||||
|
print("📂 Creating default organization...")
|
||||||
|
org = await self.create_default_organization()
|
||||||
|
|
||||||
|
# Create admin permission
|
||||||
|
print("🔐 Creating admin permission...")
|
||||||
|
permission = await self.create_admin_permission()
|
||||||
|
|
||||||
|
# Create admin user
|
||||||
|
print("👤 Creating admin user...")
|
||||||
|
user = await self.create_default_admin_user()
|
||||||
|
|
||||||
|
# Assign admin to organization
|
||||||
|
print("🏢 Assigning admin to organization...")
|
||||||
|
await db.instance.add_user_to_organization(
|
||||||
|
user_uuid=self.admin_uuid, org_id=org.id, role="Admin"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assign permission to organization
|
||||||
|
print("⚡ Assigning permissions to organization...")
|
||||||
|
await self.assign_permissions_to_organization(org.id, permission.id)
|
||||||
|
|
||||||
|
# Generate reset link for admin
|
||||||
|
print("🔗 Generating admin setup link...")
|
||||||
|
reset_token = await self.generate_admin_reset_link()
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"admin_user": {
|
||||||
|
"uuid": str(user.uuid),
|
||||||
|
"display_name": user.display_name,
|
||||||
|
"role": user.role,
|
||||||
|
},
|
||||||
|
"organization": {
|
||||||
|
"id": org.id,
|
||||||
|
"display_name": org.options.get("display_name"),
|
||||||
|
},
|
||||||
|
"permission": {
|
||||||
|
"id": permission.id,
|
||||||
|
"display_name": permission.display_name,
|
||||||
|
},
|
||||||
|
"reset_token": reset_token,
|
||||||
|
}
|
||||||
|
|
||||||
|
print("\n✅ Bootstrap completed successfully!")
|
||||||
|
print(f"\n🔑 Admin Reset Token: {reset_token}")
|
||||||
|
print("\n📋 Use this token to set up the admin user's first passkey.")
|
||||||
|
print(" The token will be valid for 24 hours.")
|
||||||
|
print(f"\n👤 Admin User UUID: {user.uuid}")
|
||||||
|
print(f"🏢 Organization: {org.options.get('display_name')} (ID: {org.id})")
|
||||||
|
print(f"🔐 Permission: {permission.display_name} (ID: {permission.id})")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def bootstrap_if_needed() -> bool:
|
||||||
|
"""
|
||||||
|
Check if system needs bootstrapping and perform it if necessary.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if bootstrapping was performed, False if system was already set up
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Try to get any organization to see if system is already bootstrapped
|
||||||
|
# We'll use a more robust check by looking for existing users
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
from .db.sql import DB, UserModel
|
||||||
|
|
||||||
|
async with DB().session() as session:
|
||||||
|
stmt = select(UserModel).limit(1)
|
||||||
|
result = await session.execute(stmt)
|
||||||
|
user = result.scalar_one_or_none()
|
||||||
|
if user:
|
||||||
|
print("ℹ️ System already bootstrapped (found existing users).")
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# No users found, need to bootstrap
|
||||||
|
manager = BootstrapManager()
|
||||||
|
await manager.bootstrap_system()
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def force_bootstrap() -> dict:
|
||||||
|
"""
|
||||||
|
Force bootstrap the system (useful for testing or resetting).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: Bootstrap result information
|
||||||
|
"""
|
||||||
|
manager = BootstrapManager()
|
||||||
|
return await manager.bootstrap_system()
|
||||||
|
|
||||||
|
|
||||||
|
# CLI interface
|
||||||
|
async def main():
|
||||||
|
"""Main CLI entry point for bootstrapping."""
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
from .db.sql import init
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Bootstrap passkey authentication system"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--force",
|
||||||
|
action="store_true",
|
||||||
|
help="Force bootstrap even if system is already set up",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Initialize database
|
||||||
|
await init()
|
||||||
|
|
||||||
|
if args.force:
|
||||||
|
await force_bootstrap()
|
||||||
|
else:
|
||||||
|
await bootstrap_if_needed()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
asyncio.run(main())
|
@ -232,28 +232,6 @@ class DatabaseInterface(ABC):
|
|||||||
"""Create a new user and their first credential in a transaction."""
|
"""Create a new user and their first credential in a transaction."""
|
||||||
|
|
||||||
|
|
||||||
class DatabaseManager:
|
|
||||||
"""Manager for the global database instance."""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self._instance: DatabaseInterface | None = None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def instance(self) -> DatabaseInterface:
|
|
||||||
if self._instance is None:
|
|
||||||
raise RuntimeError(
|
|
||||||
"Database not initialized. Call e.g. db.sql.init() first."
|
|
||||||
)
|
|
||||||
return self._instance
|
|
||||||
|
|
||||||
@instance.setter
|
|
||||||
def instance(self, instance: DatabaseInterface) -> None:
|
|
||||||
self._instance = instance
|
|
||||||
|
|
||||||
|
|
||||||
db = DatabaseManager()
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"User",
|
"User",
|
||||||
"Credential",
|
"Credential",
|
||||||
@ -261,5 +239,4 @@ __all__ = [
|
|||||||
"Org",
|
"Org",
|
||||||
"Permission",
|
"Permission",
|
||||||
"DatabaseInterface",
|
"DatabaseInterface",
|
||||||
"db",
|
|
||||||
]
|
]
|
||||||
|
@ -23,7 +23,8 @@ from sqlalchemy.dialects.sqlite import BLOB, JSON
|
|||||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
|
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
|
||||||
|
|
||||||
from . import Credential, DatabaseInterface, Org, Permission, Session, User, db
|
from ..globals import db
|
||||||
|
from . import Credential, DatabaseInterface, Org, Permission, Session, User
|
||||||
|
|
||||||
DB_PATH = "sqlite+aiosqlite:///passkey-auth.sqlite"
|
DB_PATH = "sqlite+aiosqlite:///passkey-auth.sqlite"
|
||||||
|
|
||||||
|
@ -1,9 +1,13 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
# Configure logging to remove the "ERROR:root:" prefix
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(message)s", force=True)
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="Run the passkey authentication server"
|
description="Run the passkey authentication server"
|
||||||
)
|
)
|
||||||
@ -16,9 +20,25 @@ def main():
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--dev", action="store_true", help="Enable development mode with auto-reload"
|
"--dev", action="store_true", help="Enable development mode with auto-reload"
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--rp-id", default="localhost", help="Relying Party ID (default: localhost)"
|
||||||
|
)
|
||||||
|
parser.add_argument("--rp-name", help="Relying Party name (default: same as rp-id)")
|
||||||
|
parser.add_argument("--origin", help="Origin URL (default: https://<rp-id>)")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Initialize the application
|
||||||
|
try:
|
||||||
|
from .. import globals
|
||||||
|
|
||||||
|
asyncio.run(
|
||||||
|
globals.init(rp_id=args.rp_id, rp_name=args.rp_name, origin=args.origin)
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
logging.error(f"⚠️ {e}")
|
||||||
|
return
|
||||||
|
|
||||||
uvicorn.run(
|
uvicorn.run(
|
||||||
"passkey.fastapi:app",
|
"passkey.fastapi:app",
|
||||||
host=args.host,
|
host=args.host,
|
||||||
|
@ -17,7 +17,7 @@ from passkey.util import passphrase
|
|||||||
|
|
||||||
from .. import aaguid
|
from .. import aaguid
|
||||||
from ..authsession import delete_credential, get_reset, get_session
|
from ..authsession import delete_credential, get_reset, get_session
|
||||||
from ..db import db
|
from ..globals import db
|
||||||
from ..util.tokens import session_key
|
from ..util.tokens import session_key
|
||||||
from . import session
|
from . import session
|
||||||
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from fastapi import Cookie, FastAPI, Request, Response
|
from fastapi import Cookie, FastAPI, Request, Response
|
||||||
@ -8,7 +7,6 @@ from fastapi.responses import FileResponse, JSONResponse
|
|||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
|
|
||||||
from ..authsession import get_session
|
from ..authsession import get_session
|
||||||
from ..db import db
|
|
||||||
from . import ws
|
from . import ws
|
||||||
from .api import register_api_routes
|
from .api import register_api_routes
|
||||||
from .reset import register_reset_routes
|
from .reset import register_reset_routes
|
||||||
@ -16,20 +14,7 @@ from .reset import register_reset_routes
|
|||||||
STATIC_DIR = Path(__file__).parent.parent / "frontend-build"
|
STATIC_DIR = Path(__file__).parent.parent / "frontend-build"
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
app = FastAPI()
|
||||||
async def lifespan(app: FastAPI):
|
|
||||||
# Test if we have a database already initialized, otherwise use SQL
|
|
||||||
try:
|
|
||||||
db.instance
|
|
||||||
except RuntimeError:
|
|
||||||
from ..db import sql
|
|
||||||
|
|
||||||
await sql.init()
|
|
||||||
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI(lifespan=lifespan)
|
|
||||||
|
|
||||||
|
|
||||||
# Global exception handlers
|
# Global exception handlers
|
||||||
|
@ -4,7 +4,7 @@ from fastapi import Cookie, HTTPException, Request, Response
|
|||||||
from fastapi.responses import RedirectResponse
|
from fastapi.responses import RedirectResponse
|
||||||
|
|
||||||
from ..authsession import expires, get_session
|
from ..authsession import expires, get_session
|
||||||
from ..db import db
|
from ..globals import db
|
||||||
from ..util import passphrase, tokens
|
from ..util import passphrase, tokens
|
||||||
from . import session
|
from . import session
|
||||||
|
|
||||||
|
@ -8,8 +8,8 @@ from fastapi import Cookie, FastAPI, Query, WebSocket, WebSocketDisconnect
|
|||||||
from webauthn.helpers.exceptions import InvalidAuthenticationResponse
|
from webauthn.helpers.exceptions import InvalidAuthenticationResponse
|
||||||
|
|
||||||
from ..authsession import EXPIRES, create_session, get_reset, get_session
|
from ..authsession import EXPIRES, create_session, get_reset, get_session
|
||||||
from ..db import User, db
|
from ..db import User
|
||||||
from ..sansio import Passkey
|
from ..globals import db, passkey
|
||||||
from ..util import passphrase
|
from ..util import passphrase
|
||||||
from ..util.tokens import create_token, session_key
|
from ..util.tokens import create_token, session_key
|
||||||
from .session import infodict
|
from .session import infodict
|
||||||
@ -36,12 +36,6 @@ def websocket_error_handler(func):
|
|||||||
# Create a FastAPI subapp for WebSocket endpoints
|
# Create a FastAPI subapp for WebSocket endpoints
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
# Initialize the passkey instance
|
|
||||||
passkey = Passkey(
|
|
||||||
rp_id="localhost",
|
|
||||||
rp_name="Passkey Auth",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def register_chat(
|
async def register_chat(
|
||||||
ws: WebSocket,
|
ws: WebSocket,
|
||||||
@ -51,7 +45,7 @@ async def register_chat(
|
|||||||
origin: str | None = None,
|
origin: str | None = None,
|
||||||
):
|
):
|
||||||
"""Generate registration options and send them to the client."""
|
"""Generate registration options and send them to the client."""
|
||||||
options, challenge = passkey.reg_generate_options(
|
options, challenge = passkey.instance.reg_generate_options(
|
||||||
user_id=user_uuid,
|
user_id=user_uuid,
|
||||||
user_name=user_name,
|
user_name=user_name,
|
||||||
credential_ids=credential_ids,
|
credential_ids=credential_ids,
|
||||||
@ -59,7 +53,7 @@ async def register_chat(
|
|||||||
)
|
)
|
||||||
await ws.send_json(options)
|
await ws.send_json(options)
|
||||||
response = await ws.receive_json()
|
response = await ws.receive_json()
|
||||||
return passkey.reg_verify(response, challenge, user_uuid, origin=origin)
|
return passkey.instance.reg_verify(response, challenge, user_uuid, origin=origin)
|
||||||
|
|
||||||
|
|
||||||
@app.websocket("/register")
|
@app.websocket("/register")
|
||||||
@ -139,14 +133,14 @@ async def websocket_register_add(ws: WebSocket, auth=Cookie(None)):
|
|||||||
@websocket_error_handler
|
@websocket_error_handler
|
||||||
async def websocket_authenticate(ws: WebSocket):
|
async def websocket_authenticate(ws: WebSocket):
|
||||||
origin = ws.headers["origin"]
|
origin = ws.headers["origin"]
|
||||||
options, challenge = passkey.auth_generate_options()
|
options, challenge = passkey.instance.auth_generate_options()
|
||||||
await ws.send_json(options)
|
await ws.send_json(options)
|
||||||
# Wait for the client to use his authenticator to authenticate
|
# Wait for the client to use his authenticator to authenticate
|
||||||
credential = passkey.auth_parse(await ws.receive_json())
|
credential = passkey.instance.auth_parse(await ws.receive_json())
|
||||||
# Fetch from the database by credential ID
|
# Fetch from the database by credential ID
|
||||||
stored_cred = await db.instance.get_credential_by_id(credential.raw_id)
|
stored_cred = await db.instance.get_credential_by_id(credential.raw_id)
|
||||||
# Verify the credential matches the stored data
|
# Verify the credential matches the stored data
|
||||||
passkey.auth_verify(credential, challenge, stored_cred, origin=origin)
|
passkey.instance.auth_verify(credential, challenge, stored_cred, origin=origin)
|
||||||
# Update both credential and user's last_seen timestamp
|
# Update both credential and user's last_seen timestamp
|
||||||
await db.instance.login(stored_cred.user_uuid, stored_cred)
|
await db.instance.login(stored_cred.user_uuid, stored_cred)
|
||||||
|
|
||||||
|
56
passkey/globals.py
Normal file
56
passkey/globals.py
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
from typing import Generic, TypeVar
|
||||||
|
|
||||||
|
from .db import DatabaseInterface
|
||||||
|
from .sansio import Passkey
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
class Manager(Generic[T]):
|
||||||
|
"""Generic manager for global instances."""
|
||||||
|
|
||||||
|
def __init__(self, name: str):
|
||||||
|
self._instance: T | None = None
|
||||||
|
self._name = name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def instance(self) -> T:
|
||||||
|
if self._instance is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"{self._name} not initialized. Call globals.init() first."
|
||||||
|
)
|
||||||
|
return self._instance
|
||||||
|
|
||||||
|
@instance.setter
|
||||||
|
def instance(self, instance: T) -> None:
|
||||||
|
self._instance = instance
|
||||||
|
|
||||||
|
|
||||||
|
async def init(
|
||||||
|
rp_id: str = "localhost", rp_name: str | None = None, origin: str | None = None
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the global database, passkey instance, and bootstrap the system if needed."""
|
||||||
|
# Initialize passkey instance with provided parameters
|
||||||
|
passkey.instance = Passkey(
|
||||||
|
rp_id=rp_id,
|
||||||
|
rp_name=rp_name or rp_id,
|
||||||
|
origin=origin,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test if we have a database already initialized, otherwise use SQL
|
||||||
|
try:
|
||||||
|
db.instance
|
||||||
|
except RuntimeError:
|
||||||
|
from .db import sql
|
||||||
|
|
||||||
|
await sql.init()
|
||||||
|
|
||||||
|
# Bootstrap system if needed
|
||||||
|
from .bootstrap import bootstrap_if_needed
|
||||||
|
|
||||||
|
await bootstrap_if_needed()
|
||||||
|
|
||||||
|
|
||||||
|
# Global instances
|
||||||
|
passkey = Manager[Passkey]("Passkey")
|
||||||
|
db = Manager[DatabaseInterface]("Database")
|
@ -9,6 +9,7 @@ This module provides a unified interface for WebAuthn operations including:
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from urllib.parse import urlparse
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
import uuid7
|
import uuid7
|
||||||
@ -45,7 +46,7 @@ class Passkey:
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
rp_id: str,
|
rp_id: str,
|
||||||
rp_name: str,
|
rp_name: str | None = None,
|
||||||
origin: str | None = None,
|
origin: str | None = None,
|
||||||
supported_pub_key_algs: list[COSEAlgorithmIdentifier] | None = None,
|
supported_pub_key_algs: list[COSEAlgorithmIdentifier] | None = None,
|
||||||
):
|
):
|
||||||
@ -54,19 +55,41 @@ class Passkey:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
rp_id: Your security domain (e.g. "example.com")
|
rp_id: Your security domain (e.g. "example.com")
|
||||||
rp_name: The relying party name (e.g., "My Application" - visible to users)
|
rp_name: The relying party display name (e.g. "Example App"). May be shown in authenticators.
|
||||||
origin: The origin URL of the application (e.g. "https://app.example.com"). Must be a subdomain or same as rp_id, with port and scheme but no path included.
|
origin: The origin URL of the application (e.g. "https://app.example.com").
|
||||||
|
If no scheme is provided, "https://" will be prepended.
|
||||||
|
Must be a subdomain or same as rp_id, with port and scheme but no path included.
|
||||||
supported_pub_key_algs: List of supported COSE algorithms (default is EDDSA, ECDSA_SHA_256, RSASSA_PKCS1_v1_5_SHA_256).
|
supported_pub_key_algs: List of supported COSE algorithms (default is EDDSA, ECDSA_SHA_256, RSASSA_PKCS1_v1_5_SHA_256).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If the origin domain doesn't match or isn't a subdomain of rp_id.
|
||||||
"""
|
"""
|
||||||
self.rp_id = rp_id
|
self.rp_id = rp_id
|
||||||
self.rp_name = rp_name
|
self.rp_name = rp_name or rp_id
|
||||||
self.origin = origin or f"https://{rp_id}"
|
self.origin = self._normalize_and_validate_origin(origin, rp_id)
|
||||||
self.supported_pub_key_algs = supported_pub_key_algs or [
|
self.supported_pub_key_algs = supported_pub_key_algs or [
|
||||||
COSEAlgorithmIdentifier.EDDSA,
|
COSEAlgorithmIdentifier.EDDSA,
|
||||||
COSEAlgorithmIdentifier.ECDSA_SHA_256,
|
COSEAlgorithmIdentifier.ECDSA_SHA_256,
|
||||||
COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256,
|
COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
def _normalize_and_validate_origin(self, origin: str | None, rp_id: str) -> str:
|
||||||
|
if origin is None:
|
||||||
|
origin = f"https://{rp_id}"
|
||||||
|
elif "://" not in origin:
|
||||||
|
origin = f"https://{origin}"
|
||||||
|
|
||||||
|
hostname = urlparse(origin).hostname
|
||||||
|
if not hostname:
|
||||||
|
raise ValueError(f"Invalid origin URL: no hostname found in '{origin}'")
|
||||||
|
|
||||||
|
if hostname == rp_id or hostname.endswith(f".{rp_id}"):
|
||||||
|
return origin
|
||||||
|
|
||||||
|
raise ValueError(
|
||||||
|
f"Origin domain '{hostname}' must be the same as or a subdomain of rp_id '{rp_id}'"
|
||||||
|
)
|
||||||
|
|
||||||
### Registration Methods ###
|
### Registration Methods ###
|
||||||
|
|
||||||
def reg_generate_options(
|
def reg_generate_options(
|
||||||
|
36
scripts/bootstrap.py
Normal file
36
scripts/bootstrap.py
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Bootstrap CLI script for passkey authentication system.
|
||||||
|
|
||||||
|
This script initializes a new passkey authentication system with:
|
||||||
|
- Default admin user
|
||||||
|
- Default organization
|
||||||
|
- Admin permissions
|
||||||
|
- Reset token for initial setup
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""Main CLI entry point."""
|
||||||
|
from passkey.bootstrap import main as bootstrap_main
|
||||||
|
from passkey.db.sql import init
|
||||||
|
|
||||||
|
print("Initializing passkey authentication database...")
|
||||||
|
await init()
|
||||||
|
|
||||||
|
print("\nRunning bootstrap process...")
|
||||||
|
await bootstrap_main()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\n❌ Bootstrap interrupted by user")
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n❌ Bootstrap failed: {e}")
|
||||||
|
sys.exit(1)
|
Loading…
x
Reference in New Issue
Block a user