157 lines
4.2 KiB
Python
157 lines
4.2 KiB
Python
from __future__ import annotations
|
|
|
|
import secrets
|
|
from functools import wraps
|
|
from hashlib import sha256
|
|
from pathlib import Path, PurePath
|
|
from time import time
|
|
|
|
import msgspec
|
|
|
|
|
|
class Config(msgspec.Struct):
|
|
path: Path
|
|
listen: str
|
|
secret: str = secrets.token_hex(12)
|
|
public: bool = False
|
|
users: dict[str, User] = {}
|
|
links: dict[str, Link] = {}
|
|
|
|
|
|
class User(msgspec.Struct, omit_defaults=True):
|
|
privileged: bool = False
|
|
hash: str = ""
|
|
lastSeen: int = 0 # noqa: N815
|
|
|
|
|
|
class Link(msgspec.Struct, omit_defaults=True):
|
|
location: str
|
|
creator: str = ""
|
|
expires: int = 0
|
|
|
|
|
|
config = None
|
|
conffile = Path.home() / ".local/share/cista/db.toml"
|
|
|
|
|
|
def derived_secret(*params, len=8) -> bytes:
|
|
"""Used to derive secret keys from the main secret"""
|
|
# Each part is made the same length by hashing first
|
|
combined = b"".join(
|
|
sha256(p if isinstance(p, bytes) else f"{p}".encode()).digest()
|
|
for p in [config.secret, *params]
|
|
)
|
|
# Output a bytes of the desired length
|
|
return sha256(combined).digest()[:len]
|
|
|
|
|
|
def enc_hook(obj):
|
|
if isinstance(obj, PurePath):
|
|
return obj.as_posix()
|
|
raise TypeError
|
|
|
|
|
|
def dec_hook(typ, obj):
|
|
if typ is Path:
|
|
return Path(obj)
|
|
raise TypeError
|
|
|
|
|
|
def config_update(modify):
|
|
global config
|
|
if not conffile.exists():
|
|
conffile.parent.mkdir(parents=True, exist_ok=True)
|
|
tmpname = conffile.with_suffix(".tmp")
|
|
try:
|
|
f = tmpname.open("xb")
|
|
except FileExistsError:
|
|
if tmpname.stat().st_mtime < time() - 1:
|
|
tmpname.unlink()
|
|
return "collision"
|
|
try:
|
|
# Load, modify and save with atomic replace
|
|
try:
|
|
old = conffile.read_bytes()
|
|
c = msgspec.toml.decode(old, type=Config, dec_hook=dec_hook)
|
|
except FileNotFoundError:
|
|
# No existing config file, make sure we have a folder...
|
|
confdir = conffile.parent
|
|
confdir.mkdir(parents=True, exist_ok=True)
|
|
confdir.chmod(0o700)
|
|
old = b""
|
|
c = None
|
|
c = modify(c)
|
|
new = msgspec.toml.encode(c, enc_hook=enc_hook)
|
|
if old == new:
|
|
f.close()
|
|
tmpname.unlink()
|
|
config = c
|
|
return "read"
|
|
f.write(new)
|
|
f.close()
|
|
tmpname.rename(conffile) # Atomic replace
|
|
except:
|
|
f.close()
|
|
tmpname.unlink()
|
|
raise
|
|
config = c
|
|
return "modified" if old else "created"
|
|
|
|
|
|
def modifies_config(modify):
|
|
"""Decorator for functions that modify the config file"""
|
|
|
|
@wraps(modify)
|
|
def wrapper(*args, **kwargs):
|
|
def m(c):
|
|
return modify(c, *args, **kwargs)
|
|
|
|
# Retry modification in case of write collision
|
|
while (c := config_update(m)) == "collision":
|
|
time.sleep(0.01)
|
|
return c
|
|
|
|
return wrapper
|
|
|
|
|
|
def load_config():
|
|
global config
|
|
config = msgspec.toml.decode(conffile.read_bytes(), type=Config, dec_hook=dec_hook)
|
|
|
|
|
|
@modifies_config
|
|
def update_config(conf: Config, changes: dict) -> Config:
|
|
"""Create/update the config with new values, respecting changes done by others."""
|
|
# Encode into dict, update values with new, convert to Config
|
|
settings = {} if conf is None else msgspec.to_builtins(conf, enc_hook=enc_hook)
|
|
settings.update(changes)
|
|
return msgspec.convert(settings, Config, dec_hook=dec_hook)
|
|
|
|
|
|
@modifies_config
|
|
def update_user(conf: Config, name: str, changes: dict) -> Config:
|
|
"""Create/update a user with new values, respecting changes done by others."""
|
|
# Encode into dict, update values with new, convert to Config
|
|
try:
|
|
u = conf.users[name].__copy__()
|
|
except KeyError:
|
|
u = User()
|
|
if "password" in changes:
|
|
from . import auth
|
|
|
|
auth.set_password(u, changes["password"])
|
|
del changes["password"]
|
|
udict = msgspec.to_builtins(u, enc_hook=enc_hook)
|
|
udict.update(changes)
|
|
settings = msgspec.to_builtins(conf, enc_hook=enc_hook)
|
|
settings["users"][name] = msgspec.convert(udict, User, dec_hook=dec_hook)
|
|
return msgspec.convert(settings, Config, dec_hook=dec_hook)
|
|
|
|
|
|
@modifies_config
|
|
def del_user(conf: Config, name: str) -> Config:
|
|
"""Delete named user account."""
|
|
ret = conf.__copy__()
|
|
ret.users.pop(name)
|
|
return ret
|