sanic/sanic/response.py
L. Kärkkäinen 7028eae083
Streaming Server (#1876)
* Streaming request by async for.

* Make all requests streaming and preload body for non-streaming handlers.

* Cleanup of code and avoid mixing streaming responses.

* Async http protocol loop.

* Change of test: don't require early bad request error but only after CRLF-CRLF.

* Add back streaming requests.

* Rewritten request body parser.

* Misc. cleanup, down to 4 failing tests.

* All tests OK.

* Entirely remove request body queue.

* Let black f*ckup the layout

* Better testing error messages on protocol errors.

* Remove StreamBuffer tests because the type is about to be removed.

* Remove tests using the deprecated get_headers function that can no longer be supported. Chunked mode is now autodetected, so do not put content-length header if chunked mode is preferred.

* Major refactoring of HTTP protocol handling (new module http.py added), all requests made streaming. A few compatibility issues and a lot of cleanup to be done remain, 16 tests failing.

* Terminate check_timeouts once connection_task finishes.

* Code cleanup, 14 tests failing.

* Much cleanup, 12 failing...

* Even more cleanup and error checking, 8 failing tests.

* Remove keep-alive header from responses. First of all, it should say timeout=<value> which wasn't the case with existing implementation, and secondly none of the other web servers I tried include this header.

* Everything but CustomServer OK.

* Linter

* Disable custom protocol test

* Remove unnecessary variables, optimise performance.

* A test was missing that body_init/body_push/body_finish are never called. Rewritten using receive_body and case switching to make it fail if bypassed.

* Minor fixes.

* Remove unused code.

* Py 3.8 check for deprecated loop argument.

* Fix a middleware cancellation handling test with py38.

* Linter 'n fixes

* Typing

* Stricter handling of request header size

* More specific error messages on Payload Too Large.

* Init http.response = None

* Messages further tuned.

* Always try to consume request body, plus minor cleanup.

* Add a missing check in case of close_if_idle on a dead connection.

* Avoid error messages on PayloadTooLarge.

* Add test for new API.

* json takes str, not bytes

* Default to no maximum request size for streaming handlers.

* Fix chunked mode crash.

* Header values should be strictly ASCII but both UTF-8 and Latin-1 exist. Use UTF-8B to
cope with all.

* Refactoring and cleanup.

* Unify response header processing of ASGI and asyncio modes.

* Avoid special handling of StreamingHTTPResponse.

* 35 % speedup in HTTP/1.1 response formatting (not so much overall effect).

* Duplicate set-cookie headers were being produced.

* Cleanup processed_headers some more.

* Linting

* Import ordering

* Response middleware ran by async request.respond().

* Need to check if transport is closing to avoid getting stuck in sending loops after peer has disconnected.

* Middleware and error handling refactoring.

* Linter

* Fix tracking of HTTP stage when writing to transport fails.

* Add clarifying comment

* Add a check for request body functions and a test for NotImplementedError.

* Linter and typing

* These must be tuples + hack mypy warnings away.

* New streaming test and minor fixes.

* Constant receive buffer size.

* 256 KiB send and receive buffers.

* Revert "256 KiB send and receive buffers."

This reverts commit abc1e3edb2.

* app.handle_exception already sends the response.

* Improved handling of errors during request.

* An odd hack to avoid an httpx limitation that causes test failures.

* Limit request header size to 8 KiB at most.

* Remove unnecessary use of format string.

* Cleanup tests

* Remove artifact

* Fix type checking

* Mark test for skipping

* Cleanup some edge cases

* Add ignore_body flag to safe methods

* Add unit tests for timeout logic

* Add unit tests for timeout logic

* Fix Mock usage in timeout test

* Change logging test to only logger in handler

* Windows py3.8 logging issue with current testing client

* Add test_header_size_exceeded

* Resolve merge conflicts

* Add request middleware to hard exception handling

* Add request middleware to hard exception handling

* Request middleware on exception handlers

* Linting

* Cleanup deprecations

Co-authored-by: L. Kärkkäinen <tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <admhpkns@gmail.com>
2021-01-11 00:45:36 +02:00

400 lines
11 KiB
Python

from functools import partial
from mimetypes import guess_type
from os import path
from urllib.parse import quote_plus
from warnings import warn
from sanic.compat import Header, open_async
from sanic.cookies import CookieJar
from sanic.helpers import has_message_body, remove_entity_headers
try:
from ujson import dumps as json_dumps
except ImportError:
# This is done in order to ensure that the JSON response is
# kept consistent across both ujson and inbuilt json usage.
from json import dumps
json_dumps = partial(dumps, separators=(",", ":"))
class BaseHTTPResponse:
def __init__(self):
self.asgi = False
def _encode_body(self, data):
if data is None:
return b""
return data.encode() if hasattr(data, "encode") else data
@property
def cookies(self):
if self._cookies is None:
self._cookies = CookieJar(self.headers)
return self._cookies
@property
def processed_headers(self):
"""Obtain a list of header tuples encoded in bytes for sending.
Add and remove headers based on status and content_type.
"""
# TODO: Make a blacklist set of header names and then filter with that
if self.status in (304, 412): # Not Modified, Precondition Failed
self.headers = remove_entity_headers(self.headers)
if has_message_body(self.status):
self.headers.setdefault("content-type", self.content_type)
# Encode headers into bytes
return (
(name.encode("ascii"), f"{value}".encode(errors="surrogateescape"))
for name, value in self.headers.items()
)
async def send(self, data=None, end_stream=None):
"""Send any pending response headers and the given data as body.
:param data: str or bytes to be written
:end_stream: whether to close the stream after this block
"""
if data is None and end_stream is None:
end_stream = True
if end_stream and not data and self.stream.send is None:
return
data = data.encode() if hasattr(data, "encode") else data or b""
await self.stream.send(data, end_stream=end_stream)
class StreamingHTTPResponse(BaseHTTPResponse):
"""Old style streaming response. Use `request.respond()` instead of this in
new code to avoid the callback."""
__slots__ = (
"streaming_fn",
"status",
"content_type",
"headers",
"_cookies",
)
def __init__(
self,
streaming_fn,
status=200,
headers=None,
content_type="text/plain; charset=utf-8",
chunked="deprecated",
):
if chunked != "deprecated":
warn(
"The chunked argument has been deprecated and will be "
"removed in v21.6"
)
super().__init__()
self.content_type = content_type
self.streaming_fn = streaming_fn
self.status = status
self.headers = Header(headers or {})
self._cookies = None
async def write(self, data):
"""Writes a chunk of data to the streaming response.
:param data: str or bytes-ish data to be written.
"""
await super().send(self._encode_body(data))
async def send(self, *args, **kwargs):
if self.streaming_fn is not None:
await self.streaming_fn(self)
self.streaming_fn = None
await super().send(*args, **kwargs)
class HTTPResponse(BaseHTTPResponse):
__slots__ = ("body", "status", "content_type", "headers", "_cookies")
def __init__(
self,
body=None,
status=200,
headers=None,
content_type=None,
):
super().__init__()
self.content_type = content_type
self.body = self._encode_body(body)
self.status = status
self.headers = Header(headers or {})
self._cookies = None
def empty(status=204, headers=None):
"""
Returns an empty response to the client.
:param status Response code.
:param headers Custom Headers.
"""
return HTTPResponse(body=b"", status=status, headers=headers)
def json(
body,
status=200,
headers=None,
content_type="application/json",
dumps=json_dumps,
**kwargs,
):
"""
Returns response object with body in json format.
:param body: Response data to be serialized.
:param status: Response code.
:param headers: Custom Headers.
:param kwargs: Remaining arguments that are passed to the json encoder.
"""
return HTTPResponse(
dumps(body, **kwargs),
headers=headers,
status=status,
content_type=content_type,
)
def text(
body, status=200, headers=None, content_type="text/plain; charset=utf-8"
):
"""
Returns response object with body in text format.
:param body: Response data to be encoded.
:param status: Response code.
:param headers: Custom Headers.
:param content_type: the content type (string) of the response
"""
if not isinstance(body, str):
raise TypeError(
f"Bad body type. Expected str, got {type(body).__name__})"
)
return HTTPResponse(
body, status=status, headers=headers, content_type=content_type
)
def raw(
body, status=200, headers=None, content_type="application/octet-stream"
):
"""
Returns response object without encoding the body.
:param body: Response data.
:param status: Response code.
:param headers: Custom Headers.
:param content_type: the content type (string) of the response.
"""
return HTTPResponse(
body=body,
status=status,
headers=headers,
content_type=content_type,
)
def html(body, status=200, headers=None):
"""
Returns response object with body in html format.
:param body: str or bytes-ish, or an object with __html__ or _repr_html_.
:param status: Response code.
:param headers: Custom Headers.
"""
if hasattr(body, "__html__"):
body = body.__html__()
elif hasattr(body, "_repr_html_"):
body = body._repr_html_()
return HTTPResponse(
body,
status=status,
headers=headers,
content_type="text/html; charset=utf-8",
)
async def file(
location,
status=200,
mime_type=None,
headers=None,
filename=None,
_range=None,
):
"""Return a response object with file data.
:param location: Location of file on system.
:param mime_type: Specific mime_type.
:param headers: Custom Headers.
:param filename: Override filename.
:param _range:
"""
headers = headers or {}
if filename:
headers.setdefault(
"Content-Disposition", f'attachment; filename="{filename}"'
)
filename = filename or path.split(location)[-1]
async with await open_async(location, mode="rb") as f:
if _range:
await f.seek(_range.start)
out_stream = await f.read(_range.size)
headers[
"Content-Range"
] = f"bytes {_range.start}-{_range.end}/{_range.total}"
status = 206
else:
out_stream = await f.read()
mime_type = mime_type or guess_type(filename)[0] or "text/plain"
return HTTPResponse(
body=out_stream,
status=status,
headers=headers,
content_type=mime_type,
)
async def file_stream(
location,
status=200,
chunk_size=4096,
mime_type=None,
headers=None,
filename=None,
chunked="deprecated",
_range=None,
):
"""Return a streaming response object with file data.
:param location: Location of file on system.
:param chunk_size: The size of each chunk in the stream (in bytes)
:param mime_type: Specific mime_type.
:param headers: Custom Headers.
:param filename: Override filename.
:param chunked: Deprecated
:param _range:
"""
if chunked != "deprecated":
warn(
"The chunked argument has been deprecated and will be "
"removed in v21.6"
)
headers = headers or {}
if filename:
headers.setdefault(
"Content-Disposition", f'attachment; filename="{filename}"'
)
filename = filename or path.split(location)[-1]
mime_type = mime_type or guess_type(filename)[0] or "text/plain"
if _range:
start = _range.start
end = _range.end
total = _range.total
headers["Content-Range"] = f"bytes {start}-{end}/{total}"
status = 206
async def _streaming_fn(response):
async with await open_async(location, mode="rb") as f:
if _range:
await f.seek(_range.start)
to_send = _range.size
while to_send > 0:
content = await f.read(min((_range.size, chunk_size)))
if len(content) < 1:
break
to_send -= len(content)
await response.write(content)
else:
while True:
content = await f.read(chunk_size)
if len(content) < 1:
break
await response.write(content)
return StreamingHTTPResponse(
streaming_fn=_streaming_fn,
status=status,
headers=headers,
content_type=mime_type,
)
def stream(
streaming_fn,
status=200,
headers=None,
content_type="text/plain; charset=utf-8",
chunked="deprecated",
):
"""Accepts an coroutine `streaming_fn` which can be used to
write chunks to a streaming response. Returns a `StreamingHTTPResponse`.
Example usage::
@app.route("/")
async def index(request):
async def streaming_fn(response):
await response.write('foo')
await response.write('bar')
return stream(streaming_fn, content_type='text/plain')
:param streaming_fn: A coroutine accepts a response and
writes content to that response.
:param mime_type: Specific mime_type.
:param headers: Custom Headers.
:param chunked: Deprecated
"""
if chunked != "deprecated":
warn(
"The chunked argument has been deprecated and will be "
"removed in v21.6"
)
return StreamingHTTPResponse(
streaming_fn,
headers=headers,
content_type=content_type,
status=status,
)
def redirect(
to, headers=None, status=302, content_type="text/html; charset=utf-8"
):
"""Abort execution and cause a 302 redirect (by default).
:param to: path or fully qualified URL to redirect to
:param headers: optional dict of headers to include in the new request
:param status: status code (int) of the new request, defaults to 302
:param content_type: the content type (string) of the response
:returns: the redirecting Response
"""
headers = headers or {}
# URL Quote the URL before redirecting
safe_to = quote_plus(to, safe=":/%#?&=@[]!$&'()*+,;")
# According to RFC 7231, a relative URI is now permitted.
headers["Location"] = safe_to
return HTTPResponse(
status=status, headers=headers, content_type=content_type
)