from unittest.mock import Mock import pytest from sanic import Sanic, headers, json, text from sanic.exceptions import InvalidHeader, PayloadTooLarge from sanic.http import Http from sanic.request import Request def make_request(headers) -> Request: return Request(b"/", headers, "1.1", "GET", None, None) @pytest.fixture def raised_ceiling(): Http.HEADER_CEILING = 32_768 yield Http.HEADER_CEILING = 16_384 @pytest.mark.parametrize( "input, expected", [ ("text/plain", ("text/plain", {})), ("text/vnd.just.made.this.up ; ", ("text/vnd.just.made.this.up", {})), ( "text/plain;charset=us-ascii", ("text/plain", {"charset": "us-ascii"}), ), ( 'text/plain ; charset="us-ascii"', ("text/plain", {"charset": "us-ascii"}), ), ( 'text/plain ; charset="us-ascii"; another=opt', ("text/plain", {"charset": "us-ascii", "another": "opt"}), ), ( 'attachment; filename="silly.txt"', ("attachment", {"filename": "silly.txt"}), ), ( 'attachment; filename="strange;name"', ("attachment", {"filename": "strange;name"}), ), ( 'attachment; filename="strange;name";size=123;', ("attachment", {"filename": "strange;name", "size": "123"}), ), ( 'form-data; name="foo"; value="%22\\%0D%0A"', ("form-data", {"name": "foo", "value": '"\\\n'}), ), # with Unicode filename! ( # Chrome, Firefox: # Content-Disposition: form-data; name="foo%22;bar\"; filename="😀" 'form-data; name="foo%22;bar\\"; filename="😀"', ("form-data", {"name": 'foo";bar\\', "filename": "😀"}), # cgi: ('form-data', {'name': 'foo%22;bar"; filename="😀'}) # werkzeug (pre 2.3.0): ('form-data', {'name': 'foo%22;bar"; filename='}) ), ], ) def test_parse_headers(input, expected): assert headers.parse_content_header(input) == expected @pytest.mark.asyncio async def test_header_size_exceeded(): recv_buffer = bytearray() async def _receive_more(): nonlocal recv_buffer recv_buffer += b"123" protocol = Mock() Http.set_header_max_size(1) http = Http(protocol) http._receive_more = _receive_more http.recv_buffer = recv_buffer with pytest.raises(PayloadTooLarge): await http.http1_request_header() @pytest.mark.asyncio async def test_header_size_increased_okay(): recv_buffer = bytearray() async def _receive_more(): nonlocal recv_buffer recv_buffer += b"123" protocol = Mock() Http.set_header_max_size(12_288) http = Http(protocol) http._receive_more = _receive_more http.recv_buffer = recv_buffer with pytest.raises(PayloadTooLarge): await http.http1_request_header() assert len(recv_buffer) == 12_291 @pytest.mark.asyncio async def test_header_size_exceeded_maxed_out(): recv_buffer = bytearray() async def _receive_more(): nonlocal recv_buffer recv_buffer += b"123" protocol = Mock() Http.set_header_max_size(18_432) http = Http(protocol) http._receive_more = _receive_more http.recv_buffer = recv_buffer with pytest.raises(PayloadTooLarge): await http.http1_request_header() assert len(recv_buffer) == 16_389 @pytest.mark.asyncio async def test_header_size_exceeded_raised_ceiling(raised_ceiling): recv_buffer = bytearray() async def _receive_more(): nonlocal recv_buffer recv_buffer += b"123" protocol = Mock() http = Http(protocol) Http.set_header_max_size(65_536) http._receive_more = _receive_more http.recv_buffer = recv_buffer with pytest.raises(PayloadTooLarge): await http.http1_request_header() assert len(recv_buffer) == 32_772 def test_raw_headers(app): app.route("/")(lambda _: text("")) request, _ = app.test_client.get( "/", headers={ "FOO": "bar", "Host": "", "User-Agent": "Sanic-Testing", }, ) assert b"Host:" in request.raw_headers assert b"Accept: */*" in request.raw_headers assert b"Accept-Encoding: gzip, deflate" in request.raw_headers assert b"Connection: keep-alive" in request.raw_headers assert b"User-Agent: Sanic-Testing" in request.raw_headers assert b"FOO: bar" in request.raw_headers def test_request_line(app): app.route("/")(lambda _: text("")) request, _ = app.test_client.get( "/", headers={ "FOO": "bar", "Host": "", "User-Agent": "Sanic-Testing", }, ) assert request.request_line == b"GET / HTTP/1.1" @pytest.mark.parametrize( "raw,expected_subtype", ( ("show/first, show/second", "first"), ("show/*, show/first", "first"), ("*/*, show/first", "first"), ("*/*, show/*", "*"), ("other/*; q=0.1, show/*; q=0.2", "*"), ("show/first; q=0.5, show/second; q=0.5", "first"), ("show/first; foo=bar, show/second; foo=bar", "first"), ("show/second, show/first; foo=bar", "first"), ("show/second; q=0.5, show/first; foo=bar; q=0.5", "first"), ("show/second; q=0.5, show/first; q=1.0", "first"), ("show/first, show/second; q=1.0", "second"), ), ) def test_parse_accept_ordered_okay(raw, expected_subtype): ordered = headers.parse_accept(raw) assert ordered[0].type == "show" assert ordered[0].subtype == expected_subtype @pytest.mark.parametrize( "raw", ( "missing", "missing/", "/missing", "/", ), ) def test_bad_accept(raw): with pytest.raises(InvalidHeader): headers.parse_accept(raw) def test_empty_accept(): a = headers.parse_accept("") assert a == [] assert not a.match("*/*") def test_wildcard_accept_set_ok(): accept = headers.parse_accept("*/*")[0] assert accept.type == "*" assert accept.subtype == "*" assert accept.has_wildcard accept = headers.parse_accept("foo/*")[0] assert accept.type == "foo" assert accept.subtype == "*" assert accept.has_wildcard accept = headers.parse_accept("foo/bar")[0] assert accept.type == "foo" assert accept.subtype == "bar" assert not accept.has_wildcard def test_accept_parsed_against_str(): accept = headers.Matched.parse("foo/bar") assert accept == "foo/bar; q=0.1" def test_media_type_matching(): assert headers.MediaType("foo", "bar").match(headers.MediaType("foo", "bar")) assert headers.MediaType("foo", "bar").match("foo/bar") @pytest.mark.parametrize( "value,other,outcome", ( # ALLOW BOTH ("foo/bar", "foo/bar", True), ("foo/bar", headers.Matched.parse("foo/bar"), True), ("foo/bar", "foo/*", True), ("foo/bar", headers.Matched.parse("foo/*"), True), ("foo/bar", "*/*", True), ("foo/bar", headers.Matched.parse("*/*"), True), ("foo/*", "foo/bar", True), ("foo/*", headers.Matched.parse("foo/bar"), True), ("foo/*", "foo/*", True), ("foo/*", headers.Matched.parse("foo/*"), True), ("foo/*", "*/*", True), ("foo/*", headers.Matched.parse("*/*"), True), ("*/*", "foo/bar", True), ("*/*", headers.Matched.parse("foo/bar"), True), ("*/*", "foo/*", True), ("*/*", headers.Matched.parse("foo/*"), True), ("*/*", "*/*", True), ("*/*", headers.Matched.parse("*/*"), True), ), ) def test_accept_matching(value, other, outcome): assert bool(headers.Matched.parse(value).match(other)) is outcome @pytest.mark.parametrize("value", ("foo/bar", "foo/*", "*/*")) def test_value_in_accept(value): acceptable = headers.parse_accept(value) assert acceptable.match("foo/bar") assert acceptable.match("foo/*") assert acceptable.match("*/*") @pytest.mark.parametrize("value", ("foo/bar", "foo/*")) def test_value_not_in_accept(value): acceptable = headers.parse_accept(value) assert not acceptable.match("no/match") assert not acceptable.match("no/*") assert "*/*" not in acceptable assert "*/bar" not in acceptable @pytest.mark.parametrize( "header,expected", ( ( "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", # noqa: E501 [ "text/html", "application/xhtml+xml", "image/avif", "image/webp", "application/xml;q=0.9", "*/*;q=0.8", ], ), ), ) def test_browser_headers_general(header, expected): request = Request(b"/", {"accept": header}, "1.1", "GET", None, None) assert [str(item) for item in request.accept] == expected @pytest.mark.parametrize( "header,expected", ( ( "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", # noqa: E501 [ ("text/html", 1.0), ("application/xhtml+xml", 1.0), ("image/avif", 1.0), ("image/webp", 1.0), ("application/xml", 0.9), ("*/*", 0.8), ], ), ), ) def test_browser_headers_specific(header, expected): mimes = [e[0] for e in expected] qs = [e[1] for e in expected] request = Request(b"/", {"accept": header}, "1.1", "GET", None, None) assert request.accept == mimes for a, m, q in zip(request.accept, mimes, qs): assert a == m assert a.mime == m assert a.q == q @pytest.mark.parametrize( "raw", ( "text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8", "application/xml;q=0.9, */*;q=0.8, text/html, application/xhtml+xml", ( "foo/bar;q=0.9, */*;q=0.8, text/html=0.8, " "text/plain, application/xhtml+xml" ), ), ) def test_accept_ordering(raw): """Should sort by q but also be stable.""" accept = headers.parse_accept(raw) assert accept[0].type == "text" raw1 = ", ".join(str(a) for a in accept) accept = headers.parse_accept(raw1) raw2 = ", ".join(str(a) for a in accept) assert raw1 == raw2 def test_not_accept_wildcard(): accept = headers.parse_accept("*/*, foo/*, */bar, foo/bar;q=0.1") assert not accept.match("text/html", "foo/foo", "bar/bar", accept_wildcards=False) # Should ignore wildcards in accept but still matches them from mimes m = accept.match("text/plain", "*/*", accept_wildcards=False) assert m.mime == "*/*" assert m.match("*/*") assert m.header == "foo/bar" assert not accept.match("text/html", "foo/foo", "bar/bar", accept_wildcards=False) def test_accept_misc(): header = "foo/bar;q=0.0, */plain;param=123, text/plain, text/*, foo/bar;q=0.5" a = headers.parse_accept(header) assert repr(a) == ( "[*/plain;param=123, text/plain, text/*, " "foo/bar;q=0.5, foo/bar;q=0.0]" ) # noqa: E501 assert str(a) == ( "*/plain;param=123, text/plain, text/*, " "foo/bar;q=0.5, foo/bar;q=0.0" ) # noqa: E501 # q=1 types don't match foo/bar but match the two others, # text/* comes first and matches */plain because it # comes first in the header m = a.match("foo/bar", "text/*", "text/plain") assert repr(m) == "" assert m == "text/*" assert m.mime == "text/*" assert m.header.mime == "*/plain" assert m.header.type == "*" assert m.header.subtype == "plain" assert m.header.q == 1.0 assert m.header.params == {"param": "123"} # Matches object against another Matched object (by mime and header) assert m == a.match("text/*") # Against unsupported type falls back to object id matching assert m != 123 # Matches the highest q value m = a.match("foo/bar") assert repr(m) == "" assert m == "foo/bar" assert m == "foo/bar;q=0.5" # Matching nothing special case m = a.match() assert m == "" assert m.header is None # No header means anything a = headers.parse_accept(None) assert a == ["*/*"] assert a.match("foo/bar") # Empty header means nothing a = headers.parse_accept("") assert a == [] assert not a.match("foo/bar") @pytest.mark.parametrize( "headers,expected", ( ({"foo": "bar"}, "bar"), ((("foo", "bar"), ("foo", "baz")), "bar,baz"), ({}, ""), ), ) def test_field_simple_accessor(headers, expected): request = make_request(headers) assert == request.headers.foo_ == expected @pytest.mark.parametrize( "headers,expected", ( ({"foo-bar": "bar"}, "bar"), ((("foo-bar", "bar"), ("foo-bar", "baz")), "bar,baz"), ), ) def test_field_hyphenated_accessor(headers, expected): request = make_request(headers) assert request.headers.foo_bar == request.headers.foo_bar_ == expected def test_bad_accessor(): request = make_request({}) msg = "'Header' object has no attribute '_foo'" with pytest.raises(AttributeError, match=msg): request.headers._foo def test_multiple_fields_accessor(app: Sanic): @app.get("") async def handler(request: Request): return json({"field": request.headers.example_field}) _, response = app.test_client.get( "/", headers=(("Example-Field", "Foo, Bar"), ("Example-Field", "Baz")) ) assert response.json["field"] == "Foo, Bar,Baz"