135 lines
3.4 KiB
Python
Executable File
135 lines
3.4 KiB
Python
Executable File
from __future__ import annotations
|
|
|
|
import shutil
|
|
|
|
import msgspec
|
|
from sanic import BadRequest
|
|
|
|
from cista import config
|
|
from cista.fileio import sanitize_filename
|
|
|
|
## Control commands
|
|
|
|
class ControlBase(msgspec.Struct, tag_field="op", tag=str.lower):
|
|
def __call__(self):
|
|
raise NotImplementedError
|
|
|
|
class MkDir(ControlBase):
|
|
path: str
|
|
def __call__(self):
|
|
path = config.config.path / sanitize_filename(self.path)
|
|
path.mkdir(parents=False, exist_ok=False)
|
|
|
|
class Rename(ControlBase):
|
|
path: str
|
|
to: str
|
|
def __call__(self):
|
|
to = sanitize_filename(self.to)
|
|
if "/" in to:
|
|
raise BadRequest("Rename 'to' name should only contain filename, not path")
|
|
path = config.config.path / sanitize_filename(self.path)
|
|
path.rename(path.with_name(to))
|
|
|
|
class Rm(ControlBase):
|
|
sel: list[str]
|
|
def __call__(self):
|
|
root = config.config.path
|
|
sel = [root / sanitize_filename(p) for p in self.sel]
|
|
for p in sel:
|
|
shutil.rmtree(p, ignore_errors=True)
|
|
|
|
class Mv(ControlBase):
|
|
sel: list[str]
|
|
dst: str
|
|
def __call__(self):
|
|
root = config.config.path
|
|
sel = [root / sanitize_filename(p) for p in self.sel]
|
|
dst = root / sanitize_filename(self.dst)
|
|
if not dst.is_dir():
|
|
raise BadRequest("The destination must be a directory")
|
|
for p in sel:
|
|
shutil.move(p, dst)
|
|
|
|
class Cp(ControlBase):
|
|
sel: list[str]
|
|
dst: str
|
|
def __call__(self):
|
|
root = config.config.path
|
|
sel = [root / sanitize_filename(p) for p in self.sel]
|
|
dst = root / sanitize_filename(self.dst)
|
|
if not dst.is_dir():
|
|
raise BadRequest("The destination must be a directory")
|
|
for p in sel:
|
|
# Note: copies as dst rather than in dst unless name is appended.
|
|
shutil.copytree(p, dst / p.name, dirs_exist_ok=True, ignore_dangling_symlinks=True)
|
|
|
|
## File uploads and downloads
|
|
|
|
class FileRange(msgspec.Struct):
|
|
name: str
|
|
size: int
|
|
start: int
|
|
end: int
|
|
|
|
class ErrorMsg(msgspec.Struct):
|
|
error: str
|
|
req: FileRange
|
|
|
|
class StatusMsg(msgspec.Struct):
|
|
status: str
|
|
req: FileRange
|
|
|
|
|
|
## Directory listings
|
|
|
|
class FileEntry(msgspec.Struct):
|
|
size: int
|
|
mtime: int
|
|
|
|
class DirEntry(msgspec.Struct):
|
|
size: int
|
|
mtime: int
|
|
dir: DirList
|
|
|
|
def __getitem__(self, name):
|
|
return self.dir[name]
|
|
|
|
def __setitem__(self, name, value):
|
|
self.dir[name] = value
|
|
|
|
def __contains__(self, name):
|
|
return name in self.dir
|
|
|
|
def __delitem__(self, name):
|
|
del self.dir[name]
|
|
|
|
@property
|
|
def props(self):
|
|
return {
|
|
k: v
|
|
for k, v in self.__struct_fields__
|
|
if k != "dir"
|
|
}
|
|
|
|
DirList = dict[str, FileEntry | DirEntry]
|
|
|
|
|
|
class UpdateEntry(msgspec.Struct, omit_defaults=True):
|
|
"""Updates the named entry in the tree. Fields that are set replace old values. A list of entries recurses directories."""
|
|
name: str = ""
|
|
deleted: bool = False
|
|
size: int | None = None
|
|
mtime: int | None = None
|
|
dir: DirList | None = None
|
|
|
|
def make_dir_data(root):
|
|
if len(root) == 2:
|
|
return FileEntry(*root)
|
|
size, mtime, listing = root
|
|
converted = {}
|
|
for name, data in listing.items():
|
|
converted[name] = make_dir_data(data)
|
|
sz = sum(x.size for x in converted.values())
|
|
mt = max(x.mtime for x in converted.values())
|
|
return DirEntry(sz, max(mt, mtime), converted)
|