Streaming send

This commit is contained in:
Adam Hopkins 2022-02-24 09:59:47 +02:00
parent 06035c8d51
commit 2262c692ba
No known key found for this signature in database
GPG Key ID: 9F85EE6C807303FB
4 changed files with 67 additions and 37 deletions

View File

@ -920,6 +920,7 @@ class Sanic(BaseSanic, RunnerMixin, metaclass=TouchUpMeta):
if isawaitable(response): if isawaitable(response):
response = await response response = await response
print(f"{response=}", request.responded)
if request.responded: if request.responded:
if response is not None: if response is not None:
error_logger.error( error_logger.error(
@ -945,6 +946,7 @@ class Sanic(BaseSanic, RunnerMixin, metaclass=TouchUpMeta):
"response": response, "response": response,
}, },
) )
...
await response.send(end_stream=True) await response.send(end_stream=True)
elif isinstance(response, ResponseStream): elif isinstance(response, ResponseStream):
resp = await response(request) resp = await response(request)

View File

@ -4,6 +4,7 @@ import asyncio
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from ssl import SSLContext from ssl import SSLContext
from sys import exc_info
from typing import TYPE_CHECKING, Callable, Dict, Optional, Tuple, Union from typing import TYPE_CHECKING, Callable, Dict, Optional, Tuple, Union
from aioquic.h0.connection import H0_ALPN, H0Connection from aioquic.h0.connection import H0_ALPN, H0Connection
@ -39,7 +40,7 @@ if TYPE_CHECKING:
from sanic.http.constants import Stage from sanic.http.constants import Stage
# from sanic.application.state import Mode # from sanic.application.state import Mode
from sanic.log import Colors, logger from sanic.log import Colors, error_logger, logger
HttpConnection = Union[H0Connection, H3Connection] HttpConnection = Union[H0Connection, H3Connection]
@ -54,7 +55,6 @@ class Receiver(ABC):
self.transmit = transmit self.transmit = transmit
self.protocol = protocol self.protocol = protocol
self.request = request self.request = request
self.queue: asyncio.Queue[Tuple[bytes, bool]] = asyncio.Queue()
@abstractmethod @abstractmethod
async def run(self): async def run(self):
@ -69,42 +69,23 @@ class HTTPReceiver(Receiver):
self.request_body = None self.request_body = None
self.stage = Stage.IDLE self.stage = Stage.IDLE
self.headers_sent = False self.headers_sent = False
self.response = None
async def run(self): async def run(self):
self.stage = Stage.HANDLER self.stage = Stage.HANDLER
logger.info(f"Request received: {self.request}") try:
logger.info(f">>> Request received: {self.request}")
await self.protocol.request_handler(self.request) await self.protocol.request_handler(self.request)
except Exception:
# TODO:
# - Handler errors
raise
else:
self.stage = Stage.RESPONSE self.stage = Stage.RESPONSE
while self.stage is Stage.RESPONSE:
if not self.headers_sent:
self.send_headers()
data, end_stream = await self.queue.get()
# Chunked
size = len(data)
if end_stream:
data = (
b"%x\r\n%b\r\n0\r\n\r\n" % (size, data)
if size
else b"0\r\n\r\n"
)
elif size:
data = b"%x\r\n%b\r\n" % (size, data)
self.protocol.connection.send_data(
stream_id=self.request.stream_id,
data=data,
end_stream=end_stream,
)
self.transmit()
if end_stream:
self.stage = Stage.IDLE
def send_headers(self) -> None: def send_headers(self) -> None:
print(f"{Colors.RED}SEND HEADERS{Colors.END}")
response = self.request.stream.response response = self.request.stream.response
self.protocol.connection.send_headers( self.protocol.connection.send_headers(
stream_id=self.request.stream_id, stream_id=self.request.stream_id,
@ -117,9 +98,22 @@ class HTTPReceiver(Receiver):
], ],
) )
self.headers_sent = True self.headers_sent = True
self.stage = Stage.RESPONSE
async def respond(self, response: BaseHTTPResponse) -> BaseHTTPResponse: if self.response.body:
self._send(self.response.body, False)
def respond(self, response: BaseHTTPResponse) -> BaseHTTPResponse:
print(f"{Colors.BLUE}[respond]: {Colors.GREEN}{response=}{Colors.END}") print(f"{Colors.BLUE}[respond]: {Colors.GREEN}{response=}{Colors.END}")
if self.stage is not Stage.HANDLER:
self.stage = Stage.FAILED
raise RuntimeError("Response already started")
# Disconnect any earlier but unused response object
if self.response is not None:
self.response.stream = None
self.response, response.stream = response, self self.response, response.stream = response, self
return response return response
@ -128,7 +122,37 @@ class HTTPReceiver(Receiver):
print( print(
f"{Colors.BLUE}[send]: {Colors.GREEN}{data=} {end_stream=}{Colors.END}" f"{Colors.BLUE}[send]: {Colors.GREEN}{data=} {end_stream=}{Colors.END}"
) )
self.queue.put_nowait((data, end_stream)) self._send(data, end_stream)
def _send(self, data: bytes, end_stream: bool) -> None:
if not self.headers_sent:
self.send_headers()
if self.stage is not Stage.RESPONSE:
raise Exception(f"not ready to send: {self.stage}")
print(f"{data=}")
# Chunked
size = len(data)
if end_stream:
data = (
b"%x\r\n%b\r\n0\r\n\r\n" % (size, data)
if size
else b"0\r\n\r\n"
)
elif size:
data = b"%x\r\n%b\r\n" % (size, data)
print(f"{Colors.RED}TRANSMITTING{Colors.END}")
self.protocol.connection.send_data(
stream_id=self.request.stream_id,
data=data,
end_stream=end_stream,
)
self.transmit()
if end_stream:
self.stage = Stage.IDLE
class WebsocketReceiver(Receiver): class WebsocketReceiver(Receiver):
@ -170,6 +194,10 @@ class Http3:
if isinstance(event, HeadersReceived) and created_new: if isinstance(event, HeadersReceived) and created_new:
asyncio.ensure_future(receiver.run()) asyncio.ensure_future(receiver.run())
elif isinstance(event, DataReceived):
# event.stream_ended
# TEMP
receiver.request.body = event.data
else: else:
print(f"{Colors.RED}DOING NOTHING{Colors.END}") print(f"{Colors.RED}DOING NOTHING{Colors.END}")

View File

@ -59,10 +59,10 @@ LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov
class Colors(str, Enum): # no cov class Colors(str, Enum): # no cov
END = "\033[0m" END = "\033[0m"
BLUE = "\033[01;34m" RED = "\033[01;31m"
GREEN = "\033[01;32m" GREEN = "\033[01;32m"
YELLOW = "\033[01;33m" YELLOW = "\033[01;33m"
RED = "\033[01;34m" BLUE = "\033[01;34m"
PURPLE = "\033[01;35m" PURPLE = "\033[01;35m"

View File

@ -122,7 +122,7 @@ class BaseHTTPResponse:
:param data: str or bytes to be written :param data: str or bytes to be written
:param end_stream: whether to close the stream after this block :param end_stream: whether to close the stream after this block
""" """
print(f">>> BaseHTTPResponse: {data=} {end_stream=} {self.body=}") print(f">>> BaseHTTPResponse: {data=} {end_stream=}")
if data is None and end_stream is None: if data is None and end_stream is None:
end_stream = True end_stream = True
if self.stream is None: if self.stream is None: