sanic/sanic/headers.py
L. Kärkkäinen bffdb3b5c2 More robust response datatype handling (#1674)
* HTTP1 header formatting moved to headers.format_headers and rewritten.

- New implementation is one line of code and twice faster than the old one.
- Whole header block encoded to UTF-8 in one pass.
- No longer supports custom encode method on header values.
- Cookie objects now have __str__ in addition to encode, to work with this.

* Linter

* format_http1_response

* Replace encode_body with faster implementation based on f-string.

Benchmarks:

def encode_body(data):
    try:
        # Try to encode it regularly
        return data.encode()
    except AttributeError:
        # Convert it to a str if you can't
        return str(data).encode()

def encode_body2(data):
    return f"{data}".encode()

def encode_body3(data):
    return str(data).encode()

data_str, data_int = "foo", 123

%timeit encode_body(data_int)
928 ns ± 2.96 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

%timeit encode_body2(data_int)
280 ns ± 2.09 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

%timeit encode_body3(data_int)
387 ns ± 1.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

%timeit encode_body(data_str)
202 ns ± 1.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

%timeit encode_body2(data_str)
197 ns ± 0.507 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

%timeit encode_body3(data_str)
313 ns ± 1.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

* Wtf linter

* Content-type fixes.

* Body encoding sanitation, first pass.
- body/data type autodetection fixed.
- do not repr(body).encode() bytes-ish values.
- support __html__ and _repr_html_ in sanic.response.html().

* <any type>-to-str response autoconversion limited to sanic.response.text() only.

* Workaround MyPy issue.

* Add an empty line to make isort happy.

* Add html test for __html__ and _repr_html_.

* Remove StreamingHTTPResponse.get_headers helper function.

* Add back HTTPResponse Keep-Alive removed by earlier merge or something.

* Revert "Remove StreamingHTTPResponse.get_headers helper function."

Tests depend on this otherwise useless function.

This reverts commit 9651e6ae01.

* Add deprecation warnings; instead of assert for wrong HTTP version, and for non-string response.text.

* Add back missing import.

* Avoid duplicate response header tweaking code.

* Linter errors
2020-01-20 10:34:32 -06:00

201 lines
7.0 KiB
Python

import re
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import unquote
from sanic.helpers import STATUS_CODES
HeaderIterable = Iterable[Tuple[str, Any]] # Values convertible to str
Options = Dict[str, Union[int, str]] # key=value fields in various headers
OptionsIterable = Iterable[Tuple[str, str]] # May contain duplicate keys
_token, _quoted = r"([\w!#$%&'*+\-.^_`|~]+)", r'"([^"]*)"'
_param = re.compile(fr";\s*{_token}=(?:{_token}|{_quoted})", re.ASCII)
_firefox_quote_escape = re.compile(r'\\"(?!; |\s*$)')
_ipv6 = "(?:[0-9A-Fa-f]{0,4}:){2,7}[0-9A-Fa-f]{0,4}"
_ipv6_re = re.compile(_ipv6)
_host_re = re.compile(
r"((?:\[" + _ipv6 + r"\])|[a-zA-Z0-9.\-]{1,253})(?::(\d{1,5}))?"
)
# RFC's quoted-pair escapes are mostly ignored by browsers. Chrome, Firefox and
# curl all have different escaping, that we try to handle as well as possible,
# even though no client espaces in a way that would allow perfect handling.
# For more information, consult ../tests/test_requests.py
def parse_content_header(value: str) -> Tuple[str, Options]:
"""Parse content-type and content-disposition header values.
E.g. 'form-data; name=upload; filename=\"file.txt\"' to
('form-data', {'name': 'upload', 'filename': 'file.txt'})
Mostly identical to cgi.parse_header and werkzeug.parse_options_header
but runs faster and handles special characters better. Unescapes quotes.
"""
value = _firefox_quote_escape.sub("%22", value)
pos = value.find(";")
if pos == -1:
options: Dict[str, Union[int, str]] = {}
else:
options = {
m.group(1).lower(): m.group(2) or m.group(3).replace("%22", '"')
for m in _param.finditer(value[pos:])
}
value = value[:pos]
return value.strip().lower(), options
# https://tools.ietf.org/html/rfc7230#section-3.2.6 and
# https://tools.ietf.org/html/rfc7239#section-4
# This regex is for *reversed* strings because that works much faster for
# right-to-left matching than the other way around. Be wary that all things are
# a bit backwards! _rparam matches forwarded pairs alike ";key=value"
_rparam = re.compile(f"(?:{_token}|{_quoted})={_token}\\s*($|[;,])", re.ASCII)
def parse_forwarded(headers, config) -> Optional[Options]:
"""Parse RFC 7239 Forwarded headers.
The value of `by` or `secret` must match `config.FORWARDED_SECRET`
:return: dict with keys and values, or None if nothing matched
"""
header = headers.getall("forwarded", None)
secret = config.FORWARDED_SECRET
if header is None or not secret:
return None
header = ",".join(header) # Join multiple header lines
if secret not in header:
return None
# Loop over <separator><key>=<value> elements from right to left
sep = pos = None
options: List[Tuple[str, str]] = []
found = False
for m in _rparam.finditer(header[::-1]):
# Start of new element? (on parser skips and non-semicolon right sep)
if m.start() != pos or sep != ";":
# Was the previous element (from right) what we wanted?
if found:
break
# Clear values and parse as new element
del options[:]
pos = m.end()
val_token, val_quoted, key, sep = m.groups()
key = key.lower()[::-1]
val = (val_token or val_quoted.replace('"\\', '"'))[::-1]
options.append((key, val))
if key in ("secret", "by") and val == secret:
found = True
# Check if we would return on next round, to avoid useless parse
if found and sep != ";":
break
# If secret was found, return the matching options in left-to-right order
return fwd_normalize(reversed(options)) if found else None
def parse_xforwarded(headers, config) -> Optional[Options]:
"""Parse traditional proxy headers."""
real_ip_header = config.REAL_IP_HEADER
proxies_count = config.PROXIES_COUNT
addr = real_ip_header and headers.get(real_ip_header)
if not addr and proxies_count:
assert proxies_count > 0
try:
# Combine, split and filter multiple headers' entries
forwarded_for = headers.getall(config.FORWARDED_FOR_HEADER)
proxies = [
p
for p in (
p.strip() for h in forwarded_for for p in h.split(",")
)
if p
]
addr = proxies[-proxies_count]
except (KeyError, IndexError):
pass
# No processing of other headers if no address is found
if not addr:
return None
def options():
yield "for", addr
for key, header in (
("proto", "x-scheme"),
("proto", "x-forwarded-proto"), # Overrides X-Scheme if present
("host", "x-forwarded-host"),
("port", "x-forwarded-port"),
("path", "x-forwarded-path"),
):
yield key, headers.get(header)
return fwd_normalize(options())
def fwd_normalize(fwd: OptionsIterable) -> Options:
"""Normalize and convert values extracted from forwarded headers."""
ret: Dict[str, Union[int, str]] = {}
for key, val in fwd:
if val is not None:
try:
if key in ("by", "for"):
ret[key] = fwd_normalize_address(val)
elif key in ("host", "proto"):
ret[key] = val.lower()
elif key == "port":
ret[key] = int(val)
elif key == "path":
ret[key] = unquote(val)
else:
ret[key] = val
except ValueError:
pass
return ret
def fwd_normalize_address(addr: str) -> str:
"""Normalize address fields of proxy headers."""
if addr == "unknown":
raise ValueError() # omit unknown value identifiers
if addr.startswith("_"):
return addr # do not lower-case obfuscated strings
if _ipv6_re.fullmatch(addr):
addr = f"[{addr}]" # bracket IPv6
return addr.lower()
def parse_host(host: str) -> Tuple[Optional[str], Optional[int]]:
"""Split host:port into hostname and port.
:return: None in place of missing elements
"""
m = _host_re.fullmatch(host)
if not m:
return None, None
host, port = m.groups()
return host.lower(), int(port) if port is not None else None
def format_http1(headers: HeaderIterable) -> bytes:
"""Convert a headers iterable into HTTP/1 header format.
- Outputs UTF-8 bytes where each header line ends with \\r\\n.
- Values are converted into strings if necessary.
"""
return "".join(f"{name}: {val}\r\n" for name, val in headers).encode()
def format_http1_response(
status: int, headers: HeaderIterable, body=b""
) -> bytes:
"""Format a full HTTP/1.1 response.
- If `body` is included, content-length must be specified in headers.
"""
headerbytes = format_http1(headers)
return b"HTTP/1.1 %d %b\r\n%b\r\n%b" % (
status,
STATUS_CODES.get(status, b"UNKNOWN"),
headerbytes,
body,
)