Major refactoring of HTTP protocol handling (new module http.py added), all requests made streaming. A few compatibility issues and a lot of cleanup to be done remain, 16 tests failing.

This commit is contained in:
L. Kärkkäinen 2020-02-28 18:43:36 +02:00
parent 85c67a0014
commit 85b1ad5732
6 changed files with 529 additions and 443 deletions

View File

@ -930,7 +930,7 @@ class Sanic:
"""
pass
async def handle_request(self, request, write_callback, stream_callback):
async def handle_request(self, request):
"""Take a request from the HTTP Server and return a response object
to be sent back The HTTP Server only expects a response object, so
exception handling must be done here
@ -998,6 +998,7 @@ class Sanic:
# issue a response.
response = None
cancelled = True
raise
except Exception as e:
# -------------------------------------------- #
# Response Generation Failed
@ -1045,19 +1046,52 @@ class Sanic:
if cancelled:
raise CancelledError()
try:
# pass the response to the correct callback
if write_callback is None or isinstance(
response, StreamingHTTPResponse
):
if stream_callback:
await stream_callback(response)
else:
# Should only end here IF it is an ASGI websocket.
# TODO:
# - Add exception handling
if response is None:
pass
elif isinstance(response, StreamingHTTPResponse):
await response.stream(request)
elif isinstance(response, HTTPResponse):
await request.respond(response).send(end_stream=True)
else:
write_callback(response)
raise ServerError(f"Invalid response type {response} (need HTTPResponse)")
except Exception as e:
# -------------------------------------------- #
# Response Generation Failed
# -------------------------------------------- #
try:
response = self.error_handler.response(request, e)
if isawaitable(response):
response = await response
except Exception as e:
if isinstance(e, SanicException):
response = self.error_handler.default(
request=request, exception=e
)
elif self.debug:
response = HTTPResponse(
"Error while handling error: {}\nStack: {}".format(
e, format_exc()
),
status=500,
)
else:
response = HTTPResponse(
"An error occurred while handling an error", status=500
)
# pass the response to the correct callback
if response is None:
pass
elif isinstance(response, StreamingHTTPResponse):
await response.stream(request)
elif isinstance(response, HTTPResponse):
await request.respond(response).send(end_stream=True)
else:
raise ServerError(
f"Invalid response type {response} (need HTTPResponse)")
# -------------------------------------------------------------------- #
# Testing

View File

@ -20,9 +20,8 @@ import sanic.app # noqa
from sanic.compat import Header
from sanic.exceptions import InvalidUsage, ServerError
from sanic.log import logger
from sanic.request import Request
from sanic.request import Request, StreamBuffer
from sanic.response import HTTPResponse, StreamingHTTPResponse
from sanic.server import StreamBuffer
from sanic.websocket import WebSocketConnection
@ -255,19 +254,6 @@ class ASGIApp:
return instance
async def read_body(self) -> bytes:
"""
Read and return the entire body from an incoming ASGI message.
"""
body = b""
more_body = True
while more_body:
message = await self.transport.receive()
body += message.get("body", b"")
more_body = message.get("more_body", False)
return body
async def stream_body(self) -> None:
"""
Read and stream the body in chunks from an incoming ASGI message.
@ -277,13 +263,94 @@ class ASGIApp:
return None
return message.get("body", b"")
def respond(self, response):
headers: List[Tuple[bytes, bytes]] = []
cookies: Dict[str, str] = {}
try:
cookies = {
v.key: v
for _, v in list(
filter(
lambda item: item[0].lower() == "set-cookie",
response.headers.items(),
)
)
}
headers += [
(str(name).encode("latin-1"), str(value).encode("latin-1"))
for name, value in response.headers.items()
if name.lower() not in ["set-cookie"]
]
except AttributeError:
logger.error(
"Invalid response object for url %s, "
"Expected Type: HTTPResponse, Actual Type: %s",
self.request.url,
type(response),
)
exception = ServerError("Invalid response type")
response = self.sanic_app.error_handler.response(
self.request, exception
)
headers = [
(str(name).encode("latin-1"), str(value).encode("latin-1"))
for name, value in response.headers.items()
if name not in (b"Set-Cookie",)
]
if "content-length" not in response.headers and not isinstance(
response, StreamingHTTPResponse
):
headers += [
(b"content-length", str(len(response.body)).encode("latin-1"))
]
if "content-type" not in response.headers:
headers += [
(b"content-type", str(response.content_type).encode("latin-1"))
]
if response.cookies:
cookies.update(
{
v.key: v
for _, v in response.cookies.items()
if v.key not in cookies.keys()
}
)
headers += [
(b"set-cookie", cookie.encode("utf-8"))
for k, cookie in cookies.items()
]
self.response_start = {
"type": "http.response.start",
"status": response.status,
"headers": headers,
}
self.response_body = response.body
return self
async def send(self, data=None, end_stream=None):
if data is None is end_stream:
end_stream = True
if self.response_start:
await self.transport.send(self.response_start)
self.response_start = None
if self.response_body:
data = self.response_body + data if data else self.response_body
self.response_body = None
await self.transport.send({
"type": "http.response.body",
"body": data.encode() if hasattr(data, "encode") else data,
"more_body": not end_stream,
})
async def __call__(self) -> None:
"""
Handle the incoming request.
"""
handler = self.sanic_app.handle_request
callback = None if self.ws else self.stream_callback
await handler(self.request, None, callback)
await self.sanic_app.handle_request(self.request)
async def stream_callback(self, response: HTTPResponse) -> None:
"""

