Stricter charset handling and escaping of request URLs (#2710)

Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
This commit is contained in:
L. Kärkkäinen 2023-03-21 17:55:21 +00:00 committed by GitHub
parent 1a63b9bec0
commit 932088e37e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 123 additions and 17 deletions

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Optional
from urllib.parse import quote
from sanic.compat import Header
from sanic.exceptions import BadRequest, ServerError
@ -146,14 +145,6 @@ class ASGIApp:
raise BadRequest(
"Header names can only contain US-ASCII characters"
)
path = (
scope["path"][1:]
if scope["path"].startswith("/")
else scope["path"]
)
url = "/".join([scope.get("root_path", ""), quote(path)])
url_bytes = url.encode("latin-1")
url_bytes += b"?" + scope["query_string"]
if scope["type"] == "http":
version = scope["http_version"]
@ -168,6 +159,13 @@ class ASGIApp:
else:
raise ServerError("Received unknown ASGI scope")
url_bytes, query = scope["raw_path"], scope["query_string"]
if query:
# httpx ASGI client sends query string as part of raw_path
url_bytes = url_bytes.split(b"?", 1)[0]
# All servers send them separately
url_bytes = b"%b?%b" % (url_bytes, query)
request_class = sanic_app.request_class or Request
instance.request = request_class(
url_bytes,

View File

@ -240,9 +240,14 @@ class Http(Stream, metaclass=TouchUpMeta):
headers_instance.getone("upgrade", "").lower() == "websocket"
)
try:
url_bytes = self.url.encode("ASCII")
except UnicodeEncodeError:
raise BadRequest("URL may only contain US-ASCII characters.")
# Prepare a Request object
request = self.protocol.request_class(
url_bytes=self.url.encode(),
url_bytes=url_bytes,
headers=headers_instance,
head=bytes(head),
version=protocol[5:],
@ -445,9 +450,18 @@ class Http(Stream, metaclass=TouchUpMeta):
bogus response for error handling use.
"""
# Reformat any URL already received with \xHH escapes for better logs
url_bytes = (
self.url.encode(errors="surrogateescape")
.decode("ASCII", errors="backslashreplace")
.encode("ASCII")
if self.url
else b"*"
)
# FIXME: Avoid this by refactoring error handling and response code
self.request = self.protocol.request_class(
url_bytes=self.url.encode() if self.url else b"*",
url_bytes=url_bytes,
headers=Header({}),
version="1.1",
method="NONE",

View File

@ -18,7 +18,12 @@ from typing import (
from sanic.compat import Header
from sanic.constants import LocalCertCreator
from sanic.exceptions import PayloadTooLarge, SanicException, ServerError
from sanic.exceptions import (
BadRequest,
PayloadTooLarge,
SanicException,
ServerError,
)
from sanic.helpers import has_message_body
from sanic.http.constants import Stage
from sanic.http.stream import Stream
@ -333,7 +338,17 @@ class Http3:
return self.receivers[stream_id]
def _make_request(self, event: HeadersReceived) -> Request:
headers = Header(((k.decode(), v.decode()) for k, v in event.headers))
try:
headers = Header(
(
(k.decode("ASCII"), v.decode(errors="surrogateescape"))
for k, v in event.headers
)
)
except UnicodeDecodeError:
raise BadRequest(
"Header names may only contain US-ASCII characters."
)
method = headers[":method"]
path = headers[":path"]
scheme = headers.pop(":scheme", "")
@ -342,9 +357,14 @@ class Http3:
if authority:
headers["host"] = authority
try:
url_bytes = path.encode("ASCII")
except UnicodeEncodeError:
raise BadRequest("URL may only contain US-ASCII characters.")
transport = HTTP3Transport(self.protocol)
request = self.protocol.request_class(
path.encode(),
url_bytes,
headers,
"3",
method,

View File

@ -133,7 +133,8 @@ class Request:
try:
self._parsed_url = parse_url(url_bytes)
except HttpParserInvalidURLError:
raise BadURL(f"Bad URL: {url_bytes.decode()}")
url = url_bytes.decode(errors="backslashreplace")
raise BadURL(f"Bad URL: {url}")
self._id: Optional[Union[uuid.UUID, str, int]] = None
self._name: Optional[str] = None
self._stream_id = stream_id

View File

@ -116,7 +116,7 @@ requirements = [
]
tests_require = [
"sanic-testing>=22.9.0",
"sanic-testing@git+https://github.com/sanic-org/sanic-testing.git@main#egg=sanic-testing>=22.12.0",
"pytest==7.1.*",
"coverage",
"beautifulsoup4",

View File

@ -11,7 +11,7 @@ from aioquic.quic.events import ProtocolNegotiated
from sanic import Request, Sanic
from sanic.compat import Header
from sanic.config import DEFAULT_CONFIG
from sanic.exceptions import PayloadTooLarge
from sanic.exceptions import BadRequest, PayloadTooLarge
from sanic.http.constants import Stage
from sanic.http.http3 import Http3, HTTPReceiver
from sanic.models.server_types import ConnInfo
@ -292,3 +292,48 @@ def test_request_conn_info(app):
receiver = http3.get_receiver_by_stream_id(1)
assert isinstance(receiver.request.conn_info, ConnInfo)
def test_request_header_encoding(app):
protocol = generate_protocol(app)
http3 = Http3(protocol, protocol.transmit)
with pytest.raises(BadRequest) as exc_info:
http3.http_event_received(
HeadersReceived(
[
(b":method", b"GET"),
(b":path", b"/location"),
(b":scheme", b"https"),
(b":authority", b"localhost:8443"),
("foo\u00A0".encode(), b"bar"),
],
1,
False,
)
)
assert exc_info.value.status_code == 400
assert (
str(exc_info.value)
== "Header names may only contain US-ASCII characters."
)
def test_request_url_encoding(app):
protocol = generate_protocol(app)
http3 = Http3(protocol, protocol.transmit)
with pytest.raises(BadRequest) as exc_info:
http3.http_event_received(
HeadersReceived(
[
(b":method", b"GET"),
(b":path", b"/location\xA0"),
(b":scheme", b"https"),
(b":authority", b"localhost:8443"),
(b"foo", b"bar"),
],
1,
False,
)
)
assert exc_info.value.status_code == 400
assert str(exc_info.value) == "URL may only contain US-ASCII characters."

View File

@ -652,3 +652,17 @@ async def test_asgi_headers_decoding(app: Sanic, monkeypatch: MonkeyPatch):
_, response = await app.asgi_client.get("/", headers={"Test-Header": "😅"})
assert response.status_code == 200
@pytest.mark.asyncio
async def test_asgi_url_decoding(app):
@app.get("/dir/<name>", unquote=True)
def _request(request: Request, name):
return text(name)
# 2F should not become a path separator (unquoted later)
_, response = await app.asgi_client.get("/dir/some%2Fpath")
assert response.text == "some/path"
_, response = await app.asgi_client.get("/dir/some%F0%9F%98%80path")
assert response.text == "some😀path"

View File

@ -98,3 +98,17 @@ def test_transfer_chunked(client):
data = stdjson.loads(body)
assert data == ["foo", "bar"]
def test_url_encoding(client):
client.send(
"""
GET /invalid\xA0url HTTP/1.1
"""
)
response = client.recv()
headers, body = response.rsplit(b"\r\n\r\n", 1)
assert b"400 Bad Request" in headers
assert b"URL may only contain US-ASCII characters." in body