Move HTTP streaming events to receiver
This commit is contained in:
parent
d26d79c182
commit
06035c8d51
@ -2,9 +2,9 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from abc import ABC
|
from abc import ABC, abstractmethod
|
||||||
from ssl import SSLContext
|
from ssl import SSLContext
|
||||||
from typing import TYPE_CHECKING, Callable, Dict, Optional, Union
|
from typing import TYPE_CHECKING, Callable, Dict, Optional, Tuple, Union
|
||||||
|
|
||||||
from aioquic.h0.connection import H0_ALPN, H0Connection
|
from aioquic.h0.connection import H0_ALPN, H0Connection
|
||||||
from aioquic.h3.connection import H3_ALPN, H3Connection
|
from aioquic.h3.connection import H3_ALPN, H3Connection
|
||||||
@ -32,12 +32,14 @@ from sanic.http.tls import CertSimple
|
|||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from sanic import Sanic
|
from sanic import Sanic
|
||||||
from sanic.request import Request
|
from sanic.request import Request
|
||||||
from sanic.response import BaseHTTPResponse
|
from sanic.response import BaseHTTPResponse, HTTPResponse
|
||||||
from sanic.server.protocols.http_protocol import Http3Protocol
|
from sanic.server.protocols.http_protocol import Http3Protocol
|
||||||
|
|
||||||
# from sanic.compat import Header
|
# from sanic.compat import Header
|
||||||
|
from sanic.http.constants import Stage
|
||||||
|
|
||||||
# from sanic.application.state import Mode
|
# from sanic.application.state import Mode
|
||||||
from sanic.log import logger
|
from sanic.log import Colors, logger
|
||||||
|
|
||||||
|
|
||||||
HttpConnection = Union[H0Connection, H3Connection]
|
HttpConnection = Union[H0Connection, H3Connection]
|
||||||
@ -48,23 +50,94 @@ class Transport:
|
|||||||
|
|
||||||
|
|
||||||
class Receiver(ABC):
|
class Receiver(ABC):
|
||||||
def __init__(self, transmit, protocol, request) -> None:
|
def __init__(self, transmit, protocol, request: Request) -> None:
|
||||||
self.transmit = transmit
|
self.transmit = transmit
|
||||||
self.protocol = protocol
|
self.protocol = protocol
|
||||||
self.request = request
|
self.request = request
|
||||||
|
self.queue: asyncio.Queue[Tuple[bytes, bool]] = asyncio.Queue()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def run(self):
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
class HTTPReceiver(Receiver):
|
class HTTPReceiver(Receiver):
|
||||||
async def respond(self):
|
stage: Stage
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs) -> None:
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.request_body = None
|
||||||
|
self.stage = Stage.IDLE
|
||||||
|
self.headers_sent = False
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
self.stage = Stage.HANDLER
|
||||||
|
|
||||||
logger.info(f"Request received: {self.request}")
|
logger.info(f"Request received: {self.request}")
|
||||||
await self.protocol.app.handle_request(self.request)
|
await self.protocol.request_handler(self.request)
|
||||||
|
|
||||||
|
self.stage = Stage.RESPONSE
|
||||||
|
|
||||||
|
while self.stage is Stage.RESPONSE:
|
||||||
|
if not self.headers_sent:
|
||||||
|
self.send_headers()
|
||||||
|
data, end_stream = await self.queue.get()
|
||||||
|
|
||||||
|
# Chunked
|
||||||
|
size = len(data)
|
||||||
|
if end_stream:
|
||||||
|
data = (
|
||||||
|
b"%x\r\n%b\r\n0\r\n\r\n" % (size, data)
|
||||||
|
if size
|
||||||
|
else b"0\r\n\r\n"
|
||||||
|
)
|
||||||
|
elif size:
|
||||||
|
data = b"%x\r\n%b\r\n" % (size, data)
|
||||||
|
|
||||||
|
self.protocol.connection.send_data(
|
||||||
|
stream_id=self.request.stream_id,
|
||||||
|
data=data,
|
||||||
|
end_stream=end_stream,
|
||||||
|
)
|
||||||
|
self.transmit()
|
||||||
|
|
||||||
|
if end_stream:
|
||||||
|
self.stage = Stage.IDLE
|
||||||
|
|
||||||
|
def send_headers(self) -> None:
|
||||||
|
response = self.request.stream.response
|
||||||
|
self.protocol.connection.send_headers(
|
||||||
|
stream_id=self.request.stream_id,
|
||||||
|
headers=[
|
||||||
|
(b":status", str(response.status).encode()),
|
||||||
|
*(
|
||||||
|
(k.encode(), v.encode())
|
||||||
|
for k, v in response.headers.items()
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
self.headers_sent = True
|
||||||
|
|
||||||
|
async def respond(self, response: BaseHTTPResponse) -> BaseHTTPResponse:
|
||||||
|
print(f"{Colors.BLUE}[respond]: {Colors.GREEN}{response=}{Colors.END}")
|
||||||
|
self.response, response.stream = response, self
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
async def send(self, data: bytes, end_stream: bool) -> None:
|
||||||
|
print(
|
||||||
|
f"{Colors.BLUE}[send]: {Colors.GREEN}{data=} {end_stream=}{Colors.END}"
|
||||||
|
)
|
||||||
|
self.queue.put_nowait((data, end_stream))
|
||||||
|
|
||||||
|
|
||||||
class WebsocketReceiver(Receiver):
|
class WebsocketReceiver(Receiver):
|
||||||
|
async def run(self):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
class WebTransportReceiver(Receiver):
|
class WebTransportReceiver(Receiver):
|
||||||
|
async def run(self):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
@ -81,37 +154,48 @@ class Http3:
|
|||||||
protocol: Http3Protocol,
|
protocol: Http3Protocol,
|
||||||
transmit: Callable[[], None],
|
transmit: Callable[[], None],
|
||||||
) -> None:
|
) -> None:
|
||||||
self.request_body = None
|
# self.request_body = None
|
||||||
self.request: Optional[Request] = None
|
# self.request: Optional[Request] = None
|
||||||
self.protocol = protocol
|
self.protocol = protocol
|
||||||
self.transmit = transmit
|
self.transmit = transmit
|
||||||
self.receivers: Dict[int, Receiver] = {}
|
self.receivers: Dict[int, Receiver] = {}
|
||||||
|
|
||||||
def http_event_received(self, event: H3Event) -> None:
|
def http_event_received(self, event: H3Event) -> None:
|
||||||
print("[http_event_received]:", event)
|
print(
|
||||||
receiver = self.get_or_make_receiver(event)
|
f"{Colors.BLUE}[http_event_received]: "
|
||||||
|
f"{Colors.YELLOW}{event}{Colors.END}"
|
||||||
|
)
|
||||||
|
receiver, created_new = self.get_or_make_receiver(event)
|
||||||
print(f"{receiver=}")
|
print(f"{receiver=}")
|
||||||
|
|
||||||
# asyncio.ensure_future(handler(request))
|
if isinstance(event, HeadersReceived) and created_new:
|
||||||
|
asyncio.ensure_future(receiver.run())
|
||||||
|
else:
|
||||||
|
print(f"{Colors.RED}DOING NOTHING{Colors.END}")
|
||||||
|
|
||||||
def get_or_make_receiver(self, event: H3Event) -> Receiver:
|
def get_or_make_receiver(self, event: H3Event) -> Tuple[Receiver, bool]:
|
||||||
if (
|
if (
|
||||||
isinstance(event, HeadersReceived)
|
isinstance(event, HeadersReceived)
|
||||||
and event.stream_id not in self.receivers
|
and event.stream_id not in self.receivers
|
||||||
):
|
):
|
||||||
self.request = self._make_request(event)
|
request = self._make_request(event)
|
||||||
receiver = HTTPReceiver(self.transmit, self.protocol, self.request)
|
receiver = HTTPReceiver(self.transmit, self.protocol, request)
|
||||||
|
request.stream = receiver
|
||||||
|
|
||||||
self.receivers[event.stream_id] = receiver
|
self.receivers[event.stream_id] = receiver
|
||||||
asyncio.ensure_future(receiver.respond())
|
return receiver, True
|
||||||
else:
|
else:
|
||||||
ident = getattr(event, self.HANDLER_PROPERTY_MAPPING[type(event)])
|
ident = getattr(event, self.HANDLER_PROPERTY_MAPPING[type(event)])
|
||||||
return self.receivers[ident]
|
return self.receivers[ident], False
|
||||||
|
|
||||||
|
def get_receiver_by_stream_id(self, stream_id: int) -> Receiver:
|
||||||
|
return self.receivers[stream_id]
|
||||||
|
|
||||||
def _make_request(self, event: HeadersReceived) -> Request:
|
def _make_request(self, event: HeadersReceived) -> Request:
|
||||||
method, path, *rem = event.headers
|
method_header, path_header, *rem = event.headers
|
||||||
headers = Header(((k.decode(), v.decode()) for k, v in rem))
|
headers = Header(((k.decode(), v.decode()) for k, v in rem))
|
||||||
method = method[1].decode()
|
method = method_header[1].decode()
|
||||||
path = path[1]
|
path = path_header[1]
|
||||||
scheme = headers.pop(":scheme")
|
scheme = headers.pop(":scheme")
|
||||||
authority = headers.pop(":authority")
|
authority = headers.pop(":authority")
|
||||||
print(f"{headers=}")
|
print(f"{headers=}")
|
||||||
@ -125,44 +209,10 @@ class Http3:
|
|||||||
request = self.protocol.request_class(
|
request = self.protocol.request_class(
|
||||||
path, headers, "3", method, Transport(), self.protocol.app, b""
|
path, headers, "3", method, Transport(), self.protocol.app, b""
|
||||||
)
|
)
|
||||||
request.stream = self
|
request._stream_id = event.stream_id
|
||||||
print(f"{request=}")
|
print(f"{request=}")
|
||||||
return request
|
return request
|
||||||
|
|
||||||
async def respond(self, response: BaseHTTPResponse) -> BaseHTTPResponse:
|
|
||||||
print(f"[respond]: {response=}")
|
|
||||||
response.headers.update({"foo": "bar"})
|
|
||||||
self.response, response.stream = response, self
|
|
||||||
|
|
||||||
# Need more appropriate place to send these
|
|
||||||
self.protocol.connection.send_headers(
|
|
||||||
stream_id=0,
|
|
||||||
headers=[
|
|
||||||
(b":status", str(self.response.status).encode()),
|
|
||||||
*(
|
|
||||||
(k.encode(), v.encode())
|
|
||||||
for k, v in self.response.headers.items()
|
|
||||||
),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
# TEMP
|
|
||||||
await self.drain(response)
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
async def drain(self, response: BaseHTTPResponse) -> None:
|
|
||||||
await self.send(response.body, False)
|
|
||||||
|
|
||||||
async def send(self, data: bytes, end_stream: bool) -> None:
|
|
||||||
print(f"[send]: {data=} {end_stream=}")
|
|
||||||
print(self.response.headers)
|
|
||||||
self.protocol.connection.send_data(
|
|
||||||
stream_id=0,
|
|
||||||
data=data,
|
|
||||||
end_stream=end_stream,
|
|
||||||
)
|
|
||||||
self.transmit()
|
|
||||||
|
|
||||||
|
|
||||||
class SessionTicketStore:
|
class SessionTicketStore:
|
||||||
"""
|
"""
|
||||||
|
@ -62,7 +62,8 @@ class Colors(str, Enum): # no cov
|
|||||||
BLUE = "\033[01;34m"
|
BLUE = "\033[01;34m"
|
||||||
GREEN = "\033[01;32m"
|
GREEN = "\033[01;32m"
|
||||||
YELLOW = "\033[01;33m"
|
YELLOW = "\033[01;33m"
|
||||||
RED = "\033[01;31m"
|
RED = "\033[01;34m"
|
||||||
|
PURPLE = "\033[01;35m"
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger("sanic.root") # no cov
|
logger = logging.getLogger("sanic.root") # no cov
|
||||||
|
@ -13,8 +13,9 @@ from typing import (
|
|||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
from sanic_routing.route import Route # type: ignore
|
from sanic_routing.route import Route
|
||||||
|
|
||||||
|
from sanic.http.http3 import HTTPReceiver # type: ignore
|
||||||
from sanic.models.http_types import Credentials
|
from sanic.models.http_types import Credentials
|
||||||
|
|
||||||
|
|
||||||
@ -91,6 +92,7 @@ class Request:
|
|||||||
"_protocol",
|
"_protocol",
|
||||||
"_remote_addr",
|
"_remote_addr",
|
||||||
"_socket",
|
"_socket",
|
||||||
|
"_stream_id",
|
||||||
"_match_info",
|
"_match_info",
|
||||||
"_name",
|
"_name",
|
||||||
"app",
|
"app",
|
||||||
@ -127,6 +129,7 @@ class Request:
|
|||||||
transport: TransportProtocol,
|
transport: TransportProtocol,
|
||||||
app: Sanic,
|
app: Sanic,
|
||||||
head: bytes = b"",
|
head: bytes = b"",
|
||||||
|
stream_id: int = 0,
|
||||||
):
|
):
|
||||||
|
|
||||||
self.raw_url = url_bytes
|
self.raw_url = url_bytes
|
||||||
@ -134,6 +137,7 @@ class Request:
|
|||||||
self._parsed_url = parse_url(url_bytes)
|
self._parsed_url = parse_url(url_bytes)
|
||||||
self._id: Optional[Union[uuid.UUID, str, int]] = None
|
self._id: Optional[Union[uuid.UUID, str, int]] = None
|
||||||
self._name: Optional[str] = None
|
self._name: Optional[str] = None
|
||||||
|
self._stream_id = stream_id
|
||||||
self.app = app
|
self.app = app
|
||||||
|
|
||||||
self.headers = Header(headers)
|
self.headers = Header(headers)
|
||||||
@ -162,7 +166,9 @@ class Request:
|
|||||||
self.request_middleware_started = False
|
self.request_middleware_started = False
|
||||||
self._cookies: Optional[Dict[str, str]] = None
|
self._cookies: Optional[Dict[str, str]] = None
|
||||||
self._match_info: Dict[str, Any] = {}
|
self._match_info: Dict[str, Any] = {}
|
||||||
self.stream: Optional[Http] = None
|
# TODO:
|
||||||
|
# - Create an ABC (called Stream) for Http and HTTPReceiver to subclass
|
||||||
|
self.stream: Optional[Union[Http, HTTPReceiver]] = None
|
||||||
self.route: Optional[Route] = None
|
self.route: Optional[Route] = None
|
||||||
self._protocol = None
|
self._protocol = None
|
||||||
self.responded: bool = False
|
self.responded: bool = False
|
||||||
@ -175,6 +181,10 @@ class Request:
|
|||||||
def generate_id(*_):
|
def generate_id(*_):
|
||||||
return uuid.uuid4()
|
return uuid.uuid4()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stream_id(self):
|
||||||
|
return self._stream_id
|
||||||
|
|
||||||
def reset_response(self):
|
def reset_response(self):
|
||||||
try:
|
try:
|
||||||
if (
|
if (
|
||||||
|
@ -20,7 +20,7 @@ from aioquic.quic.events import ProtocolNegotiated, QuicEvent
|
|||||||
|
|
||||||
from sanic.exceptions import RequestTimeout, ServiceUnavailable
|
from sanic.exceptions import RequestTimeout, ServiceUnavailable
|
||||||
from sanic.http import Http, Stage
|
from sanic.http import Http, Stage
|
||||||
from sanic.log import error_logger, logger
|
from sanic.log import Colors, error_logger, logger
|
||||||
from sanic.models.server_types import ConnInfo
|
from sanic.models.server_types import ConnInfo
|
||||||
from sanic.request import Request
|
from sanic.request import Request
|
||||||
from sanic.server.protocols.base_protocol import SanicProtocol
|
from sanic.server.protocols.base_protocol import SanicProtocol
|
||||||
@ -265,7 +265,9 @@ class Http3Protocol(HttpProtocolMixin, QuicConnectionProtocol):
|
|||||||
self._connection: Optional[H3Connection] = None
|
self._connection: Optional[H3Connection] = None
|
||||||
|
|
||||||
def quic_event_received(self, event: QuicEvent) -> None:
|
def quic_event_received(self, event: QuicEvent) -> None:
|
||||||
print("[quic_event_received]:", event)
|
print(
|
||||||
|
f"{Colors.BLUE}[quic_event_received]: {Colors.PURPLE}{event}{Colors.END}"
|
||||||
|
)
|
||||||
if isinstance(event, ProtocolNegotiated):
|
if isinstance(event, ProtocolNegotiated):
|
||||||
self._setup_connection(transmit=self.transmit)
|
self._setup_connection(transmit=self.transmit)
|
||||||
if event.alpn_protocol in H3_ALPN:
|
if event.alpn_protocol in H3_ALPN:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user