143 lines
4.9 KiB
Python
143 lines
4.9 KiB
Python
import asyncio
|
|
import gc
|
|
import io
|
|
import mimetypes
|
|
import urllib.parse
|
|
from pathlib import PurePosixPath
|
|
from urllib.parse import unquote
|
|
from wsgiref.handlers import format_date_time
|
|
import av
|
|
import av.datasets
|
|
import fitz # PyMuPDF
|
|
from PIL import Image
|
|
from sanic import Blueprint, empty, raw
|
|
from sanic.exceptions import NotFound
|
|
from sanic.log import logger
|
|
|
|
from cista import config
|
|
from cista.util.filename import sanitize
|
|
import pillow_heif
|
|
|
|
pillow_heif.register_heif_opener()
|
|
|
|
bp = Blueprint("preview", url_prefix="/preview")
|
|
|
|
|
|
@bp.get("/<path:path>")
|
|
async def preview(req, path):
|
|
"""Preview a file"""
|
|
maxsize = int(req.args.get("px", 1024))
|
|
maxzoom = float(req.args.get("zoom", 2.0))
|
|
quality = int(req.args.get("q", 40))
|
|
rel = PurePosixPath(sanitize(unquote(path)))
|
|
path = config.config.path / rel
|
|
stat = path.lstat()
|
|
etag = config.derived_secret(
|
|
"preview", rel, stat.st_mtime_ns, quality, maxsize, maxzoom
|
|
).hex()
|
|
savename = PurePosixPath(path.name).with_suffix(".avif")
|
|
headers = {
|
|
"etag": etag,
|
|
"last-modified": format_date_time(stat.st_mtime),
|
|
"cache-control": "max-age=604800, immutable"
|
|
+ ("" if config.config.public else ", private"),
|
|
"content-type": "image/avif",
|
|
"content-disposition": f"inline; filename*=UTF-8''{urllib.parse.quote(savename.as_posix())}",
|
|
}
|
|
if req.headers.if_none_match == etag:
|
|
# The client has it cached, respond 304 Not Modified
|
|
return empty(304, headers=headers)
|
|
|
|
if not path.is_file():
|
|
raise NotFound("File not found")
|
|
|
|
img = await asyncio.get_event_loop().run_in_executor(
|
|
req.app.ctx.threadexec, dispatch, path, quality, maxsize, maxzoom
|
|
)
|
|
return raw(img, headers=headers)
|
|
|
|
|
|
def dispatch(path, quality, maxsize, maxzoom):
|
|
if path.suffix.lower() in (".pdf", ".xps", ".epub", ".mobi"):
|
|
return process_pdf(path, quality=quality, maxsize=maxsize, maxzoom=maxzoom)
|
|
if mimetypes.guess_type(path.name)[0].startswith("video/"):
|
|
return process_video(path, quality=quality, maxsize=maxsize)
|
|
return process_image(path, quality=quality, maxsize=maxsize)
|
|
|
|
|
|
def process_image(path, *, maxsize, quality):
|
|
img = Image.open(path)
|
|
w, h = img.size
|
|
img.thumbnail((min(w, maxsize), min(h, maxsize)))
|
|
# Fix rotation based on EXIF data
|
|
try:
|
|
rotate_values = {3: 180, 6: 270, 8: 90}
|
|
orientation = img._getexif().get(274)
|
|
if orientation in rotate_values:
|
|
logger.debug(f"Rotating preview {path} by {rotate_values[orientation]}")
|
|
img = img.rotate(rotate_values[orientation], expand=True)
|
|
except AttributeError:
|
|
...
|
|
except Exception as e:
|
|
logger.error(f"Error rotating preview image: {e}")
|
|
# Save as webp
|
|
imgdata = io.BytesIO()
|
|
img.save(imgdata, format="avif", quality=quality, method=4)
|
|
return imgdata.getvalue()
|
|
|
|
|
|
def process_pdf(path, *, maxsize, maxzoom, quality, page_number=0):
|
|
pdf = fitz.open(path)
|
|
page = pdf.load_page(page_number)
|
|
w, h = page.rect[2:4]
|
|
zoom = min(maxsize / w, maxsize / h, maxzoom)
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
return pix.pil_tobytes(format="avif", quality=quality, method=4)
|
|
|
|
|
|
def process_video(path, *, maxsize, quality):
|
|
frame = None
|
|
imgdata = io.BytesIO()
|
|
istream = ostream = icontainer = ocontainer = icc = occ = frame = None
|
|
with (
|
|
av.open(str(path)) as icontainer,
|
|
av.open(imgdata, "w", format="avif") as ocontainer,
|
|
):
|
|
istream = icontainer.streams.video[0]
|
|
istream.codec_context.skip_frame = "NONKEY"
|
|
icontainer.seek((icontainer.duration or 0) // 8)
|
|
for frame in icontainer.decode(istream):
|
|
if frame.dts is not None:
|
|
break
|
|
else:
|
|
raise RuntimeError("No frames found in video")
|
|
|
|
# Resize frame to thumbnail size
|
|
if frame.width > maxsize or frame.height > maxsize:
|
|
scale_factor = min(maxsize / frame.width, maxsize / frame.height)
|
|
new_width = int(frame.width * scale_factor)
|
|
new_height = int(frame.height * scale_factor)
|
|
frame = frame.reformat(width=new_width, height=new_height)
|
|
|
|
ostream = ocontainer.add_stream("av1", options={"quality": str(quality)})
|
|
assert isinstance(ostream, av.VideoStream)
|
|
ostream.width = frame.width
|
|
ostream.height = frame.height
|
|
icc = istream.codec_context
|
|
occ = ostream.codec_context
|
|
|
|
# Copy HDR metadata from input video stream
|
|
occ.color_primaries = icc.color_primaries
|
|
occ.color_trc = icc.color_trc
|
|
occ.colorspace = icc.colorspace
|
|
occ.color_range = icc.color_range
|
|
|
|
ocontainer.mux(ostream.encode(frame))
|
|
ocontainer.mux(ostream.encode(None)) # Flush the stream
|
|
|
|
ret = imgdata.getvalue()
|
|
del imgdata, istream, ostream, icc, occ, frame
|
|
gc.collect()
|
|
return ret
|