cista-storage/cista/config.py

158 lines
4.2 KiB
Python
Raw Normal View History

from __future__ import annotations
import secrets
from functools import wraps
from hashlib import sha256
from pathlib import Path, PurePath
from time import time
import msgspec
class Config(msgspec.Struct):
path: Path
listen: str
secret: str = secrets.token_hex(12)
public: bool = False
users: dict[str, User] = {}
links: dict[str, Link] = {}
class User(msgspec.Struct, omit_defaults=True):
privileged: bool = False
hash: str = ""
lastSeen: int = 0 # noqa: N815
class Link(msgspec.Struct, omit_defaults=True):
location: str
creator: str = ""
expires: int = 0
config = None
conffile = Path.home() / ".local/share/cista/db.toml"
def derived_secret(*params, len=8) -> bytes:
"""Used to derive secret keys from the main secret"""
# Each part is made the same length by hashing first
combined = b"".join(
sha256(p if isinstance(p, bytes) else f"{p}".encode()).digest()
for p in [config.secret, *params]
)
# Output a bytes of the desired length
return sha256(combined).digest()[:len]
def enc_hook(obj):
if isinstance(obj, PurePath):
return obj.as_posix()
raise TypeError
def dec_hook(typ, obj):
if typ is Path:
return Path(obj)
raise TypeError
def config_update(modify):
global config
if not conffile.exists():
conffile.parent.mkdir(parents=True, exist_ok=True)
tmpname = conffile.with_suffix(".tmp")
try:
f = tmpname.open("xb")
except FileExistsError:
if tmpname.stat().st_mtime < time() - 1:
tmpname.unlink()
return "collision"
try:
# Load, modify and save with atomic replace
try:
old = conffile.read_bytes()
c = msgspec.toml.decode(old, type=Config, dec_hook=dec_hook)
except FileNotFoundError:
# No existing config file, make sure we have a folder...
confdir = conffile.parent
confdir.mkdir(parents=True, exist_ok=True)
confdir.chmod(0o700)
old = b""
c = None
c = modify(c)
new = msgspec.toml.encode(c, enc_hook=enc_hook)
if old == new:
f.close()
tmpname.unlink()
config = c
return "read"
f.write(new)
f.close()
tmpname.rename(conffile) # Atomic replace
except:
f.close()
tmpname.unlink()
raise
config = c
return "modified" if old else "created"
def modifies_config(modify):
"""Decorator for functions that modify the config file"""
@wraps(modify)
def wrapper(*args, **kwargs):
def m(c):
return modify(c, *args, **kwargs)
# Retry modification in case of write collision
while (c := config_update(m)) == "collision":
time.sleep(0.01)
return c
return wrapper
def load_config():
global config
config = msgspec.toml.decode(conffile.read_bytes(), type=Config, dec_hook=dec_hook)
@modifies_config
def update_config(conf: Config, changes: dict) -> Config:
"""Create/update the config with new values, respecting changes done by others."""
# Encode into dict, update values with new, convert to Config
settings = {} if conf is None else msgspec.to_builtins(conf, enc_hook=enc_hook)
settings.update(changes)
return msgspec.convert(settings, Config, dec_hook=dec_hook)
@modifies_config
def update_user(conf: Config, name: str, changes: dict) -> Config:
"""Create/update a user with new values, respecting changes done by others."""
# Encode into dict, update values with new, convert to Config
try:
u = conf.users[name].__copy__()
except KeyError:
u = User()
if "password" in changes:
from . import auth
auth.set_password(u, changes["password"])
del changes["password"]
udict = msgspec.to_builtins(u, enc_hook=enc_hook)
udict.update(changes)
settings = msgspec.to_builtins(conf, enc_hook=enc_hook)
settings["users"][name] = msgspec.convert(udict, User, dec_hook=dec_hook)
return msgspec.convert(settings, Config, dec_hook=dec_hook)
@modifies_config
def del_user(conf: Config, name: str) -> Config:
"""Delete named user account."""
ret = conf.__copy__()
ret.users.pop(name)
return ret