Restructuring as a Python package.

This commit is contained in:
Leo Vasanko 2023-10-15 01:29:50 +03:00 committed by Leo Vasanko
parent 386ae8f5b1
commit f697d96c89
13 changed files with 509 additions and 239 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
.* .*
!.gitignore !.gitignore
__pycache__/ __pycache__/
*.egg-info/

0
README.md Normal file
View File

1
cista/__init__.py Normal file
View File

@ -0,0 +1 @@
from .app import app

122
cista/app.py Normal file
View File

@ -0,0 +1,122 @@
import asyncio
import logging
from pathlib import Path
from tarfile import DEFAULT_FORMAT
from typing import ParamSpecKwargs
import msgspec
from importlib.resources import files
from sanic import Sanic
from sanic.response import html
from sanic.log import logger
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from . import watching
from .fileio import ROOT, FileServer
from .protocol import ErrorMsg, FileRange, StatusMsg
app = Sanic("cista")
fileserver = FileServer()
def asend(ws, msg):
return ws.send(msg if isinstance(msg, bytes) else msgspec.json.encode(msg).decode())
@app.before_server_start
async def start_fileserver(app, _):
await fileserver.start()
@app.after_server_stop
async def stop_fileserver(app, _):
await fileserver.stop()
@app.before_server_start
async def start_watcher(app, _):
class Handler(FileSystemEventHandler):
def on_any_event(self, event):
watching.update(Path(event.src_path).relative_to(ROOT))
app.ctx.observer = Observer()
app.ctx.observer.schedule(Handler(), str(ROOT), recursive=True)
app.ctx.observer.start()
@app.after_server_stop
async def stop_watcher(app, _):
app.ctx.observer.stop()
app.ctx.observer.join()
@app.get("/")
async def index_page(request):
index = files("cista").joinpath("static", "index.html").read_text()
return html(index)
app.static("/files", ROOT, use_content_range=True, stream_large_files=True, directory_view=True)
@app.websocket('/api/watch')
async def watch(request, ws):
try:
q = watching.pubsub[ws] = asyncio.Queue()
await asend(ws, {"root": watching.tree})
while True:
await asend(ws, await q.get())
finally:
del watching.pubsub[ws]
@app.websocket('/api/upload')
async def upload(request, ws):
alink = fileserver.alink
url = request.url_for("upload")
while True:
req = None
try:
text = await ws.recv()
if not isinstance(text, str):
raise ValueError(f"Expected JSON control, got binary len(data) = {len(text)}")
req = msgspec.json.decode(text, type=FileRange)
pos = req.start
while pos < req.end and (data := await ws.recv()) and isinstance(data, bytes):
pos += await alink(("upload", req.name, pos, data, req.size))
if pos != req.end:
d = f"{len(data)} bytes" if isinstance(data, bytes) else data
raise ValueError(f"Expected {req.end - pos} more bytes, got {d}")
# Report success
res = StatusMsg(status="upload", url=url, req=req)
await asend(ws, res)
print(res)
except Exception as e:
res = ErrorMsg(error=str(e), url=url, req=req)
await asend(ws, res)
logger.exception(repr(res), e)
return
@app.websocket('/api/download')
async def download(request, ws):
alink = fileserver.alink
url = request.url_for("download")
while True:
req = None
try:
text = await ws.recv()
if not isinstance(text, str):
raise ValueError(f"Expected JSON control, got binary len(data) = {len(text)}")
req = msgspec.json.decode(text, type=FileRange)
print("download", req)
pos = req.start
while pos < req.end:
end = min(req.end, pos + (1<<20))
data = await alink(("download", req.name, pos, end))
await asend(ws, data)
pos += len(data)
# Report success
res = StatusMsg(status="download", url=url, req=req)
await asend(ws, res)
print(res)
except Exception as e:
res = ErrorMsg(error=str(e), url=url, req=req)
await asend(ws, res)
logger.exception(repr(res), e)
return

View File

@ -1,6 +1,7 @@
import asyncio import asyncio
from contextlib import suppress from contextlib import suppress
class AsyncLink: class AsyncLink:
"""Facilitate two-way connection between asyncio and a worker thread.""" """Facilitate two-way connection between asyncio and a worker thread."""

