Compare commits

...

8 Commits

Author SHA1 Message Date
Adam Hopkins
b2c0eed24d
Bump version 2022-08-11 10:04:06 +03:00
Adam Hopkins
3abe4f885e
Always show server location in ASGI (#2522)
Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: Zhiwei Liang <zhi.wei.liang@outlook.com>
Co-authored-by: Néstor Pérez <25409753+prryplatypus@users.noreply.github.com>
2022-08-11 10:03:50 +03:00
Adam Hopkins
f4c8252185
Always show server location in ASGI 2022-08-07 23:15:05 +03:00
Adam Hopkins
daa1f8f2d5
Bump version 2022-07-31 14:16:48 +03:00
Adam Hopkins
0901d3188a
Use path.parts instead of match (#2508) 2022-07-31 12:55:50 +03:00
Adam Hopkins
0985d130e2
Use pathlib for path resolution (#2506) 2022-07-31 12:55:50 +03:00
Adam Hopkins
7e0b0deb11
Fix dotted test 2022-07-31 12:55:49 +03:00
Néstor Pérez
e54ac3c6fd
Prevent directory traversion with static files (#2495)
Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: Zhiwei Liang <zhi.wei.liang@outlook.com>
2022-07-31 12:55:49 +03:00
8 changed files with 119 additions and 44 deletions

View File

@ -4,6 +4,7 @@ coverage:
default: default:
target: auto target: auto
threshold: 0.75 threshold: 0.75
informational: true
project: project:
default: default:
target: auto target: auto

View File

@ -1 +1 @@
__version__ = "22.6.0" __version__ = "22.6.2"

View File

@ -1315,7 +1315,7 @@ class Sanic(BaseSanic, RunnerMixin, metaclass=TouchUpMeta):
self.config.update_config(config) self.config.update_config(config)
@property @property
def asgi(self): def asgi(self) -> bool:
return self.state.asgi return self.state.asgi
@asgi.setter @asgi.setter

View File

@ -4,8 +4,7 @@ from functools import partial, wraps
from inspect import getsource, signature from inspect import getsource, signature
from mimetypes import guess_type from mimetypes import guess_type
from os import path from os import path
from pathlib import PurePath from pathlib import Path, PurePath
from re import sub
from textwrap import dedent from textwrap import dedent
from time import gmtime, strftime from time import gmtime, strftime
from typing import ( from typing import (
@ -27,12 +26,7 @@ from sanic.base.meta import SanicMeta
from sanic.compat import stat_async from sanic.compat import stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE, HTTP_METHODS from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE, HTTP_METHODS
from sanic.errorpages import RESPONSE_MAPPING from sanic.errorpages import RESPONSE_MAPPING
from sanic.exceptions import ( from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
BadRequest,
FileNotFound,
HeaderNotFound,
RangeNotSatisfiable,
)
from sanic.handlers import ContentRangeHandler from sanic.handlers import ContentRangeHandler
from sanic.log import error_logger from sanic.log import error_logger
from sanic.models.futures import FutureRoute, FutureStatic from sanic.models.futures import FutureRoute, FutureStatic
@ -806,32 +800,40 @@ class RouteMixin(metaclass=SanicMeta):
content_type=None, content_type=None,
__file_uri__=None, __file_uri__=None,
): ):
# Using this to determine if the URL is trying to break out of the path
# served. os.path.realpath seems to be very slow
if __file_uri__ and "../" in __file_uri__:
raise BadRequest("Invalid URL")
# Merge served directory and requested file if provided # Merge served directory and requested file if provided
# Strip all / that in the beginning of the URL to help prevent python file_path_raw = Path(unquote(file_or_directory))
# from herping a derp and treating the uri as an absolute path root_path = file_path = file_path_raw.resolve()
root_path = file_path = file_or_directory not_found = FileNotFound(
if __file_uri__:
file_path = path.join(
file_or_directory, sub("^[/]*", "", __file_uri__)
)
# URL decode the path sent by the browser otherwise we won't be able to
# match filenames which got encoded (filenames with spaces etc)
file_path = path.abspath(unquote(file_path))
if not file_path.startswith(path.abspath(unquote(root_path))):
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise FileNotFound(
"File not found", "File not found",
path=file_or_directory, path=file_or_directory,
relative_url=__file_uri__, relative_url=__file_uri__,
) )
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try: try:
headers = {} headers = {}
# Check if the client has been sent this file before # Check if the client has been sent this file before
@ -899,11 +901,7 @@ class RouteMixin(metaclass=SanicMeta):
except RangeNotSatisfiable: except RangeNotSatisfiable:
raise raise
except FileNotFoundError: except FileNotFoundError:
raise FileNotFound( raise not_found
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
except Exception: except Exception:
error_logger.exception( error_logger.exception(
f"Exception in static request handler: " f"Exception in static request handler: "

View File

@ -69,6 +69,7 @@ else:
class RunnerMixin(metaclass=SanicMeta): class RunnerMixin(metaclass=SanicMeta):
_app_registry: Dict[str, Sanic] _app_registry: Dict[str, Sanic]
asgi: bool
config: Config config: Config
listeners: Dict[str, List[ListenerType[Any]]] listeners: Dict[str, List[ListenerType[Any]]]
state: ApplicationState state: ApplicationState
@ -525,7 +526,7 @@ class RunnerMixin(metaclass=SanicMeta):
) )
) )
else: else:
server = "" server = "ASGI" if self.asgi else "unknown" # type: ignore
display = { display = {
"mode": " ".join(mode), "mode": " ".join(mode),
@ -571,8 +572,12 @@ class RunnerMixin(metaclass=SanicMeta):
@property @property
def serve_location(self) -> str: def serve_location(self) -> str:
try:
server_settings = self.state.server_info[0].settings server_settings = self.state.server_info[0].settings
return self.get_server_location(server_settings) return self.get_server_location(server_settings)
except IndexError:
location = "ASGI" if self.asgi else "unknown" # type: ignore
return f"http://<{location}>"
@staticmethod @staticmethod
def get_server_location( def get_server_location(

View File

@ -546,3 +546,13 @@ async def test_signals_triggered(app):
assert response.status_code == 200 assert response.status_code == 200
assert response.text == "test_signals_triggered" assert response.text == "test_signals_triggered"
assert signals_triggered == signals_expected assert signals_triggered == signals_expected
@pytest.mark.asyncio
async def test_asgi_serve_location(app):
@app.get("/")
def _request(request: Request):
return text(request.app.serve_location)
_, response = await app.asgi_client.get("/")
assert response.text == "http://<ASGI>"

View File

@ -2,6 +2,7 @@ import json as stdjson
from collections import namedtuple from collections import namedtuple
from pathlib import Path from pathlib import Path
from sys import version_info
import pytest import pytest
@ -74,7 +75,10 @@ def test_full_message(client):
""" """
) )
response = client.recv() response = client.recv()
assert len(response) == 151
# AltSvcCheck touchup removes the Alt-Svc header from the
# response in the Python 3.9+ in this case
assert len(response) == (151 if version_info < (3, 9) else 140)
assert b"200 OK" in response assert b"200 OK" in response

View File

@ -1,6 +1,7 @@
import inspect import inspect
import logging import logging
import os import os
import sys
from collections import Counter from collections import Counter
from pathlib import Path from pathlib import Path
@ -8,7 +9,7 @@ from time import gmtime, strftime
import pytest import pytest
from sanic import text from sanic import Sanic, text
from sanic.exceptions import FileNotFound from sanic.exceptions import FileNotFound
@ -21,6 +22,22 @@ def static_file_directory():
return static_directory return static_directory
@pytest.fixture(scope="module")
def double_dotted_directory_file(static_file_directory: str):
"""Generate double dotted directory and its files"""
if sys.platform == "win32":
raise Exception("Windows doesn't support double dotted directories")
file_path = Path(static_file_directory) / "dotted.." / "dot.txt"
double_dotted_dir = file_path.parent
Path.mkdir(double_dotted_dir, exist_ok=True)
with open(file_path, "w") as f:
f.write("DOT\n")
yield file_path
Path.unlink(file_path)
Path.rmdir(double_dotted_dir)
def get_file_path(static_file_directory, file_name): def get_file_path(static_file_directory, file_name):
return os.path.join(static_file_directory, file_name) return os.path.join(static_file_directory, file_name)
@ -578,3 +595,43 @@ def test_resource_type_dir(app, static_file_directory):
def test_resource_type_unknown(app, static_file_directory, caplog): def test_resource_type_unknown(app, static_file_directory, caplog):
with pytest.raises(ValueError): with pytest.raises(ValueError):
app.static("/static", static_file_directory, resource_type="unknown") app.static("/static", static_file_directory, resource_type="unknown")
@pytest.mark.skipif(
sys.platform == "win32",
reason="Windows does not support double dotted directories",
)
def test_dotted_dir_ok(
app: Sanic, static_file_directory: str, double_dotted_directory_file: Path
):
app.static("/foo", static_file_directory)
dot_relative_path = str(
double_dotted_directory_file.relative_to(static_file_directory)
)
_, response = app.test_client.get("/foo/" + dot_relative_path)
assert response.status == 200
assert response.body == b"DOT\n"
def test_breakout(app: Sanic, static_file_directory: str):
app.static("/foo", static_file_directory)
_, response = app.test_client.get("/foo/..%2Ffake/server.py")
assert response.status == 404
_, response = app.test_client.get("/foo/..%2Fstatic/test.file")
assert response.status == 404
@pytest.mark.skipif(
sys.platform != "win32", reason="Block backslash on Windows only"
)
def test_double_backslash_prohibited_on_win32(
app: Sanic, static_file_directory: str
):
app.static("/foo", static_file_directory)
_, response = app.test_client.get("/foo/static/..\\static/test.file")
assert response.status == 404
_, response = app.test_client.get("/foo/static\\../static/test.file")
assert response.status == 404