Fixing cascade.

This commit is contained in:
Leo Vasanko
2025-08-30 14:07:32 -06:00
parent f3e3679b6d
commit 4f094a7016
6 changed files with 209 additions and 17 deletions

View File

@@ -242,6 +242,18 @@ class DatabaseInterface(ABC):
async def delete_permission(self, permission_id: str) -> None:
"""Delete permission by ID."""
@abstractmethod
async def rename_permission(
self, old_id: str, new_id: str, display_name: str
) -> None:
"""Rename a permission's ID (and display name) updating all references.
This must update:
- permissions.id (primary key)
- org_permissions.permission_id
- role_permissions.permission_id
"""
@abstractmethod
async def add_permission_to_organization(
self, org_id: str, permission_id: str

View File

@@ -16,6 +16,7 @@ from sqlalchemy import (
LargeBinary,
String,
delete,
event,
select,
update,
)
@@ -226,6 +227,18 @@ class DB(DatabaseInterface):
def __init__(self, db_path: str = DB_PATH):
"""Initialize with database path."""
self.engine = create_async_engine(db_path, echo=False)
# Ensure SQLite foreign key enforcement is ON for every new connection
if db_path.startswith("sqlite"):
@event.listens_for(self.engine.sync_engine, "connect")
def _fk_on(dbapi_connection, connection_record): # type: ignore
try:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON;")
cursor.close()
except Exception:
pass
self.async_session_factory = async_sessionmaker(
self.engine, expire_on_commit=False
)
@@ -750,6 +763,63 @@ class DB(DatabaseInterface):
)
await session.execute(stmt)
async def rename_permission(
self, old_id: str, new_id: str, display_name: str
) -> None:
"""Rename a permission's primary key and update referencing tables.
Approach: insert new row (if id changes), update FKs, delete old row.
Wrapped in a transaction; will raise on conflict.
"""
if old_id == new_id:
# Just update display name
async with self.session() as session:
stmt = (
update(PermissionModel)
.where(PermissionModel.id == old_id)
.values(display_name=display_name)
)
await session.execute(stmt)
return
async with self.session() as session:
# Ensure old exists
existing_old = await session.execute(
select(PermissionModel).where(PermissionModel.id == old_id)
)
if not existing_old.scalar_one_or_none():
raise ValueError("Original permission not found")
# Check new not taken
existing_new = await session.execute(
select(PermissionModel).where(PermissionModel.id == new_id)
)
if existing_new.scalar_one_or_none():
raise ValueError("New permission id already exists")
# Create new permission row first
session.add(PermissionModel(id=new_id, display_name=display_name))
await session.flush()
# Update org_permissions
await session.execute(
update(OrgPermission)
.where(OrgPermission.permission_id == old_id)
.values(permission_id=new_id)
)
await session.flush()
# Update role_permissions
await session.execute(
update(RolePermission)
.where(RolePermission.permission_id == old_id)
.values(permission_id=new_id)
)
await session.flush()
# Delete old permission row
await session.execute(
delete(PermissionModel).where(PermissionModel.id == old_id)
)
await session.flush()
async def delete_permission(self, permission_id: str) -> None:
async with self.session() as session:
stmt = delete(PermissionModel).where(PermissionModel.id == permission_id)