95
cista/fileio.py Normal file
View File

@ -0,0 +1,95 @@
import asyncio
import os
import unicodedata
from pathlib import Path
from pathvalidate import sanitize_filepath
from .asynclink import AsyncLink
from .lrucache import LRUCache
ROOT = Path(os.environ.get("STORAGE", Path.cwd()))
def sanitize_filename(filename):
filename = unicodedata.normalize("NFC", filename)
filename = sanitize_filepath(filename)
filename = filename.replace("/", "-")
return filename
class File:
def __init__(self, filename):
self.path = ROOT / filename
self.fd = None
self.writable = False
def open_ro(self):
self.close()
self.fd = os.open(self.path, os.O_RDONLY)
def open_rw(self):
self.close()
self.path.parent.mkdir(parents=True, exist_ok=True)
self.fd = os.open(self.path, os.O_RDWR | os.O_CREAT)
self.writable = True
def write(self, pos, buffer, *, file_size=None):
if not self.writable:
# Create/open file
self.open_rw()
if file_size is not None:
os.ftruncate(self.fd, file_size)
os.lseek(self.fd, pos, os.SEEK_SET)
os.write(self.fd, buffer)
def __getitem__(self, slice):
if self.fd is None:
self.open_ro()
os.lseek(self.fd, slice.start, os.SEEK_SET)
l = slice.stop - slice.start
data = os.read(self.fd, l)
if len(data) < l: raise EOFError("Error reading requested range")
return data
def close(self):
if self.fd is not None:
os.close(self.fd)
self.fd = self.writable = None
def __del__(self):
self.close()
class FileServer:
async def start(self):
self.alink = AsyncLink()
self.worker = asyncio.get_event_loop().run_in_executor(None, self.worker_thread, self.alink.to_sync)
self.cache = LRUCache(File, capacity=10, maxage=5.0)
async def stop(self):
await self.alink.stop()
await self.worker
def worker_thread(self, slink):
try:
for req in slink:
with req as (command, *args):
if command == "upload":
req.set_result(self.upload(*args))
elif command == "download":
req.set_result(self.download(*args))
else:
raise NotImplementedError(f"Unhandled {command=} {args}")
finally:
self.cache.close()
def upload(self, name, pos, data, file_size):
name = sanitize_filename(name)
f = self.cache[name]
f.write(pos, data, file_size=file_size)
return len(data)
def download(self, name, start, end):
name = sanitize_filename(name)
f = self.cache[name]
return f[start: end]

View File

@ -1,5 +1,6 @@
from time import monotonic from time import monotonic
class LRUCache: class LRUCache:
def __init__(self, open: callable, *, capacity: int, maxage: float): def __init__(self, open: callable, *, capacity: int, maxage: float):
self.open = open self.open = open
@ -27,3 +28,7 @@ class LRUCache:
ts = monotonic() - self.maxage ts = monotonic() - self.maxage
while len(self.cache) > self.capacity or self.cache and self.cache[-1][2] < ts: while len(self.cache) > self.capacity or self.cache and self.cache[-1][2] < ts:
self.cache.pop()[1].close() self.cache.pop()[1].close()
def close(self):
self.capacity = 0
self.expire_items()

46
cista/protocol.py Normal file
View File

@ -0,0 +1,46 @@
from __future__ import annotations
from typing import Dict, Tuple, Union
import msgspec
## File uploads and downloads
class FileRange(msgspec.Struct):
name: str
size: int
start: int
end: int
class ErrorMsg(msgspec.Struct):
error: str
req: FileRange
url: str
class StatusMsg(msgspec.Struct):
status: str
url: str
req: FileRange
## Directory listings
class FileEntry(msgspec.Struct):
size: int
mtime: int
class DirEntry(msgspec.Struct):
size: int
mtime: int
dir: Dict[str, Union[FileEntry, DirEntry]]
def make_dir_data(root):
if len(root) == 2:
return FileEntry(*root)
size, mtime, listing = root
converted = {}
for name, data in listing.items():
converted[name] = make_dir_data(data)
sz = sum(x.size for x in converted.values())
mt = max(x.mtime for x in converted.values())
return DirEntry(sz, max(mt, mtime), converted)

