Merge pull request #1475 from tomchristie/asgi-refactor-attempt

ASGI refactoring attempt
This commit is contained in:
7
2019-06-20 16:33:44 -07:00
committed by GitHub
23 changed files with 2068 additions and 61 deletions

View File

@@ -15,6 +15,7 @@ from typing import Any, Optional, Type, Union
from urllib.parse import urlencode, urlunparse
from sanic import reloader_helpers
from sanic.asgi import ASGIApp
from sanic.blueprint_group import BlueprintGroup
from sanic.config import BASE_LOGO, Config
from sanic.constants import HTTP_METHODS
@@ -25,7 +26,7 @@ from sanic.response import HTTPResponse, StreamingHTTPResponse
from sanic.router import Router
from sanic.server import HttpProtocol, Signal, serve, serve_multiple
from sanic.static import register as static_register
from sanic.testing import SanicTestClient
from sanic.testing import SanicASGITestClient, SanicTestClient
from sanic.views import CompositionView
from sanic.websocket import ConnectionClosed, WebSocketProtocol
@@ -53,6 +54,7 @@ class Sanic:
logging.config.dictConfig(log_config or LOGGING_CONFIG_DEFAULTS)
self.name = name
self.asgi = False
self.router = router or Router()
self.request_class = request_class
self.error_handler = error_handler or ErrorHandler()
@@ -80,7 +82,7 @@ class Sanic:
Only supported when using the `app.run` method.
"""
if not self.is_running:
if not self.is_running and self.asgi is False:
raise SanicException(
"Loop can only be retrieved after the app has started "
"running. Not supported with `create_server` function"
@@ -469,13 +471,23 @@ class Sanic:
getattr(handler, "__blueprintname__", "")
+ handler.__name__
)
try:
protocol = request.transport.get_protocol()
except AttributeError:
# On Python3.5 the Transport classes in asyncio do not
# have a get_protocol() method as in uvloop
protocol = request.transport._protocol
ws = await protocol.websocket_handshake(request, subprotocols)
pass
if self.asgi:
ws = request.transport.get_websocket_connection()
else:
try:
protocol = request.transport.get_protocol()
except AttributeError:
# On Python3.5 the Transport classes in asyncio do not
# have a get_protocol() method as in uvloop
protocol = request.transport._protocol
protocol.app = self
ws = await protocol.websocket_handshake(
request, subprotocols
)
# schedule the application handler
# its future is kept in self.websocket_tasks in case it
@@ -983,8 +995,16 @@ class Sanic:
raise CancelledError()
# pass the response to the correct callback
if isinstance(response, StreamingHTTPResponse):
await stream_callback(response)
if write_callback is None or isinstance(
response, StreamingHTTPResponse
):
if stream_callback:
await stream_callback(response)
else:
# Should only end here IF it is an ASGI websocket.
# TODO:
# - Add exception handling
pass
else:
write_callback(response)
@@ -996,6 +1016,10 @@ class Sanic:
def test_client(self):
return SanicTestClient(self)
@property
def asgi_client(self):
return SanicASGITestClient(self)
# -------------------------------------------------------------------- #
# Execution
# -------------------------------------------------------------------- #
@@ -1122,10 +1146,6 @@ class Sanic:
"""This kills the Sanic"""
get_event_loop().stop()
def __call__(self):
"""gunicorn compatibility"""
return self
async def create_server(
self,
host: Optional[str] = None,
@@ -1367,3 +1387,15 @@ class Sanic:
def _build_endpoint_name(self, *parts):
parts = [self.name, *parts]
return ".".join(parts)
# -------------------------------------------------------------------- #
# ASGI
# -------------------------------------------------------------------- #
async def __call__(self, scope, receive, send):
"""To be ASGI compliant, our instance must be a callable that accepts
three arguments: scope, receive, send. See the ASGI reference for more
details: https://asgi.readthedocs.io/en/latest/"""
self.asgi = True
asgi_app = await ASGIApp.create(self, scope, receive, send)
await asgi_app()