329
sanic/http.py Normal file
View File

@ -0,0 +1,329 @@
from enum import Enum
from sanic.exceptions import (
HeaderExpectationFailed,
InvalidUsage,
PayloadTooLarge,
RequestTimeout,
SanicException,
ServerError,
ServiceUnavailable,
)
from sanic.headers import format_http1, format_http1_response
from sanic.helpers import has_message_body, remove_entity_headers
from sanic.log import access_logger, logger
from sanic.request import Request
from sanic.response import HTTPResponse
from sanic.compat import Header
class Lifespan(Enum):
IDLE = 0 # Waiting for request
REQUEST = 1 # Request headers being received
HANDLER = 3 # Headers done, handler running
RESPONSE = 4 # Response headers sent, body in progress
FAILED = 100 # Unrecoverable state (error while sending response)
HTTP_CONTINUE = b"HTTP/1.1 100 Continue\r\n\r\n"
class Http:
def __init__(self, protocol):
self._send = protocol.push_data
self._receive_more = protocol.receive_more
self.protocol = protocol
self.recv_buffer = bytearray()
self.expecting_continue = False
# Note: connections are initially in request mode and do not obey
# keep-alive timeout like with some other servers.
self.lifespan = Lifespan.REQUEST
async def http1(self):
"""HTTP 1.1 connection handler"""
buf = self.recv_buffer
self.keep_alive = True
url = None
while self.keep_alive:
# Read request header
pos = 0
while len(buf) < self.protocol.request_max_size:
if buf:
pos = buf.find(b"\r\n\r\n", pos)
if pos >= 0:
break
pos = max(0, len(buf) - 3)
await self._receive_more()
if self.lifespan is Lifespan.IDLE:
self.lifespan = Lifespan.REQUEST
else:
self.lifespan = Lifespan.HANDLER
raise PayloadTooLarge("Payload Too Large")
self.protocol._total_request_size = pos + 4
try:
reqline, *headers = buf[:pos].decode().split("\r\n")
method, url, protocol = reqline.split(" ")
if protocol not in ("HTTP/1.0", "HTTP/1.1"):
raise Exception
self.head_only = method.upper() == "HEAD"
headers = Header(
(name.lower(), value.lstrip())
for name, value in (h.split(":", 1) for h in headers)
)
except:
self.lifespan = Lifespan.HANDLER
raise InvalidUsage("Bad Request")
# Prepare a request object from the header received
request = self.protocol.request_class(
url_bytes=url.encode(),
headers=headers,
version=protocol[-3:],
method=method,
transport=self.protocol.transport,
app=self.protocol.app,
)
request.stream = self
self.protocol.state["requests_count"] += 1
self.protocol.url = url
self.protocol.request = request
self.keep_alive = (
protocol == "HTTP/1.1"
or headers.get("connection", "").lower() == "keep-alive"
)
# Prepare for request body
body = headers.get("transfer-encoding") == "chunked" or int(
headers.get("content-length", 0)
)
self.request_chunked = False
self.request_bytes_left = 0
self.lifespan = Lifespan.HANDLER
if body:
expect = headers.get("expect")
if expect:
if expect.lower() == "100-continue":
self.expecting_continue = True
else:
raise HeaderExpectationFailed(f"Unknown Expect: {expect}")
request.stream = self
if body is True:
self.request_chunked = True
pos -= 2 # One CRLF stays in buffer
else:
self.request_bytes_left = body
# Remove header and its trailing CRLF
del buf[: pos + 4]
# Run handler
try:
await self.protocol.request_handler(request)
except Exception:
logger.exception("Uncaught from app/handler")
await self.write_error(ServerError("Internal Server Error"))
if self.lifespan is Lifespan.IDLE:
continue
if self.lifespan is Lifespan.HANDLER:
await self.respond(HTTPResponse(status=204)).send(end_stream=True)
# Finish sending a response (if no error)
elif self.lifespan is Lifespan.RESPONSE:
await self.send(end_stream=True)
# Consume any remaining request body
if self.request_bytes_left or self.request_chunked:
logger.error(
f"Handler of {method} {url} did not consume request body."
)
while await self.read():
pass
self.lifespan = Lifespan.IDLE
async def write_error(self, e):
if self.lifespan is Lifespan.HANDLER:
try:
response = HTTPResponse(f"{e}", e.status_code, content_type="text/plain")
await self.respond(response).send(end_stream=True)
except:
logger.exception("Error sending error")
# Request methods
async def __aiter__(self):
while True:
data = await self.read()
if not data:
return
yield data
async def read(self):
# Send a 100-continue if needed
if self.expecting_continue:
self.expecting_continue = False
await self._send(HTTP_CONTINUE)
# Receive request body chunk
buf = self.recv_buffer
if self.request_chunked and self.request_bytes_left == 0:
# Process a chunk header: \r\n<size>[;<chunk extensions>]\r\n
while True:
pos = buf.find(b"\r\n", 3)
if pos != -1:
break
if len(buf) > 64:
self.keep_alive = False
raise InvalidUsage("Bad chunked encoding")
await self._receive_more()
try:
size = int(buf[2:pos].split(b";", 1)[0].decode(), 16)
except:
self.keep_alive = False
raise InvalidUsage("Bad chunked encoding")
self.request_bytes_left = size
self.protocol._total_request_size += pos + 2
del buf[: pos + 2]
if self.request_bytes_left <= 0:
self.request_chunked = False
return None
# At this point we are good to read/return _request_bytes_left
if self.request_bytes_left:
if not buf:
await self._receive_more()
data = bytes(buf[: self.request_bytes_left])
size = len(data)
del buf[:size]
self.request_bytes_left -= size
self.protocol._total_request_size += size
if self.protocol._total_request_size > self.protocol.request_max_size:
self.keep_alive = False
raise PayloadTooLarge("Payload Too Large")
return data
return None
# Response methods
def respond(self, response):
"""Initiate new streaming response.
Nothing is sent until the first send() call on the returned object, and
calling this function multiple times will just alter the response to be
given."""
if self.lifespan is not Lifespan.HANDLER:
self.lifespan = Lifespan.FAILED
raise RuntimeError("Response already started")
if not isinstance(response.status, int) or response.status < 200:
raise RuntimeError(f"Invalid response status {response.status!r}")
self.response = response
return self
async def send(self, data=None, end_stream=None):
"""Send any pending response headers and the given data as body.
:param data: str or bytes to be written
:end_stream: whether to close the stream after this block
"""
if data is None and end_stream is None:
end_stream = True
data = self.data_to_send(data, end_stream)
if data is None:
return
await self._send(data)
def data_to_send(self, data, end_stream):
"""Format output data bytes for given body data.
Headers are prepended to the first output block and then cleared.
:param data: str or bytes to be written
:return: bytes to send, or None if there is nothing to send
"""
data = data.encode() if hasattr(data, "encode") else data
size = len(data) if data is not None else 0
# Headers not yet sent?
if self.lifespan is Lifespan.HANDLER:
if self.response.body:
data = self.response.body + data if data else self.response.body
size = len(data)
r = self.response
status = r.status
headers = r.headers
if r.content_type and "content-type" not in headers:
headers["content-type"] = r.content_type
# Not Modified, Precondition Failed
if status in (304, 412):
headers = remove_entity_headers(headers)
if not has_message_body(status):
# Header-only response status
if (
size > 0
or not end_stream
or "content-length" in headers
or "transfer-encoding" in headers
):
# TODO: This matches old Sanic operation but possibly
# an exception would be more appropriate?
data = None
size = 0
end_stream = True
#raise ServerError(
# f"A {status} response may only have headers, no body."
#)
elif self.head_only and "content-length" in headers:
pass
elif end_stream:
# Non-streaming response (all in one block)
headers["content-length"] = size
elif "content-length" in headers:
# Streaming response with size known in advance
self.response_bytes_left = int(headers["content-length"]) - size
else:
# Length not known, use chunked encoding
headers["transfer-encoding"] = "chunked"
data = b"%x\r\n%b\r\n" % (size, data) if size else None
self.response_bytes_left = True
self.headers = None
if self.head_only:
data = None
self.response_bytes_left = None
if self.keep_alive:
headers["connection"] = "keep-alive"
headers["keep-alive"] = self.protocol.keep_alive_timeout
else:
headers["connection"] = "close"
ret = format_http1_response(status, headers.items(), data or b"")
# Send a 100-continue if expected and not Expectation Failed
if self.expecting_continue:
self.expecting_continue = False
if status != 417:
ret = HTTP_CONTINUE + ret
# Send response
self.lifespan = Lifespan.IDLE if end_stream else Lifespan.RESPONSE
return ret
# HEAD request: don't send body
if self.head_only:
return None
if self.lifespan is not Lifespan.RESPONSE:
if size:
raise RuntimeError("Cannot send data to a closed stream")
return
# Chunked encoding
if self.response_bytes_left is True:
if end_stream:
self.response_bytes_left = None
self.lifespan = Lifespan.IDLE
if size:
return b"%x\r\n%b\r\n0\r\n\r\n" % (size, data)
return b"0\r\n\r\n"
return b"%x\r\n%b\r\n" % (size, data) if size else None
# Normal encoding
else:
self.response_bytes_left -= size
if self.response_bytes_left <= 0:
if self.response_bytes_left < 0:
raise ServerError("Response was bigger than content-length")
self.lifespan = Lifespan.IDLE
return data if size else None