154
cista/static/index.html Normal file
View File

@ -0,0 +1,154 @@
<!DOCTYPE html>
<title>Storage</title>
<div>
<h2>Quick file upload</h2>
<p>Uses parallel WebSocket connections for increased bandwidth /api/upload</p>
<input type=file id=fileInput>
<progress id=progressBar value=0 max=1></progress>
</div>
<div>
<h2>File downloads (websocket)</h2>
<ul id=file_list></ul>
</div>
<h2>File listings</h2>
<p>Plain HTML browser <a href=/files/>/files/</a></p>
<p>JSON list updated via WebSocket /api/watch:</p>
<textarea id=list style="padding: 1em; width: 80ch; height: 40ch;"></textarea>
<script>
const list = document.getElementById("list")
let files = {}
function createWatchSocket() {
const wsurl = new URL("/api/watch", location.href.replace(/^http/, 'ws'))
const ws = new WebSocket(wsurl)
ws.onmessage = event => {
msg = JSON.parse(event.data)
console.log("Watch", msg)
if (msg.root) {
files = msg.root
file_list(files)
} else if (msg.update) {
const {path, data} = msg.update
for (const p of path.split("/")) {
// TODO update files at path with new data
}
}
list.value = JSON.stringify(files)
}
}
function file_list(files) {
const ul = document.getElementById("file_list")
ul.innerHTML = ""
const dir = ""
let ptr = files.dir
console.log(ptr)
for (const name of Object.keys(ptr)) {
if (ptr[name].dir) continue
const {size, mtime} = ptr[name]
const li = document.createElement("li")
const a = document.createElement("a")
ul.appendChild(li)
li.appendChild(a)
a.textContent = name
a.href = name
a.onclick = event => {
event.preventDefault()
download(name, size)
}
}
}
createWatchSocket()
async function download(name, size) {
const wsurl = new URL("/api/download", location.href.replace(/^http/, 'ws'))
const ws = new WebSocket(wsurl)
ws.binaryType = 'arraybuffer'
ws.onopen = () => {
console.log("Download socket connected")
ws.send(JSON.stringify({name, start: 0, end: size, size}))
}
ws.onmessage = event => {
const data = event.data
console.log("Download", data)
const blob = new Blob([data], {type: "application/octet-stream"})
const url = URL.createObjectURL(blob)
const a = document.createElement("a")
a.href = url
a.download = name
a.click()
ws.close()
}
ws.onclose = () => {
console.log("Download socket disconnected")
}
}
const fileInput = document.getElementById("fileInput")
const progress = document.getElementById("progressBar")
const numConnections = 2
const chunkSize = 1<<20
const wsConnections = new Set()
for (let i = 0; i < numConnections; i++) createUploadWS()
function createUploadWS() {
const wsurl = new URL("/api/upload", location.href.replace(/^http/, 'ws'))
const ws = new WebSocket(wsurl)
ws.binaryType = 'arraybuffer'
ws.onopen = () => {
wsConnections.add(ws)
console.log("Upload socket connected")
}
ws.onmessage = event => {
msg = JSON.parse(event.data)
if (msg.written) progress.value += +msg.written
else console.log(`Error: ${msg.error}`)
}
ws.onclose = () => {
wsConnections.delete(ws)
console.log("Upload socket disconnected, reconnecting...")
setTimeout(createUploadWS, 1000)
}
}
async function load(file, start, end) {
const reader = new FileReader()
const load = new Promise(resolve => reader.onload = resolve)
reader.readAsArrayBuffer(file.slice(start, end))
const event = await load
return event.target.result
}
async function sendChunk(file, start, end, ws) {
const chunk = await load(file, start, end)
ws.send(JSON.stringify({
name: file.name,
size: file.size,
start: start,
end: end
}))
ws.send(chunk)
}
fileInput.addEventListener("change", async function() {
const file = this.files[0]
const numChunks = Math.ceil(file.size / chunkSize)
progress.value = 0
progress.max = file.size
console.log(wsConnections)
for (let i = 0; i < numChunks; i++) {
const ws = Array.from(wsConnections)[i % wsConnections.size]
const start = i * chunkSize
const end = Math.min(file.size, start + chunkSize)
const res = await sendChunk(file, start, end, ws)
}
})
</script>

