Refactoring permissions checks.

This commit is contained in:
Leo Vasanko 2025-09-02 17:28:26 -06:00
parent 3cd6a59b26
commit bfc777fb56
4 changed files with 160 additions and 118 deletions

View File

@ -1,26 +1,13 @@
"""Admin sub-application.
All admin API endpoints previously under /auth/admin/* are now implemented
in this standalone FastAPI app which is mounted by the main application at
the /auth/admin path prefix. The routes defined here therefore omit the
"/auth/admin" prefix and start at root (e.g. "/orgs" becomes
"/auth/admin/orgs" once mounted).
"""
from __future__ import annotations
import contextlib
import logging import logging
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from fastapi import Body, Cookie, FastAPI, HTTPException from fastapi import Body, Cookie, FastAPI, HTTPException
from fastapi.responses import FileResponse, JSONResponse from fastapi.responses import FileResponse, JSONResponse
from ..authsession import expires, get_session from ..authsession import expires
from ..globals import db from ..globals import db
from ..globals import passkey as global_passkey from ..globals import passkey as global_passkey
from ..util import frontend, passphrase, querysafe, tokens from ..util import frontend, passphrase, permutil, querysafe, tokens
from ..util.tokens import session_key
app = FastAPI() app = FastAPI()
@ -36,33 +23,12 @@ async def general_exception_handler(_request, exc: Exception):
return JSONResponse(status_code=500, content={"detail": "Internal server error"}) return JSONResponse(status_code=500, content={"detail": "Internal server error"})
async def _get_ctx_and_admin_flags(auth_cookie: str):
"""Helper to get session context and admin flags from cookie."""
if not auth_cookie:
raise ValueError("Not authenticated")
ctx = await db.instance.get_session_context(session_key(auth_cookie))
if not ctx:
raise ValueError("Not authenticated")
role_perm_ids = set(ctx.role.permissions or [])
org_uuid_str = str(ctx.org.uuid)
is_global_admin = "auth:admin" in role_perm_ids
is_org_admin = f"auth:org:{org_uuid_str}" in role_perm_ids
return ctx, is_global_admin, is_org_admin
@app.get("/") @app.get("/")
async def admin_frontend(auth=Cookie(None)): async def admin_frontend(auth=Cookie(None)):
"""Serve the admin SPA root if an authorized session exists.""" ctx = await permutil.session_context(auth)
if auth: if permutil.has_any(ctx, ["auth:admin", "auth:org:*"]):
with contextlib.suppress(ValueError):
s = await get_session(auth)
if s.info and s.info.get("type") == "authenticated":
return FileResponse(frontend.file("admin/index.html")) return FileResponse(frontend.file("admin/index.html"))
return FileResponse( return FileResponse(frontend.file("index.html"), status_code=401 if ctx else 403)
frontend.file("index.html"),
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
# -------------------- Organizations -------------------- # -------------------- Organizations --------------------
@ -70,11 +36,11 @@ async def admin_frontend(auth=Cookie(None)):
@app.get("/orgs") @app.get("/orgs")
async def admin_list_orgs(auth=Cookie(None)): async def admin_list_orgs(auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not (is_global_admin or is_org_admin): if not permutil.has_any(ctx, ["auth:admin", "auth:org:*"]):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
orgs = await db.instance.list_organizations() orgs = await db.instance.list_organizations()
if not is_global_admin: # limit org admin to their own org if not permutil.has_any(ctx, ["auth:admin"]): # limit org admin to their own org
orgs = [o for o in orgs if o.uuid == ctx.org.uuid] orgs = [o for o in orgs if o.uuid == ctx.org.uuid]
def role_to_dict(r): def role_to_dict(r):
@ -109,8 +75,8 @@ async def admin_list_orgs(auth=Cookie(None)):
@app.post("/orgs") @app.post("/orgs")
async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)): async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not is_global_admin: if not permutil.has_any(ctx, ["auth:admin"]):
raise ValueError("Global admin required") raise ValueError("Global admin required")
from ..db import Org as OrgDC # local import to avoid cycles from ..db import Org as OrgDC # local import to avoid cycles
@ -126,8 +92,8 @@ async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
async def admin_update_org( async def admin_update_org(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None) org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
): ):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not is_global_admin: if not permutil.has_any(ctx, ["auth:admin"]):
raise ValueError("Global admin required") raise ValueError("Global admin required")
from ..db import Org as OrgDC # local import to avoid cycles from ..db import Org as OrgDC # local import to avoid cycles
@ -141,8 +107,8 @@ async def admin_update_org(
@app.delete("/orgs/{org_uuid}") @app.delete("/orgs/{org_uuid}")
async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)): async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, _ = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not is_global_admin: if not permutil.has_any(ctx, ["auth:admin"]):
raise ValueError("Global admin required") raise ValueError("Global admin required")
try: try:
acting_org_uuid = ctx.org.uuid if ctx.org else None acting_org_uuid = ctx.org.uuid if ctx.org else None
@ -158,8 +124,13 @@ async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
async def admin_add_org_permission( async def admin_add_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None) org_uuid: UUID, permission_id: str, auth=Cookie(None)
): ):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)): if not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{org_uuid}"]) and ctx.org.uuid == org_uuid
)
):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
querysafe.assert_safe(permission_id, field="permission_id") querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.add_permission_to_organization(str(org_uuid), permission_id) await db.instance.add_permission_to_organization(str(org_uuid), permission_id)
@ -170,8 +141,14 @@ async def admin_add_org_permission(
async def admin_remove_org_permission( async def admin_remove_org_permission(
org_uuid: UUID, permission_id: str, auth=Cookie(None) org_uuid: UUID, permission_id: str, auth=Cookie(None)
): ):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{org_uuid}"])
and getattr(ctx.org, "uuid", None) == org_uuid
)
):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
querysafe.assert_safe(permission_id, field="permission_id") querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.remove_permission_from_organization(str(org_uuid), permission_id) await db.instance.remove_permission_from_organization(str(org_uuid), permission_id)
@ -185,17 +162,23 @@ async def admin_remove_org_permission(
async def admin_create_role( async def admin_create_role(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None) org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
): ):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{org_uuid}"])
and getattr(ctx.org, "uuid", None) == org_uuid
)
):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC from ..db import Role as RoleDC
role_uuid = uuid4() role_uuid = uuid4()
display_name = payload.get("display_name") or "New Role" display_name = payload.get("display_name") or "New Role"
permissions = payload.get("permissions") or [] perms = payload.get("permissions") or []
org = await db.instance.get_organization(str(org_uuid)) org = await db.instance.get_organization(str(org_uuid))
grantable = set(org.permissions or []) grantable = set(org.permissions or [])
for pid in permissions: for pid in perms:
await db.instance.get_permission(pid) await db.instance.get_permission(pid)
if pid not in grantable: if pid not in grantable:
raise ValueError(f"Permission not grantable by org: {pid}") raise ValueError(f"Permission not grantable by org: {pid}")
@ -203,7 +186,7 @@ async def admin_create_role(
uuid=role_uuid, uuid=role_uuid,
org_uuid=org_uuid, org_uuid=org_uuid,
display_name=display_name, display_name=display_name,
permissions=permissions, permissions=perms,
) )
await db.instance.create_role(role) await db.instance.create_role(role)
return {"uuid": str(role_uuid)} return {"uuid": str(role_uuid)}
@ -213,9 +196,15 @@ async def admin_create_role(
async def admin_update_role( async def admin_update_role(
role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None) role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
): ):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
role = await db.instance.get_role(role_uuid) role = await db.instance.get_role(role_uuid)
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{role.org_uuid}"])
and getattr(ctx.org, "uuid", None) == role.org_uuid
)
):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
from ..db import Role as RoleDC from ..db import Role as RoleDC
@ -239,9 +228,15 @@ async def admin_update_role(
@app.delete("/roles/{role_uuid}") @app.delete("/roles/{role_uuid}")
async def admin_delete_role(role_uuid: UUID, auth=Cookie(None)): async def admin_delete_role(role_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
role = await db.instance.get_role(role_uuid) role = await db.instance.get_role(role_uuid)
if not (is_global_admin or (is_org_admin and role.org_uuid == ctx.org.uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{role.org_uuid}"])
and getattr(ctx.org, "uuid", None) == role.org_uuid
)
):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
await db.instance.delete_role(role_uuid) await db.instance.delete_role(role_uuid)
return {"status": "ok"} return {"status": "ok"}
@ -254,8 +249,14 @@ async def admin_delete_role(role_uuid: UUID, auth=Cookie(None)):
async def admin_create_user( async def admin_create_user(
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None) org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
): ):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{org_uuid}"])
and getattr(ctx.org, "uuid", None) == org_uuid
)
):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
display_name = payload.get("display_name") display_name = payload.get("display_name")
role_name = payload.get("role") role_name = payload.get("role")
@ -283,8 +284,14 @@ async def admin_create_user(
async def admin_update_user_role( async def admin_update_user_role(
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None) org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
): ):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not (is_global_admin or (is_org_admin and ctx.org.uuid == org_uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{org_uuid}"])
and getattr(ctx.org, "uuid", None) == org_uuid
)
):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
new_role = payload.get("role") new_role = payload.get("role")
if not new_role: if not new_role:
@ -304,12 +311,18 @@ async def admin_update_user_role(
@app.post("/users/{user_uuid}/create-link") @app.post("/users/{user_uuid}/create-link")
async def admin_create_user_registration_link(user_uuid: UUID, auth=Cookie(None)): async def admin_create_user_registration_link(user_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
try: try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid) user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError: except ValueError:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{user_org.uuid}"])
and getattr(ctx.org, "uuid", None) == user_org.uuid
)
):
raise HTTPException(status_code=403, detail="Insufficient permissions") raise HTTPException(status_code=403, detail="Insufficient permissions")
token = passphrase.generate() token = passphrase.generate()
await db.instance.create_session( await db.instance.create_session(
@ -325,12 +338,18 @@ async def admin_create_user_registration_link(user_uuid: UUID, auth=Cookie(None)
@app.get("/users/{user_uuid}") @app.get("/users/{user_uuid}")
async def admin_get_user_detail(user_uuid: UUID, auth=Cookie(None)): async def admin_get_user_detail(user_uuid: UUID, auth=Cookie(None)):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
try: try:
user_org, role_name = await db.instance.get_user_organization(user_uuid) user_org, role_name = await db.instance.get_user_organization(user_uuid)
except ValueError: except ValueError:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{user_org.uuid}"])
and getattr(ctx.org, "uuid", None) == user_org.uuid
)
):
raise HTTPException(status_code=403, detail="Insufficient permissions") raise HTTPException(status_code=403, detail="Insufficient permissions")
user = await db.instance.get_user_by_uuid(user_uuid) user = await db.instance.get_user_by_uuid(user_uuid)
cred_ids = await db.instance.get_credentials_by_user_uuid(user_uuid) cred_ids = await db.instance.get_credentials_by_user_uuid(user_uuid)
@ -374,12 +393,18 @@ async def admin_get_user_detail(user_uuid: UUID, auth=Cookie(None)):
async def admin_update_user_display_name( async def admin_update_user_display_name(
user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None) user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
): ):
ctx, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
try: try:
user_org, _role_name = await db.instance.get_user_organization(user_uuid) user_org, _role_name = await db.instance.get_user_organization(user_uuid)
except ValueError: except ValueError:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
if not (is_global_admin or (is_org_admin and user_org.uuid == ctx.org.uuid)): if not ctx or not (
permutil.has_any(ctx, ["auth:admin"])
or (
permutil.has_any(ctx, [f"auth:org:{user_org.uuid}"])
and getattr(ctx.org, "uuid", None) == user_org.uuid
)
):
raise HTTPException(status_code=403, detail="Insufficient permissions") raise HTTPException(status_code=403, detail="Insufficient permissions")
new_name = (payload.get("display_name") or "").strip() new_name = (payload.get("display_name") or "").strip()
if not new_name: if not new_name:
@ -395,8 +420,8 @@ async def admin_update_user_display_name(
@app.get("/permissions") @app.get("/permissions")
async def admin_list_permissions(auth=Cookie(None)): async def admin_list_permissions(auth=Cookie(None)):
_, is_global_admin, is_org_admin = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not (is_global_admin or is_org_admin): if not ctx or not permutil.has_any(ctx, ["auth:admin", "auth:org:*"]):
raise ValueError("Insufficient permissions") raise ValueError("Insufficient permissions")
perms = await db.instance.list_permissions() perms = await db.instance.list_permissions()
return [{"id": p.id, "display_name": p.display_name} for p in perms] return [{"id": p.id, "display_name": p.display_name} for p in perms]
@ -404,8 +429,8 @@ async def admin_list_permissions(auth=Cookie(None)):
@app.post("/permissions") @app.post("/permissions")
async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)): async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not is_global_admin: if not ctx or not permutil.has_any(ctx, ["auth:admin"]):
raise ValueError("Global admin required") raise ValueError("Global admin required")
from ..db import Permission as PermDC from ..db import Permission as PermDC
@ -422,8 +447,8 @@ async def admin_create_permission(payload: dict = Body(...), auth=Cookie(None)):
async def admin_update_permission( async def admin_update_permission(
permission_id: str, display_name: str, auth=Cookie(None) permission_id: str, display_name: str, auth=Cookie(None)
): ):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not is_global_admin: if not ctx or not permutil.has_any(ctx, ["auth:admin"]):
raise ValueError("Global admin required") raise ValueError("Global admin required")
from ..db import Permission as PermDC from ..db import Permission as PermDC
@ -438,8 +463,8 @@ async def admin_update_permission(
@app.post("/permission/rename") @app.post("/permission/rename")
async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)): async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not is_global_admin: if not ctx or not permutil.has_any(ctx, ["auth:admin"]):
raise ValueError("Global admin required") raise ValueError("Global admin required")
old_id = payload.get("old_id") old_id = payload.get("old_id")
new_id = payload.get("new_id") new_id = payload.get("new_id")
@ -460,8 +485,8 @@ async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)):
@app.delete("/permission") @app.delete("/permission")
async def admin_delete_permission(permission_id: str, auth=Cookie(None)): async def admin_delete_permission(permission_id: str, auth=Cookie(None)):
_, is_global_admin, _ = await _get_ctx_and_admin_flags(auth) ctx = await permutil.session_context(auth)
if not is_global_admin: if not ctx or not permutil.has_any(ctx, ["auth:admin"]):
raise ValueError("Global admin required") raise ValueError("Global admin required")
querysafe.assert_safe(permission_id, field="permission_id") querysafe.assert_safe(permission_id, field="permission_id")
await db.instance.delete_permission(permission_id) await db.instance.delete_permission(permission_id)