350
sanic/asgi.py Normal file
View File

@@ -0,0 +1,350 @@
import asyncio
import warnings
from http.cookies import SimpleCookie
from inspect import isawaitable
from typing import Any, Awaitable, Callable, MutableMapping, Union
from urllib.parse import quote
from multidict import CIMultiDict
from sanic.exceptions import InvalidUsage, ServerError
from sanic.log import logger
from sanic.request import Request
from sanic.response import HTTPResponse, StreamingHTTPResponse
from sanic.server import StreamBuffer
from sanic.websocket import WebSocketConnection
ASGIScope = MutableMapping[str, Any]
ASGIMessage = MutableMapping[str, Any]
ASGISend = Callable[[ASGIMessage], Awaitable[None]]
ASGIReceive = Callable[[], Awaitable[ASGIMessage]]
class MockProtocol:
def __init__(self, transport: "MockTransport", loop):
self.transport = transport
self._not_paused = asyncio.Event(loop=loop)
self._not_paused.set()
self._complete = asyncio.Event(loop=loop)
def pause_writing(self) -> None:
self._not_paused.clear()
def resume_writing(self) -> None:
self._not_paused.set()
async def complete(self) -> None:
self._not_paused.set()
await self.transport.send(
{"type": "http.response.body", "body": b"", "more_body": False}
)
@property
def is_complete(self) -> bool:
return self._complete.is_set()
async def push_data(self, data: bytes) -> None:
if not self.is_complete:
await self.transport.send(
{"type": "http.response.body", "body": data, "more_body": True}
)
async def drain(self) -> None:
await self._not_paused.wait()
class MockTransport:
def __init__(
self, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
) -> None:
self.scope = scope
self._receive = receive
self._send = send
self._protocol = None
self.loop = None
def get_protocol(self) -> MockProtocol:
if not self._protocol:
self._protocol = MockProtocol(self, self.loop)
return self._protocol
def get_extra_info(self, info: str) -> Union[str, bool]:
if info == "peername":
return self.scope.get("server")
elif info == "sslcontext":
return self.scope.get("scheme") in ["https", "wss"]
def get_websocket_connection(self) -> WebSocketConnection:
try:
return self._websocket_connection
except AttributeError:
raise InvalidUsage("Improper websocket connection.")
def create_websocket_connection(
self, send: ASGISend, receive: ASGIReceive
) -> WebSocketConnection:
self._websocket_connection = WebSocketConnection(send, receive)
return self._websocket_connection
def add_task(self) -> None: # noqa
raise NotImplementedError
async def send(self, data) -> None:
# TODO:
# - Validation on data and that it is formatted properly and is valid
await self._send(data)
async def receive(self) -> ASGIMessage:
return await self._receive()
class Lifespan:
def __init__(self, asgi_app: "ASGIApp") -> None:
self.asgi_app = asgi_app
if "before_server_start" in self.asgi_app.sanic_app.listeners:
warnings.warn(
'You have set a listener for "before_server_start" '
"in ASGI mode. "
"It will be executed as early as possible, but not before "
"the ASGI server is started."
)
if "after_server_stop" in self.asgi_app.sanic_app.listeners:
warnings.warn(
'You have set a listener for "after_server_stop" '
"in ASGI mode. "
"It will be executed as late as possible, but not after "
"the ASGI server is stopped."
)
# async def pre_startup(self) -> None:
# for handler in self.asgi_app.sanic_app.listeners[
# "before_server_start"
# ]:
# response = handler(
# self.asgi_app.sanic_app, self.asgi_app.sanic_app.loop
# )
# if isawaitable(response):
# await response
async def startup(self) -> None:
for handler in self.asgi_app.sanic_app.listeners[
"before_server_start"
]:
response = handler(
self.asgi_app.sanic_app, self.asgi_app.sanic_app.loop
)
if isawaitable(response):
await response
for handler in self.asgi_app.sanic_app.listeners["after_server_start"]:
response = handler(
self.asgi_app.sanic_app, self.asgi_app.sanic_app.loop
)
if isawaitable(response):
await response
async def shutdown(self) -> None:
for handler in self.asgi_app.sanic_app.listeners["before_server_stop"]:
response = handler(
self.asgi_app.sanic_app, self.asgi_app.sanic_app.loop
)
if isawaitable(response):
await response
for handler in self.asgi_app.sanic_app.listeners["after_server_stop"]:
response = handler(
self.asgi_app.sanic_app, self.asgi_app.sanic_app.loop
)
if isawaitable(response):
await response
async def __call__(
self, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
) -> None:
message = await receive()
if message["type"] == "lifespan.startup":
await self.startup()
await send({"type": "lifespan.startup.complete"})
message = await receive()
if message["type"] == "lifespan.shutdown":
await self.shutdown()
await send({"type": "lifespan.shutdown.complete"})
class ASGIApp:
def __init__(self) -> None:
self.ws = None
@classmethod
async def create(
cls, sanic_app, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
) -> "ASGIApp":
instance = cls()
instance.sanic_app = sanic_app
instance.transport = MockTransport(scope, receive, send)
instance.transport.add_task = sanic_app.loop.create_task
instance.transport.loop = sanic_app.loop
headers = CIMultiDict(
[
(key.decode("latin-1"), value.decode("latin-1"))
for key, value in scope.get("headers", [])
]
)
instance.do_stream = (
True if headers.get("expect") == "100-continue" else False
)
instance.lifespan = Lifespan(instance)
if scope["type"] == "lifespan":
await instance.lifespan(scope, receive, send)
else:
url_bytes = scope.get("root_path", "") + quote(scope["path"])
url_bytes = url_bytes.encode("latin-1")
url_bytes += b"?" + scope["query_string"]
if scope["type"] == "http":
version = scope["http_version"]
method = scope["method"]
elif scope["type"] == "websocket":
version = "1.1"
method = "GET"
instance.ws = instance.transport.create_websocket_connection(
send, receive
)
await instance.ws.accept()
else:
pass
# TODO:
# - close connection
instance.request = Request(
url_bytes,
headers,
version,
method,
instance.transport,
sanic_app,
)
if sanic_app.is_request_stream:
is_stream_handler = sanic_app.router.is_stream_handler(
instance.request
)
if is_stream_handler:
instance.request.stream = StreamBuffer(
sanic_app.config.REQUEST_BUFFER_QUEUE_SIZE
)
instance.do_stream = True
return instance
async def read_body(self) -> bytes:
"""
Read and return the entire body from an incoming ASGI message.
"""
body = b""
more_body = True
while more_body:
message = await self.transport.receive()
body += message.get("body", b"")
more_body = message.get("more_body", False)
return body
async def stream_body(self) -> None:
"""
Read and stream the body in chunks from an incoming ASGI message.
"""
more_body = True
while more_body:
message = await self.transport.receive()
chunk = message.get("body", b"")
await self.request.stream.put(chunk)
more_body = message.get("more_body", False)
await self.request.stream.put(None)
async def __call__(self) -> None:
"""
Handle the incoming request.
"""
if not self.do_stream:
self.request.body = await self.read_body()
else:
self.sanic_app.loop.create_task(self.stream_body())
handler = self.sanic_app.handle_request
callback = None if self.ws else self.stream_callback
await handler(self.request, None, callback)
async def stream_callback(self, response: HTTPResponse) -> None:
"""
Write the response.
"""
try:
headers = [
(str(name).encode("latin-1"), str(value).encode("latin-1"))
for name, value in response.headers.items()
]
except AttributeError:
logger.error(
"Invalid response object for url %s, "
"Expected Type: HTTPResponse, Actual Type: %s",
self.request.url,
type(response),
)
exception = ServerError("Invalid response type")
response = self.sanic_app.error_handler.response(
self.request, exception
)
headers = [
(str(name).encode("latin-1"), str(value).encode("latin-1"))
for name, value in response.headers.items()
if name not in (b"Set-Cookie",)
]
if "content-length" not in response.headers and not isinstance(
response, StreamingHTTPResponse
):
headers += [
(b"content-length", str(len(response.body)).encode("latin-1"))
]
if response.cookies:
cookies = SimpleCookie()
cookies.load(response.cookies)
headers += [
(b"set-cookie", cookie.encode("utf-8"))
for name, cookie in response.cookies.items()
]
await self.transport.send(
{
"type": "http.response.start",
"status": response.status,
"headers": headers,
}
)
if isinstance(response, StreamingHTTPResponse):
response.protocol = self.transport.get_protocol()
await response.stream()
await response.protocol.complete()
else:
await self.transport.send(
{
"type": "http.response.body",
"body": response.body,
"more_body": False,
}
)

