diff --git a/passkey/fastapi/__main__.py b/passkey/fastapi/__main__.py index e25ced2..4262f93 100644 --- a/passkey/fastapi/__main__.py +++ b/passkey/fastapi/__main__.py @@ -131,11 +131,30 @@ def main(): ) add_common_options(dev) + # reset subcommand + reset = sub.add_parser( + "reset", + help=( + "Create a credential reset link for a user. Provide part of the display name or UUID. " + "If omitted, targets the master admin (first Administration role user in an auth:admin org)." + ), + ) + reset.add_argument( + "query", + nargs="?", + help="User UUID (full) or case-insensitive substring of display name. If omitted, master admin is used.", + ) + add_common_options(reset) + args = parser.parse_args() - default_port = DEFAULT_DEV_PORT if args.command == "dev" else DEFAULT_SERVE_PORT - host, port, uds, all_ifaces = parse_endpoint(args.hostport, default_port) - devmode = args.command == "dev" + if args.command in {"serve", "dev"}: + default_port = DEFAULT_DEV_PORT if args.command == "dev" else DEFAULT_SERVE_PORT + host, port, uds, all_ifaces = parse_endpoint(args.hostport, default_port) + devmode = args.command == "dev" + else: + host = port = uds = all_ifaces = None # type: ignore + devmode = False # Determine origin (dev mode default override) origin = args.origin @@ -165,56 +184,62 @@ def main(): ) ) - run_kwargs: dict = { - "reload": devmode, - "log_level": "info", - } - if uds: - run_kwargs["uds"] = uds - else: - # For :port form (all interfaces) we will handle separately - if not all_ifaces: - run_kwargs["host"] = host - run_kwargs["port"] = port + # Handle recover-admin command (no server start) + if args.command == "reset": + from passkey.fastapi import reset as reset_cmd # local import - if devmode: - # Spawn frontend dev server (bun or npm) only once in parent process - if os.environ.get("PASSKEY_BUN_PARENT") != "1": - os.environ["PASSKEY_BUN_PARENT"] = "1" - frontend.run_dev() + exit_code = reset_cmd.run(getattr(args, "query", None)) + raise SystemExit(exit_code) - if all_ifaces and not uds: - # If reload enabled, fallback to single dual-stack attempt (::) to keep reload simple - if devmode: - run_kwargs["host"] = "::" - run_kwargs["port"] = port - uvicorn.run("passkey.fastapi:app", **run_kwargs) + if args.command in {"serve", "dev"}: + run_kwargs: dict = { + "reload": devmode, + "log_level": "info", + } + if uds: + run_kwargs["uds"] = uds else: - # Start two servers concurrently: IPv4 and IPv6 - from uvicorn import Config, Server # noqa: E402 local import + if not all_ifaces: + run_kwargs["host"] = host + run_kwargs["port"] = port - from passkey.fastapi import app as fastapi_app # noqa: E402 local import + if devmode: + if os.environ.get("PASSKEY_BUN_PARENT") != "1": + os.environ["PASSKEY_BUN_PARENT"] = "1" + frontend.run_dev() - async def serve_both(): - servers = [] - assert port is not None - for h in ("0.0.0.0", "::"): - try: - cfg = Config( - app=fastapi_app, - host=h, - port=port, - log_level="info", - ) - servers.append(Server(cfg)) - except Exception as e: # pragma: no cover - logging.warning(f"Failed to configure server for {h}: {e}") - tasks = [asyncio.create_task(s.serve()) for s in servers] - await asyncio.gather(*tasks) + if all_ifaces and not uds: + if devmode: + run_kwargs["host"] = "::" + run_kwargs["port"] = port + uvicorn.run("passkey.fastapi:app", **run_kwargs) + else: + from uvicorn import Config, Server # noqa: E402 local import - asyncio.run(serve_both()) - else: - uvicorn.run("passkey.fastapi:app", **run_kwargs) + from passkey.fastapi import ( + app as fastapi_app, # noqa: E402 local import + ) + + async def serve_both(): + servers = [] + assert port is not None + for h in ("0.0.0.0", "::"): + try: + cfg = Config( + app=fastapi_app, + host=h, + port=port, + log_level="info", + ) + servers.append(Server(cfg)) + except Exception as e: # pragma: no cover + logging.warning(f"Failed to configure server for {h}: {e}") + tasks = [asyncio.create_task(s.serve()) for s in servers] + await asyncio.gather(*tasks) + + asyncio.run(serve_both()) + else: + uvicorn.run("passkey.fastapi:app", **run_kwargs) if __name__ == "__main__": diff --git a/passkey/fastapi/reset.py b/passkey/fastapi/reset.py new file mode 100644 index 0000000..9f2103c --- /dev/null +++ b/passkey/fastapi/reset.py @@ -0,0 +1,100 @@ +"""CLI support for creating user credential reset links. + +Usage (via main CLI): + passkey-auth reset [query] + +If query is omitted, the master admin (first Administration role user in +an organization granting auth:admin) is targeted. Otherwise query is +matched as either an exact UUID or a case-insensitive substring of the +display name. If multiple users match, they are listed and the command +aborts. A new one-time reset link is always created. +""" + +from __future__ import annotations + +import asyncio +from uuid import UUID + +from passkey import authsession as _authsession +from passkey import globals as _g +from passkey.util import passphrase +from passkey.util import tokens as _tokens + + +async def _resolve_targets(query: str | None): + if query: + # Try UUID + targets: list[tuple] = [] + try: + q_uuid = UUID(query) + perm_orgs = await _g.db.instance.get_permission_organizations("auth:admin") + for o in perm_orgs: + users = await _g.db.instance.get_organization_users(str(o.uuid)) + for u, role_name in users: + if u.uuid == q_uuid: + return [(u, role_name)] + # UUID not found among admin orgs -> fall back to substring search (rare case) + except ValueError: + pass + # Substring search + needle = query.lower() + perm_orgs = await _g.db.instance.get_permission_organizations("auth:admin") + for o in perm_orgs: + users = await _g.db.instance.get_organization_users(str(o.uuid)) + for u, role_name in users: + if needle in (u.display_name or "").lower(): + targets.append((u, role_name)) + # De-duplicate + seen = set() + deduped = [] + for u, role_name in targets: + if u.uuid not in seen: + seen.add(u.uuid) + deduped.append((u, role_name)) + return deduped + # No query -> master admin + perm_orgs = await _g.db.instance.get_permission_organizations("auth:admin") + if not perm_orgs: + return [] + users = await _g.db.instance.get_organization_users(str(perm_orgs[0].uuid)) + admin_users = [pair for pair in users if pair[1] == "Administration"] + return admin_users[:1] + + +async def _create_reset(user, role_name: str): + token = passphrase.generate() + await _g.db.instance.create_session( + user_uuid=user.uuid, + key=_tokens.reset_key(token), + expires=_authsession.expires(), + info={"type": "manual reset", "role": role_name}, + ) + return f"{_g.passkey.instance.origin}/auth/{token}", token + + +async def _main(query: str | None) -> int: + try: + candidates = await _resolve_targets(query) + if not candidates: + print("No matching users found") + return 1 + if len(candidates) > 1: + print("Multiple matches. Refine your query:") + for u, role_name in candidates: + print(f" - {u.display_name} ({u.uuid}) role={role_name}") + return 2 + user, role_name = candidates[0] + link, token = await _create_reset(user, role_name) + print(f"Reset link for {user.display_name} ({user.uuid}):\n{link}\n") + return 0 + except Exception as e: # pragma: no cover + print("Failed to create reset link:", e) + return 1 + + +def run(query: str | None) -> int: + """Synchronous wrapper for CLI entrypoint.""" + return asyncio.run(_main(query)) + + +__all__ = ["run"] diff --git a/pyproject.toml b/pyproject.toml index 33c899c..b43884f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ line-length = 88 select = ["E", "F", "I", "N", "W", "UP"] ignore = ["E501"] # Line too long isort.known-first-party = ["passkey"] + [project.scripts] passkey-auth = "passkey.fastapi.__main__:main"