From 3ead529693d7279a8aeb6c263f8c5c2484f4bdb6 Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Mon, 27 May 2019 00:57:50 +0300 Subject: [PATCH] Setup streaming on ASGI --- examples/run_asgi.py | 28 +++++++++++++++++++--------- sanic/app.py | 4 ++-- sanic/asgi.py | 41 ++++++++++++++++++++++++++++++++++------- sanic/websocket.py | 2 +- 4 files changed, 56 insertions(+), 19 deletions(-) diff --git a/examples/run_asgi.py b/examples/run_asgi.py index 4e7e838c..81333383 100644 --- a/examples/run_asgi.py +++ b/examples/run_asgi.py @@ -6,22 +6,24 @@ $ hypercorn run_asgi:app """ -from sanic import Sanic -from sanic.response import text +import os +from sanic import Sanic, response app = Sanic(__name__) -@app.route("/") + +@app.route("/text") def handler(request): - return text("Hello") + return response.text("Hello") -@app.route("/foo") + +@app.route("/json") def handler_foo(request): - return text("bar") + return response.text("bar") -@app.websocket('/feed') +@app.websocket("/ws") async def feed(request, ws): name = "" while True: @@ -33,5 +35,13 @@ async def feed(request, ws): break -if __name__ == '__main__': - app.run(debug=True) +@app.route("/file") +async def test_file(request): + return await response.file(os.path.abspath("setup.py")) + + +@app.route("/file_stream") +async def test_file_stream(request): + return await response.file_stream( + os.path.abspath("setup.py"), chunk_size=1024 + ) diff --git a/sanic/app.py b/sanic/app.py index 5e1b87c3..5952aff6 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -82,7 +82,7 @@ class Sanic: Only supported when using the `app.run` method. """ - if not self.is_running: + if not self.is_running and self.asgi is False: raise SanicException( "Loop can only be retrieved after the app has started " "running. Not supported with `create_server` function" @@ -997,7 +997,7 @@ class Sanic: if stream_callback: await stream_callback(response) else: - # Should only end here IF it is an ASGI websocket. + # Should only end here IF it is an ASGI websocket. # TODO: # - Add exception handling pass diff --git a/sanic/asgi.py b/sanic/asgi.py index 6e9be0e7..fa853f5a 100644 --- a/sanic/asgi.py +++ b/sanic/asgi.py @@ -5,13 +5,14 @@ from multidict import CIMultiDict from sanic.request import Request from sanic.response import HTTPResponse, StreamingHTTPResponse from sanic.websocket import WebSocketConnection - +from sanic.server import StreamBuffer ASGIScope = MutableMapping[str, Any] ASGIMessage = MutableMapping[str, Any] ASGISend = Callable[[ASGIMessage], Awaitable[None]] ASGIReceive = Callable[[], Awaitable[ASGIMessage]] + class MockTransport: def __init__(self, scope: ASGIScope) -> None: self.scope = scope @@ -26,9 +27,7 @@ class MockTransport: return self._websocket_connection def create_websocket_connection( - self, - send: ASGISend, - receive: ASGIReceive, + self, send: ASGISend, receive: ASGIReceive ) -> WebSocketConnection: self._websocket_connection = WebSocketConnection(send, receive) return self._websocket_connection @@ -39,7 +38,9 @@ class ASGIApp: self.ws = None @classmethod - async def create(cls, sanic_app, scope: ASGIScope, receive: ASGIReceive, send: ASGISend) -> "ASGIApp": + async def create( + cls, sanic_app, scope: ASGIScope, receive: ASGIReceive, send: ASGISend + ) -> "ASGIApp": instance = cls() instance.sanic_app = sanic_app instance.receive = receive @@ -55,6 +56,10 @@ class ASGIApp: ] ) + instance.do_stream = ( + True if headers.get("expect") == "100-continue" else False + ) + transport = MockTransport(scope) if scope["type"] == "http": @@ -75,6 +80,9 @@ class ASGIApp: url_bytes, headers, version, method, transport, sanic_app ) + if sanic_app.is_request_stream: + instance.request.stream = StreamBuffer() + return instance async def read_body(self) -> bytes: @@ -83,7 +91,6 @@ class ASGIApp: """ body = b"" more_body = True - while more_body: message = await self.receive() body += message.get("body", b"") @@ -91,11 +98,31 @@ class ASGIApp: return body + async def stream_body(self) -> None: + """ + Read and stream the body in chunks from an incoming ASGI message. + """ + more_body = True + + while more_body: + message = await self.receive() + chunk = message.get("body", b"") + await self.request.stream.put(chunk) + # self.sanic_app.loop.create_task(self.request.stream.put(chunk)) + + more_body = message.get("more_body", False) + + await self.request.stream.put(None) + async def __call__(self) -> None: """ Handle the incoming request. """ - self.request.body = await self.read_body() + if not self.do_stream: + self.request.body = await self.read_body() + else: + self.sanic_app.loop.create_task(self.stream_body()) + handler = self.sanic_app.handle_request callback = None if self.ws else self.stream_callback await handler(self.request, None, callback) diff --git a/sanic/websocket.py b/sanic/websocket.py index e4c693ff..ff321284 100644 --- a/sanic/websocket.py +++ b/sanic/websocket.py @@ -24,7 +24,7 @@ class WebSocketProtocol(HttpProtocol): ): super().__init__(*args, **kwargs) self.websocket = None - self.app = None + # self.app = None self.websocket_timeout = websocket_timeout self.websocket_max_size = websocket_max_size self.websocket_max_queue = websocket_max_queue