From 06035c8d51e034e43460fe02d63e0c56e59f9af7 Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Wed, 23 Feb 2022 14:42:11 +0200 Subject: [PATCH] Move HTTP streaming events to receiver --- sanic/http/http3.py | 164 ++++++++++++++++-------- sanic/log.py | 3 +- sanic/request.py | 14 +- sanic/server/protocols/http_protocol.py | 6 +- 4 files changed, 125 insertions(+), 62 deletions(-) diff --git a/sanic/http/http3.py b/sanic/http/http3.py index b954566f..be639d59 100644 --- a/sanic/http/http3.py +++ b/sanic/http/http3.py @@ -2,9 +2,9 @@ from __future__ import annotations import asyncio -from abc import ABC +from abc import ABC, abstractmethod from ssl import SSLContext -from typing import TYPE_CHECKING, Callable, Dict, Optional, Union +from typing import TYPE_CHECKING, Callable, Dict, Optional, Tuple, Union from aioquic.h0.connection import H0_ALPN, H0Connection from aioquic.h3.connection import H3_ALPN, H3Connection @@ -32,12 +32,14 @@ from sanic.http.tls import CertSimple if TYPE_CHECKING: from sanic import Sanic from sanic.request import Request - from sanic.response import BaseHTTPResponse + from sanic.response import BaseHTTPResponse, HTTPResponse from sanic.server.protocols.http_protocol import Http3Protocol # from sanic.compat import Header +from sanic.http.constants import Stage + # from sanic.application.state import Mode -from sanic.log import logger +from sanic.log import Colors, logger HttpConnection = Union[H0Connection, H3Connection] @@ -48,24 +50,95 @@ class Transport: class Receiver(ABC): - def __init__(self, transmit, protocol, request) -> None: + def __init__(self, transmit, protocol, request: Request) -> None: self.transmit = transmit self.protocol = protocol self.request = request + self.queue: asyncio.Queue[Tuple[bytes, bool]] = asyncio.Queue() + + @abstractmethod + async def run(self): + ... class HTTPReceiver(Receiver): - async def respond(self): + stage: Stage + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.request_body = None + self.stage = Stage.IDLE + self.headers_sent = False + + async def run(self): + self.stage = Stage.HANDLER + logger.info(f"Request received: {self.request}") - await self.protocol.app.handle_request(self.request) + await self.protocol.request_handler(self.request) + + self.stage = Stage.RESPONSE + + while self.stage is Stage.RESPONSE: + if not self.headers_sent: + self.send_headers() + data, end_stream = await self.queue.get() + + # Chunked + size = len(data) + if end_stream: + data = ( + b"%x\r\n%b\r\n0\r\n\r\n" % (size, data) + if size + else b"0\r\n\r\n" + ) + elif size: + data = b"%x\r\n%b\r\n" % (size, data) + + self.protocol.connection.send_data( + stream_id=self.request.stream_id, + data=data, + end_stream=end_stream, + ) + self.transmit() + + if end_stream: + self.stage = Stage.IDLE + + def send_headers(self) -> None: + response = self.request.stream.response + self.protocol.connection.send_headers( + stream_id=self.request.stream_id, + headers=[ + (b":status", str(response.status).encode()), + *( + (k.encode(), v.encode()) + for k, v in response.headers.items() + ), + ], + ) + self.headers_sent = True + + async def respond(self, response: BaseHTTPResponse) -> BaseHTTPResponse: + print(f"{Colors.BLUE}[respond]: {Colors.GREEN}{response=}{Colors.END}") + self.response, response.stream = response, self + + return response + + async def send(self, data: bytes, end_stream: bool) -> None: + print( + f"{Colors.BLUE}[send]: {Colors.GREEN}{data=} {end_stream=}{Colors.END}" + ) + self.queue.put_nowait((data, end_stream)) class WebsocketReceiver(Receiver): - ... + async def run(self): + ... class WebTransportReceiver(Receiver): - ... + async def run(self): + ... class Http3: @@ -81,37 +154,48 @@ class Http3: protocol: Http3Protocol, transmit: Callable[[], None], ) -> None: - self.request_body = None - self.request: Optional[Request] = None + # self.request_body = None + # self.request: Optional[Request] = None self.protocol = protocol self.transmit = transmit self.receivers: Dict[int, Receiver] = {} def http_event_received(self, event: H3Event) -> None: - print("[http_event_received]:", event) - receiver = self.get_or_make_receiver(event) + print( + f"{Colors.BLUE}[http_event_received]: " + f"{Colors.YELLOW}{event}{Colors.END}" + ) + receiver, created_new = self.get_or_make_receiver(event) print(f"{receiver=}") - # asyncio.ensure_future(handler(request)) + if isinstance(event, HeadersReceived) and created_new: + asyncio.ensure_future(receiver.run()) + else: + print(f"{Colors.RED}DOING NOTHING{Colors.END}") - def get_or_make_receiver(self, event: H3Event) -> Receiver: + def get_or_make_receiver(self, event: H3Event) -> Tuple[Receiver, bool]: if ( isinstance(event, HeadersReceived) and event.stream_id not in self.receivers ): - self.request = self._make_request(event) - receiver = HTTPReceiver(self.transmit, self.protocol, self.request) + request = self._make_request(event) + receiver = HTTPReceiver(self.transmit, self.protocol, request) + request.stream = receiver + self.receivers[event.stream_id] = receiver - asyncio.ensure_future(receiver.respond()) + return receiver, True else: ident = getattr(event, self.HANDLER_PROPERTY_MAPPING[type(event)]) - return self.receivers[ident] + return self.receivers[ident], False + + def get_receiver_by_stream_id(self, stream_id: int) -> Receiver: + return self.receivers[stream_id] def _make_request(self, event: HeadersReceived) -> Request: - method, path, *rem = event.headers + method_header, path_header, *rem = event.headers headers = Header(((k.decode(), v.decode()) for k, v in rem)) - method = method[1].decode() - path = path[1] + method = method_header[1].decode() + path = path_header[1] scheme = headers.pop(":scheme") authority = headers.pop(":authority") print(f"{headers=}") @@ -125,44 +209,10 @@ class Http3: request = self.protocol.request_class( path, headers, "3", method, Transport(), self.protocol.app, b"" ) - request.stream = self + request._stream_id = event.stream_id print(f"{request=}") return request - async def respond(self, response: BaseHTTPResponse) -> BaseHTTPResponse: - print(f"[respond]: {response=}") - response.headers.update({"foo": "bar"}) - self.response, response.stream = response, self - - # Need more appropriate place to send these - self.protocol.connection.send_headers( - stream_id=0, - headers=[ - (b":status", str(self.response.status).encode()), - *( - (k.encode(), v.encode()) - for k, v in self.response.headers.items() - ), - ], - ) - # TEMP - await self.drain(response) - - return response - - async def drain(self, response: BaseHTTPResponse) -> None: - await self.send(response.body, False) - - async def send(self, data: bytes, end_stream: bool) -> None: - print(f"[send]: {data=} {end_stream=}") - print(self.response.headers) - self.protocol.connection.send_data( - stream_id=0, - data=data, - end_stream=end_stream, - ) - self.transmit() - class SessionTicketStore: """ diff --git a/sanic/log.py b/sanic/log.py index 4b3b960c..d5f991e6 100644 --- a/sanic/log.py +++ b/sanic/log.py @@ -62,7 +62,8 @@ class Colors(str, Enum): # no cov BLUE = "\033[01;34m" GREEN = "\033[01;32m" YELLOW = "\033[01;33m" - RED = "\033[01;31m" + RED = "\033[01;34m" + PURPLE = "\033[01;35m" logger = logging.getLogger("sanic.root") # no cov diff --git a/sanic/request.py b/sanic/request.py index a1ce91c3..ef911356 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -13,8 +13,9 @@ from typing import ( Union, ) -from sanic_routing.route import Route # type: ignore +from sanic_routing.route import Route +from sanic.http.http3 import HTTPReceiver # type: ignore from sanic.models.http_types import Credentials @@ -91,6 +92,7 @@ class Request: "_protocol", "_remote_addr", "_socket", + "_stream_id", "_match_info", "_name", "app", @@ -127,6 +129,7 @@ class Request: transport: TransportProtocol, app: Sanic, head: bytes = b"", + stream_id: int = 0, ): self.raw_url = url_bytes @@ -134,6 +137,7 @@ class Request: self._parsed_url = parse_url(url_bytes) self._id: Optional[Union[uuid.UUID, str, int]] = None self._name: Optional[str] = None + self._stream_id = stream_id self.app = app self.headers = Header(headers) @@ -162,7 +166,9 @@ class Request: self.request_middleware_started = False self._cookies: Optional[Dict[str, str]] = None self._match_info: Dict[str, Any] = {} - self.stream: Optional[Http] = None + # TODO: + # - Create an ABC (called Stream) for Http and HTTPReceiver to subclass + self.stream: Optional[Union[Http, HTTPReceiver]] = None self.route: Optional[Route] = None self._protocol = None self.responded: bool = False @@ -175,6 +181,10 @@ class Request: def generate_id(*_): return uuid.uuid4() + @property + def stream_id(self): + return self._stream_id + def reset_response(self): try: if ( diff --git a/sanic/server/protocols/http_protocol.py b/sanic/server/protocols/http_protocol.py index c215208a..e522f3dc 100644 --- a/sanic/server/protocols/http_protocol.py +++ b/sanic/server/protocols/http_protocol.py @@ -20,7 +20,7 @@ from aioquic.quic.events import ProtocolNegotiated, QuicEvent from sanic.exceptions import RequestTimeout, ServiceUnavailable from sanic.http import Http, Stage -from sanic.log import error_logger, logger +from sanic.log import Colors, error_logger, logger from sanic.models.server_types import ConnInfo from sanic.request import Request from sanic.server.protocols.base_protocol import SanicProtocol @@ -265,7 +265,9 @@ class Http3Protocol(HttpProtocolMixin, QuicConnectionProtocol): self._connection: Optional[H3Connection] = None def quic_event_received(self, event: QuicEvent) -> None: - print("[quic_event_received]:", event) + print( + f"{Colors.BLUE}[quic_event_received]: {Colors.PURPLE}{event}{Colors.END}" + ) if isinstance(event, ProtocolNegotiated): self._setup_connection(transmit=self.transmit) if event.alpn_protocol in H3_ALPN: