WIP to begin http3

This commit is contained in:
Adam Hopkins 2021-12-12 09:07:25 +02:00
parent 96c027bad5
commit f1c2d0e042
No known key found for this signature in database
GPG Key ID: 9F85EE6C807303FB
7 changed files with 280 additions and 36 deletions

View File

@ -35,6 +35,7 @@ from typing import (
Dict, Dict,
Iterable, Iterable,
List, List,
Literal,
Optional, Optional,
Set, Set,
Tuple, Tuple,
@ -68,6 +69,7 @@ from sanic.exceptions import (
) )
from sanic.handlers import ErrorHandler from sanic.handlers import ErrorHandler
from sanic.http import Stage from sanic.http import Stage
from sanic.http.constants import HTTP
from sanic.log import LOGGING_CONFIG_DEFAULTS, Colors, error_logger, logger from sanic.log import LOGGING_CONFIG_DEFAULTS, Colors, error_logger, logger
from sanic.mixins.listeners import ListenerEvent from sanic.mixins.listeners import ListenerEvent
from sanic.models.futures import ( from sanic.models.futures import (
@ -1050,6 +1052,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
fast: bool = False, fast: bool = False,
verbosity: int = 0, verbosity: int = 0,
motd_display: Optional[Dict[str, str]] = None, motd_display: Optional[Dict[str, str]] = None,
version: HTTP = HTTP.VERSION_1,
) -> None: ) -> None:
""" """
Run the HTTP Server and listen until keyboard interrupt or term Run the HTTP Server and listen until keyboard interrupt or term
@ -1156,6 +1159,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
protocol=protocol, protocol=protocol,
backlog=backlog, backlog=backlog,
register_sys_signals=register_sys_signals, register_sys_signals=register_sys_signals,
version=version,
) )
try: try:
@ -1383,6 +1387,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
backlog: int = 100, backlog: int = 100,
register_sys_signals: bool = True, register_sys_signals: bool = True,
run_async: bool = False, run_async: bool = False,
version: Union[HTTP, Literal[1], Literal[3]] = HTTP.VERSION_1,
): ):
"""Helper function used by `run` and `create_server`.""" """Helper function used by `run` and `create_server`."""
if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0: if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0:
@ -1392,6 +1397,9 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
"#proxy-configuration" "#proxy-configuration"
) )
if isinstance(version, int):
version = HTTP(version)
self.debug = debug self.debug = debug
self.state.host = host self.state.host = host
self.state.port = port self.state.port = port
@ -1425,6 +1433,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
"loop": loop, "loop": loop,
"register_sys_signals": register_sys_signals, "register_sys_signals": register_sys_signals,
"backlog": backlog, "backlog": backlog,
"version": version,
} }
self.motd(serve_location) self.motd(serve_location)

5
sanic/http/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .constants import Stage
from .http1 import Http
__all__ = ("Http", "Stage")

25
sanic/http/constants.py Normal file
View File

@ -0,0 +1,25 @@
from enum import Enum
class Stage(Enum):
"""
Enum for representing the stage of the request/response cycle
| ``IDLE`` Waiting for request
| ``REQUEST`` Request headers being received
| ``HANDLER`` Headers done, handler running
| ``RESPONSE`` Response headers sent, body in progress
| ``FAILED`` Unrecoverable state (error while sending response)
|
"""
IDLE = 0 # Waiting for request
REQUEST = 1 # Request headers being received
HANDLER = 3 # Headers done, handler running
RESPONSE = 4 # Response headers sent, body in progress
FAILED = 100 # Unrecoverable state (error while sending response)
class HTTP(Enum):
VERSION_1 = 1
VERSION_3 = 3

View File

