From 85b1ad57322da51d5f1d669689e2d3b7fc900de9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=2E=20K=C3=A4rkk=C3=A4inen?= Date: Fri, 28 Feb 2020 18:43:36 +0200 Subject: [PATCH] Major refactoring of HTTP protocol handling (new module http.py added), all requests made streaming. A few compatibility issues and a lot of cleanup to be done remain, 16 tests failing. --- sanic/app.py | 60 ++++++-- sanic/asgi.py | 103 ++++++++++--- sanic/http.py | 329 +++++++++++++++++++++++++++++++++++++++ sanic/request.py | 21 ++- sanic/response.py | 75 +-------- sanic/server.py | 384 ++++++---------------------------------------- 6 files changed, 529 insertions(+), 443 deletions(-) create mode 100644 sanic/http.py diff --git a/sanic/app.py b/sanic/app.py index 7bcc90d1..beabcdfc 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -930,7 +930,7 @@ class Sanic: """ pass - async def handle_request(self, request, write_callback, stream_callback): + async def handle_request(self, request): """Take a request from the HTTP Server and return a response object to be sent back The HTTP Server only expects a response object, so exception handling must be done here @@ -998,6 +998,7 @@ class Sanic: # issue a response. response = None cancelled = True + raise except Exception as e: # -------------------------------------------- # # Response Generation Failed @@ -1045,19 +1046,52 @@ class Sanic: if cancelled: raise CancelledError() - # pass the response to the correct callback - if write_callback is None or isinstance( - response, StreamingHTTPResponse - ): - if stream_callback: - await stream_callback(response) - else: - # Should only end here IF it is an ASGI websocket. - # TODO: - # - Add exception handling + try: + # pass the response to the correct callback + if response is None: pass - else: - write_callback(response) + elif isinstance(response, StreamingHTTPResponse): + await response.stream(request) + elif isinstance(response, HTTPResponse): + await request.respond(response).send(end_stream=True) + else: + raise ServerError(f"Invalid response type {response} (need HTTPResponse)") + except Exception as e: + # -------------------------------------------- # + # Response Generation Failed + # -------------------------------------------- # + + try: + response = self.error_handler.response(request, e) + if isawaitable(response): + response = await response + except Exception as e: + if isinstance(e, SanicException): + response = self.error_handler.default( + request=request, exception=e + ) + elif self.debug: + response = HTTPResponse( + "Error while handling error: {}\nStack: {}".format( + e, format_exc() + ), + status=500, + ) + else: + response = HTTPResponse( + "An error occurred while handling an error", status=500 + ) + + # pass the response to the correct callback + if response is None: + pass + elif isinstance(response, StreamingHTTPResponse): + await response.stream(request) + elif isinstance(response, HTTPResponse): + await request.respond(response).send(end_stream=True) + else: + raise ServerError( + f"Invalid response type {response} (need HTTPResponse)") # -------------------------------------------------------------------- # # Testing diff --git a/sanic/asgi.py b/sanic/asgi.py index 6800b5a8..8213fdca 100644 --- a/sanic/asgi.py +++ b/sanic/asgi.py @@ -20,9 +20,8 @@ import sanic.app # noqa from sanic.compat import Header from sanic.exceptions import InvalidUsage, ServerError from sanic.log import logger -from sanic.request import Request +from sanic.request import Request, StreamBuffer from sanic.response import HTTPResponse, StreamingHTTPResponse -from sanic.server import StreamBuffer from sanic.websocket import WebSocketConnection @@ -255,19 +254,6 @@ class ASGIApp: return instance - async def read_body(self) -> bytes: - """ - Read and return the entire body from an incoming ASGI message. - """ - body = b"" - more_body = True - while more_body: - message = await self.transport.receive() - body += message.get("body", b"") - more_body = message.get("more_body", False) - - return body - async def stream_body(self) -> None: """ Read and stream the body in chunks from an incoming ASGI message. @@ -277,13 +263,94 @@ class ASGIApp: return None return message.get("body", b"") + def respond(self, response): + headers: List[Tuple[bytes, bytes]] = [] + cookies: Dict[str, str] = {} + try: + cookies = { + v.key: v + for _, v in list( + filter( + lambda item: item[0].lower() == "set-cookie", + response.headers.items(), + ) + ) + } + headers += [ + (str(name).encode("latin-1"), str(value).encode("latin-1")) + for name, value in response.headers.items() + if name.lower() not in ["set-cookie"] + ] + except AttributeError: + logger.error( + "Invalid response object for url %s, " + "Expected Type: HTTPResponse, Actual Type: %s", + self.request.url, + type(response), + ) + exception = ServerError("Invalid response type") + response = self.sanic_app.error_handler.response( + self.request, exception + ) + headers = [ + (str(name).encode("latin-1"), str(value).encode("latin-1")) + for name, value in response.headers.items() + if name not in (b"Set-Cookie",) + ] + + if "content-length" not in response.headers and not isinstance( + response, StreamingHTTPResponse + ): + headers += [ + (b"content-length", str(len(response.body)).encode("latin-1")) + ] + + if "content-type" not in response.headers: + headers += [ + (b"content-type", str(response.content_type).encode("latin-1")) + ] + + if response.cookies: + cookies.update( + { + v.key: v + for _, v in response.cookies.items() + if v.key not in cookies.keys() + } + ) + + headers += [ + (b"set-cookie", cookie.encode("utf-8")) + for k, cookie in cookies.items() + ] + self.response_start = { + "type": "http.response.start", + "status": response.status, + "headers": headers, + } + self.response_body = response.body + return self + + async def send(self, data=None, end_stream=None): + if data is None is end_stream: + end_stream = True + if self.response_start: + await self.transport.send(self.response_start) + self.response_start = None + if self.response_body: + data = self.response_body + data if data else self.response_body + self.response_body = None + await self.transport.send({ + "type": "http.response.body", + "body": data.encode() if hasattr(data, "encode") else data, + "more_body": not end_stream, + }) + async def __call__(self) -> None: """ Handle the incoming request. """ - handler = self.sanic_app.handle_request - callback = None if self.ws else self.stream_callback - await handler(self.request, None, callback) + await self.sanic_app.handle_request(self.request) async def stream_callback(self, response: HTTPResponse) -> None: """ diff --git a/sanic/http.py b/sanic/http.py new file mode 100644 index 00000000..8c0ecae9 --- /dev/null +++ b/sanic/http.py @@ -0,0 +1,329 @@ +from enum import Enum + +from sanic.exceptions import ( + HeaderExpectationFailed, + InvalidUsage, + PayloadTooLarge, + RequestTimeout, + SanicException, + ServerError, + ServiceUnavailable, +) +from sanic.headers import format_http1, format_http1_response +from sanic.helpers import has_message_body, remove_entity_headers +from sanic.log import access_logger, logger +from sanic.request import Request +from sanic.response import HTTPResponse +from sanic.compat import Header + + +class Lifespan(Enum): + IDLE = 0 # Waiting for request + REQUEST = 1 # Request headers being received + HANDLER = 3 # Headers done, handler running + RESPONSE = 4 # Response headers sent, body in progress + FAILED = 100 # Unrecoverable state (error while sending response) + + +HTTP_CONTINUE = b"HTTP/1.1 100 Continue\r\n\r\n" + +class Http: + def __init__(self, protocol): + self._send = protocol.push_data + self._receive_more = protocol.receive_more + self.protocol = protocol + self.recv_buffer = bytearray() + self.expecting_continue = False + # Note: connections are initially in request mode and do not obey + # keep-alive timeout like with some other servers. + self.lifespan = Lifespan.REQUEST + + async def http1(self): + """HTTP 1.1 connection handler""" + buf = self.recv_buffer + self.keep_alive = True + url = None + while self.keep_alive: + # Read request header + pos = 0 + while len(buf) < self.protocol.request_max_size: + if buf: + pos = buf.find(b"\r\n\r\n", pos) + if pos >= 0: + break + pos = max(0, len(buf) - 3) + await self._receive_more() + if self.lifespan is Lifespan.IDLE: + self.lifespan = Lifespan.REQUEST + else: + self.lifespan = Lifespan.HANDLER + raise PayloadTooLarge("Payload Too Large") + + self.protocol._total_request_size = pos + 4 + + try: + reqline, *headers = buf[:pos].decode().split("\r\n") + method, url, protocol = reqline.split(" ") + if protocol not in ("HTTP/1.0", "HTTP/1.1"): + raise Exception + self.head_only = method.upper() == "HEAD" + headers = Header( + (name.lower(), value.lstrip()) + for name, value in (h.split(":", 1) for h in headers) + ) + except: + self.lifespan = Lifespan.HANDLER + raise InvalidUsage("Bad Request") + + # Prepare a request object from the header received + request = self.protocol.request_class( + url_bytes=url.encode(), + headers=headers, + version=protocol[-3:], + method=method, + transport=self.protocol.transport, + app=self.protocol.app, + ) + request.stream = self + self.protocol.state["requests_count"] += 1 + self.protocol.url = url + self.protocol.request = request + self.keep_alive = ( + protocol == "HTTP/1.1" + or headers.get("connection", "").lower() == "keep-alive" + ) + # Prepare for request body + body = headers.get("transfer-encoding") == "chunked" or int( + headers.get("content-length", 0) + ) + self.request_chunked = False + self.request_bytes_left = 0 + self.lifespan = Lifespan.HANDLER + if body: + expect = headers.get("expect") + if expect: + if expect.lower() == "100-continue": + self.expecting_continue = True + else: + raise HeaderExpectationFailed(f"Unknown Expect: {expect}") + request.stream = self + if body is True: + self.request_chunked = True + pos -= 2 # One CRLF stays in buffer + else: + self.request_bytes_left = body + # Remove header and its trailing CRLF + del buf[: pos + 4] + + # Run handler + try: + await self.protocol.request_handler(request) + except Exception: + logger.exception("Uncaught from app/handler") + await self.write_error(ServerError("Internal Server Error")) + if self.lifespan is Lifespan.IDLE: + continue + + if self.lifespan is Lifespan.HANDLER: + await self.respond(HTTPResponse(status=204)).send(end_stream=True) + + # Finish sending a response (if no error) + elif self.lifespan is Lifespan.RESPONSE: + await self.send(end_stream=True) + + # Consume any remaining request body + if self.request_bytes_left or self.request_chunked: + logger.error( + f"Handler of {method} {url} did not consume request body." + ) + while await self.read(): + pass + + self.lifespan = Lifespan.IDLE + + async def write_error(self, e): + if self.lifespan is Lifespan.HANDLER: + try: + response = HTTPResponse(f"{e}", e.status_code, content_type="text/plain") + await self.respond(response).send(end_stream=True) + except: + logger.exception("Error sending error") + + # Request methods + + async def __aiter__(self): + while True: + data = await self.read() + if not data: + return + yield data + + async def read(self): + # Send a 100-continue if needed + if self.expecting_continue: + self.expecting_continue = False + await self._send(HTTP_CONTINUE) + # Receive request body chunk + buf = self.recv_buffer + if self.request_chunked and self.request_bytes_left == 0: + # Process a chunk header: \r\n[;]\r\n + while True: + pos = buf.find(b"\r\n", 3) + if pos != -1: + break + if len(buf) > 64: + self.keep_alive = False + raise InvalidUsage("Bad chunked encoding") + await self._receive_more() + try: + size = int(buf[2:pos].split(b";", 1)[0].decode(), 16) + except: + self.keep_alive = False + raise InvalidUsage("Bad chunked encoding") + self.request_bytes_left = size + self.protocol._total_request_size += pos + 2 + del buf[: pos + 2] + if self.request_bytes_left <= 0: + self.request_chunked = False + return None + # At this point we are good to read/return _request_bytes_left + if self.request_bytes_left: + if not buf: + await self._receive_more() + data = bytes(buf[: self.request_bytes_left]) + size = len(data) + del buf[:size] + self.request_bytes_left -= size + self.protocol._total_request_size += size + if self.protocol._total_request_size > self.protocol.request_max_size: + self.keep_alive = False + raise PayloadTooLarge("Payload Too Large") + return data + return None + + + # Response methods + + def respond(self, response): + """Initiate new streaming response. + + Nothing is sent until the first send() call on the returned object, and + calling this function multiple times will just alter the response to be + given.""" + if self.lifespan is not Lifespan.HANDLER: + self.lifespan = Lifespan.FAILED + raise RuntimeError("Response already started") + if not isinstance(response.status, int) or response.status < 200: + raise RuntimeError(f"Invalid response status {response.status!r}") + self.response = response + return self + + async def send(self, data=None, end_stream=None): + """Send any pending response headers and the given data as body. + :param data: str or bytes to be written + :end_stream: whether to close the stream after this block + """ + if data is None and end_stream is None: + end_stream = True + data = self.data_to_send(data, end_stream) + if data is None: + return + await self._send(data) + + def data_to_send(self, data, end_stream): + """Format output data bytes for given body data. + Headers are prepended to the first output block and then cleared. + :param data: str or bytes to be written + :return: bytes to send, or None if there is nothing to send + """ + data = data.encode() if hasattr(data, "encode") else data + size = len(data) if data is not None else 0 + + # Headers not yet sent? + if self.lifespan is Lifespan.HANDLER: + if self.response.body: + data = self.response.body + data if data else self.response.body + size = len(data) + r = self.response + status = r.status + headers = r.headers + if r.content_type and "content-type" not in headers: + headers["content-type"] = r.content_type + # Not Modified, Precondition Failed + if status in (304, 412): + headers = remove_entity_headers(headers) + if not has_message_body(status): + # Header-only response status + if ( + size > 0 + or not end_stream + or "content-length" in headers + or "transfer-encoding" in headers + ): + # TODO: This matches old Sanic operation but possibly + # an exception would be more appropriate? + data = None + size = 0 + end_stream = True + #raise ServerError( + # f"A {status} response may only have headers, no body." + #) + elif self.head_only and "content-length" in headers: + pass + elif end_stream: + # Non-streaming response (all in one block) + headers["content-length"] = size + elif "content-length" in headers: + # Streaming response with size known in advance + self.response_bytes_left = int(headers["content-length"]) - size + else: + # Length not known, use chunked encoding + headers["transfer-encoding"] = "chunked" + data = b"%x\r\n%b\r\n" % (size, data) if size else None + self.response_bytes_left = True + self.headers = None + if self.head_only: + data = None + self.response_bytes_left = None + if self.keep_alive: + headers["connection"] = "keep-alive" + headers["keep-alive"] = self.protocol.keep_alive_timeout + else: + headers["connection"] = "close" + ret = format_http1_response(status, headers.items(), data or b"") + # Send a 100-continue if expected and not Expectation Failed + if self.expecting_continue: + self.expecting_continue = False + if status != 417: + ret = HTTP_CONTINUE + ret + # Send response + self.lifespan = Lifespan.IDLE if end_stream else Lifespan.RESPONSE + return ret + + # HEAD request: don't send body + if self.head_only: + return None + + if self.lifespan is not Lifespan.RESPONSE: + if size: + raise RuntimeError("Cannot send data to a closed stream") + return + + # Chunked encoding + if self.response_bytes_left is True: + if end_stream: + self.response_bytes_left = None + self.lifespan = Lifespan.IDLE + if size: + return b"%x\r\n%b\r\n0\r\n\r\n" % (size, data) + return b"0\r\n\r\n" + return b"%x\r\n%b\r\n" % (size, data) if size else None + + # Normal encoding + else: + self.response_bytes_left -= size + if self.response_bytes_left <= 0: + if self.response_bytes_left < 0: + raise ServerError("Response was bigger than content-length") + self.lifespan = Lifespan.IDLE + return data if size else None diff --git a/sanic/request.py b/sanic/request.py index bcfd6bec..097808eb 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -9,6 +9,7 @@ from urllib.parse import parse_qs, parse_qsl, unquote, urlunparse from httptools import parse_url # type: ignore +from sanic.compat import Header from sanic.exceptions import InvalidUsage from sanic.headers import ( parse_content_header, @@ -16,6 +17,7 @@ from sanic.headers import ( parse_host, parse_xforwarded, ) +from sanic.response import HTTPResponse from sanic.log import error_logger, logger @@ -25,7 +27,6 @@ except ImportError: from json import loads as json_loads # type: ignore DEFAULT_HTTP_CONTENT_TYPE = "application/octet-stream" -EXPECT_HEADER = "EXPECT" # HTTP/1.1: https://www.w3.org/Protocols/rfc2616/rfc2616-sec7.html#sec7.2.1 # > If the media type remains unknown, the recipient SHOULD treat it @@ -47,12 +48,9 @@ class RequestParameters(dict): class StreamBuffer: - def __init__(self, protocol=None): - self._protocol = protocol - - async def read(self): - """ Stop reading when gets None """ - return await self._protocol.stream_body() + def __init__(self, protocol): + self.read = protocol.stream_body + self.respond = protocol.respond async def __aiter__(self): while True: @@ -147,6 +145,15 @@ class Request: Custom context is now stored in `request.custom_context.yourkey`""" setattr(self.ctx, key, value) + def respond(self, status=200, headers=None, content_type=DEFAULT_HTTP_CONTENT_TYPE): + return self.stream.respond( + status if isinstance(status, HTTPResponse) else HTTPResponse( + status=status, + headers=headers, + content_type=content_type, + ) + ) + async def receive_body(self): self.body = b"".join([data async for data in self.stream]) diff --git a/sanic/response.py b/sanic/response.py index 9e1a4437..c71a5376 100644 --- a/sanic/response.py +++ b/sanic/response.py @@ -34,37 +34,6 @@ class BaseHTTPResponse: self._cookies = CookieJar(self.headers) return self._cookies - def get_headers( - self, - version="1.1", - keep_alive=False, - keep_alive_timeout=None, - body=b"", - ): - """.. deprecated:: 20.3: - This function is not public API and will be removed.""" - if version != "1.1": - warnings.warn( - "Only HTTP/1.1 is currently supported (got {version})", - DeprecationWarning, - ) - - # self.headers get priority over content_type - if self.content_type and "Content-Type" not in self.headers: - self.headers["Content-Type"] = self.content_type - - if keep_alive: - self.headers["Connection"] = "keep-alive" - if keep_alive_timeout is not None: - self.headers["Keep-Alive"] = keep_alive_timeout - else: - self.headers["Connection"] = "close" - - if self.status in (304, 412): - self.headers = remove_entity_headers(self.headers) - - return format_http1_response(self.status, self.headers.items(), body) - class StreamingHTTPResponse(BaseHTTPResponse): __slots__ = ( @@ -75,6 +44,7 @@ class StreamingHTTPResponse(BaseHTTPResponse): "headers", "chunked", "_cookies", + "send", ) def __init__( @@ -97,44 +67,15 @@ class StreamingHTTPResponse(BaseHTTPResponse): :param data: str or bytes-ish data to be written. """ - data = self._encode_body(data) + await self.send(self._encode_body(data)) - if self.chunked: - await self.protocol.push_data(b"%x\r\n%b\r\n" % (len(data), data)) - else: - await self.protocol.push_data(data) - await self.protocol.drain() - - async def stream( - self, version="1.1", keep_alive=False, keep_alive_timeout=None - ): - """Streams headers, runs the `streaming_fn` callback that writes - content to the response body, then finalizes the response body. - """ - if version != "1.1": - self.chunked = False - headers = self.get_headers( - version, - keep_alive=keep_alive, - keep_alive_timeout=keep_alive_timeout, - ) - await self.protocol.push_data(headers) - await self.protocol.drain() + async def stream(self, request): + self.send = request.respond( + self.status, + self.headers, + self.content_type, + ).send await self.streaming_fn(self) - if self.chunked: - await self.protocol.push_data(b"0\r\n\r\n") - # no need to await drain here after this write, because it is the - # very last thing we write and nothing needs to wait for it. - - def get_headers( - self, version="1.1", keep_alive=False, keep_alive_timeout=None - ): - if self.chunked and version == "1.1": - self.headers["Transfer-Encoding"] = "chunked" - self.headers.pop("Content-Length", None) - - return super().get_headers(version, keep_alive, keep_alive_timeout) - class HTTPResponse(BaseHTTPResponse): __slots__ = ("body", "status", "content_type", "headers", "_cookies") diff --git a/sanic/server.py b/sanic/server.py index c86f9bcd..af146a01 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -11,11 +11,7 @@ from multiprocessing import Process from signal import SIG_IGN, SIGINT, SIGTERM, Signals from signal import signal as signal_func from socket import SO_REUSEADDR, SOL_SOCKET, socket -from time import monotonic as current_time -from time import time - -from httptools import HttpRequestParser # type: ignore -from httptools.parser.errors import HttpParserError # type: ignore +from time import time, monotonic as current_time from sanic.compat import Header from sanic.exceptions import ( @@ -27,9 +23,9 @@ from sanic.exceptions import ( ServerError, ServiceUnavailable, ) +from sanic.http import Http, Lifespan from sanic.log import access_logger, logger -from sanic.request import EXPECT_HEADER, Request, StreamBuffer -from sanic.response import HTTPResponse +from sanic.request import Request try: @@ -45,14 +41,6 @@ class Signal: stopped = False -class Status(enum.Enum): - IDLE = 0 # Waiting for request - REQUEST = 1 # Request headers being received - EXPECT = 2 # Sender wants 100-continue - HANDLER = 3 # Headers done, handler running - RESPONSE = 4 # Response headers sent - - class HttpProtocol(asyncio.Protocol): """ This class provides a basic HTTP implementation of the sanic framework. @@ -82,19 +70,17 @@ class HttpProtocol(asyncio.Protocol): "access_log", # connection management "_total_request_size", - "_request_bytes_left", - "_status", - "_time", "_last_response_time", "keep_alive", "state", "url", "_debug", "_handler_task", - "_buffer", "_can_write", "_data_received", + "_time", "_task", + "_http", "_exception", ) @@ -145,10 +131,10 @@ class HttpProtocol(asyncio.Protocol): if "requests_count" not in self.state: self.state["requests_count"] = 0 self._debug = debug - self._buffer = bytearray() self._data_received = asyncio.Event(loop=deprecated_loop) self._can_write = asyncio.Event(loop=deprecated_loop) self._can_write.set() + self._exception = None # -------------------------------------------- # # Connection @@ -157,14 +143,17 @@ class HttpProtocol(asyncio.Protocol): def connection_made(self, transport): self.connections.add(self) self.transport = transport - self._status, self._time = Status.IDLE, current_time() + #self.check_timeouts() + self._http = Http(self) + self._task = self.loop.create_task(self.connection_task()) + self._time = current_time() self.check_timeouts() - self._task = self.loop.create_task(self.http1()) def connection_lost(self, exc): self.connections.discard(self) if self._task: self._task.cancel() + self._task = None def pause_writing(self): self._can_write.clear() @@ -183,349 +172,66 @@ class HttpProtocol(asyncio.Protocol): # -------------------------------------------- # def data_received(self, data): + self._time = current_time() if not data: return self.close() - self._buffer += data - if len(self._buffer) > self.request_max_size: + self._http.recv_buffer += data + if len(self._http.recv_buffer) > self.request_max_size: self.transport.pause_reading() if self._data_received: self._data_received.set() - def check_timeouts(self): - """Runs itself once a second to enforce any expired timeouts.""" - duration = current_time() - self._time - status = self._status - if status == Status.IDLE and duration > self.keep_alive_timeout: - logger.debug("KeepAlive Timeout. Closing connection.") - elif status == Status.REQUEST and duration > self.request_timeout: - self._exception = RequestTimeout("Request Timeout") - elif ( - status.value > Status.REQUEST.value - and duration > self.response_timeout - ): - self._exception = ServiceUnavailable("Response Timeout") - else: - self.loop.call_later(0.1, self.check_timeouts) - return - self._task.cancel() - - async def http1(self): - """HTTP 1.1 connection handler""" + async def connection_task(self): try: - self._exception = None - buf = self._buffer - # Note: connections are initially in request mode and do not obey - # keep-alive timeout like with some other servers. - self._status = Status.REQUEST - while self.keep_alive: - # Read request header - pos = 0 - self._time = current_time() - while len(buf) < self.request_max_size: - if buf: - self._status = Status.REQUEST - pos = buf.find(b"\r\n\r\n", pos) - if pos >= 0: - break - pos = max(0, len(buf) - 3) - await self.receive_more() - else: - raise PayloadTooLarge("Payload Too Large") - - self._total_request_size = pos + 4 - try: - reqline, *headers = buf[:pos].decode().split("\r\n") - method, self.url, protocol = reqline.split(" ") - if protocol not in ("HTTP/1.0", "HTTP/1.1"): - raise Exception - headers = Header( - (name.lower(), value.lstrip()) - for name, value in (h.split(":", 1) for h in headers) - ) - except: - raise InvalidUsage("Bad Request") - - self.state["requests_count"] += 1 - # Prepare a request object from the header received - self.request = self.request_class( - url_bytes=self.url.encode(), - headers=headers, - version=protocol[-3:], - method=method, - transport=self.transport, - app=self.app, - ) - if headers.get("connection", "").lower() == "close": - self.keep_alive = False - # Prepare for request body - body = headers.get("transfer-encoding") == "chunked" or int( - headers.get("content-length", 0) - ) - self._request_chunked = False - self._request_bytes_left = 0 - if body: - if headers.get(EXPECT_HEADER): - self._status = Status.EXPECT - self.expect_handler() - self.request.stream = StreamBuffer(protocol=self) - if body is True: - self._request_chunked = True - pos -= 2 # One CRLF stays in buffer - else: - self._request_bytes_left = body - # Remove header and its trailing CRLF - del buf[: pos + 4] - # Run handler - self._status, self._time = Status.HANDLER, current_time() - await self.request_handler( - self.request, self.write_response, self.stream_response - ) - # Consume any remaining request body - if self._request_bytes_left or self._request_chunked: - logger.error( - f"Handler of {method} {self.url} did not consume request body." - ) - while await self.stream_body(): - pass - self._status, self._time = Status.IDLE, current_time() + await self._http.http1() except asyncio.CancelledError: - self.write_error( + await self._http.write_error( self._exception or ServiceUnavailable("Request handler cancelled") ) except SanicException as e: - self.write_error(e) - except Exception as e: - logger.error(f"Uncaught {e!r} handling URL {self.url}") + await self._http.write_error(e) + except BaseException as e: + logger.exception(f"Uncaught exception while handling URL {url}") finally: - self.close() - - async def stream_body(self): - buf = self._buffer - if self._request_chunked and self._request_bytes_left == 0: - # Process a chunk header: \r\n[;]\r\n - while True: - pos = buf.find(b"\r\n", 3) - if pos != -1: - break - if len(buf) > 64: - self.keep_alive = False - raise InvalidUsage("Bad chunked encoding") - await self.receive_more() try: - size = int(buf[2:pos].split(b";", 1)[0].decode(), 16) + self.close() except: - self.keep_alive = False - raise InvalidUsage("Bad chunked encoding") - self._request_bytes_left = size - self._total_request_size += pos + 2 - del buf[: pos + 2] - if self._request_bytes_left <= 0: - self._request_chunked = False - return None - # At this point we are good to read/return _request_bytes_left - if self._request_bytes_left: - if not buf: - await self.receive_more() - data = bytes(buf[: self._request_bytes_left]) - size = len(data) - del buf[:size] - self._request_bytes_left -= size - self._total_request_size += size - if self._total_request_size > self.request_max_size: - self.keep_alive = False - raise PayloadTooLarge("Payload Too Large") - return data - return None + logger.exception("Closing failed") - def expect_handler(self): - """ - Handler for Expect Header. - """ - expect = self.request.headers.get(EXPECT_HEADER) - if self.request.version == "1.1": - if expect.lower() == "100-continue": - self.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n") - else: - raise HeaderExpectationFailed(f"Unknown Expect: {expect}") - - # -------------------------------------------- # - # Responding - # -------------------------------------------- # - def log_response(self, response): - """ - Helper method provided to enable the logging of responses in case if - the :attr:`HttpProtocol.access_log` is enabled. - - :param response: Response generated for the current request - - :type response: :class:`sanic.response.HTTPResponse` or - :class:`sanic.response.StreamingHTTPResponse` - - :return: None - """ - if self.access_log: - extra = {"status": getattr(response, "status", 0)} - - if isinstance(response, HTTPResponse): - extra["byte"] = len(response.body) - else: - extra["byte"] = -1 - - extra["host"] = "UNKNOWN" - if self.request is not None: - if self.request.ip: - extra["host"] = "{0}:{1}".format( - self.request.ip, self.request.port - ) - - extra["request"] = "{0} {1}".format( - self.request.method, self.request.url - ) - else: - extra["request"] = "nil" - - access_logger.info("", extra=extra) - - def write_response(self, response): - """ - Writes response content synchronously to the transport. - """ - try: - self._status, self._time = Status.RESPONSE, current_time() - self._last_response_time = self._time - self.transport.write( - response.output( - "1.1", self.keep_alive, self.keep_alive_timeout - ) - ) - self.log_response(response) - except AttributeError: - if isinstance(response, HTTPResponse): - raise - res_type = type(response).__name__ - logger.error( - f"Invalid response object for url {self.url!r}, " - f"Expected Type: HTTPResponse, Actual Type: {res_type}" - ) - self.write_error(ServerError("Invalid response type")) - except RuntimeError: - if self._debug: - logger.error( - "Connection lost before response written @ %s", - self.request.ip, - ) - self.keep_alive = False - except Exception as e: - self.bail_out( - "Writing response failed, connection closed {}".format(repr(e)) - ) - finally: - if not self.keep_alive: - self.transport.close() - self.transport = None - else: - self._last_response_time = time() + def check_timeouts(self): + """Runs itself once a second to enforce any expired timeouts.""" + duration = current_time() - self._time + lifespan = self._http.lifespan + if lifespan == Lifespan.IDLE and duration > self.keep_alive_timeout: + logger.debug("KeepAlive Timeout. Closing connection.") + elif lifespan == Lifespan.REQUEST and duration > self.request_timeout: + self._exception = RequestTimeout("Request Timeout") + elif ( + lifespan.value > Lifespan.REQUEST.value + and duration > self.response_timeout + ): + self._exception = ServiceUnavailable("Response Timeout") + else: + self.loop.call_later(1.0, self.check_timeouts) + return + self._task.cancel() async def drain(self): await self._can_write.wait() async def push_data(self, data): + self._time = current_time() + await self.drain() self.transport.write(data) - async def stream_response(self, response): - """ - Streams a response to the client asynchronously. Attaches - the transport to the response so the response consumer can - write to the response as needed. - """ - try: - self._status, self._time = Status.RESPONSE, current_time() - response.protocol = self - await response.stream( - "1.1", self.keep_alive, self.keep_alive_timeout - ) - self.log_response(response) - except AttributeError: - logger.error( - "Invalid response object for url %s, " - "Expected Type: HTTPResponse, Actual Type: %s", - self.url, - type(response), - ) - self.write_error(ServerError("Invalid response type")) - except RuntimeError: - if self._debug: - logger.error( - "Connection lost before response written @ %s", - self.request.ip, - ) - self.keep_alive = False - except Exception as e: - self.bail_out( - "Writing response failed, connection closed {}".format(repr(e)) - ) - - def write_error(self, exception): - # An error _is_ a response. - # Don't throw a response timeout, when a response _is_ given. - response = None - try: - response = self.error_handler.response(self.request, exception) - self.transport.write(response.output("1.1")) - except RuntimeError: - if self._debug: - logger.error( - "Connection lost before error written @ %s", - self.request.ip if self.request else "Unknown", - ) - except Exception as e: - self.bail_out( - "Writing error failed, connection closed {}".format(repr(e)), - from_error=True, - ) - finally: - if self.keep_alive or getattr(response, "status") == 408: - self.log_response(response) - self.keep_alive = False - - def bail_out(self, message, from_error=False): - """ - In case if the transport pipes are closed and the sanic app encounters - an error while writing data to the transport pipe, we log the error - with proper details. - - :param message: Error message to display - :param from_error: If the bail out was invoked while handling an - exception scenario. - - :type message: str - :type from_error: bool - - :return: None - """ - if from_error or self.transport is None or self.transport.is_closing(): - logger.error( - "Transport closed @ %s and exception " - "experienced during error handling", - ( - self.transport.get_extra_info("peername") - if self.transport is not None - else "N/A" - ), - ) - logger.debug("Exception:", exc_info=True) - else: - self.write_error(ServerError(message)) - logger.error(message) - def close_if_idle(self): """Close the connection if a request is not being sent or received :return: boolean - True if closed, false if staying open """ - if self._status == Status.IDLE: + if self._http.lifespan == Lifespan.IDLE: self.close() return True return False @@ -536,9 +242,11 @@ class HttpProtocol(asyncio.Protocol): """ if self.transport is not None: try: - self.keep_alive = False - self._task.cancel() + if self._task: + self._task.cancel() + self._task = None self.transport.close() + self.resume_writing() finally: self.transport = None