Completely revamped file list format. Fixes to zip creation.
This commit is contained in:
parent
a70549e6ec
commit
ef5e37187d
|
@ -104,11 +104,11 @@ async def watch(req, ws):
|
||||||
)
|
)
|
||||||
uuid = token_bytes(16)
|
uuid = token_bytes(16)
|
||||||
try:
|
try:
|
||||||
with watching.tree_lock:
|
with watching.state.lock:
|
||||||
q = watching.pubsub[uuid] = asyncio.Queue()
|
q = watching.pubsub[uuid] = asyncio.Queue()
|
||||||
# Init with disk usage and full tree
|
# Init with disk usage and full tree
|
||||||
await ws.send(watching.format_du())
|
await ws.send(watching.format_du(watching.state.space))
|
||||||
await ws.send(watching.format_tree())
|
await ws.send(watching.format_tree(watching.state.root))
|
||||||
# Send updates
|
# Send updates
|
||||||
while True:
|
while True:
|
||||||
await ws.send(await q.get())
|
await ws.send(await q.get())
|
||||||
|
|
72
cista/app.py
72
cista/app.py
|
@ -1,10 +1,8 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import datetime
|
import datetime
|
||||||
import mimetypes
|
import mimetypes
|
||||||
from collections import deque
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from importlib.resources import files
|
from pathlib import Path, PurePath, PurePosixPath
|
||||||
from pathlib import Path, PurePath
|
|
||||||
from stat import S_IFDIR, S_IFREG
|
from stat import S_IFDIR, S_IFREG
|
||||||
from urllib.parse import unquote
|
from urllib.parse import unquote
|
||||||
from wsgiref.handlers import format_date_time
|
from wsgiref.handlers import format_date_time
|
||||||
|
@ -12,15 +10,13 @@ from wsgiref.handlers import format_date_time
|
||||||
import brotli
|
import brotli
|
||||||
import sanic.helpers
|
import sanic.helpers
|
||||||
from blake3 import blake3
|
from blake3 import blake3
|
||||||
from natsort import natsorted, ns
|
|
||||||
from sanic import Blueprint, Sanic, empty, raw
|
from sanic import Blueprint, Sanic, empty, raw
|
||||||
from sanic.exceptions import Forbidden, NotFound, ServerError, ServiceUnavailable
|
from sanic.exceptions import Forbidden, NotFound, ServerError
|
||||||
from sanic.log import logging
|
from sanic.log import logging
|
||||||
from stream_zip import ZIP_AUTO, stream_zip
|
from stream_zip import ZIP_AUTO, stream_zip
|
||||||
|
|
||||||
from cista import auth, config, session, watching
|
from cista import auth, config, session, watching
|
||||||
from cista.api import bp
|
from cista.api import bp
|
||||||
from cista.protocol import DirEntry, DirList
|
|
||||||
from cista.util.apphelpers import handle_sanic_exception
|
from cista.util.apphelpers import handle_sanic_exception
|
||||||
|
|
||||||
# Workaround until Sanic PR #2824 is merged
|
# Workaround until Sanic PR #2824 is merged
|
||||||
|
@ -186,28 +182,26 @@ async def wwwroot(req, path=""):
|
||||||
return raw(data, headers=headers)
|
return raw(data, headers=headers)
|
||||||
|
|
||||||
|
|
||||||
def get_files(wanted: set) -> list:
|
def get_files(wanted: set) -> list[tuple[PurePosixPath, Path]]:
|
||||||
if not isinstance(watching.tree[""], DirEntry):
|
loc = PurePosixPath()
|
||||||
raise ServiceUnavailable(headers={"retry-after": 1})
|
idx = 0
|
||||||
root = Path(watching.rootpath)
|
ret = []
|
||||||
with watching.tree_lock:
|
level: int | None = None
|
||||||
q: deque[tuple[list[str], None | list[str], DirList]] = deque(
|
parent: PurePosixPath | None = None
|
||||||
[([], None, watching.tree[""].dir)]
|
with watching.state.lock:
|
||||||
)
|
root = watching.state.root
|
||||||
while q:
|
while idx < len(root):
|
||||||
locpar, relpar, d = q.pop()
|
f = root[idx]
|
||||||
for name, attr in d.items():
|
loc = PurePosixPath(*loc.parts[: f.level - 1]) / f.name
|
||||||
loc = [*locpar, name]
|
if parent is not None and f.level <= level:
|
||||||
rel = None
|
level = parent = None
|
||||||
if relpar or attr.key in wanted:
|
if f.key in wanted:
|
||||||
rel = [*relpar, name] if relpar else [name]
|
level, parent = f.level, loc.parent
|
||||||
wanted.discard(attr.key)
|
if parent is not None:
|
||||||
isdir = isinstance(attr, DirEntry)
|
wanted.discard(f.key)
|
||||||
if isdir:
|
ret.append((loc.relative_to(parent), watching.rootpath / loc))
|
||||||
q.append((loc, rel, attr.dir))
|
idx += 1
|
||||||
if rel:
|
return ret
|
||||||
files.append(("/".join(rel), root.joinpath(*loc)))
|
|
||||||
return natsorted(files, key=lambda f: "/".join(f[0]), alg=ns.IGNORECASE)
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/zip/<keys>/<zipfile:ext=zip>")
|
@app.get("/zip/<keys>/<zipfile:ext=zip>")
|
||||||
|
@ -220,7 +214,7 @@ async def zip_download(req, keys, zipfile, ext):
|
||||||
if not files:
|
if not files:
|
||||||
raise NotFound(
|
raise NotFound(
|
||||||
"No files found",
|
"No files found",
|
||||||
context={"keys": keys, "zipfile": zipfile, "wanted": wanted},
|
context={"keys": keys, "zipfile": f"{zipfile}.{ext}", "wanted": wanted},
|
||||||
)
|
)
|
||||||
if wanted:
|
if wanted:
|
||||||
raise NotFound("Files not found", context={"missing": wanted})
|
raise NotFound("Files not found", context={"missing": wanted})
|
||||||
|
@ -230,20 +224,25 @@ async def zip_download(req, keys, zipfile, ext):
|
||||||
s = p.stat()
|
s = p.stat()
|
||||||
size = s.st_size
|
size = s.st_size
|
||||||
modified = datetime.datetime.fromtimestamp(s.st_mtime, datetime.UTC)
|
modified = datetime.datetime.fromtimestamp(s.st_mtime, datetime.UTC)
|
||||||
|
name = rel.as_posix()
|
||||||
if p.is_dir():
|
if p.is_dir():
|
||||||
yield rel, modified, S_IFDIR | 0o755, ZIP_AUTO(size), b""
|
yield f"{name}/", modified, S_IFDIR | 0o755, ZIP_AUTO(size), iter(b"")
|
||||||
else:
|
else:
|
||||||
yield rel, modified, S_IFREG | 0o644, ZIP_AUTO(size), contents(p)
|
yield name, modified, S_IFREG | 0o644, ZIP_AUTO(size), contents(p, size)
|
||||||
|
|
||||||
def contents(name):
|
def contents(name, size):
|
||||||
with name.open("rb") as f:
|
with name.open("rb") as f:
|
||||||
while chunk := f.read(65536):
|
while size > 0 and (chunk := f.read(min(size, 1 << 20))):
|
||||||
|
size -= len(chunk)
|
||||||
yield chunk
|
yield chunk
|
||||||
|
assert size == 0
|
||||||
|
|
||||||
def worker():
|
def worker():
|
||||||
try:
|
try:
|
||||||
|
with open("test2.zip", "wb") as f:
|
||||||
for chunk in stream_zip(local_files(files)):
|
for chunk in stream_zip(local_files(files)):
|
||||||
asyncio.run_coroutine_threadsafe(queue.put(chunk), loop)
|
f.write(chunk)
|
||||||
|
asyncio.run_coroutine_threadsafe(queue.put(chunk), loop).result()
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception("Error streaming ZIP")
|
logging.exception("Error streaming ZIP")
|
||||||
raise
|
raise
|
||||||
|
@ -256,7 +255,10 @@ async def zip_download(req, keys, zipfile, ext):
|
||||||
thread = loop.run_in_executor(app.ctx.threadexec, worker)
|
thread = loop.run_in_executor(app.ctx.threadexec, worker)
|
||||||
|
|
||||||
# Stream the response
|
# Stream the response
|
||||||
res = await req.respond(content_type="application/zip")
|
res = await req.respond(
|
||||||
|
content_type="application/zip",
|
||||||
|
headers={"cache-control": "no-store"},
|
||||||
|
)
|
||||||
while chunk := await queue.get():
|
while chunk := await queue.get():
|
||||||
await res.send(chunk)
|
await res.send(chunk)
|
||||||
|
|
||||||
|
|
|
@ -112,47 +112,36 @@ class ErrorMsg(msgspec.Struct):
|
||||||
## Directory listings
|
## Directory listings
|
||||||
|
|
||||||
|
|
||||||
class FileEntry(msgspec.Struct):
|
class FileEntry(msgspec.Struct, array_like=True):
|
||||||
key: str
|
level: int
|
||||||
size: int
|
|
||||||
mtime: int
|
|
||||||
|
|
||||||
|
|
||||||
class DirEntry(msgspec.Struct):
|
|
||||||
key: str
|
|
||||||
size: int
|
|
||||||
mtime: int
|
|
||||||
dir: DirList
|
|
||||||
|
|
||||||
def __getitem__(self, name):
|
|
||||||
return self.dir[name]
|
|
||||||
|
|
||||||
def __setitem__(self, name, value):
|
|
||||||
self.dir[name] = value
|
|
||||||
|
|
||||||
def __contains__(self, name):
|
|
||||||
return name in self.dir
|
|
||||||
|
|
||||||
def __delitem__(self, name):
|
|
||||||
del self.dir[name]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def props(self):
|
|
||||||
return {k: v for k, v in self.__struct_fields__ if k != "dir"}
|
|
||||||
|
|
||||||
|
|
||||||
DirList = dict[str, FileEntry | DirEntry]
|
|
||||||
|
|
||||||
|
|
||||||
class UpdateEntry(msgspec.Struct, omit_defaults=True):
|
|
||||||
"""Updates the named entry in the tree. Fields that are set replace old values. A list of entries recurses directories."""
|
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
key: str
|
key: str
|
||||||
deleted: bool = False
|
mtime: int
|
||||||
size: int | None = None
|
size: int
|
||||||
mtime: int | None = None
|
isfile: int
|
||||||
dir: DirList | None = None
|
|
||||||
|
|
||||||
|
class Update(msgspec.Struct, array_like=True):
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class UpdKeep(Update, tag="k"):
|
||||||
|
count: int
|
||||||
|
|
||||||
|
|
||||||
|
class UpdDel(Update, tag="d"):
|
||||||
|
count: int
|
||||||
|
|
||||||
|
|
||||||
|
class UpdIns(Update, tag="i"):
|
||||||
|
items: list[FileEntry]
|
||||||
|
|
||||||
|
|
||||||
|
class Space(msgspec.Struct):
|
||||||
|
disk: int
|
||||||
|
free: int
|
||||||
|
usage: int
|
||||||
|
storage: int
|
||||||
|
|
||||||
|
|
||||||
def make_dir_data(root):
|
def make_dir_data(root):
|
||||||
|
|
|
@ -1,20 +1,133 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import shutil
|
import shutil
|
||||||
|
import stat
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
from os import stat_result
|
||||||
from pathlib import Path, PurePosixPath
|
from pathlib import Path, PurePosixPath
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
|
from natsort import humansorted, natsort_keygen, ns
|
||||||
from sanic.log import logging
|
from sanic.log import logging
|
||||||
|
|
||||||
from cista import config
|
from cista import config
|
||||||
from cista.fileio import fuid
|
from cista.fileio import fuid
|
||||||
from cista.protocol import DirEntry, FileEntry, UpdateEntry
|
from cista.protocol import FileEntry, Space, UpdDel, UpdIns, UpdKeep
|
||||||
|
|
||||||
pubsub = {}
|
pubsub = {}
|
||||||
tree = {"": None}
|
sortkey = natsort_keygen(alg=ns.LOCALE)
|
||||||
tree_lock = threading.Lock()
|
|
||||||
|
|
||||||
|
class State:
|
||||||
|
def __init__(self):
|
||||||
|
self.lock = threading.RLock()
|
||||||
|
self._space = Space(0, 0, 0, 0)
|
||||||
|
self._listing: list[FileEntry] = []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def space(self):
|
||||||
|
with self.lock:
|
||||||
|
return self._space
|
||||||
|
|
||||||
|
@space.setter
|
||||||
|
def space(self, space):
|
||||||
|
with self.lock:
|
||||||
|
self._space = space
|
||||||
|
|
||||||
|
@property
|
||||||
|
def root(self) -> list[FileEntry]:
|
||||||
|
with self.lock:
|
||||||
|
return self._listing[:]
|
||||||
|
|
||||||
|
@root.setter
|
||||||
|
def root(self, listing: list[FileEntry]):
|
||||||
|
with self.lock:
|
||||||
|
self._listing = listing
|
||||||
|
|
||||||
|
def _slice(self, idx: PurePosixPath | tuple[PurePosixPath, int]):
|
||||||
|
relpath, relfile = idx if isinstance(idx, tuple) else (idx, 0)
|
||||||
|
begin, end = 0, len(self._listing)
|
||||||
|
level = 0
|
||||||
|
isfile = 0
|
||||||
|
while level < len(relpath.parts):
|
||||||
|
# Enter a subdirectory
|
||||||
|
level += 1
|
||||||
|
begin += 1
|
||||||
|
if level == len(relpath.parts):
|
||||||
|
isfile = relfile
|
||||||
|
name = relpath.parts[level - 1]
|
||||||
|
namesort = sortkey(name)
|
||||||
|
r = self._listing[begin]
|
||||||
|
assert r.level == level
|
||||||
|
# Iterate over items at this level
|
||||||
|
while (
|
||||||
|
begin < end
|
||||||
|
and r.name != name
|
||||||
|
and r.isfile <= isfile
|
||||||
|
and sortkey(r.name) < namesort
|
||||||
|
):
|
||||||
|
# Skip contents
|
||||||
|
begin += 1
|
||||||
|
while begin < end and self._listing[begin].level > level:
|
||||||
|
begin += 1
|
||||||
|
# Not found?
|
||||||
|
if begin == end or self._listing[begin].level < level:
|
||||||
|
return slice(begin, begin)
|
||||||
|
r = self._listing[begin]
|
||||||
|
# Not found?
|
||||||
|
if begin == end or r.name != name:
|
||||||
|
return slice(begin, begin)
|
||||||
|
# Found an item, now find its end
|
||||||
|
for end in range(begin + 1, len(self._listing)):
|
||||||
|
if self._listing[end].level <= level:
|
||||||
|
break
|
||||||
|
return slice(begin, end)
|
||||||
|
|
||||||
|
def __getitem__(self, index: PurePosixPath | tuple[PurePosixPath, int]):
|
||||||
|
with self.lock:
|
||||||
|
print(self._slice(index))
|
||||||
|
return self._listing[self._slice(index)]
|
||||||
|
|
||||||
|
def __setitem__(
|
||||||
|
self, index: tuple[PurePosixPath, int], value: list[FileEntry]
|
||||||
|
) -> None:
|
||||||
|
rel, isfile = index
|
||||||
|
with self.lock:
|
||||||
|
if rel.parts:
|
||||||
|
parent = self._slice(rel.parent)
|
||||||
|
if parent.start == parent.stop:
|
||||||
|
raise ValueError(
|
||||||
|
f"Parent folder {rel.as_posix()} is missing for {rel.name}"
|
||||||
|
)
|
||||||
|
self._listing[self._slice(index)] = value
|
||||||
|
|
||||||
|
def __delitem__(self, relpath: PurePosixPath):
|
||||||
|
with self.lock:
|
||||||
|
del self._listing[self._slice(relpath)]
|
||||||
|
|
||||||
|
def _index(self, rel: PurePosixPath):
|
||||||
|
idx = 0
|
||||||
|
ret = []
|
||||||
|
|
||||||
|
def _dir(self, idx: int):
|
||||||
|
level = self._listing[idx].level + 1
|
||||||
|
end = len(self._listing)
|
||||||
|
idx += 1
|
||||||
|
ret = []
|
||||||
|
while idx < end and (r := self._listing[idx]).level >= level:
|
||||||
|
if r.level == level:
|
||||||
|
ret.append(idx)
|
||||||
|
return ret, idx
|
||||||
|
|
||||||
|
def update(self, rel: PurePosixPath, value: FileEntry):
|
||||||
|
begin = 0
|
||||||
|
parents = []
|
||||||
|
while self._listing[begin].level < len(rel.parts):
|
||||||
|
parents.append(begin)
|
||||||
|
|
||||||
|
|
||||||
|
state = State()
|
||||||
rootpath: Path = None # type: ignore
|
rootpath: Path = None # type: ignore
|
||||||
quit = False
|
quit = False
|
||||||
modified_flags = (
|
modified_flags = (
|
||||||
|
@ -26,23 +139,22 @@ modified_flags = (
|
||||||
"IN_MOVED_FROM",
|
"IN_MOVED_FROM",
|
||||||
"IN_MOVED_TO",
|
"IN_MOVED_TO",
|
||||||
)
|
)
|
||||||
disk_usage = None
|
|
||||||
|
|
||||||
|
|
||||||
def watcher_thread(loop):
|
def watcher_thread(loop):
|
||||||
global disk_usage, rootpath
|
global rootpath
|
||||||
import inotify.adapters
|
import inotify.adapters
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
rootpath = config.config.path
|
rootpath = config.config.path
|
||||||
i = inotify.adapters.InotifyTree(rootpath.as_posix())
|
i = inotify.adapters.InotifyTree(rootpath.as_posix())
|
||||||
old = format_tree() if tree[""] else None
|
|
||||||
with tree_lock:
|
|
||||||
# Initialize the tree from filesystem
|
# Initialize the tree from filesystem
|
||||||
tree[""] = walk(rootpath)
|
old, new = state.root, walk()
|
||||||
msg = format_tree()
|
if old != new:
|
||||||
if msg != old:
|
with state.lock:
|
||||||
asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
|
state.root = new
|
||||||
|
msg = format_tree(new)
|
||||||
|
asyncio.run_coroutine_threadsafe(broadcast(msg), loop).result()
|
||||||
|
|
||||||
# The watching is not entirely reliable, so do a full refresh every minute
|
# The watching is not entirely reliable, so do a full refresh every minute
|
||||||
refreshdl = time.monotonic() + 60.0
|
refreshdl = time.monotonic() + 60.0
|
||||||
|
@ -52,9 +164,12 @@ def watcher_thread(loop):
|
||||||
return
|
return
|
||||||
# Disk usage update
|
# Disk usage update
|
||||||
du = shutil.disk_usage(rootpath)
|
du = shutil.disk_usage(rootpath)
|
||||||
if du != disk_usage:
|
space = Space(*du, storage=state.root[0].size)
|
||||||
disk_usage = du
|
if space != state.space:
|
||||||
asyncio.run_coroutine_threadsafe(broadcast(format_du()), loop)
|
state.space = space
|
||||||
|
asyncio.run_coroutine_threadsafe(
|
||||||
|
broadcast(format_du(space)), loop
|
||||||
|
).result()
|
||||||
break
|
break
|
||||||
# Do a full refresh?
|
# Do a full refresh?
|
||||||
if time.monotonic() > refreshdl:
|
if time.monotonic() > refreshdl:
|
||||||
|
@ -75,141 +190,136 @@ def watcher_thread(loop):
|
||||||
|
|
||||||
|
|
||||||
def watcher_thread_poll(loop):
|
def watcher_thread_poll(loop):
|
||||||
global disk_usage, rootpath
|
global rootpath
|
||||||
|
|
||||||
while not quit:
|
while not quit:
|
||||||
rootpath = config.config.path
|
rootpath = config.config.path
|
||||||
old = format_tree() if tree[""] else None
|
old = state.root
|
||||||
with tree_lock:
|
new = walk()
|
||||||
# Initialize the tree from filesystem
|
if old != new:
|
||||||
tree[""] = walk(rootpath)
|
state.root = new
|
||||||
msg = format_tree()
|
asyncio.run_coroutine_threadsafe(broadcast(format_tree(new)), loop).result()
|
||||||
if msg != old:
|
|
||||||
asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
|
|
||||||
|
|
||||||
# Disk usage update
|
# Disk usage update
|
||||||
du = shutil.disk_usage(rootpath)
|
du = shutil.disk_usage(rootpath)
|
||||||
if du != disk_usage:
|
space = Space(*du, storage=state.root[0].size)
|
||||||
disk_usage = du
|
if space != state.space:
|
||||||
asyncio.run_coroutine_threadsafe(broadcast(format_du()), loop)
|
state.space = space
|
||||||
|
asyncio.run_coroutine_threadsafe(broadcast(format_du(space)), loop).result()
|
||||||
|
|
||||||
time.sleep(1.0)
|
time.sleep(2.0)
|
||||||
|
|
||||||
|
|
||||||
def format_du():
|
def format_du(usage):
|
||||||
return msgspec.json.encode(
|
return msgspec.json.encode({"space": usage}).decode()
|
||||||
{
|
|
||||||
"space": {
|
|
||||||
"disk": disk_usage.total,
|
|
||||||
"used": disk_usage.used,
|
|
||||||
"free": disk_usage.free,
|
|
||||||
"storage": tree[""].size,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
).decode()
|
|
||||||
|
|
||||||
|
|
||||||
def format_tree():
|
def format_tree(root):
|
||||||
root = tree[""]
|
|
||||||
return msgspec.json.encode({"root": root}).decode()
|
return msgspec.json.encode({"root": root}).decode()
|
||||||
|
|
||||||
|
|
||||||
def walk(path: Path) -> DirEntry | FileEntry | None:
|
def walk(rel=PurePosixPath()) -> list[FileEntry]: # noqa: B008
|
||||||
|
path = rootpath / rel
|
||||||
try:
|
try:
|
||||||
s = path.stat()
|
st = path.stat()
|
||||||
key = fuid(s)
|
except OSError:
|
||||||
assert key, repr(key)
|
return []
|
||||||
mtime = int(s.st_mtime)
|
return _walk(rel, int(not stat.S_ISDIR(st.st_mode)), st)
|
||||||
if path.is_file():
|
|
||||||
return FileEntry(key, s.st_size, mtime)
|
|
||||||
|
|
||||||
tree = {
|
|
||||||
p.name: v
|
def _walk(rel: PurePosixPath, isfile: int, st: stat_result) -> list[FileEntry]:
|
||||||
for p in path.iterdir()
|
entry = FileEntry(
|
||||||
if not p.name.startswith(".")
|
level=len(rel.parts),
|
||||||
if (v := walk(p)) is not None
|
name=rel.name,
|
||||||
}
|
key=fuid(st),
|
||||||
if tree:
|
mtime=int(st.st_mtime),
|
||||||
size = sum(v.size for v in tree.values())
|
size=st.st_size if isfile else 0,
|
||||||
mtime = max(mtime, *(v.mtime for v in tree.values()))
|
isfile=isfile,
|
||||||
else:
|
)
|
||||||
size = 0
|
if isfile:
|
||||||
return DirEntry(key, size, mtime, tree)
|
return [entry]
|
||||||
|
ret = [entry]
|
||||||
|
path = rootpath / rel
|
||||||
|
try:
|
||||||
|
li = []
|
||||||
|
for f in path.iterdir():
|
||||||
|
if f.name.startswith("."):
|
||||||
|
continue # No dotfiles
|
||||||
|
s = f.stat()
|
||||||
|
li.append((int(not stat.S_ISDIR(s.st_mode)), f.name, s))
|
||||||
|
for [isfile, name, s] in humansorted(li):
|
||||||
|
subtree = _walk(rel / name, isfile, s)
|
||||||
|
child = subtree[0]
|
||||||
|
entry.mtime = max(entry.mtime, child.mtime)
|
||||||
|
entry.size += child.size
|
||||||
|
ret.extend(subtree)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
return None
|
pass # Things may be rapidly in motion
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
print("OS error walking path", path, e)
|
print("OS error walking path", path, e)
|
||||||
return None
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def update(relpath: PurePosixPath, loop):
|
def update(relpath: PurePosixPath, loop):
|
||||||
"""Called by inotify updates, check the filesystem and broadcast any changes."""
|
"""Called by inotify updates, check the filesystem and broadcast any changes."""
|
||||||
if rootpath is None or relpath is None:
|
if rootpath is None or relpath is None:
|
||||||
print("ERROR", rootpath, relpath)
|
print("ERROR", rootpath, relpath)
|
||||||
new = walk(rootpath / relpath)
|
new = walk(relpath)
|
||||||
with tree_lock:
|
old = state[relpath]
|
||||||
update = update_internal(relpath, new)
|
|
||||||
if not update:
|
|
||||||
return # No changes
|
|
||||||
msg = msgspec.json.encode({"update": update}).decode()
|
|
||||||
asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
|
|
||||||
|
|
||||||
|
|
||||||
def update_internal(
|
|
||||||
relpath: PurePosixPath,
|
|
||||||
new: DirEntry | FileEntry | None,
|
|
||||||
) -> list[UpdateEntry]:
|
|
||||||
path = "", *relpath.parts
|
|
||||||
old = tree
|
|
||||||
elems = []
|
|
||||||
for name in path:
|
|
||||||
if name not in old:
|
|
||||||
# File or folder created
|
|
||||||
old = None
|
|
||||||
elems.append((name, None))
|
|
||||||
if len(elems) < len(path):
|
|
||||||
# We got a notify for an item whose parent is not in tree
|
|
||||||
print("Tree out of sync DEBUG", relpath)
|
|
||||||
print(elems)
|
|
||||||
print("Current tree:")
|
|
||||||
print(tree[""])
|
|
||||||
print("Walking all:")
|
|
||||||
print(walk(rootpath))
|
|
||||||
raise ValueError("Tree out of sync")
|
|
||||||
break
|
|
||||||
old = old[name]
|
|
||||||
elems.append((name, old))
|
|
||||||
if old == new:
|
if old == new:
|
||||||
return []
|
return
|
||||||
mt = new.mtime if new else 0
|
old = state.root
|
||||||
szdiff = (new.size if new else 0) - (old.size if old else 0)
|
|
||||||
# Update parents
|
|
||||||
update = []
|
|
||||||
for name, entry in elems[:-1]:
|
|
||||||
u = UpdateEntry(name, entry.key)
|
|
||||||
if szdiff:
|
|
||||||
entry.size += szdiff
|
|
||||||
u.size = entry.size
|
|
||||||
if mt > entry.mtime:
|
|
||||||
u.mtime = entry.mtime = mt
|
|
||||||
update.append(u)
|
|
||||||
# The last element is the one that changed
|
|
||||||
name, entry = elems[-1]
|
|
||||||
parent = elems[-2][1] if len(elems) > 1 else tree
|
|
||||||
u = UpdateEntry(name, new.key if new else entry.key)
|
|
||||||
if new:
|
if new:
|
||||||
parent[name] = new
|
state[relpath, new[0].isfile] = new
|
||||||
if u.size != new.size:
|
|
||||||
u.size = new.size
|
|
||||||
if u.mtime != new.mtime:
|
|
||||||
u.mtime = new.mtime
|
|
||||||
if isinstance(new, DirEntry) and u.dir != new.dir:
|
|
||||||
u.dir = new.dir
|
|
||||||
else:
|
else:
|
||||||
del parent[name]
|
del state[relpath]
|
||||||
u.deleted = True
|
# FIXME: broadcast format_update()
|
||||||
update.append(u)
|
msg = format_update(old, state.root)
|
||||||
return update
|
asyncio.run_coroutine_threadsafe(broadcast(msg), loop).result()
|
||||||
|
|
||||||
|
|
||||||
|
def format_update(old, new):
|
||||||
|
# Make keep/del/insert diff until one of the lists ends
|
||||||
|
oidx, nidx = 0, 0
|
||||||
|
update = []
|
||||||
|
keep_count = 0
|
||||||
|
while oidx < len(old) and nidx < len(new):
|
||||||
|
if old[oidx] == new[nidx]:
|
||||||
|
keep_count += 1
|
||||||
|
oidx += 1
|
||||||
|
nidx += 1
|
||||||
|
continue
|
||||||
|
if keep_count > 0:
|
||||||
|
update.append(UpdKeep(keep_count))
|
||||||
|
keep_count = 0
|
||||||
|
|
||||||
|
del_count = 0
|
||||||
|
rest = new[nidx:]
|
||||||
|
while old[oidx] not in rest:
|
||||||
|
del_count += 1
|
||||||
|
oidx += 1
|
||||||
|
|
||||||
|
if del_count:
|
||||||
|
update.append(UpdDel(del_count))
|
||||||
|
oidx += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
insert_items = []
|
||||||
|
rest = old[oidx:]
|
||||||
|
while nidx < len(new) and new[nidx] not in rest:
|
||||||
|
insert_items.append(new[nidx])
|
||||||
|
nidx += 1
|
||||||
|
update.append(UpdIns(insert_items))
|
||||||
|
|
||||||
|
# Diff any remaining
|
||||||
|
if keep_count > 0:
|
||||||
|
update.append(UpdKeep(keep_count))
|
||||||
|
if oidx < len(old):
|
||||||
|
update.append(UpdDel(len(old) - oidx))
|
||||||
|
elif nidx < len(new):
|
||||||
|
update.append(UpdIns(new[nidx:]))
|
||||||
|
|
||||||
|
return msgspec.json.encode({"update": update}).decode()
|
||||||
|
|
||||||
|
|
||||||
async def broadcast(msg):
|
async def broadcast(msg):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user