Implement credential reset via CLI.
This commit is contained in:
@@ -131,11 +131,30 @@ def main():
|
|||||||
)
|
)
|
||||||
add_common_options(dev)
|
add_common_options(dev)
|
||||||
|
|
||||||
|
# reset subcommand
|
||||||
|
reset = sub.add_parser(
|
||||||
|
"reset",
|
||||||
|
help=(
|
||||||
|
"Create a credential reset link for a user. Provide part of the display name or UUID. "
|
||||||
|
"If omitted, targets the master admin (first Administration role user in an auth:admin org)."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
reset.add_argument(
|
||||||
|
"query",
|
||||||
|
nargs="?",
|
||||||
|
help="User UUID (full) or case-insensitive substring of display name. If omitted, master admin is used.",
|
||||||
|
)
|
||||||
|
add_common_options(reset)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.command in {"serve", "dev"}:
|
||||||
default_port = DEFAULT_DEV_PORT if args.command == "dev" else DEFAULT_SERVE_PORT
|
default_port = DEFAULT_DEV_PORT if args.command == "dev" else DEFAULT_SERVE_PORT
|
||||||
host, port, uds, all_ifaces = parse_endpoint(args.hostport, default_port)
|
host, port, uds, all_ifaces = parse_endpoint(args.hostport, default_port)
|
||||||
devmode = args.command == "dev"
|
devmode = args.command == "dev"
|
||||||
|
else:
|
||||||
|
host = port = uds = all_ifaces = None # type: ignore
|
||||||
|
devmode = False
|
||||||
|
|
||||||
# Determine origin (dev mode default override)
|
# Determine origin (dev mode default override)
|
||||||
origin = args.origin
|
origin = args.origin
|
||||||
@@ -165,6 +184,14 @@ def main():
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Handle recover-admin command (no server start)
|
||||||
|
if args.command == "reset":
|
||||||
|
from passkey.fastapi import reset as reset_cmd # local import
|
||||||
|
|
||||||
|
exit_code = reset_cmd.run(getattr(args, "query", None))
|
||||||
|
raise SystemExit(exit_code)
|
||||||
|
|
||||||
|
if args.command in {"serve", "dev"}:
|
||||||
run_kwargs: dict = {
|
run_kwargs: dict = {
|
||||||
"reload": devmode,
|
"reload": devmode,
|
||||||
"log_level": "info",
|
"log_level": "info",
|
||||||
@@ -172,28 +199,26 @@ def main():
|
|||||||
if uds:
|
if uds:
|
||||||
run_kwargs["uds"] = uds
|
run_kwargs["uds"] = uds
|
||||||
else:
|
else:
|
||||||
# For :port form (all interfaces) we will handle separately
|
|
||||||
if not all_ifaces:
|
if not all_ifaces:
|
||||||
run_kwargs["host"] = host
|
run_kwargs["host"] = host
|
||||||
run_kwargs["port"] = port
|
run_kwargs["port"] = port
|
||||||
|
|
||||||
if devmode:
|
if devmode:
|
||||||
# Spawn frontend dev server (bun or npm) only once in parent process
|
|
||||||
if os.environ.get("PASSKEY_BUN_PARENT") != "1":
|
if os.environ.get("PASSKEY_BUN_PARENT") != "1":
|
||||||
os.environ["PASSKEY_BUN_PARENT"] = "1"
|
os.environ["PASSKEY_BUN_PARENT"] = "1"
|
||||||
frontend.run_dev()
|
frontend.run_dev()
|
||||||
|
|
||||||
if all_ifaces and not uds:
|
if all_ifaces and not uds:
|
||||||
# If reload enabled, fallback to single dual-stack attempt (::) to keep reload simple
|
|
||||||
if devmode:
|
if devmode:
|
||||||
run_kwargs["host"] = "::"
|
run_kwargs["host"] = "::"
|
||||||
run_kwargs["port"] = port
|
run_kwargs["port"] = port
|
||||||
uvicorn.run("passkey.fastapi:app", **run_kwargs)
|
uvicorn.run("passkey.fastapi:app", **run_kwargs)
|
||||||
else:
|
else:
|
||||||
# Start two servers concurrently: IPv4 and IPv6
|
|
||||||
from uvicorn import Config, Server # noqa: E402 local import
|
from uvicorn import Config, Server # noqa: E402 local import
|
||||||
|
|
||||||
from passkey.fastapi import app as fastapi_app # noqa: E402 local import
|
from passkey.fastapi import (
|
||||||
|
app as fastapi_app, # noqa: E402 local import
|
||||||
|
)
|
||||||
|
|
||||||
async def serve_both():
|
async def serve_both():
|
||||||
servers = []
|
servers = []
|
||||||
|
|||||||
100
passkey/fastapi/reset.py
Normal file
100
passkey/fastapi/reset.py
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
"""CLI support for creating user credential reset links.
|
||||||
|
|
||||||
|
Usage (via main CLI):
|
||||||
|
passkey-auth reset [query]
|
||||||
|
|
||||||
|
If query is omitted, the master admin (first Administration role user in
|
||||||
|
an organization granting auth:admin) is targeted. Otherwise query is
|
||||||
|
matched as either an exact UUID or a case-insensitive substring of the
|
||||||
|
display name. If multiple users match, they are listed and the command
|
||||||
|
aborts. A new one-time reset link is always created.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
from passkey import authsession as _authsession
|
||||||
|
from passkey import globals as _g
|
||||||
|
from passkey.util import passphrase
|
||||||
|
from passkey.util import tokens as _tokens
|
||||||
|
|
||||||
|
|
||||||
|
async def _resolve_targets(query: str | None):
|
||||||
|
if query:
|
||||||
|
# Try UUID
|
||||||
|
targets: list[tuple] = []
|
||||||
|
try:
|
||||||
|
q_uuid = UUID(query)
|
||||||
|
perm_orgs = await _g.db.instance.get_permission_organizations("auth:admin")
|
||||||
|
for o in perm_orgs:
|
||||||
|
users = await _g.db.instance.get_organization_users(str(o.uuid))
|
||||||
|
for u, role_name in users:
|
||||||
|
if u.uuid == q_uuid:
|
||||||
|
return [(u, role_name)]
|
||||||
|
# UUID not found among admin orgs -> fall back to substring search (rare case)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
# Substring search
|
||||||
|
needle = query.lower()
|
||||||
|
perm_orgs = await _g.db.instance.get_permission_organizations("auth:admin")
|
||||||
|
for o in perm_orgs:
|
||||||
|
users = await _g.db.instance.get_organization_users(str(o.uuid))
|
||||||
|
for u, role_name in users:
|
||||||
|
if needle in (u.display_name or "").lower():
|
||||||
|
targets.append((u, role_name))
|
||||||
|
# De-duplicate
|
||||||
|
seen = set()
|
||||||
|
deduped = []
|
||||||
|
for u, role_name in targets:
|
||||||
|
if u.uuid not in seen:
|
||||||
|
seen.add(u.uuid)
|
||||||
|
deduped.append((u, role_name))
|
||||||
|
return deduped
|
||||||
|
# No query -> master admin
|
||||||
|
perm_orgs = await _g.db.instance.get_permission_organizations("auth:admin")
|
||||||
|
if not perm_orgs:
|
||||||
|
return []
|
||||||
|
users = await _g.db.instance.get_organization_users(str(perm_orgs[0].uuid))
|
||||||
|
admin_users = [pair for pair in users if pair[1] == "Administration"]
|
||||||
|
return admin_users[:1]
|
||||||
|
|
||||||
|
|
||||||
|
async def _create_reset(user, role_name: str):
|
||||||
|
token = passphrase.generate()
|
||||||
|
await _g.db.instance.create_session(
|
||||||
|
user_uuid=user.uuid,
|
||||||
|
key=_tokens.reset_key(token),
|
||||||
|
expires=_authsession.expires(),
|
||||||
|
info={"type": "manual reset", "role": role_name},
|
||||||
|
)
|
||||||
|
return f"{_g.passkey.instance.origin}/auth/{token}", token
|
||||||
|
|
||||||
|
|
||||||
|
async def _main(query: str | None) -> int:
|
||||||
|
try:
|
||||||
|
candidates = await _resolve_targets(query)
|
||||||
|
if not candidates:
|
||||||
|
print("No matching users found")
|
||||||
|
return 1
|
||||||
|
if len(candidates) > 1:
|
||||||
|
print("Multiple matches. Refine your query:")
|
||||||
|
for u, role_name in candidates:
|
||||||
|
print(f" - {u.display_name} ({u.uuid}) role={role_name}")
|
||||||
|
return 2
|
||||||
|
user, role_name = candidates[0]
|
||||||
|
link, token = await _create_reset(user, role_name)
|
||||||
|
print(f"Reset link for {user.display_name} ({user.uuid}):\n{link}\n")
|
||||||
|
return 0
|
||||||
|
except Exception as e: # pragma: no cover
|
||||||
|
print("Failed to create reset link:", e)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
def run(query: str | None) -> int:
|
||||||
|
"""Synchronous wrapper for CLI entrypoint."""
|
||||||
|
return asyncio.run(_main(query))
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["run"]
|
||||||
@@ -34,6 +34,7 @@ line-length = 88
|
|||||||
select = ["E", "F", "I", "N", "W", "UP"]
|
select = ["E", "F", "I", "N", "W", "UP"]
|
||||||
ignore = ["E501"] # Line too long
|
ignore = ["E501"] # Line too long
|
||||||
isort.known-first-party = ["passkey"]
|
isort.known-first-party = ["passkey"]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
passkey-auth = "passkey.fastapi.__main__:main"
|
passkey-auth = "passkey.fastapi.__main__:main"
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user