Admin app: guard rails extended, consistent styling, also share styling with main app.
This commit is contained in:
@@ -17,6 +17,7 @@ from sqlalchemy import (
|
||||
String,
|
||||
delete,
|
||||
event,
|
||||
insert,
|
||||
select,
|
||||
update,
|
||||
)
|
||||
@@ -971,8 +972,10 @@ class DB(DatabaseInterface):
|
||||
)
|
||||
if role.permissions:
|
||||
for perm_id in set(role.permissions):
|
||||
session.add(
|
||||
RolePermission(role_uuid=role.uuid.bytes, permission_id=perm_id)
|
||||
await session.execute(
|
||||
insert(RolePermission).values(
|
||||
role_uuid=role.uuid.bytes, permission_id=perm_id
|
||||
)
|
||||
)
|
||||
|
||||
async def delete_role(self, role_uuid: UUID) -> None:
|
||||
@@ -1200,10 +1203,15 @@ class DB(DatabaseInterface):
|
||||
org_perm_result = await session.execute(org_perm_stmt)
|
||||
organization.permissions = [row[0] for row in org_perm_result.fetchall()]
|
||||
|
||||
# Filter effective permissions: only include permissions that the org can grant
|
||||
effective_permissions = [
|
||||
p for p in permissions if p.id in organization.permissions
|
||||
]
|
||||
|
||||
return SessionContext(
|
||||
session=session_obj,
|
||||
user=user_obj,
|
||||
org=organization,
|
||||
role=role,
|
||||
permissions=permissions if permissions else None,
|
||||
permissions=effective_permissions if effective_permissions else None,
|
||||
)
|
||||
|
||||
@@ -77,12 +77,24 @@ async def admin_list_orgs(auth=Cookie(None)):
|
||||
async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
|
||||
await authz.verify(auth, ["auth:admin"])
|
||||
from ..db import Org as OrgDC # local import to avoid cycles
|
||||
from ..db import Role as RoleDC # local import to avoid cycles
|
||||
|
||||
org_uuid = uuid4()
|
||||
display_name = payload.get("display_name") or "New Organization"
|
||||
permissions = payload.get("permissions") or []
|
||||
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
|
||||
await db.instance.create_organization(org)
|
||||
|
||||
# Automatically create Administration role with org admin permission
|
||||
role_uuid = uuid4()
|
||||
admin_role = RoleDC(
|
||||
uuid=role_uuid,
|
||||
org_uuid=org_uuid,
|
||||
display_name="Administration",
|
||||
permissions=[f"auth:org:{org_uuid}"],
|
||||
)
|
||||
await db.instance.create_role(admin_role)
|
||||
|
||||
return {"uuid": str(org_uuid)}
|
||||
|
||||
|
||||
@@ -90,7 +102,7 @@ async def admin_create_org(payload: dict = Body(...), auth=Cookie(None)):
|
||||
async def admin_update_org(
|
||||
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
|
||||
):
|
||||
await authz.verify(
|
||||
ctx = await authz.verify(
|
||||
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
|
||||
)
|
||||
from ..db import Org as OrgDC # local import to avoid cycles
|
||||
@@ -98,6 +110,20 @@ async def admin_update_org(
|
||||
current = await db.instance.get_organization(str(org_uuid))
|
||||
display_name = payload.get("display_name") or current.display_name
|
||||
permissions = payload.get("permissions") or current.permissions or []
|
||||
|
||||
# Sanity check: prevent removing permissions that would break current user's admin access
|
||||
org_admin_perm = f"auth:org:{org_uuid}"
|
||||
|
||||
# If current user is org admin (not global admin), ensure org admin perm remains
|
||||
if (
|
||||
"auth:admin" not in ctx.role.permissions
|
||||
and f"auth:org:{org_uuid}" in ctx.role.permissions
|
||||
):
|
||||
if org_admin_perm not in permissions:
|
||||
raise ValueError(
|
||||
"Cannot remove organization admin permission from your own organization"
|
||||
)
|
||||
|
||||
org = OrgDC(uuid=org_uuid, display_name=display_name, permissions=permissions)
|
||||
await db.instance.update_organization(org)
|
||||
return {"status": "ok"}
|
||||
@@ -110,6 +136,21 @@ async def admin_delete_org(org_uuid: UUID, auth=Cookie(None)):
|
||||
)
|
||||
if ctx.org.uuid == org_uuid:
|
||||
raise ValueError("Cannot delete the organization you belong to")
|
||||
|
||||
# Delete organization-specific permissions
|
||||
org_perm_pattern = f"org:{str(org_uuid).lower()}"
|
||||
all_permissions = await db.instance.list_permissions()
|
||||
for perm in all_permissions:
|
||||
perm_id_lower = perm.id.lower()
|
||||
# Check if permission contains "org:{uuid}" separated by colons or at boundaries
|
||||
if (
|
||||
f":{org_perm_pattern}:" in perm_id_lower
|
||||
or perm_id_lower.startswith(f"{org_perm_pattern}:")
|
||||
or perm_id_lower.endswith(f":{org_perm_pattern}")
|
||||
or perm_id_lower == org_perm_pattern
|
||||
):
|
||||
await db.instance.delete_permission(perm.id)
|
||||
|
||||
await db.instance.delete_organization(org_uuid)
|
||||
return {"status": "ok"}
|
||||
|
||||
@@ -139,7 +180,9 @@ async def admin_remove_org_permission(
|
||||
async def admin_create_role(
|
||||
org_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
|
||||
):
|
||||
await authz.verify(auth, ["auth:admin", f"auth:org:{org_uuid}"])
|
||||
await authz.verify(
|
||||
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
|
||||
)
|
||||
from ..db import Role as RoleDC
|
||||
|
||||
role_uuid = uuid4()
|
||||
@@ -166,7 +209,7 @@ async def admin_update_role(
|
||||
org_uuid: UUID, role_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
|
||||
):
|
||||
# Verify caller is global admin or admin of provided org
|
||||
await authz.verify(
|
||||
ctx = await authz.verify(
|
||||
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
|
||||
)
|
||||
role = await db.instance.get_role(role_uuid)
|
||||
@@ -175,13 +218,25 @@ async def admin_update_role(
|
||||
from ..db import Role as RoleDC
|
||||
|
||||
display_name = payload.get("display_name") or role.display_name
|
||||
permissions = payload.get("permissions") or role.permissions
|
||||
permissions = payload.get("permissions")
|
||||
if permissions is None:
|
||||
permissions = role.permissions
|
||||
org = await db.instance.get_organization(str(org_uuid))
|
||||
grantable = set(org.permissions or [])
|
||||
existing_permissions = set(role.permissions)
|
||||
for pid in permissions:
|
||||
await db.instance.get_permission(pid)
|
||||
if pid not in grantable:
|
||||
if pid not in existing_permissions and pid not in grantable:
|
||||
raise ValueError(f"Permission not grantable by org: {pid}")
|
||||
|
||||
# Sanity check: prevent admin from removing their own access via role update
|
||||
if ctx.org.uuid == org_uuid and ctx.role.uuid == role_uuid:
|
||||
has_admin_access = (
|
||||
"auth:admin" in permissions or f"auth:org:{org_uuid}" in permissions
|
||||
)
|
||||
if not has_admin_access:
|
||||
raise ValueError("Cannot update your own role to remove admin permissions")
|
||||
|
||||
updated = RoleDC(
|
||||
uuid=role_uuid,
|
||||
org_uuid=org_uuid,
|
||||
@@ -194,12 +249,17 @@ async def admin_update_role(
|
||||
|
||||
@app.delete("/orgs/{org_uuid}/roles/{role_uuid}")
|
||||
async def admin_delete_role(org_uuid: UUID, role_uuid: UUID, auth=Cookie(None)):
|
||||
await authz.verify(
|
||||
ctx = await authz.verify(
|
||||
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
|
||||
)
|
||||
role = await db.instance.get_role(role_uuid)
|
||||
if role.org_uuid != org_uuid:
|
||||
raise HTTPException(status_code=404, detail="Role not found in organization")
|
||||
|
||||
# Sanity check: prevent admin from deleting their own role
|
||||
if ctx.role.uuid == role_uuid:
|
||||
raise ValueError("Cannot delete your own role")
|
||||
|
||||
await db.instance.delete_role(role_uuid)
|
||||
return {"status": "ok"}
|
||||
|
||||
@@ -240,7 +300,7 @@ async def admin_create_user(
|
||||
async def admin_update_user_role(
|
||||
org_uuid: UUID, user_uuid: UUID, payload: dict = Body(...), auth=Cookie(None)
|
||||
):
|
||||
await authz.verify(
|
||||
ctx = await authz.verify(
|
||||
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
|
||||
)
|
||||
new_role = payload.get("role")
|
||||
@@ -255,6 +315,20 @@ async def admin_update_user_role(
|
||||
roles = await db.instance.get_roles_by_organization(str(org_uuid))
|
||||
if not any(r.display_name == new_role for r in roles):
|
||||
raise ValueError("Role not found in organization")
|
||||
|
||||
# Sanity check: prevent admin from removing their own access
|
||||
if ctx.user.uuid == user_uuid:
|
||||
new_role_obj = next((r for r in roles if r.display_name == new_role), None)
|
||||
if new_role_obj:
|
||||
has_admin_access = (
|
||||
"auth:admin" in new_role_obj.permissions
|
||||
or f"auth:org:{org_uuid}" in new_role_obj.permissions
|
||||
)
|
||||
if not has_admin_access:
|
||||
raise ValueError(
|
||||
"Cannot change your own role to one without admin permissions"
|
||||
)
|
||||
|
||||
await db.instance.update_user_role_in_organization(user_uuid, new_role)
|
||||
return {"status": "ok"}
|
||||
|
||||
@@ -370,14 +444,44 @@ async def admin_update_user_display_name(
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
@app.delete("/orgs/{org_uuid}/users/{user_uuid}/credentials/{credential_uuid}")
|
||||
async def admin_delete_user_credential(
|
||||
org_uuid: UUID, user_uuid: UUID, credential_uuid: UUID, auth=Cookie(None)
|
||||
):
|
||||
try:
|
||||
user_org, _role_name = await db.instance.get_user_organization(user_uuid)
|
||||
except ValueError:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
if user_org.uuid != org_uuid:
|
||||
raise HTTPException(status_code=404, detail="User not found in organization")
|
||||
ctx = await authz.verify(
|
||||
auth, ["auth:admin", f"auth:org:{org_uuid}"], match=permutil.has_any
|
||||
)
|
||||
if (
|
||||
"auth:admin" not in ctx.role.permissions
|
||||
and f"auth:org:{org_uuid}" not in ctx.role.permissions
|
||||
):
|
||||
raise HTTPException(status_code=403, detail="Insufficient permissions")
|
||||
await db.instance.delete_credential(credential_uuid, user_uuid)
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
# -------------------- Permissions (global) --------------------
|
||||
|
||||
|
||||
@app.get("/permissions")
|
||||
async def admin_list_permissions(auth=Cookie(None)):
|
||||
await authz.verify(auth, ["auth:admin"], match=permutil.has_any)
|
||||
ctx = await authz.verify(auth, ["auth:admin", "auth:org:*"], match=permutil.has_any)
|
||||
perms = await db.instance.list_permissions()
|
||||
return [{"id": p.id, "display_name": p.display_name} for p in perms]
|
||||
|
||||
# Global admins see all permissions
|
||||
if "auth:admin" in ctx.role.permissions:
|
||||
return [{"id": p.id, "display_name": p.display_name} for p in perms]
|
||||
|
||||
# Org admins only see permissions their org can grant
|
||||
grantable = set(ctx.org.permissions or [])
|
||||
filtered_perms = [p for p in perms if p.id in grantable]
|
||||
return [{"id": p.id, "display_name": p.display_name} for p in filtered_perms]
|
||||
|
||||
|
||||
@app.post("/permissions")
|
||||
@@ -418,6 +522,11 @@ async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)):
|
||||
display_name = payload.get("display_name")
|
||||
if not old_id or not new_id:
|
||||
raise ValueError("old_id and new_id required")
|
||||
|
||||
# Sanity check: prevent renaming critical permissions
|
||||
if old_id == "auth:admin":
|
||||
raise ValueError("Cannot rename the master admin permission")
|
||||
|
||||
querysafe.assert_safe(old_id, field="old_id")
|
||||
querysafe.assert_safe(new_id, field="new_id")
|
||||
if display_name is None:
|
||||
@@ -434,5 +543,10 @@ async def admin_rename_permission(payload: dict = Body(...), auth=Cookie(None)):
|
||||
async def admin_delete_permission(permission_id: str, auth=Cookie(None)):
|
||||
await authz.verify(auth, ["auth:admin"])
|
||||
querysafe.assert_safe(permission_id, field="permission_id")
|
||||
|
||||
# Sanity check: prevent deleting critical permissions
|
||||
if permission_id == "auth:admin":
|
||||
raise ValueError("Cannot delete the master admin permission")
|
||||
|
||||
await db.instance.delete_permission(permission_id)
|
||||
return {"status": "ok"}
|
||||
|
||||
@@ -193,10 +193,9 @@ async def api_user_info(reset: str | None = None, auth=Cookie(None)):
|
||||
}
|
||||
effective_permissions = [p.id for p in (ctx.permissions or [])]
|
||||
is_global_admin = "auth:admin" in (role_info["permissions"] or [])
|
||||
if org_info:
|
||||
is_org_admin = f"auth:org:{org_info['uuid']}" in (
|
||||
role_info["permissions"] or []
|
||||
)
|
||||
is_org_admin = any(
|
||||
p.startswith("auth:org:") for p in (role_info["permissions"] or [])
|
||||
)
|
||||
|
||||
return {
|
||||
"authenticated": True,
|
||||
|
||||
Reference in New Issue
Block a user