View File

@ -9,6 +9,7 @@ from urllib.parse import parse_qs, parse_qsl, unquote, urlunparse
from httptools import parse_url # type: ignore
from sanic.compat import Header
from sanic.exceptions import InvalidUsage
from sanic.headers import (
parse_content_header,
@ -16,6 +17,7 @@ from sanic.headers import (
parse_host,
parse_xforwarded,
)
from sanic.response import HTTPResponse
from sanic.log import error_logger, logger
@ -25,7 +27,6 @@ except ImportError:
from json import loads as json_loads # type: ignore
DEFAULT_HTTP_CONTENT_TYPE = "application/octet-stream"
EXPECT_HEADER = "EXPECT"
# HTTP/1.1: https://www.w3.org/Protocols/rfc2616/rfc2616-sec7.html#sec7.2.1
# > If the media type remains unknown, the recipient SHOULD treat it
@ -47,12 +48,9 @@ class RequestParameters(dict):
class StreamBuffer:
def __init__(self, protocol=None):
self._protocol = protocol
async def read(self):
""" Stop reading when gets None """
return await self._protocol.stream_body()
def __init__(self, protocol):
self.read = protocol.stream_body
self.respond = protocol.respond
async def __aiter__(self):
while True:
@ -147,6 +145,15 @@ class Request:
Custom context is now stored in `request.custom_context.yourkey`"""
setattr(self.ctx, key, value)
def respond(self, status=200, headers=None, content_type=DEFAULT_HTTP_CONTENT_TYPE):
return self.stream.respond(
status if isinstance(status, HTTPResponse) else HTTPResponse(
status=status,
headers=headers,
content_type=content_type,
)
)
async def receive_body(self):
self.body = b"".join([data async for data in self.stream])

View File

@ -34,37 +34,6 @@ class BaseHTTPResponse:
self._cookies = CookieJar(self.headers)
return self._cookies
def get_headers(
self,
version="1.1",
keep_alive=False,
keep_alive_timeout=None,
body=b"",
):
""".. deprecated:: 20.3:
This function is not public API and will be removed."""
if version != "1.1":
warnings.warn(
"Only HTTP/1.1 is currently supported (got {version})",
DeprecationWarning,
)
# self.headers get priority over content_type
if self.content_type and "Content-Type" not in self.headers:
self.headers["Content-Type"] = self.content_type
if keep_alive:
self.headers["Connection"] = "keep-alive"
if keep_alive_timeout is not None:
self.headers["Keep-Alive"] = keep_alive_timeout
else:
self.headers["Connection"] = "close"
if self.status in (304, 412):
self.headers = remove_entity_headers(self.headers)
return format_http1_response(self.status, self.headers.items(), body)
class StreamingHTTPResponse(BaseHTTPResponse):
__slots__ = (
@ -75,6 +44,7 @@ class StreamingHTTPResponse(BaseHTTPResponse):
"headers",
"chunked",
"_cookies",
"send",
)
def __init__(
@ -97,44 +67,15 @@ class StreamingHTTPResponse(BaseHTTPResponse):
:param data: str or bytes-ish data to be written.
"""
data = self._encode_body(data)
await self.send(self._encode_body(data))
if self.chunked:
await self.protocol.push_data(b"%x\r\n%b\r\n" % (len(data), data))
else:
await self.protocol.push_data(data)
await self.protocol.drain()
async def stream(
self, version="1.1", keep_alive=False, keep_alive_timeout=None
):
"""Streams headers, runs the `streaming_fn` callback that writes
content to the response body, then finalizes the response body.
"""
if version != "1.1":
self.chunked = False
headers = self.get_headers(
version,
keep_alive=keep_alive,
keep_alive_timeout=keep_alive_timeout,
)
await self.protocol.push_data(headers)
await self.protocol.drain()
async def stream(self, request):
self.send = request.respond(
self.status,
self.headers,
self.content_type,
).send
await self.streaming_fn(self)
if self.chunked:
await self.protocol.push_data(b"0\r\n\r\n")
# no need to await drain here after this write, because it is the
# very last thing we write and nothing needs to wait for it.
def get_headers(
self, version="1.1", keep_alive=False, keep_alive_timeout=None
):
if self.chunked and version == "1.1":
self.headers["Transfer-Encoding"] = "chunked"
self.headers.pop("Content-Length", None)
return super().get_headers(version, keep_alive, keep_alive_timeout)
class HTTPResponse(BaseHTTPResponse):
__slots__ = ("body", "status", "content_type", "headers", "_cookies")

View File

@ -11,11 +11,7 @@ from multiprocessing import Process
from signal import SIG_IGN, SIGINT, SIGTERM, Signals
from signal import signal as signal_func
from socket import SO_REUSEADDR, SOL_SOCKET, socket
from time import monotonic as current_time
from time import time
from httptools import HttpRequestParser # type: ignore
from httptools.parser.errors import HttpParserError # type: ignore
from time import time, monotonic as current_time
from sanic.compat import Header
from sanic.exceptions import (
@ -27,9 +23,9 @@ from sanic.exceptions import (
ServerError,
ServiceUnavailable,
)
from sanic.http import Http, Lifespan
from sanic.log import access_logger, logger
from sanic.request import EXPECT_HEADER, Request, StreamBuffer
from sanic.response import HTTPResponse
from sanic.request import Request
try:
@ -45,14 +41,6 @@ class Signal:
stopped = False
class Status(enum.Enum):
IDLE = 0 # Waiting for request
REQUEST = 1 # Request headers being received
EXPECT = 2 # Sender wants 100-continue
HANDLER = 3 # Headers done, handler running
RESPONSE = 4 # Response headers sent
class HttpProtocol(asyncio.Protocol):
"""
This class provides a basic HTTP implementation of the sanic framework.
@ -82,19 +70,17 @@ class HttpProtocol(asyncio.Protocol):
"access_log",
# connection management
"_total_request_size",
"_request_bytes_left",
"_status",
"_time",
"_last_response_time",
"keep_alive",
"state",
"url",
"_debug",
"_handler_task",
"_buffer",
"_can_write",
"_data_received",
"_time",
"_task",
"_http",
"_exception",
)
@ -145,10 +131,10 @@ class HttpProtocol(asyncio.Protocol):
if "requests_count" not in self.state:
self.state["requests_count"] = 0
self._debug = debug
self._buffer = bytearray()
self._data_received = asyncio.Event(loop=deprecated_loop)
self._can_write = asyncio.Event(loop=deprecated_loop)
self._can_write.set()
self._exception = None
# -------------------------------------------- #
# Connection
@ -157,14 +143,17 @@ class HttpProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.connections.add(self)
self.transport = transport
self._status, self._time = Status.IDLE, current_time()
#self.check_timeouts()
self._http = Http(self)
self._task = self.loop.create_task(self.connection_task())
self._time = current_time()
self.check_timeouts()
self._task = self.loop.create_task(self.http1())
def connection_lost(self, exc):
self.connections.discard(self)
if self._task:
self._task.cancel()
self._task = None
def pause_writing(self):
self._can_write.clear()
@ -183,349 +172,66 @@ class HttpProtocol(asyncio.Protocol):
# -------------------------------------------- #
def data_received(self, data):
self._time = current_time()
if not data:
return self.close()
self._buffer += data
if len(self._buffer) > self.request_max_size:
self._http.recv_buffer += data
if len(self._http.recv_buffer) > self.request_max_size:
self.transport.pause_reading()
if self._data_received:
self._data_received.set()
def check_timeouts(self):
"""Runs itself once a second to enforce any expired timeouts."""
duration = current_time() - self._time
status = self._status
if status == Status.IDLE and duration > self.keep_alive_timeout:
logger.debug("KeepAlive Timeout. Closing connection.")
elif status == Status.REQUEST and duration > self.request_timeout:
self._exception = RequestTimeout("Request Timeout")
elif (
status.value > Status.REQUEST.value
and duration > self.response_timeout
):
self._exception = ServiceUnavailable("Response Timeout")
else:
self.loop.call_later(0.1, self.check_timeouts)
return
self._task.cancel()
async def http1(self):
"""HTTP 1.1 connection handler"""
async def connection_task(self):
try:
self._exception = None
buf = self._buffer
# Note: connections are initially in request mode and do not obey
# keep-alive timeout like with some other servers.
self._status = Status.REQUEST
while self.keep_alive:
# Read request header
pos = 0
self._time = current_time()
while len(buf) < self.request_max_size:
if buf:
self._status = Status.REQUEST
pos = buf.find(b"\r\n\r\n", pos)
if pos >= 0:
break
pos = max(0, len(buf) - 3)
await self.receive_more()
else:
raise PayloadTooLarge("Payload Too Large")
self._total_request_size = pos + 4
try:
reqline, *headers = buf[:pos].decode().split("\r\n")
method, self.url, protocol = reqline.split(" ")
if protocol not in ("HTTP/1.0", "HTTP/1.1"):
raise Exception
headers = Header(
(name.lower(), value.lstrip())
for name, value in (h.split(":", 1) for h in headers)
)
except:
raise InvalidUsage("Bad Request")
self.state["requests_count"] += 1
# Prepare a request object from the header received
self.request = self.request_class(
url_bytes=self.url.encode(),
headers=headers,
version=protocol[-3:],
method=method,
transport=self.transport,
app=self.app,
)
if headers.get("connection", "").lower() == "close":
self.keep_alive = False
# Prepare for request body
body = headers.get("transfer-encoding") == "chunked" or int(
headers.get("content-length", 0)
)
self._request_chunked = False
self._request_bytes_left = 0
if body:
if headers.get(EXPECT_HEADER):
self._status = Status.EXPECT
self.expect_handler()
self.request.stream = StreamBuffer(protocol=self)
if body is True:
self._request_chunked = True
pos -= 2 # One CRLF stays in buffer
else:
self._request_bytes_left = body
# Remove header and its trailing CRLF
del buf[: pos + 4]
# Run handler
self._status, self._time = Status.HANDLER, current_time()
await self.request_handler(
self.request, self.write_response, self.stream_response
)
# Consume any remaining request body
if self._request_bytes_left or self._request_chunked:
logger.error(
f"Handler of {method} {self.url} did not consume request body."
)
while await self.stream_body():
pass
self._status, self._time = Status.IDLE, current_time()
await self._http.http1()
except asyncio.CancelledError:
self.write_error(
await self._http.write_error(
self._exception
or ServiceUnavailable("Request handler cancelled")
)
except SanicException as e:
self.write_error(e)
except Exception as e:
logger.error(f"Uncaught {e!r} handling URL {self.url}")
await self._http.write_error(e)
except BaseException as e:
logger.exception(f"Uncaught exception while handling URL {url}")
finally:
try:
self.close()
async def stream_body(self):
buf = self._buffer
if self._request_chunked and self._request_bytes_left == 0:
# Process a chunk header: \r\n<size>[;<chunk extensions>]\r\n
while True:
pos = buf.find(b"\r\n", 3)
if pos != -1:
break
if len(buf) > 64:
self.keep_alive = False
raise InvalidUsage("Bad chunked encoding")
await self.receive_more()
try:
size = int(buf[2:pos].split(b";", 1)[0].decode(), 16)
except:
self.keep_alive = False
raise InvalidUsage("Bad chunked encoding")
self._request_bytes_left = size
self._total_request_size += pos + 2
del buf[: pos + 2]
if self._request_bytes_left <= 0:
self._request_chunked = False
return None
# At this point we are good to read/return _request_bytes_left
if self._request_bytes_left:
if not buf:
await self.receive_more()
data = bytes(buf[: self._request_bytes_left])
size = len(data)
del buf[:size]
self._request_bytes_left -= size
self._total_request_size += size
if self._total_request_size > self.request_max_size:
self.keep_alive = False
raise PayloadTooLarge("Payload Too Large")
return data
return None
logger.exception("Closing failed")
def expect_handler(self):
"""
Handler for Expect Header.
"""
expect = self.request.headers.get(EXPECT_HEADER)
if self.request.version == "1.1":
if expect.lower() == "100-continue":
self.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n")
def check_timeouts(self):
"""Runs itself once a second to enforce any expired timeouts."""
duration = current_time() - self._time
lifespan = self._http.lifespan
if lifespan == Lifespan.IDLE and duration > self.keep_alive_timeout:
logger.debug("KeepAlive Timeout. Closing connection.")
elif lifespan == Lifespan.REQUEST and duration > self.request_timeout:
self._exception = RequestTimeout("Request Timeout")
elif (
lifespan.value > Lifespan.REQUEST.value
and duration > self.response_timeout
):
self._exception = ServiceUnavailable("Response Timeout")
else:
raise HeaderExpectationFailed(f"Unknown Expect: {expect}")
# -------------------------------------------- #
# Responding
# -------------------------------------------- #
def log_response(self, response):
"""
Helper method provided to enable the logging of responses in case if
the :attr:`HttpProtocol.access_log` is enabled.
:param response: Response generated for the current request
:type response: :class:`sanic.response.HTTPResponse` or
:class:`sanic.response.StreamingHTTPResponse`
:return: None
"""
if self.access_log:
extra = {"status": getattr(response, "status", 0)}
if isinstance(response, HTTPResponse):
extra["byte"] = len(response.body)
else:
extra["byte"] = -1
extra["host"] = "UNKNOWN"
if self.request is not None:
if self.request.ip:
extra["host"] = "{0}:{1}".format(
self.request.ip, self.request.port
)
extra["request"] = "{0} {1}".format(
self.request.method, self.request.url
)
else:
extra["request"] = "nil"
access_logger.info("", extra=extra)
def write_response(self, response):
"""
Writes response content synchronously to the transport.
"""
try:
self._status, self._time = Status.RESPONSE, current_time()
self._last_response_time = self._time
self.transport.write(
response.output(
"1.1", self.keep_alive, self.keep_alive_timeout
)
)
self.log_response(response)
except AttributeError:
if isinstance(response, HTTPResponse):
raise
res_type = type(response).__name__
logger.error(
f"Invalid response object for url {self.url!r}, "
f"Expected Type: HTTPResponse, Actual Type: {res_type}"
)
self.write_error(ServerError("Invalid response type"))
except RuntimeError:
if self._debug:
logger.error(
"Connection lost before response written @ %s",
self.request.ip,
)
self.keep_alive = False
except Exception as e:
self.bail_out(
"Writing response failed, connection closed {}".format(repr(e))
)
finally:
if not self.keep_alive:
self.transport.close()
self.transport = None
else:
self._last_response_time = time()
self.loop.call_later(1.0, self.check_timeouts)
return
self._task.cancel()
async def drain(self):
await self._can_write.wait()
async def push_data(self, data):
self._time = current_time()
await self.drain()
self.transport.write(data)
async def stream_response(self, response):
"""
Streams a response to the client asynchronously. Attaches
the transport to the response so the response consumer can
write to the response as needed.
"""
try:
self._status, self._time = Status.RESPONSE, current_time()
response.protocol = self
await response.stream(
"1.1", self.keep_alive, self.keep_alive_timeout
)
self.log_response(response)
except AttributeError:
logger.error(
"Invalid response object for url %s, "
"Expected Type: HTTPResponse, Actual Type: %s",
self.url,
type(response),
)
self.write_error(ServerError("Invalid response type"))
except RuntimeError:
if self._debug:
logger.error(
"Connection lost before response written @ %s",
self.request.ip,
)
self.keep_alive = False
except Exception as e:
self.bail_out(
"Writing response failed, connection closed {}".format(repr(e))
)
def write_error(self, exception):
# An error _is_ a response.
# Don't throw a response timeout, when a response _is_ given.
response = None
try:
response = self.error_handler.response(self.request, exception)
self.transport.write(response.output("1.1"))
except RuntimeError:
if self._debug:
logger.error(
"Connection lost before error written @ %s",
self.request.ip if self.request else "Unknown",
)
except Exception as e:
self.bail_out(
"Writing error failed, connection closed {}".format(repr(e)),
from_error=True,
)
finally:
if self.keep_alive or getattr(response, "status") == 408:
self.log_response(response)
self.keep_alive = False
def bail_out(self, message, from_error=False):
"""
In case if the transport pipes are closed and the sanic app encounters
an error while writing data to the transport pipe, we log the error
with proper details.
:param message: Error message to display
:param from_error: If the bail out was invoked while handling an
exception scenario.
:type message: str
:type from_error: bool
:return: None
"""
if from_error or self.transport is None or self.transport.is_closing():
logger.error(
"Transport closed @ %s and exception "
"experienced during error handling",
(
self.transport.get_extra_info("peername")
if self.transport is not None
else "N/A"
),
)
logger.debug("Exception:", exc_info=True)
else:
self.write_error(ServerError(message))
logger.error(message)
def close_if_idle(self):
"""Close the connection if a request is not being sent or received
:return: boolean - True if closed, false if staying open
"""
if self._status == Status.IDLE:
if self._http.lifespan == Lifespan.IDLE:
self.close()
return True
return False
@ -536,9 +242,11 @@ class HttpProtocol(asyncio.Protocol):
"""
if self.transport is not None:
try:
self.keep_alive = False
if self._task:
self._task.cancel()
self._task = None
self.transport.close()
self.resume_writing()
finally:
self.transport = None