View File

@ -19,7 +19,7 @@ from .. import aaguid
from ..authsession import delete_credential, expires, get_reset, get_session from ..authsession import delete_credential, expires, get_reset, get_session
from ..globals import db from ..globals import db
from ..globals import passkey as global_passkey from ..globals import passkey as global_passkey
from ..util import passphrase, tokens from ..util import passphrase, permutil, tokens
from ..util.tokens import session_key from ..util.tokens import session_key
from . import authz, session from . import authz, session
@ -42,13 +42,13 @@ async def general_exception_handler(
@app.post("/validate") @app.post("/validate")
async def validate_token(perm=Query(None), auth=Cookie(None)): async def validate_token(perm: list[str] = Query([]), auth=Cookie(None)):
s = await authz.verify(auth, perm) ctx = await authz.verify(auth, perm)
return {"valid": True, "user_uuid": str(s.user_uuid)} return {"valid": True, "user_uuid": str(ctx.session.user_uuid)}
@app.get("/forward") @app.get("/forward")
async def forward_authentication(perm=Query(None), auth=Cookie(None)): async def forward_authentication(perm: list[str] = Query([]), auth=Cookie(None)):
"""Forward auth validation for Caddy/Nginx (moved from /auth/forward-auth). """Forward auth validation for Caddy/Nginx (moved from /auth/forward-auth).
Query Params: Query Params:
@ -58,8 +58,10 @@ async def forward_authentication(perm=Query(None), auth=Cookie(None)):
Failure (unauthenticated / unauthorized): 4xx JSON body with detail. Failure (unauthenticated / unauthorized): 4xx JSON body with detail.
""" """
try: try:
s = await authz.verify(auth, perm) ctx = await authz.verify(auth, perm)
return Response(status_code=204, headers={"x-auth-user-uuid": str(s.user_uuid)}) return Response(
status_code=204, headers={"x-auth-user-uuid": str(ctx.session.user_uuid)}
)
except HTTPException as e: # pass through explicitly except HTTPException as e: # pass through explicitly
raise e raise e
@ -96,7 +98,8 @@ async def api_user_info(reset: str | None = None, auth=Cookie(None)):
} }
assert authenticated and auth is not None assert authenticated and auth is not None
ctx = await db.instance.get_session_context(session_key(auth))
ctx = await permutil.session_context(auth)
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid) credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
credentials: list[dict] = [] credentials: list[dict] = []
user_aaguids: set[str] = set() user_aaguids: set[str] = set()

