Compare commits
4 Commits
7d55a43119
...
65c6ed6a17
Author | SHA1 | Date | |
---|---|---|---|
![]() |
65c6ed6a17 | ||
![]() |
44428eec71 | ||
![]() |
c47ff317c3 | ||
![]() |
0de8b99c02 |
@ -6,9 +6,11 @@ import urllib.parse
|
||||
from pathlib import PurePosixPath
|
||||
from urllib.parse import unquote
|
||||
from wsgiref.handlers import format_date_time
|
||||
|
||||
import av
|
||||
import av.datasets
|
||||
import fitz # PyMuPDF
|
||||
import numpy as np
|
||||
import pillow_heif
|
||||
from PIL import Image
|
||||
from sanic import Blueprint, empty, raw
|
||||
from sanic.exceptions import NotFound
|
||||
@ -16,7 +18,6 @@ from sanic.log import logger
|
||||
|
||||
from cista import config
|
||||
from cista.util.filename import sanitize
|
||||
import pillow_heif
|
||||
|
||||
pillow_heif.register_heif_opener()
|
||||
|
||||
@ -60,7 +61,8 @@ async def preview(req, path):
|
||||
def dispatch(path, quality, maxsize, maxzoom):
|
||||
if path.suffix.lower() in (".pdf", ".xps", ".epub", ".mobi"):
|
||||
return process_pdf(path, quality=quality, maxsize=maxsize, maxzoom=maxzoom)
|
||||
if mimetypes.guess_type(path.name)[0].startswith("video/"):
|
||||
type, _ = mimetypes.guess_type(path.name)
|
||||
if type and type.startswith("video/"):
|
||||
return process_video(path, quality=quality, maxsize=maxsize)
|
||||
return process_image(path, quality=quality, maxsize=maxsize)
|
||||
|
||||
@ -72,17 +74,16 @@ def process_image(path, *, maxsize, quality):
|
||||
# Fix rotation based on EXIF data
|
||||
try:
|
||||
rotate_values = {3: 180, 6: 270, 8: 90}
|
||||
orientation = img._getexif().get(274)
|
||||
orientation = img.getexif().get(274)
|
||||
if orientation in rotate_values:
|
||||
logger.debug(f"Rotating preview {path} by {rotate_values[orientation]}")
|
||||
img = img.rotate(rotate_values[orientation], expand=True)
|
||||
except AttributeError:
|
||||
...
|
||||
except Exception as e:
|
||||
logger.error(f"Error rotating preview image: {e}")
|
||||
# Save as webp
|
||||
imgdata = io.BytesIO()
|
||||
img.save(imgdata, format="avif", quality=quality, method=4)
|
||||
print("Image quality", quality)
|
||||
img.save(imgdata, format="avif", quality=quality)
|
||||
return imgdata.getvalue()
|
||||
|
||||
|
||||
@ -99,14 +100,15 @@ def process_pdf(path, *, maxsize, maxzoom, quality, page_number=0):
|
||||
def process_video(path, *, maxsize, quality):
|
||||
frame = None
|
||||
imgdata = io.BytesIO()
|
||||
istream = ostream = icc = occ = frame = None
|
||||
with (
|
||||
av.open(str(path)) as container,
|
||||
av.open(str(path)) as icontainer,
|
||||
av.open(imgdata, "w", format="avif") as ocontainer,
|
||||
):
|
||||
istream = container.streams.video[0]
|
||||
istream = icontainer.streams.video[0]
|
||||
istream.codec_context.skip_frame = "NONKEY"
|
||||
container.seek((container.duration or 0) // 8)
|
||||
for frame in container.decode(istream):
|
||||
icontainer.seek((icontainer.duration or 0) // 8)
|
||||
for frame in icontainer.decode(istream):
|
||||
if frame.dts is not None:
|
||||
break
|
||||
else:
|
||||
@ -119,7 +121,38 @@ def process_video(path, *, maxsize, quality):
|
||||
new_height = int(frame.height * scale_factor)
|
||||
frame = frame.reformat(width=new_width, height=new_height)
|
||||
|
||||
ostream = ocontainer.add_stream("av1", options={"quality": str(quality)})
|
||||
# Simple rotation detection and logging
|
||||
if frame.rotation:
|
||||
try:
|
||||
fplanes = frame.to_ndarray()
|
||||
# Split into Y, U, V planes of proper dimensions
|
||||
planes = [
|
||||
fplanes[: frame.height],
|
||||
fplanes[frame.height : frame.height + frame.height // 4].reshape(
|
||||
frame.height // 2, frame.width // 2
|
||||
),
|
||||
fplanes[frame.height + frame.height // 4 :].reshape(
|
||||
frame.height // 2, frame.width // 2
|
||||
),
|
||||
]
|
||||
# Rotate
|
||||
planes = [np.rot90(p, frame.rotation // 90) for p in planes]
|
||||
# Restore PyAV format
|
||||
planes = np.hstack([p.flat for p in planes]).reshape(
|
||||
-1, planes[0].shape[1]
|
||||
)
|
||||
frame = av.VideoFrame.from_ndarray(planes, format=frame.format.name)
|
||||
del planes, fplanes
|
||||
except Exception as e:
|
||||
if "not yet supported" in str(e):
|
||||
logger.warning(
|
||||
f"Not rotating {path.name} preview image by {frame.rotation}°:\n PyAV: {e}"
|
||||
)
|
||||
else:
|
||||
logger.exception(f"Error rotating video frame: {e}")
|
||||
|
||||
crf = str(int(63 * (1 - quality / 100) ** 2)) # Closely matching PIL quality-%
|
||||
ostream = ocontainer.add_stream("av1", options={"crf": crf})
|
||||
assert isinstance(ostream, av.VideoStream)
|
||||
ostream.width = frame.width
|
||||
ostream.height = frame.height
|
||||
@ -136,6 +169,6 @@ def process_video(path, *, maxsize, quality):
|
||||
ocontainer.mux(ostream.encode(None)) # Flush the stream
|
||||
|
||||
ret = imgdata.getvalue()
|
||||
del imgdata
|
||||
del imgdata, istream, ostream, icc, occ, frame
|
||||
gc.collect()
|
||||
return ret
|
||||
|
@ -127,8 +127,7 @@ class FileEntry(msgspec.Struct, array_like=True, frozen=True):
|
||||
return f"{self.name} ({self.size}, {self.mtime})"
|
||||
|
||||
|
||||
class Update(msgspec.Struct, array_like=True):
|
||||
...
|
||||
class Update(msgspec.Struct, array_like=True): ...
|
||||
|
||||
|
||||
class UpdKeep(Update, tag="k"):
|
||||
|
@ -22,6 +22,7 @@ dependencies = [
|
||||
"inotify",
|
||||
"msgspec",
|
||||
"natsort",
|
||||
"numpy>=2.3.2",
|
||||
"pathvalidate",
|
||||
"pillow",
|
||||
"pillow-heif>=1.1.0",
|
||||
@ -71,11 +72,9 @@ testpaths = [
|
||||
"tests",
|
||||
]
|
||||
|
||||
[tool.ruff.isort]
|
||||
known-first-party = ["cista"]
|
||||
|
||||
[tool.ruff.per-file-ignores]
|
||||
"tests/*" = ["S", "ANN", "D", "INP"]
|
||||
[tool.ruff.lint]
|
||||
isort.known-first-party = ["cista"]
|
||||
per-file-ignores."tests/*" = ["S", "ANN", "D", "INP"]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
|
Loading…
x
Reference in New Issue
Block a user