@ -8,7 +8,6 @@ if TYPE_CHECKING:
from sanic.response import BaseHTTPResponse from sanic.response import BaseHTTPResponse
from asyncio import CancelledError, sleep from asyncio import CancelledError, sleep
from enum import Enum
from sanic.compat import Header from sanic.compat import Header
from sanic.exceptions import ( from sanic.exceptions import (
@ -20,29 +19,11 @@ from sanic.exceptions import (
) )
from sanic.headers import format_http1_response from sanic.headers import format_http1_response
from sanic.helpers import has_message_body from sanic.helpers import has_message_body
from sanic.http.constants import Stage
from sanic.log import access_logger, error_logger, logger from sanic.log import access_logger, error_logger, logger
from sanic.touchup import TouchUpMeta from sanic.touchup import TouchUpMeta
class Stage(Enum):
"""
Enum for representing the stage of the request/response cycle
| ``IDLE`` Waiting for request
| ``REQUEST`` Request headers being received
| ``HANDLER`` Headers done, handler running
| ``RESPONSE`` Response headers sent, body in progress
| ``FAILED`` Unrecoverable state (error while sending response)
|
"""
IDLE = 0 # Waiting for request
REQUEST = 1 # Request headers being received
HANDLER = 3 # Headers done, handler running
RESPONSE = 4 # Response headers sent, body in progress
FAILED = 100 # Unrecoverable state (error while sending response)
HTTP_CONTINUE = b"HTTP/1.1 100 Continue\r\n\r\n" HTTP_CONTINUE = b"HTTP/1.1 100 Continue\r\n\r\n"

136
sanic/http/http3.py Normal file
View File

@ -0,0 +1,136 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Dict, Optional, Union
from aioquic.h0.connection import H0_ALPN, H0Connection
from aioquic.h3.connection import H3_ALPN, H3Connection
from aioquic.h3.events import H3Event
# from aioquic.h3.events import (
# DatagramReceived,
# DataReceived,
# H3Event,
# HeadersReceived,
# WebTransportStreamDataReceived,
# )
from aioquic.quic.configuration import QuicConfiguration
# from aioquic.quic.events import (
# DatagramFrameReceived,
# ProtocolNegotiated,
# QuicEvent,
# )
from aioquic.tls import SessionTicket
if TYPE_CHECKING:
from sanic.request import Request
# from sanic.compat import Header
from sanic.log import logger
from sanic.response import BaseHTTPResponse
HttpConnection = Union[H0Connection, H3Connection]
async def handler(request: Request):
logger.info(f"Request received: {request}")
response = await request.app.handle_request(request)
logger.info(f"Build response: {response=}")
class Transport:
...
class Http3:
def __init__(
self,
connection: HttpConnection,
transmit: Callable[[], None],
) -> None:
self.request_body = None
self.connection = connection
self.transmit = transmit
def http_event_received(self, event: H3Event) -> None:
print("[http_event_received]:", event)
# if isinstance(event, HeadersReceived):
# method, path, *rem = event.headers
# headers = Header(((k.decode(), v.decode()) for k, v in rem))
# method = method[1].decode()
# path = path[1]
# scheme = headers.pop(":scheme")
# authority = headers.pop(":authority")
# print(f"{headers=}")
# print(f"{method=}")
# print(f"{path=}")
# print(f"{scheme=}")
# print(f"{authority=}")
# if authority:
# headers["host"] = authority
# request = Request(
# path, headers, "3", method, Transport(), app, b""
# )
# request.stream = Stream(
# connection=self._http, transmit=self.transmit
# )
# print(f"{request=}")
# asyncio.ensure_future(handler(request))
async def send(self, data: bytes, end_stream: bool) -> None:
print(f"[send]: {data=} {end_stream=}")
print(self.response.headers)
# self.connection.send_headers(
# stream_id=0,
# headers=[
# (b":status", str(self.response.status).encode()),
# *(
# (k.encode(), v.encode())
# for k, v in self.response.headers.items()
# ),
# ],
# )
# self.connection.send_data(
# stream_id=0,
# data=data,
# end_stream=end_stream,
# )
# self.transmit()
def respond(self, response: BaseHTTPResponse) -> BaseHTTPResponse:
print(f"[respond]: {response=}")
self.response, response.stream = response, self
return response
class SessionTicketStore:
"""
Simple in-memory store for session tickets.
"""
def __init__(self) -> None:
self.tickets: Dict[bytes, SessionTicket] = {}
def add(self, ticket: SessionTicket) -> None:
self.tickets[ticket.ticket] = ticket
def pop(self, label: bytes) -> Optional[SessionTicket]:
return self.tickets.pop(label, None)
def get_config():
config = QuicConfiguration(
alpn_protocols=H3_ALPN + H0_ALPN + ["siduck"],
is_client=False,
max_datagram_frame_size=65536,
)
config.load_cert_chain("./cert.pem", "./key.pem", password="qqqqqqqq")
return config
def get_ticket_store():
return SessionTicketStore()

View File

@ -2,6 +2,9 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional
from aioquic.h3.connection import H3_ALPN, H3Connection
from sanic.http.http3 import Http3
from sanic.touchup.meta import TouchUpMeta from sanic.touchup.meta import TouchUpMeta
@ -11,6 +14,10 @@ if TYPE_CHECKING:
from asyncio import CancelledError from asyncio import CancelledError
from time import monotonic as current_time from time import monotonic as current_time
from aioquic.asyncio import QuicConnectionProtocol
from aioquic.h3.events import H3Event
from aioquic.quic.events import ProtocolNegotiated, QuicEvent
from sanic.exceptions import RequestTimeout, ServiceUnavailable from sanic.exceptions import RequestTimeout, ServiceUnavailable
from sanic.http import Http, Stage from sanic.http import Http, Stage
from sanic.log import error_logger, logger from sanic.log import error_logger, logger
@ -19,12 +26,35 @@ from sanic.request import Request
from sanic.server.protocols.base_protocol import SanicProtocol from sanic.server.protocols.base_protocol import SanicProtocol
class HttpProtocol(SanicProtocol, metaclass=TouchUpMeta): class HttpProtocolMixin:
def _setup_connection(self, *args, **kwargs):
self._http = self.HTTP_CLASS(self, *args, **kwargs)
self._time = current_time()
try:
self.check_timeouts()
except AttributeError:
...
def _setup(self):
self.request: Optional[Request] = None
self.access_log = self.app.config.ACCESS_LOG
self.request_handler = self.app.handle_request
self.error_handler = self.app.error_handler
self.request_timeout = self.app.config.REQUEST_TIMEOUT
self.response_timeout = self.app.config.RESPONSE_TIMEOUT
self.keep_alive_timeout = self.app.config.KEEP_ALIVE_TIMEOUT
self.request_max_size = self.app.config.REQUEST_MAX_SIZE
self.request_class = self.app.request_class or Request
class HttpProtocol(HttpProtocolMixin, SanicProtocol, metaclass=TouchUpMeta):
""" """
This class provides implements the HTTP 1.1 protocol on top of our This class provides implements the HTTP 1.1 protocol on top of our
Sanic Server transport Sanic Server transport
""" """
HTTP_CLASS = Http
__touchup__ = ( __touchup__ = (
"send", "send",
"connection_task", "connection_task",
@ -70,25 +100,12 @@ class HttpProtocol(SanicProtocol, metaclass=TouchUpMeta):
unix=unix, unix=unix,
) )
self.url = None self.url = None
self.request: Optional[Request] = None
self.access_log = self.app.config.ACCESS_LOG
self.request_handler = self.app.handle_request
self.error_handler = self.app.error_handler
self.request_timeout = self.app.config.REQUEST_TIMEOUT
self.response_timeout = self.app.config.RESPONSE_TIMEOUT
self.keep_alive_timeout = self.app.config.KEEP_ALIVE_TIMEOUT
self.request_max_size = self.app.config.REQUEST_MAX_SIZE
self.request_class = self.app.request_class or Request
self.state = state if state else {} self.state = state if state else {}
self._setup()
if "requests_count" not in self.state: if "requests_count" not in self.state:
self.state["requests_count"] = 0 self.state["requests_count"] = 0
self._exception = None self._exception = None
def _setup_connection(self):
self._http = Http(self)
self._time = current_time()
self.check_timeouts()
async def connection_task(self): # no cov async def connection_task(self): # no cov
""" """
Run a HTTP connection. Run a HTTP connection.
@ -236,3 +253,32 @@ class HttpProtocol(SanicProtocol, metaclass=TouchUpMeta):
self._data_received.set() self._data_received.set()
except Exception: except Exception:
error_logger.exception("protocol.data_received") error_logger.exception("protocol.data_received")
class Http3Protocol(HttpProtocolMixin, QuicConnectionProtocol):
HTTP_CLASS = Http3
def __init__(self, *args, app: Sanic, **kwargs) -> None:
self.app = app
super().__init__(*args, **kwargs)
self._setup()
self._connection = None
def quic_event_received(self, event: QuicEvent) -> None:
print("[quic_event_received]:", event)
if isinstance(event, ProtocolNegotiated):
self._setup_connection(transmit=self.transmit)
if event.alpn_protocol in H3_ALPN:
self._connection = H3Connection(
self._quic, enable_webtransport=True
)
# elif event.alpn_protocol in H0_ALPN:
# self._http = H0Connection(self._quic)
# elif isinstance(event, DatagramFrameReceived):
# if event.data == b"quack":
# self._quic.send_datagram_frame(b"quack-ack")
# pass event to the HTTP layer
if self._connection is not None:
for http_event in self._connection.handle_event(event):
self._http.http_event_received(http_event)

View File

@ -4,6 +4,7 @@ from ssl import SSLContext
from typing import TYPE_CHECKING, Dict, Optional, Type, Union from typing import TYPE_CHECKING, Dict, Optional, Type, Union
from sanic.config import Config from sanic.config import Config
from sanic.http.constants import HTTP
from sanic.server.events import trigger_events from sanic.server.events import trigger_events
@ -19,11 +20,14 @@ from functools import partial
from signal import SIG_IGN, SIGINT, SIGTERM, Signals from signal import SIG_IGN, SIGINT, SIGTERM, Signals
from signal import signal as signal_func from signal import signal as signal_func
from aioquic.asyncio import serve as quic_serve
from sanic.compat import OS_IS_WINDOWS, ctrlc_workaround_for_windows from sanic.compat import OS_IS_WINDOWS, ctrlc_workaround_for_windows
from sanic.http.http3 import get_config, get_ticket_store
from sanic.log import error_logger, logger from sanic.log import error_logger, logger
from sanic.models.server_types import Signal from sanic.models.server_types import Signal
from sanic.server.async_server import AsyncioServer from sanic.server.async_server import AsyncioServer
from sanic.server.protocols.http_protocol import HttpProtocol from sanic.server.protocols.http_protocol import Http3Protocol, HttpProtocol
from sanic.server.socket import ( from sanic.server.socket import (
bind_socket, bind_socket,
bind_unix_socket, bind_unix_socket,
@ -49,6 +53,7 @@ def serve(
signal=Signal(), signal=Signal(),
state=None, state=None,
asyncio_server_kwargs=None, asyncio_server_kwargs=None,
version=HTTP.VERSION_1,
): ):
"""Start asynchronous HTTP Server on an individual process. """Start asynchronous HTTP Server on an individual process.
@ -85,6 +90,9 @@ def serve(
app.asgi = False app.asgi = False
if version is HTTP.VERSION_3:
return serve_http_3(host, port, app, loop)
connections = connections if connections is not None else set() connections = connections if connections is not None else set()
protocol_kwargs = _build_protocol_kwargs(protocol, app.config) protocol_kwargs = _build_protocol_kwargs(protocol, app.config)
server = partial( server = partial(
@ -185,6 +193,40 @@ def serve(
remove_unix_socket(unix) remove_unix_socket(unix)
def serve_http_3(host, port, app, loop):
protocol = partial(Http3Protocol, app=app)
ticket_store = get_ticket_store()
config = get_config()
coro = quic_serve(
host,
port,
configuration=config,
create_protocol=protocol,
session_ticket_fetcher=ticket_store.pop,
session_ticket_handler=ticket_store.add,
)
server = AsyncioServer(app, loop, coro, [])
loop.run_until_complete(server.startup())
loop.run_until_complete(server.before_start())
loop.run_until_complete(server)
loop.run_until_complete(server.after_start())
pid = os.getpid()
try:
logger.info("Starting worker [%s]", pid)
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
logger.info("Stopping worker [%s]", pid)
loop.run_until_complete(server.before_stop())
# DO close connections here
loop.run_until_complete(server.after_stop())
def serve_single(server_settings): def serve_single(server_settings):
main_start = server_settings.pop("main_start", None) main_start = server_settings.pop("main_start", None)
main_stop = server_settings.pop("main_stop", None) main_stop = server_settings.pop("main_stop", None)