View File

@ -1,39 +1,25 @@
"""Authorization utilities shared across FastAPI endpoints.
Provides helper(s) to validate a session token (from cookie) and optionally
enforce that the user possesses a given permission (either via their role or
their organization level permissions).
"""
from fastapi import HTTPException from fastapi import HTTPException
from ..authsession import get_session from ..util import permutil
from ..globals import db
from ..util.tokens import session_key
async def verify(auth: str | None, perm: list[str] | str | None): async def verify(auth: str | None, perm: list[str], match=permutil.has_all):
"""Validate session token and optional list of required permissions. """Validate session token and optional list of required permissions.
Returns the Session object on success. Raises HTTPException on failure. Returns the session context.
Raises HTTPException on failure:
401: unauthenticated / invalid session 401: unauthenticated / invalid session
403: one or more required permissions missing 403: required permissions missing
""" """
if not auth: if not auth:
raise HTTPException(status_code=401, detail="Authentication required") raise HTTPException(status_code=401, detail="Authentication required")
session = await get_session(auth)
if perm is not None: ctx = await permutil.session_context(auth)
if isinstance(perm, str):
perm = [perm]
ctx = await db.instance.get_session_context(session_key(auth))
if not ctx: if not ctx:
raise HTTPException(status_code=401, detail="Session not found") raise HTTPException(status_code=401, detail="Session not found")
available = set(ctx.role.permissions or []) | (
set(ctx.org.permissions or []) if ctx.org else set() if not match(ctx, perm):
)
if any(p not in available for p in perm):
raise HTTPException(status_code=403, detail="Permission required") raise HTTPException(status_code=403, detail="Permission required")
return session
return ctx
__all__ = ["verify"]

28
passkey/util/permutil.py Normal file
View File

@ -0,0 +1,28 @@
"""Minimal permission helpers with '*' wildcard support (no DB expansion)."""
from collections.abc import Sequence
from fnmatch import fnmatchcase
from ..globals import db
from .tokens import session_key
__all__ = ["has_any", "has_all", "session_context"]
def _match(perms: set[str], patterns: Sequence[str]):
return (
any(fnmatchcase(p, pat) for p in perms) if "*" in pat else False
for pat in patterns
)
def has_any(ctx, patterns: Sequence[str]) -> bool:
return any(_match(ctx.role.permissions, patterns)) if ctx else False
def has_all(ctx, patterns: Sequence[str]) -> bool:
return all(_match(ctx.role.permissions, patterns)) if ctx else False
async def session_context(auth: str | None):
return await db.instance.get_session_context(session_key(auth)) if auth else None