58
cista/watching.py Normal file
View File

@ -0,0 +1,58 @@
import secrets
from hashlib import sha256
from pathlib import Path, PurePosixPath
import msgspec
from .fileio import ROOT
from .protocol import DirEntry, FileEntry
secret = secrets.token_bytes(8)
pubsub = {}
def fuid(stat):
return sha256((stat.st_dev << 32 | stat.st_ino).to_bytes(8, 'big') + secret).hexdigest()[:16]
def walk(path: Path = ROOT):
try:
s = path.stat()
mtime = int(s.st_mtime)
if path.is_file():
return FileEntry(s.st_size, mtime)
tree = {p.name: v for p in path.iterdir() if not p.name.startswith('.') if (v := walk(p)) is not None}
if tree:
size = sum(v.size for v in tree.values())
mtime = max(mtime, max(v.mtime for v in tree.values()))
else:
size = 0
return DirEntry(size, mtime, tree)
except OSError as e:
print("OS error walking path", path, e)
return None
tree = walk()
def update(relpath: PurePosixPath):
ptr = tree.dir
path = ROOT
name = ""
for name in relpath.parts[:-1]:
path /= name
try:
ptr = ptr[name].dir
except KeyError:
break
new = walk(path)
old = ptr.pop(name, None)
if new is not None:
ptr[name] = new
if old == new:
return
print("Update", relpath)
# TODO: update parents size/mtime
msg = msgspec.json.encode({"update": {
"path": relpath.as_posix(),
"data": new,
}})
for queue in pubsub.values(): queue.put_nowait(msg)

View File

@ -1,156 +0,0 @@
from tarfile import DEFAULT_FORMAT
from sanic import Sanic
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path, PurePosixPath
from hashlib import sha256
import secrets
import json
import unicodedata
import asyncio
from pathvalidate import sanitize_filepath
import os
ROOT = Path(os.environ.get("STORAGE", Path.cwd()))
secret = secrets.token_bytes(8)
def fuid(stat):
return sha256((stat.st_dev << 32 | stat.st_ino).to_bytes(8, 'big') + secret).hexdigest()[:16]
def walk(path: Path = ROOT):
try:
s = path.stat()
mtime = int(s.st_mtime)
if path.is_file():
return s.st_size, mtime
tree = {p.name: v for p in path.iterdir() if not p.name.startswith('.') if (v := walk(p)) is not None}
if tree:
size = sum(v[0] for v in tree.values())
mtime = max(v[1] for v in tree.values())
else:
size = 0
return size, mtime, tree
except OSError as e:
return None
tree = walk()
def update(relpath: PurePosixPath):
ptr = tree[2]
path = ROOT
name = ""
for name in relpath.parts[:-1]:
path /= name
try:
ptr = ptr[name][2]
except KeyError:
break
new = walk(path)
old = ptr.pop(name, None)
if new is not None:
ptr[name] = new
if old == new:
return
print("Update", relpath, new)
# TODO: update parents size/mtime
msg = json.dumps({"update": {
"path": relpath.as_posix(),
"data": new,
}})
for queue in watchers.values(): queue.put_nowait(msg)
app = Sanic(__name__)
@app.before_server_start
async def start_watcher(app, _):
class Handler(FileSystemEventHandler):
def on_any_event(self, event):
update(Path(event.src_path).relative_to(ROOT))
app.ctx.observer = Observer()
app.ctx.observer.schedule(Handler(), str(ROOT), recursive=True)
app.ctx.observer.start()
@app.after_server_stop
async def stop_watcher(app, _):
app.ctx.observer.stop()
app.ctx.observer.join()
app.static('/', "index.html", name="indexhtml")
app.static("/files", ROOT, use_content_range=True, stream_large_files=True, directory_view=True)
watchers = {}
@app.websocket('/api/watch')
async def watch(request, ws):
try:
q = watchers[ws] = asyncio.Queue()
await ws.send(json.dumps({"root": tree}))
while True:
await ws.send(await q.get())
finally:
del watchers[ws]
@app.websocket('/api/upload')
async def upload(request, ws):
file = None
filename = None
left = 0
msg = {}
try:
async for data in ws:
if isinstance(data, bytes):
if not file:
print(f"No file open, received {len(data)} bytes")
break
if len(data) > left:
msg["error"] = "Too much data"
ws.send(json.dumps(msg))
return
left -= len(data)
file.write(data)
if left == 0:
msg["written"] = end - start
await ws.send(json.dumps(msg))
msg = {}
continue
msg = json.loads(data)
name = str(msg['name'])
size = int(msg['size'])
start = int(msg['start'])
end = int(msg['end'])
if not 0 <= start < end <= size:
msg["error"] = "Invalid range"
ws.send(json.dumps(msg))
return
left = end - start
if filename != name:
if file:
file.close()
file, filename = None, None
file = openfile(name)
file.truncate(size)
filename = name
file.seek(start)
finally:
if file:
file.close()
def openfile(name):
# Name sanitation & security
name = unicodedata.normalize("NFC", name).replace("\\", "")
name = sanitize_filepath(name)
p = PurePosixPath(name)
if p.is_absolute() or any(n.startswith(".") for n in p.parts):
raise ValueError("Invalid filename")
# Create/open file
path = ROOT / p
path.parent.mkdir(parents=True, exist_ok=True)
try:
file = path.open("xb+") # create new file
except FileExistsError:
file = path.open("rb+") # write to existing file (along with other workers)
return file