View File

@@ -87,9 +87,9 @@ class StreamingHTTPResponse(BaseHTTPResponse):
data = self._encode_body(data)
if self.chunked:
self.protocol.push_data(b"%x\r\n%b\r\n" % (len(data), data))
await self.protocol.push_data(b"%x\r\n%b\r\n" % (len(data), data))
else:
self.protocol.push_data(data)
await self.protocol.push_data(data)
await self.protocol.drain()
async def stream(
@@ -105,11 +105,11 @@ class StreamingHTTPResponse(BaseHTTPResponse):
keep_alive=keep_alive,
keep_alive_timeout=keep_alive_timeout,
)
self.protocol.push_data(headers)
await self.protocol.push_data(headers)
await self.protocol.drain()
await self.streaming_fn(self)
if self.chunked:
self.protocol.push_data(b"0\r\n\r\n")
await self.protocol.push_data(b"0\r\n\r\n")
# no need to await drain here after this write, because it is the
# very last thing we write and nothing needs to wait for it.

View File

@@ -406,6 +406,7 @@ class Router:
if not self.hosts:
return self._get(request.path, request.method, "")
# virtual hosts specified; try to match route to the host header
try:
return self._get(
request.path, request.method, request.headers.get("Host", "")

View File

@@ -477,7 +477,7 @@ class HttpProtocol(asyncio.Protocol):
async def drain(self):
await self._not_paused.wait()
def push_data(self, data):
async def push_data(self, data):
self.transport.write(data)
async def stream_response(self, response):
@@ -728,6 +728,8 @@ def serve(
if debug:
loop.set_debug(debug)
app.asgi = False
connections = connections if connections is not None else set()
server = partial(
protocol,

View File

@@ -1,14 +1,22 @@
import asyncio
import types
import typing
from json import JSONDecodeError
from socket import socket
from urllib.parse import unquote, urlsplit
import httpcore
import requests_async as requests
import websockets
from sanic.asgi import ASGIApp
from sanic.exceptions import MethodNotSupported
from sanic.log import logger
from sanic.response import text
ASGI_HOST = "mockserver"
HOST = "127.0.0.1"
PORT = 42101
@@ -64,7 +72,7 @@ class SanicTestClient:
debug=False,
server_kwargs={"auto_reload": False},
*request_args,
**request_kwargs
**request_kwargs,
):
results = [None, None]
exceptions = []
@@ -128,7 +136,7 @@ class SanicTestClient:
try:
request, response = results
return request, response
except BaseException:
except BaseException: # noqa
raise ValueError(
"Request and response object expected, got ({})".format(
results
@@ -137,7 +145,7 @@ class SanicTestClient:
else:
try:
return results[-1]
except BaseException:
except BaseException: # noqa
raise ValueError(
"Request object expected, got ({})".format(results)
)
@@ -165,3 +173,251 @@ class SanicTestClient:
def websocket(self, *args, **kwargs):
return self._sanic_endpoint_test("websocket", *args, **kwargs)
class SanicASGIAdapter(requests.asgi.ASGIAdapter): # noqa
async def send( # type: ignore
self,
request: requests.PreparedRequest,
gather_return: bool = False,
*args: typing.Any,
**kwargs: typing.Any,
) -> requests.Response:
"""This method is taken MOSTLY verbatim from requests-asyn. The
difference is the capturing of a response on the ASGI call and then
returning it on the response object. This is implemented to achieve:
request, response = await app.asgi_client.get("/")
You can see the original code here:
https://github.com/encode/requests-async/blob/614f40f77f19e6c6da8a212ae799107b0384dbf9/requests_async/asgi.py#L51""" # noqa
scheme, netloc, path, query, fragment = urlsplit(
request.url
) # type: ignore
default_port = {"http": 80, "ws": 80, "https": 443, "wss": 443}[scheme]
if ":" in netloc:
host, port_string = netloc.split(":", 1)
port = int(port_string)
else:
host = netloc
port = default_port
# Include the 'host' header.
if "host" in request.headers:
headers = [] # type: typing.List[typing.Tuple[bytes, bytes]]
elif port == default_port:
headers = [(b"host", host.encode())]
else:
headers = [(b"host", (f"{host}:{port}").encode())]
# Include other request headers.
headers += [
(key.lower().encode(), value.encode())
for key, value in request.headers.items()
]
no_response = False
if scheme in {"ws", "wss"}:
subprotocol = request.headers.get("sec-websocket-protocol", None)
if subprotocol is None:
subprotocols = [] # type: typing.Sequence[str]
else:
subprotocols = [
value.strip() for value in subprotocol.split(",")
]
scope = {
"type": "websocket",
"path": unquote(path),
"root_path": "",
"scheme": scheme,
"query_string": query.encode(),
"headers": headers,
"client": ["testclient", 50000],
"server": [host, port],
"subprotocols": subprotocols,
}
no_response = True
else:
scope = {
"type": "http",
"http_version": "1.1",
"method": request.method,
"path": unquote(path),
"root_path": "",
"scheme": scheme,
"query_string": query.encode(),
"headers": headers,
"client": ["testclient", 50000],
"server": [host, port],
"extensions": {"http.response.template": {}},
}
async def receive():
nonlocal request_complete, response_complete
if request_complete:
while not response_complete:
await asyncio.sleep(0.0001)
return {"type": "http.disconnect"}
body = request.body
if isinstance(body, str):
body_bytes = body.encode("utf-8") # type: bytes
elif body is None:
body_bytes = b""
elif isinstance(body, types.GeneratorType):
try:
chunk = body.send(None)
if isinstance(chunk, str):
chunk = chunk.encode("utf-8")
return {
"type": "http.request",
"body": chunk,
"more_body": True,
}
except StopIteration:
request_complete = True
return {"type": "http.request", "body": b""}
else:
body_bytes = body
request_complete = True
return {"type": "http.request", "body": body_bytes}
async def send(message) -> None:
nonlocal raw_kwargs, response_started, response_complete, template, context # noqa
if message["type"] == "http.response.start":
assert (
not response_started
), 'Received multiple "http.response.start" messages.'
raw_kwargs["status_code"] = message["status"]
raw_kwargs["headers"] = message["headers"]
response_started = True
elif message["type"] == "http.response.body":
assert response_started, (
'Received "http.response.body" '
'without "http.response.start".'
)
assert (
not response_complete
), 'Received "http.response.body" after response completed.'
body = message.get("body", b"")
more_body = message.get("more_body", False)
if request.method != "HEAD":
raw_kwargs["content"] += body
if not more_body:
response_complete = True
elif message["type"] == "http.response.template":
template = message["template"]
context = message["context"]
request_complete = False
response_started = False
response_complete = False
raw_kwargs = {"content": b""} # type: typing.Dict[str, typing.Any]
template = None
context = None
return_value = None
try:
return_value = await self.app(scope, receive, send)
except BaseException as exc:
if not self.suppress_exceptions:
raise exc from None
if no_response:
response_started = True
raw_kwargs = {"status_code": 204, "headers": []}
if not self.suppress_exceptions:
assert response_started, "TestClient did not receive any response."
elif not response_started:
raw_kwargs = {"status_code": 500, "headers": []}
raw = httpcore.Response(**raw_kwargs)
response = self.build_response(request, raw)
if template is not None:
response.template = template
response.context = context
if gather_return:
response.return_value = return_value
return response
class TestASGIApp(ASGIApp):
async def __call__(self):
await super().__call__()
return self.request
async def app_call_with_return(self, scope, receive, send):
asgi_app = await TestASGIApp.create(self, scope, receive, send)
return await asgi_app()
class SanicASGITestClient(requests.ASGISession):
def __init__(
self,
app,
base_url: str = "http://{}".format(ASGI_HOST),
suppress_exceptions: bool = False,
) -> None:
app.__class__.__call__ = app_call_with_return
app.asgi = True
super().__init__(app)
adapter = SanicASGIAdapter(
app, suppress_exceptions=suppress_exceptions
)
self.mount("http://", adapter)
self.mount("https://", adapter)
self.mount("ws://", adapter)
self.mount("wss://", adapter)
self.headers.update({"user-agent": "testclient"})
self.app = app
self.base_url = base_url
async def request(self, method, url, gather_request=True, *args, **kwargs):
self.gather_request = gather_request
response = await super().request(method, url, *args, **kwargs)
response.status = response.status_code
response.body = response.content
response.content_type = response.headers.get("content-type")
if hasattr(response, "return_value"):
request = response.return_value
del response.return_value
return request, response
return response
def merge_environment_settings(self, *args, **kwargs):
settings = super().merge_environment_settings(*args, **kwargs)
settings.update({"gather_return": self.gather_request})
return settings
async def websocket(self, uri, subprotocols=None, *args, **kwargs):
if uri.startswith(("ws:", "wss:")):
url = uri
else:
uri = uri if uri.startswith("/") else "/{uri}".format(uri=uri)
url = "ws://testserver{uri}".format(uri=uri)
headers = kwargs.get("headers", {})
headers.setdefault("connection", "upgrade")
headers.setdefault("sec-websocket-key", "testserver==")
headers.setdefault("sec-websocket-version", "13")
if subprotocols is not None:
headers.setdefault(
"sec-websocket-protocol", ", ".join(subprotocols)
)
kwargs["headers"] = headers
return await self.request("websocket", url, **kwargs)

View File

@@ -1,3 +1,5 @@
from typing import Any, Awaitable, Callable, MutableMapping, Optional, Union
from httptools import HttpParserUpgrade
from websockets import ConnectionClosed # noqa
from websockets import InvalidHandshake, WebSocketCommonProtocol, handshake
@@ -6,6 +8,9 @@ from sanic.exceptions import InvalidUsage
from sanic.server import HttpProtocol
ASIMessage = MutableMapping[str, Any]
class WebSocketProtocol(HttpProtocol):
def __init__(
self,
@@ -19,6 +24,7 @@ class WebSocketProtocol(HttpProtocol):
):
super().__init__(*args, **kwargs)
self.websocket = None
# self.app = None
self.websocket_timeout = websocket_timeout
self.websocket_max_size = websocket_max_size
self.websocket_max_queue = websocket_max_queue
@@ -103,3 +109,45 @@ class WebSocketProtocol(HttpProtocol):
self.websocket.connection_made(request.transport)
self.websocket.connection_open()
return self.websocket
class WebSocketConnection:
# TODO
# - Implement ping/pong
def __init__(
self,
send: Callable[[ASIMessage], Awaitable[None]],
receive: Callable[[], Awaitable[ASIMessage]],
) -> None:
self._send = send
self._receive = receive
async def send(self, data: Union[str, bytes], *args, **kwargs) -> None:
message = {"type": "websocket.send"}
try:
data.decode()
except AttributeError:
message.update({"text": str(data)})
else:
message.update({"bytes": data})
await self._send(message)
async def recv(self, *args, **kwargs) -> Optional[str]:
message = await self._receive()
if message["type"] == "websocket.receive":
return message["text"]
elif message["type"] == "websocket.disconnect":
pass
receive = recv
async def accept(self) -> None:
await self._send({"type": "websocket.accept", "subprotocol": ""})
async def close(self) -> None:
pass