From f1c2d0e04283f0945e923fa3a01c008213e43a0c Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Sun, 12 Dec 2021 09:07:25 +0200 Subject: [PATCH] WIP to begin http3 --- sanic/app.py | 9 ++ sanic/http/__init__.py | 5 + sanic/http/constants.py | 25 +++++ sanic/{http.py => http/http1.py} | 21 +--- sanic/http/http3.py | 136 ++++++++++++++++++++++++ sanic/server/protocols/http_protocol.py | 76 ++++++++++--- sanic/server/runners.py | 44 +++++++- 7 files changed, 280 insertions(+), 36 deletions(-) create mode 100644 sanic/http/__init__.py create mode 100644 sanic/http/constants.py rename sanic/{http.py => http/http1.py} (96%) create mode 100644 sanic/http/http3.py diff --git a/sanic/app.py b/sanic/app.py index f0230165..a0eca9c2 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -35,6 +35,7 @@ from typing import ( Dict, Iterable, List, + Literal, Optional, Set, Tuple, @@ -68,6 +69,7 @@ from sanic.exceptions import ( ) from sanic.handlers import ErrorHandler from sanic.http import Stage +from sanic.http.constants import HTTP from sanic.log import LOGGING_CONFIG_DEFAULTS, Colors, error_logger, logger from sanic.mixins.listeners import ListenerEvent from sanic.models.futures import ( @@ -1050,6 +1052,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): fast: bool = False, verbosity: int = 0, motd_display: Optional[Dict[str, str]] = None, + version: HTTP = HTTP.VERSION_1, ) -> None: """ Run the HTTP Server and listen until keyboard interrupt or term @@ -1156,6 +1159,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): protocol=protocol, backlog=backlog, register_sys_signals=register_sys_signals, + version=version, ) try: @@ -1383,6 +1387,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): backlog: int = 100, register_sys_signals: bool = True, run_async: bool = False, + version: Union[HTTP, Literal[1], Literal[3]] = HTTP.VERSION_1, ): """Helper function used by `run` and `create_server`.""" if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0: @@ -1392,6 +1397,9 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): "#proxy-configuration" ) + if isinstance(version, int): + version = HTTP(version) + self.debug = debug self.state.host = host self.state.port = port @@ -1425,6 +1433,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): "loop": loop, "register_sys_signals": register_sys_signals, "backlog": backlog, + "version": version, } self.motd(serve_location) diff --git a/sanic/http/__init__.py b/sanic/http/__init__.py new file mode 100644 index 00000000..8a961029 --- /dev/null +++ b/sanic/http/__init__.py @@ -0,0 +1,5 @@ +from .constants import Stage +from .http1 import Http + + +__all__ = ("Http", "Stage") diff --git a/sanic/http/constants.py b/sanic/http/constants.py new file mode 100644 index 00000000..35890712 --- /dev/null +++ b/sanic/http/constants.py @@ -0,0 +1,25 @@ +from enum import Enum + + +class Stage(Enum): + """ + Enum for representing the stage of the request/response cycle + + | ``IDLE`` Waiting for request + | ``REQUEST`` Request headers being received + | ``HANDLER`` Headers done, handler running + | ``RESPONSE`` Response headers sent, body in progress + | ``FAILED`` Unrecoverable state (error while sending response) + | + """ + + IDLE = 0 # Waiting for request + REQUEST = 1 # Request headers being received + HANDLER = 3 # Headers done, handler running + RESPONSE = 4 # Response headers sent, body in progress + FAILED = 100 # Unrecoverable state (error while sending response) + + +class HTTP(Enum): + VERSION_1 = 1 + VERSION_3 = 3 diff --git a/sanic/http.py b/sanic/http/http1.py similarity index 96% rename from sanic/http.py rename to sanic/http/http1.py index 86f23fe3..c00929be 100644 --- a/sanic/http.py +++ b/sanic/http/http1.py @@ -8,7 +8,6 @@ if TYPE_CHECKING: from sanic.response import BaseHTTPResponse from asyncio import CancelledError, sleep -from enum import Enum from sanic.compat import Header from sanic.exceptions import ( @@ -20,29 +19,11 @@ from sanic.exceptions import ( ) from sanic.headers import format_http1_response from sanic.helpers import has_message_body +from sanic.http.constants import Stage from sanic.log import access_logger, error_logger, logger from sanic.touchup import TouchUpMeta -class Stage(Enum): - """ - Enum for representing the stage of the request/response cycle - - | ``IDLE`` Waiting for request - | ``REQUEST`` Request headers being received - | ``HANDLER`` Headers done, handler running - | ``RESPONSE`` Response headers sent, body in progress - | ``FAILED`` Unrecoverable state (error while sending response) - | - """ - - IDLE = 0 # Waiting for request - REQUEST = 1 # Request headers being received - HANDLER = 3 # Headers done, handler running - RESPONSE = 4 # Response headers sent, body in progress - FAILED = 100 # Unrecoverable state (error while sending response) - - HTTP_CONTINUE = b"HTTP/1.1 100 Continue\r\n\r\n" diff --git a/sanic/http/http3.py b/sanic/http/http3.py new file mode 100644 index 00000000..cc82a51d --- /dev/null +++ b/sanic/http/http3.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable, Dict, Optional, Union + +from aioquic.h0.connection import H0_ALPN, H0Connection +from aioquic.h3.connection import H3_ALPN, H3Connection +from aioquic.h3.events import H3Event + +# from aioquic.h3.events import ( +# DatagramReceived, +# DataReceived, +# H3Event, +# HeadersReceived, +# WebTransportStreamDataReceived, +# ) +from aioquic.quic.configuration import QuicConfiguration + +# from aioquic.quic.events import ( +# DatagramFrameReceived, +# ProtocolNegotiated, +# QuicEvent, +# ) +from aioquic.tls import SessionTicket + + +if TYPE_CHECKING: + from sanic.request import Request + +# from sanic.compat import Header +from sanic.log import logger +from sanic.response import BaseHTTPResponse + + +HttpConnection = Union[H0Connection, H3Connection] + + +async def handler(request: Request): + logger.info(f"Request received: {request}") + response = await request.app.handle_request(request) + logger.info(f"Build response: {response=}") + + +class Transport: + ... + + +class Http3: + def __init__( + self, + connection: HttpConnection, + transmit: Callable[[], None], + ) -> None: + self.request_body = None + self.connection = connection + self.transmit = transmit + + def http_event_received(self, event: H3Event) -> None: + print("[http_event_received]:", event) + # if isinstance(event, HeadersReceived): + # method, path, *rem = event.headers + # headers = Header(((k.decode(), v.decode()) for k, v in rem)) + # method = method[1].decode() + # path = path[1] + # scheme = headers.pop(":scheme") + # authority = headers.pop(":authority") + # print(f"{headers=}") + # print(f"{method=}") + # print(f"{path=}") + # print(f"{scheme=}") + # print(f"{authority=}") + # if authority: + # headers["host"] = authority + + # request = Request( + # path, headers, "3", method, Transport(), app, b"" + # ) + # request.stream = Stream( + # connection=self._http, transmit=self.transmit + # ) + # print(f"{request=}") + + # asyncio.ensure_future(handler(request)) + + async def send(self, data: bytes, end_stream: bool) -> None: + print(f"[send]: {data=} {end_stream=}") + print(self.response.headers) + # self.connection.send_headers( + # stream_id=0, + # headers=[ + # (b":status", str(self.response.status).encode()), + # *( + # (k.encode(), v.encode()) + # for k, v in self.response.headers.items() + # ), + # ], + # ) + # self.connection.send_data( + # stream_id=0, + # data=data, + # end_stream=end_stream, + # ) + # self.transmit() + + def respond(self, response: BaseHTTPResponse) -> BaseHTTPResponse: + print(f"[respond]: {response=}") + self.response, response.stream = response, self + return response + + +class SessionTicketStore: + """ + Simple in-memory store for session tickets. + """ + + def __init__(self) -> None: + self.tickets: Dict[bytes, SessionTicket] = {} + + def add(self, ticket: SessionTicket) -> None: + self.tickets[ticket.ticket] = ticket + + def pop(self, label: bytes) -> Optional[SessionTicket]: + return self.tickets.pop(label, None) + + +def get_config(): + config = QuicConfiguration( + alpn_protocols=H3_ALPN + H0_ALPN + ["siduck"], + is_client=False, + max_datagram_frame_size=65536, + ) + config.load_cert_chain("./cert.pem", "./key.pem", password="qqqqqqqq") + return config + + +def get_ticket_store(): + return SessionTicketStore() diff --git a/sanic/server/protocols/http_protocol.py b/sanic/server/protocols/http_protocol.py index 409f5e4b..551603e0 100644 --- a/sanic/server/protocols/http_protocol.py +++ b/sanic/server/protocols/http_protocol.py @@ -2,6 +2,9 @@ from __future__ import annotations from typing import TYPE_CHECKING, Optional +from aioquic.h3.connection import H3_ALPN, H3Connection + +from sanic.http.http3 import Http3 from sanic.touchup.meta import TouchUpMeta @@ -11,6 +14,10 @@ if TYPE_CHECKING: from asyncio import CancelledError from time import monotonic as current_time +from aioquic.asyncio import QuicConnectionProtocol +from aioquic.h3.events import H3Event +from aioquic.quic.events import ProtocolNegotiated, QuicEvent + from sanic.exceptions import RequestTimeout, ServiceUnavailable from sanic.http import Http, Stage from sanic.log import error_logger, logger @@ -19,12 +26,35 @@ from sanic.request import Request from sanic.server.protocols.base_protocol import SanicProtocol -class HttpProtocol(SanicProtocol, metaclass=TouchUpMeta): +class HttpProtocolMixin: + def _setup_connection(self, *args, **kwargs): + self._http = self.HTTP_CLASS(self, *args, **kwargs) + self._time = current_time() + try: + self.check_timeouts() + except AttributeError: + ... + + def _setup(self): + self.request: Optional[Request] = None + self.access_log = self.app.config.ACCESS_LOG + self.request_handler = self.app.handle_request + self.error_handler = self.app.error_handler + self.request_timeout = self.app.config.REQUEST_TIMEOUT + self.response_timeout = self.app.config.RESPONSE_TIMEOUT + self.keep_alive_timeout = self.app.config.KEEP_ALIVE_TIMEOUT + self.request_max_size = self.app.config.REQUEST_MAX_SIZE + self.request_class = self.app.request_class or Request + + +class HttpProtocol(HttpProtocolMixin, SanicProtocol, metaclass=TouchUpMeta): """ This class provides implements the HTTP 1.1 protocol on top of our Sanic Server transport """ + HTTP_CLASS = Http + __touchup__ = ( "send", "connection_task", @@ -70,25 +100,12 @@ class HttpProtocol(SanicProtocol, metaclass=TouchUpMeta): unix=unix, ) self.url = None - self.request: Optional[Request] = None - self.access_log = self.app.config.ACCESS_LOG - self.request_handler = self.app.handle_request - self.error_handler = self.app.error_handler - self.request_timeout = self.app.config.REQUEST_TIMEOUT - self.response_timeout = self.app.config.RESPONSE_TIMEOUT - self.keep_alive_timeout = self.app.config.KEEP_ALIVE_TIMEOUT - self.request_max_size = self.app.config.REQUEST_MAX_SIZE - self.request_class = self.app.request_class or Request self.state = state if state else {} + self._setup() if "requests_count" not in self.state: self.state["requests_count"] = 0 self._exception = None - def _setup_connection(self): - self._http = Http(self) - self._time = current_time() - self.check_timeouts() - async def connection_task(self): # no cov """ Run a HTTP connection. @@ -236,3 +253,32 @@ class HttpProtocol(SanicProtocol, metaclass=TouchUpMeta): self._data_received.set() except Exception: error_logger.exception("protocol.data_received") + + +class Http3Protocol(HttpProtocolMixin, QuicConnectionProtocol): + HTTP_CLASS = Http3 + + def __init__(self, *args, app: Sanic, **kwargs) -> None: + self.app = app + super().__init__(*args, **kwargs) + self._setup() + self._connection = None + + def quic_event_received(self, event: QuicEvent) -> None: + print("[quic_event_received]:", event) + if isinstance(event, ProtocolNegotiated): + self._setup_connection(transmit=self.transmit) + if event.alpn_protocol in H3_ALPN: + self._connection = H3Connection( + self._quic, enable_webtransport=True + ) + # elif event.alpn_protocol in H0_ALPN: + # self._http = H0Connection(self._quic) + # elif isinstance(event, DatagramFrameReceived): + # if event.data == b"quack": + # self._quic.send_datagram_frame(b"quack-ack") + + # pass event to the HTTP layer + if self._connection is not None: + for http_event in self._connection.handle_event(event): + self._http.http_event_received(http_event) diff --git a/sanic/server/runners.py b/sanic/server/runners.py index 94a29328..c5fbd2f2 100644 --- a/sanic/server/runners.py +++ b/sanic/server/runners.py @@ -4,6 +4,7 @@ from ssl import SSLContext from typing import TYPE_CHECKING, Dict, Optional, Type, Union from sanic.config import Config +from sanic.http.constants import HTTP from sanic.server.events import trigger_events @@ -19,11 +20,14 @@ from functools import partial from signal import SIG_IGN, SIGINT, SIGTERM, Signals from signal import signal as signal_func +from aioquic.asyncio import serve as quic_serve + from sanic.compat import OS_IS_WINDOWS, ctrlc_workaround_for_windows +from sanic.http.http3 import get_config, get_ticket_store from sanic.log import error_logger, logger from sanic.models.server_types import Signal from sanic.server.async_server import AsyncioServer -from sanic.server.protocols.http_protocol import HttpProtocol +from sanic.server.protocols.http_protocol import Http3Protocol, HttpProtocol from sanic.server.socket import ( bind_socket, bind_unix_socket, @@ -49,6 +53,7 @@ def serve( signal=Signal(), state=None, asyncio_server_kwargs=None, + version=HTTP.VERSION_1, ): """Start asynchronous HTTP Server on an individual process. @@ -85,6 +90,9 @@ def serve( app.asgi = False + if version is HTTP.VERSION_3: + return serve_http_3(host, port, app, loop) + connections = connections if connections is not None else set() protocol_kwargs = _build_protocol_kwargs(protocol, app.config) server = partial( @@ -185,6 +193,40 @@ def serve( remove_unix_socket(unix) +def serve_http_3(host, port, app, loop): + protocol = partial(Http3Protocol, app=app) + ticket_store = get_ticket_store() + config = get_config() + coro = quic_serve( + host, + port, + configuration=config, + create_protocol=protocol, + session_ticket_fetcher=ticket_store.pop, + session_ticket_handler=ticket_store.add, + ) + server = AsyncioServer(app, loop, coro, []) + loop.run_until_complete(server.startup()) + loop.run_until_complete(server.before_start()) + loop.run_until_complete(server) + loop.run_until_complete(server.after_start()) + + pid = os.getpid() + try: + logger.info("Starting worker [%s]", pid) + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + logger.info("Stopping worker [%s]", pid) + + loop.run_until_complete(server.before_stop()) + + # DO close connections here + + loop.run_until_complete(server.after_stop()) + + def serve_single(server_settings): main_start = server_settings.pop("main_start", None) main_stop = server_settings.pop("main_stop", None)