View File

@ -1,83 +0,0 @@
import asyncio
import os
from asynclink import AsyncLink
from lrucache import LRUCache
class File:
def __init__(self, filename):
self.filename = filename
self.fd = None
self.writable = False
def open_ro(self):
self.close()
self.fd = os.open(self.filename, os.O_RDONLY)
def open_rw(self):
self.close()
self.fd = os.open(self.filename, os.O_RDWR | os.O_CREAT)
self.writable = True
def write(self, pos, buffer, *, file_size=None):
if not self.writable:
self.open_rw()
if file_size is not None:
os.ftruncate(self.fd, file_size)
os.lseek(self.fd, pos, os.SEEK_SET)
os.write(self.fd, buffer)
def __getitem__(self, slice):
if self.fd is None:
self.open_ro()
os.lseek(self.fd, slice.start, os.SEEK_SET)
l = slice.stop - slice.start
data = os.read(self.fd, l)
if len(data) < l: raise EOFError("Error reading requested range")
return data
def close(self):
if self.fd is not None:
os.close(self.fd)
self.fd = self.writable = None
def __del__(self):
self.close()
class FileServer:
async def start(self):
self.alink = AsyncLink()
self.worker = asyncio.get_event_loop().run_in_executor(None, self.worker_thread, alink.to_sync)
async def stop(self):
await self.alink.stop()
await self.worker
def worker_thread(self, slink):
cache = LRUCache(File, capacity=10, maxage=5.0)
for req in slink:
with req as (command, msg, data):
if command == "upload":
req.set_result(self.upload(msg, data))
else:
raise NotImplementedError(f"Unhandled {command=} {msg}")
def upload(self, msg, data):
name = str(msg['name'])
size = int(msg['size'])
start = int(msg['start'])
end = int(msg['end'])
if not 0 <= start < end <= size:
raise OverflowError("Invalid range")
if end - start > 1<<20:
raise OverflowError("Too much data, max 1 MiB.")
if end - start != len(data):
raise ValueError("Data length does not match range")
f = self.cache[name]
f.write(start, data, file_size=size)
return {"written": len(data), **msg}

26
setup.py Normal file
View File

@ -0,0 +1,26 @@
# Install package
import setuptools
import pathlib
doc = pathlib.Path(__file__).parent.joinpath('README.md').read_text(encoding="UTF-8")
setuptools.setup(
name="cista",
version="0.0.1",
author="Vasanko lda",
description="Dropbox-like file server with modern web interface",
long_description=doc,
long_description_content_type="text/markdown",
url="",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
],
install_requires=[
"sanic",
"msgspec",
"watchdog",
"pathvalidate",
],
)