Leo Vasanko
fb03fa5430
Server name may be set in config file. If unset, backend uses the folder name being served. This is shown in page title for site root, and subfolder names are also now shown. New icon of Droppy icon, changing only the color. Reviewed-on: #2
158 lines
4.3 KiB
Python
158 lines
4.3 KiB
Python
from __future__ import annotations
|
|
|
|
import secrets
|
|
from functools import wraps
|
|
from hashlib import sha256
|
|
from pathlib import Path, PurePath
|
|
from time import time
|
|
|
|
import msgspec
|
|
|
|
|
|
class Config(msgspec.Struct):
|
|
path: Path
|
|
listen: str
|
|
secret: str = secrets.token_hex(12)
|
|
public: bool = False
|
|
name: str = ""
|
|
users: dict[str, User] = {}
|
|
links: dict[str, Link] = {}
|
|
|
|
|
|
class User(msgspec.Struct, omit_defaults=True):
|
|
privileged: bool = False
|
|
hash: str = ""
|
|
lastSeen: int = 0 # noqa: N815
|
|
|
|
|
|
class Link(msgspec.Struct, omit_defaults=True):
|
|
location: str
|
|
creator: str = ""
|
|
expires: int = 0
|
|
|
|
|
|
config = None
|
|
conffile = Path.home() / ".local/share/cista/db.toml"
|
|
|
|
|
|
def derived_secret(*params, len=8) -> bytes:
|
|
"""Used to derive secret keys from the main secret"""
|
|
# Each part is made the same length by hashing first
|
|
combined = b"".join(
|
|
sha256(p if isinstance(p, bytes) else f"{p}".encode()).digest()
|
|
for p in [config.secret, *params]
|
|
)
|
|
# Output a bytes of the desired length
|
|
return sha256(combined).digest()[:len]
|
|
|
|
|
|
def enc_hook(obj):
|
|
if isinstance(obj, PurePath):
|
|
return obj.as_posix()
|
|
raise TypeError
|
|
|
|
|
|
def dec_hook(typ, obj):
|
|
if typ is Path:
|
|
return Path(obj)
|
|
raise TypeError
|
|
|
|
|
|
def config_update(modify):
|
|
global config
|
|
if not conffile.exists():
|
|
conffile.parent.mkdir(parents=True, exist_ok=True)
|
|
tmpname = conffile.with_suffix(".tmp")
|
|
try:
|
|
f = tmpname.open("xb")
|
|
except FileExistsError:
|
|
if tmpname.stat().st_mtime < time() - 1:
|
|
tmpname.unlink()
|
|
return "collision"
|
|
try:
|
|
# Load, modify and save with atomic replace
|
|
try:
|
|
old = conffile.read_bytes()
|
|
c = msgspec.toml.decode(old, type=Config, dec_hook=dec_hook)
|
|
except FileNotFoundError:
|
|
# No existing config file, make sure we have a folder...
|
|
confdir = conffile.parent
|
|
confdir.mkdir(parents=True, exist_ok=True)
|
|
confdir.chmod(0o700)
|
|
old = b""
|
|
c = None
|
|
c = modify(c)
|
|
new = msgspec.toml.encode(c, enc_hook=enc_hook)
|
|
if old == new:
|
|
f.close()
|
|
tmpname.unlink()
|
|
config = c
|
|
return "read"
|
|
f.write(new)
|
|
f.close()
|
|
tmpname.rename(conffile) # Atomic replace
|
|
except:
|
|
f.close()
|
|
tmpname.unlink()
|
|
raise
|
|
config = c
|
|
return "modified" if old else "created"
|
|
|
|
|
|
def modifies_config(modify):
|
|
"""Decorator for functions that modify the config file"""
|
|
|
|
@wraps(modify)
|
|
def wrapper(*args, **kwargs):
|
|
def m(c):
|
|
return modify(c, *args, **kwargs)
|
|
|
|
# Retry modification in case of write collision
|
|
while (c := config_update(m)) == "collision":
|
|
time.sleep(0.01)
|
|
return c
|
|
|
|
return wrapper
|
|
|
|
|
|
def load_config():
|
|
global config
|
|
config = msgspec.toml.decode(conffile.read_bytes(), type=Config, dec_hook=dec_hook)
|
|
|
|
|
|
@modifies_config
|
|
def update_config(conf: Config, changes: dict) -> Config:
|
|
"""Create/update the config with new values, respecting changes done by others."""
|
|
# Encode into dict, update values with new, convert to Config
|
|
settings = {} if conf is None else msgspec.to_builtins(conf, enc_hook=enc_hook)
|
|
settings.update(changes)
|
|
return msgspec.convert(settings, Config, dec_hook=dec_hook)
|
|
|
|
|
|
@modifies_config
|
|
def update_user(conf: Config, name: str, changes: dict) -> Config:
|
|
"""Create/update a user with new values, respecting changes done by others."""
|
|
# Encode into dict, update values with new, convert to Config
|
|
try:
|
|
u = conf.users[name].__copy__()
|
|
except KeyError:
|
|
u = User()
|
|
if "password" in changes:
|
|
from . import auth
|
|
|
|
auth.set_password(u, changes["password"])
|
|
del changes["password"]
|
|
udict = msgspec.to_builtins(u, enc_hook=enc_hook)
|
|
udict.update(changes)
|
|
settings = msgspec.to_builtins(conf, enc_hook=enc_hook)
|
|
settings["users"][name] = msgspec.convert(udict, User, dec_hook=dec_hook)
|
|
return msgspec.convert(settings, Config, dec_hook=dec_hook)
|
|
|
|
|
|
@modifies_config
|
|
def del_user(conf: Config, name: str) -> Config:
|
|
"""Delete named user account."""
|
|
ret = conf.__copy__()
|
|
ret.users.pop(name)
|
|
return ret
|