7028eae083
* Streaming request by async for.
* Make all requests streaming and preload body for non-streaming handlers.
* Cleanup of code and avoid mixing streaming responses.
* Async http protocol loop.
* Change of test: don't require early bad request error but only after CRLF-CRLF.
* Add back streaming requests.
* Rewritten request body parser.
* Misc. cleanup, down to 4 failing tests.
* All tests OK.
* Entirely remove request body queue.
* Let black f*ckup the layout
* Better testing error messages on protocol errors.
* Remove StreamBuffer tests because the type is about to be removed.
* Remove tests using the deprecated get_headers function that can no longer be supported. Chunked mode is now autodetected, so do not put content-length header if chunked mode is preferred.
* Major refactoring of HTTP protocol handling (new module http.py added), all requests made streaming. A few compatibility issues and a lot of cleanup to be done remain, 16 tests failing.
* Terminate check_timeouts once connection_task finishes.
* Code cleanup, 14 tests failing.
* Much cleanup, 12 failing...
* Even more cleanup and error checking, 8 failing tests.
* Remove keep-alive header from responses. First of all, it should say timeout=<value> which wasn't the case with existing implementation, and secondly none of the other web servers I tried include this header.
* Everything but CustomServer OK.
* Linter
* Disable custom protocol test
* Remove unnecessary variables, optimise performance.
* A test was missing that body_init/body_push/body_finish are never called. Rewritten using receive_body and case switching to make it fail if bypassed.
* Minor fixes.
* Remove unused code.
* Py 3.8 check for deprecated loop argument.
* Fix a middleware cancellation handling test with py38.
* Linter 'n fixes
* Typing
* Stricter handling of request header size
* More specific error messages on Payload Too Large.
* Init http.response = None
* Messages further tuned.
* Always try to consume request body, plus minor cleanup.
* Add a missing check in case of close_if_idle on a dead connection.
* Avoid error messages on PayloadTooLarge.
* Add test for new API.
* json takes str, not bytes
* Default to no maximum request size for streaming handlers.
* Fix chunked mode crash.
* Header values should be strictly ASCII but both UTF-8 and Latin-1 exist. Use UTF-8B to
cope with all.
* Refactoring and cleanup.
* Unify response header processing of ASGI and asyncio modes.
* Avoid special handling of StreamingHTTPResponse.
* 35 % speedup in HTTP/1.1 response formatting (not so much overall effect).
* Duplicate set-cookie headers were being produced.
* Cleanup processed_headers some more.
* Linting
* Import ordering
* Response middleware ran by async request.respond().
* Need to check if transport is closing to avoid getting stuck in sending loops after peer has disconnected.
* Middleware and error handling refactoring.
* Linter
* Fix tracking of HTTP stage when writing to transport fails.
* Add clarifying comment
* Add a check for request body functions and a test for NotImplementedError.
* Linter and typing
* These must be tuples + hack mypy warnings away.
* New streaming test and minor fixes.
* Constant receive buffer size.
* 256 KiB send and receive buffers.
* Revert "256 KiB send and receive buffers."
This reverts commit abc1e3edb2
.
* app.handle_exception already sends the response.
* Improved handling of errors during request.
* An odd hack to avoid an httpx limitation that causes test failures.
* Limit request header size to 8 KiB at most.
* Remove unnecessary use of format string.
* Cleanup tests
* Remove artifact
* Fix type checking
* Mark test for skipping
* Cleanup some edge cases
* Add ignore_body flag to safe methods
* Add unit tests for timeout logic
* Add unit tests for timeout logic
* Fix Mock usage in timeout test
* Change logging test to only logger in handler
* Windows py3.8 logging issue with current testing client
* Add test_header_size_exceeded
* Resolve merge conflicts
* Add request middleware to hard exception handling
* Add request middleware to hard exception handling
* Request middleware on exception handlers
* Linting
* Cleanup deprecations
Co-authored-by: L. Kärkkäinen <tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <admhpkns@gmail.com>
296 lines
9.6 KiB
Python
296 lines
9.6 KiB
Python
import asyncio
|
|
import warnings
|
|
|
|
from inspect import isawaitable
|
|
from typing import Any, Awaitable, Callable, MutableMapping, Optional, Union
|
|
from urllib.parse import quote
|
|
|
|
import sanic.app # noqa
|
|
|
|
from sanic.compat import Header
|
|
from sanic.exceptions import InvalidUsage, ServerError
|
|
from sanic.request import Request
|
|
from sanic.server import ConnInfo
|
|
from sanic.websocket import WebSocketConnection
|
|
|
|
|
|
ASGIScope = MutableMapping[str, Any]
|
|
ASGIMessage = MutableMapping[str, Any]
|
|
ASGISend = Callable[[ASGIMessage], Awaitable[None]]
|
|
ASGIReceive = Callable[[], Awaitable[ASGIMessage]]
|
|
|
|
|
|
class MockProtocol:
|
|
def __init__(self, transport: "MockTransport", loop):
|
|
self.transport = transport
|
|
self._not_paused = asyncio.Event(loop=loop)
|
|
self._not_paused.set()
|
|
self._complete = asyncio.Event(loop=loop)
|
|
|
|
def pause_writing(self) -> None:
|
|
self._not_paused.clear()
|
|
|
|
def resume_writing(self) -> None:
|
|
self._not_paused.set()
|
|
|
|
async def complete(self) -> None:
|
|
self._not_paused.set()
|
|
await self.transport.send(
|
|
{"type": "http.response.body", "body": b"", "more_body": False}
|
|
)
|
|
|
|
@property
|
|
def is_complete(self) -> bool:
|
|
return self._complete.is_set()
|
|
|
|
async def push_data(self, data: bytes) -> None:
|
|
if not self.is_complete:
|
|
await self.transport.send(
|
|
{"type": "http.response.body", "body": data, "more_body": True}
|
|
)
|
|
|
|
async def drain(self) -> None:
|
|
await self._not_paused.wait()
|
|
|
|
|
|
class MockTransport:
|
|
_protocol: Optional[MockProtocol]
|
|
|
|
def __init__(
|
|
self, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
|
|
) -> None:
|
|
self.scope = scope
|
|
self._receive = receive
|
|
self._send = send
|
|
self._protocol = None
|
|
self.loop = None
|
|
|
|
def get_protocol(self) -> MockProtocol:
|
|
if not self._protocol:
|
|
self._protocol = MockProtocol(self, self.loop)
|
|
return self._protocol
|
|
|
|
def get_extra_info(self, info: str) -> Union[str, bool, None]:
|
|
if info == "peername":
|
|
return self.scope.get("client")
|
|
elif info == "sslcontext":
|
|
return self.scope.get("scheme") in ["https", "wss"]
|
|
return None
|
|
|
|
def get_websocket_connection(self) -> WebSocketConnection:
|
|
try:
|
|
return self._websocket_connection
|
|
except AttributeError:
|
|
raise InvalidUsage("Improper websocket connection.")
|
|
|
|
def create_websocket_connection(
|
|
self, send: ASGISend, receive: ASGIReceive
|
|
) -> WebSocketConnection:
|
|
self._websocket_connection = WebSocketConnection(
|
|
send, receive, self.scope.get("subprotocols", [])
|
|
)
|
|
return self._websocket_connection
|
|
|
|
def add_task(self) -> None:
|
|
raise NotImplementedError
|
|
|
|
async def send(self, data) -> None:
|
|
# TODO:
|
|
# - Validation on data and that it is formatted properly and is valid
|
|
await self._send(data)
|
|
|
|
async def receive(self) -> ASGIMessage:
|
|
return await self._receive()
|
|
|
|
|
|
class Lifespan:
|
|
def __init__(self, asgi_app: "ASGIApp") -> None:
|
|
self.asgi_app = asgi_app
|
|
|
|
if "before_server_start" in self.asgi_app.sanic_app.listeners:
|
|
warnings.warn(
|
|
'You have set a listener for "before_server_start" '
|
|
"in ASGI mode. "
|
|
"It will be executed as early as possible, but not before "
|
|
"the ASGI server is started."
|
|
)
|
|
if "after_server_stop" in self.asgi_app.sanic_app.listeners:
|
|
warnings.warn(
|
|
'You have set a listener for "after_server_stop" '
|
|
"in ASGI mode. "
|
|
"It will be executed as late as possible, but not after "
|
|
"the ASGI server is stopped."
|
|
)
|
|
|
|
async def startup(self) -> None:
|
|
"""
|
|
Gather the listeners to fire on server start.
|
|
Because we are using a third-party server and not Sanic server, we do
|
|
not have access to fire anything BEFORE the server starts.
|
|
Therefore, we fire before_server_start and after_server_start
|
|
in sequence since the ASGI lifespan protocol only supports a single
|
|
startup event.
|
|
"""
|
|
listeners = self.asgi_app.sanic_app.listeners.get(
|
|
"before_server_start", []
|
|
) + self.asgi_app.sanic_app.listeners.get("after_server_start", [])
|
|
|
|
for handler in listeners:
|
|
response = handler(
|
|
self.asgi_app.sanic_app, self.asgi_app.sanic_app.loop
|
|
)
|
|
if response and isawaitable(response):
|
|
await response
|
|
|
|
async def shutdown(self) -> None:
|
|
"""
|
|
Gather the listeners to fire on server stop.
|
|
Because we are using a third-party server and not Sanic server, we do
|
|
not have access to fire anything AFTER the server stops.
|
|
Therefore, we fire before_server_stop and after_server_stop
|
|
in sequence since the ASGI lifespan protocol only supports a single
|
|
shutdown event.
|
|
"""
|
|
listeners = self.asgi_app.sanic_app.listeners.get(
|
|
"before_server_stop", []
|
|
) + self.asgi_app.sanic_app.listeners.get("after_server_stop", [])
|
|
|
|
for handler in listeners:
|
|
response = handler(
|
|
self.asgi_app.sanic_app, self.asgi_app.sanic_app.loop
|
|
)
|
|
if response and isawaitable(response):
|
|
await response
|
|
|
|
async def __call__(
|
|
self, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
|
|
) -> None:
|
|
message = await receive()
|
|
if message["type"] == "lifespan.startup":
|
|
await self.startup()
|
|
await send({"type": "lifespan.startup.complete"})
|
|
|
|
message = await receive()
|
|
if message["type"] == "lifespan.shutdown":
|
|
await self.shutdown()
|
|
await send({"type": "lifespan.shutdown.complete"})
|
|
|
|
|
|
class ASGIApp:
|
|
sanic_app: "sanic.app.Sanic"
|
|
request: Request
|
|
transport: MockTransport
|
|
lifespan: Lifespan
|
|
ws: Optional[WebSocketConnection]
|
|
|
|
def __init__(self) -> None:
|
|
self.ws = None
|
|
|
|
@classmethod
|
|
async def create(
|
|
cls, sanic_app, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
|
|
) -> "ASGIApp":
|
|
instance = cls()
|
|
instance.sanic_app = sanic_app
|
|
instance.transport = MockTransport(scope, receive, send)
|
|
instance.transport.loop = sanic_app.loop
|
|
setattr(instance.transport, "add_task", sanic_app.loop.create_task)
|
|
|
|
headers = Header(
|
|
[
|
|
(key.decode("latin-1"), value.decode("latin-1"))
|
|
for key, value in scope.get("headers", [])
|
|
]
|
|
)
|
|
instance.lifespan = Lifespan(instance)
|
|
|
|
if scope["type"] == "lifespan":
|
|
await instance.lifespan(scope, receive, send)
|
|
else:
|
|
path = (
|
|
scope["path"][1:]
|
|
if scope["path"].startswith("/")
|
|
else scope["path"]
|
|
)
|
|
url = "/".join([scope.get("root_path", ""), quote(path)])
|
|
url_bytes = url.encode("latin-1")
|
|
url_bytes += b"?" + scope["query_string"]
|
|
|
|
if scope["type"] == "http":
|
|
version = scope["http_version"]
|
|
method = scope["method"]
|
|
elif scope["type"] == "websocket":
|
|
version = "1.1"
|
|
method = "GET"
|
|
|
|
instance.ws = instance.transport.create_websocket_connection(
|
|
send, receive
|
|
)
|
|
await instance.ws.accept()
|
|
else:
|
|
raise ServerError("Received unknown ASGI scope")
|
|
|
|
request_class = sanic_app.request_class or Request
|
|
instance.request = request_class(
|
|
url_bytes,
|
|
headers,
|
|
version,
|
|
method,
|
|
instance.transport,
|
|
sanic_app,
|
|
)
|
|
instance.request.stream = instance
|
|
instance.request_body = True
|
|
instance.request.conn_info = ConnInfo(instance.transport)
|
|
|
|
return instance
|
|
|
|
async def read(self) -> Optional[bytes]:
|
|
"""
|
|
Read and stream the body in chunks from an incoming ASGI message.
|
|
"""
|
|
message = await self.transport.receive()
|
|
if not message.get("more_body", False):
|
|
self.request_body = False
|
|
return None
|
|
return message.get("body", b"")
|
|
|
|
async def __aiter__(self):
|
|
while self.request_body:
|
|
data = await self.read()
|
|
if data:
|
|
yield data
|
|
|
|
def respond(self, response):
|
|
response.stream, self.response = self, response
|
|
return response
|
|
|
|
async def send(self, data, end_stream):
|
|
if self.response:
|
|
response, self.response = self.response, None
|
|
await self.transport.send(
|
|
{
|
|
"type": "http.response.start",
|
|
"status": response.status,
|
|
"headers": response.processed_headers,
|
|
}
|
|
)
|
|
response_body = getattr(response, "body", None)
|
|
if response_body:
|
|
data = response_body + data if data else response_body
|
|
await self.transport.send(
|
|
{
|
|
"type": "http.response.body",
|
|
"body": data.encode() if hasattr(data, "encode") else data,
|
|
"more_body": not end_stream,
|
|
}
|
|
)
|
|
|
|
_asgi_single_callable = True # We conform to ASGI 3.0 single-callable
|
|
|
|
async def __call__(self) -> None:
|
|
"""
|
|
Handle the incoming request.
|
|
"""
|
|
await self.sanic_app.handle_request(self.request)
|