Centralised error handling & convenience.

This commit is contained in:
Leo Vasanko 2025-08-06 10:44:57 -06:00
parent 42545c07d2
commit c9ae53ef79
3 changed files with 94 additions and 113 deletions

View File

@ -30,78 +30,67 @@ def register_api_routes(app: FastAPI):
@app.post("/auth/validate")
async def validate_token(response: Response, auth=Cookie(None)):
"""Lightweight token validation endpoint."""
try:
s = await get_session(auth)
return {
"valid": True,
"user_uuid": str(s.user_uuid),
}
except ValueError:
response.status_code = 401
return {"valid": False}
s = await get_session(auth)
return {
"valid": True,
"user_uuid": str(s.user_uuid),
}
@app.post("/auth/user-info")
async def api_user_info(response: Response, auth=Cookie(None)):
"""Get full user information for the authenticated user."""
try:
reset = passphrase.is_well_formed(auth)
s = await (get_reset if reset else get_session)(auth)
u = await db.instance.get_user_by_uuid(s.user_uuid)
# Get all credentials for the user
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
reset = passphrase.is_well_formed(auth)
s = await (get_reset if reset else get_session)(auth)
u = await db.instance.get_user_by_uuid(s.user_uuid)
# Get all credentials for the user
credential_ids = await db.instance.get_credentials_by_user_uuid(s.user_uuid)
credentials = []
user_aaguids = set()
credentials = []
user_aaguids = set()
for cred_id in credential_ids:
c = await db.instance.get_credential_by_id(cred_id)
for cred_id in credential_ids:
c = await db.instance.get_credential_by_id(cred_id)
# Convert AAGUID to string format
aaguid_str = str(c.aaguid)
user_aaguids.add(aaguid_str)
# Convert AAGUID to string format
aaguid_str = str(c.aaguid)
user_aaguids.add(aaguid_str)
# Check if this is the current session credential
is_current_session = s.credential_uuid == c.uuid
# Check if this is the current session credential
is_current_session = s.credential_uuid == c.uuid
credentials.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat()
if c.last_verified
else None,
"sign_count": c.sign_count,
"is_current_session": is_current_session,
}
)
credentials.append(
{
"credential_uuid": str(c.uuid),
"aaguid": aaguid_str,
"created_at": c.created_at.isoformat(),
"last_used": c.last_used.isoformat() if c.last_used else None,
"last_verified": c.last_verified.isoformat()
if c.last_verified
else None,
"sign_count": c.sign_count,
"is_current_session": is_current_session,
}
)
# Get AAGUID information for only the AAGUIDs that the user has
aaguid_info = aaguid.filter(user_aaguids)
# Get AAGUID information for only the AAGUIDs that the user has
aaguid_info = aaguid.filter(user_aaguids)
# Sort credentials by creation date (earliest first, most recently created last)
credentials.sort(key=lambda cred: cred["created_at"])
# Sort credentials by creation date (earliest first, most recently created last)
credentials.sort(key=lambda cred: cred["created_at"])
return {
"authenticated": not reset,
"session_type": s.info["type"],
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
"created_at": u.created_at.isoformat() if u.created_at else None,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
"visits": u.visits,
},
"credentials": credentials,
"aaguid_info": aaguid_info,
}
except ValueError as e:
response.status_code = 400
return {"detail": f"Failed to get user info: {e}"}
except Exception:
response.status_code = 500
return {"detail": "Failed to get user info"}
return {
"authenticated": not reset,
"session_type": s.info["type"],
"user": {
"user_uuid": str(u.uuid),
"user_name": u.display_name,
"created_at": u.created_at.isoformat() if u.created_at else None,
"last_seen": u.last_seen.isoformat() if u.last_seen else None,
"visits": u.visits,
},
"credentials": credentials,
"aaguid_info": aaguid_info,
}
@app.post("/auth/logout")
async def api_logout(response: Response, auth=Cookie(None)):
@ -119,36 +108,20 @@ def register_api_routes(app: FastAPI):
@app.post("/auth/set-session")
async def api_set_session(response: Response, auth=Depends(bearer_auth)):
"""Set session cookie from Authorization header. Fetched after login by WebSocket."""
try:
user = await get_session(auth.credentials)
if not user:
raise ValueError("Invalid Authorization header.")
session.set_session_cookie(response, auth.credentials)
user = await get_session(auth.credentials)
if not user:
raise ValueError("Invalid Authorization header.")
session.set_session_cookie(response, auth.credentials)
return {
"message": "Session cookie set successfully",
"user_uuid": str(user.user_uuid),
}
except ValueError as e:
response.status_code = 400
return {"detail": str(e)}
except Exception:
response.status_code = 500
return {"detail": "Failed to set session"}
return {
"message": "Session cookie set successfully",
"user_uuid": str(user.user_uuid),
}
@app.delete("/auth/credential/{uuid}")
async def api_delete_credential(
response: Response, uuid: UUID, auth: str = Cookie(None)
):
"""Delete a specific credential for the current user."""
try:
await delete_credential(uuid, auth)
return {"message": "Credential deleted successfully"}
except ValueError as e:
response.status_code = 400
return {"detail": str(e)}
except Exception:
response.status_code = 500
return {"detail": "Failed to delete credential"}
await delete_credential(uuid, auth)
return {"message": "Credential deleted successfully"}

View File

@ -1,9 +1,10 @@
import contextlib
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import Cookie, FastAPI, Request, Response
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from ..authsession import get_session
@ -30,6 +31,21 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
# Global exception handlers
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
"""Handle ValueError exceptions globally with 400 status code."""
return JSONResponse(status_code=400, content={"detail": str(exc)})
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle all other exceptions globally with 500 status code."""
logging.exception("Internal Server Error")
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
# Mount the WebSocket subapp
app.mount("/auth/ws", ws.app)

View File

@ -15,35 +15,27 @@ def register_reset_routes(app):
@app.post("/auth/create-link")
async def api_create_link(request: Request, response: Response, auth=Cookie(None)):
"""Create a device addition link for the authenticated user."""
try:
# Require authentication
s = await get_session(auth)
# Require authentication
s = await get_session(auth)
# Generate a human-readable token
token = passphrase.generate() # e.g., "cross.rotate.yin.note.evoke"
await db.instance.create_session(
user_uuid=s.user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info=session.infodict(request, "device addition"),
)
# Generate a human-readable token
token = passphrase.generate() # e.g., "cross.rotate.yin.note.evoke"
await db.instance.create_session(
user_uuid=s.user_uuid,
key=tokens.reset_key(token),
expires=expires(),
info=session.infodict(request, "device addition"),
)
# Generate the device addition link with pretty URL
path = request.url.path.removesuffix("create-link") + token
url = f"{request.headers['origin']}{path}"
# Generate the device addition link with pretty URL
path = request.url.path.removesuffix("create-link") + token
url = f"{request.headers['origin']}{path}"
return {
"message": "Registration link generated successfully",
"url": url,
"expires": expires().isoformat(),
}
except ValueError:
response.status_code = 401
return {"detail": "Authentication required"}
except Exception as e:
response.status_code = 500
return {"detail": f"Failed to create registration link: {str(e)}"}
return {
"message": "Registration link generated successfully",
"url": url,
"expires": expires().isoformat(),
}
@app.get("/auth/{reset_token}")
async def reset_authentication(