diff --git a/docs/sanic/streaming.md b/docs/sanic/streaming.md index c13fffb3..4614aa46 100644 --- a/docs/sanic/streaming.md +++ b/docs/sanic/streaming.md @@ -29,4 +29,51 @@ async def index(request): response.write(record[0]) return stream(stream_from_db) -``` \ No newline at end of file +``` + +## Request Streaming + +Sanic allows you to get request data by stream, as below. When the request ends, `request.stream.get()` returns `None`. + +``` +from sanic import Sanic +from sanic.blueprints import Blueprint +from sanic.response import stream, text + +bp = Blueprint('blueprint_request_stream') +app = Sanic('request_stream', is_request_stream=True) + + +@app.stream('/stream') +async def handler(request): + async def streaming(response): + while True: + body = await request.stream.get() + if body is None: + break + body = body.decode('utf-8').replace('1', 'A') + response.write(body) + return stream(streaming) + + +@app.get('/get') +async def get(request): + return text('OK') + + +@bp.stream('/bp_stream') +async def bp_handler(request): + result = '' + while True: + body = await request.stream.get() + if body is None: + break + result += body.decode('utf-8').replace('1', 'A') + return text(result) + +app.blueprint(bp) + + +if __name__ == '__main__': + app.run(host='127.0.0.1', port=8000) +``` diff --git a/examples/request_stream/client.py b/examples/request_stream/client.py new file mode 100644 index 00000000..58535b7b --- /dev/null +++ b/examples/request_stream/client.py @@ -0,0 +1,10 @@ +import requests + +# Warning: This is a heavy process. + +data = "" +for i in range(1, 250000): + data += str(i) + +r = requests.post('http://127.0.0.1:8000/bp_stream', data=data) +print(r.text) diff --git a/examples/request_stream/server.py b/examples/request_stream/server.py new file mode 100644 index 00000000..e6cf7977 --- /dev/null +++ b/examples/request_stream/server.py @@ -0,0 +1,40 @@ +from sanic import Sanic +from sanic.blueprints import Blueprint +from sanic.response import stream, text + +bp = Blueprint('blueprint_request_stream') +app = Sanic('request_stream') + + +@app.stream('/stream') +async def handler(request): + async def streaming(response): + while True: + body = await request.stream.get() + if body is None: + break + body = body.decode('utf-8').replace('1', 'A') + response.write(body) + return stream(streaming) + + +@app.get('/get') +async def get(request): + return text('OK') + + +@bp.stream('/bp_stream') +async def bp_handler(request): + result = '' + while True: + body = await request.stream.get() + if body is None: + break + result += body.decode('utf-8').replace('1', 'A') + return text(result) + +app.blueprint(bp) + + +if __name__ == '__main__': + app.run(host='127.0.0.1', port=8000) diff --git a/sanic/app.py b/sanic/app.py index 8d846c3e..4ae186ed 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -60,6 +60,7 @@ class Sanic: self.sock = None self.listeners = defaultdict(list) self.is_running = False + self.is_request_stream = False self.websocket_enabled = False self.websocket_tasks = [] @@ -110,12 +111,14 @@ class Sanic: # Decorator def route(self, uri, methods=frozenset({'GET'}), host=None, - strict_slashes=False): + strict_slashes=False, stream=False): """Decorate a function to be registered as a route :param uri: path of the URL :param methods: list or tuple of methods allowed :param host: + :param strict_slashes: + :param stream: :return: decorated function """ @@ -124,7 +127,11 @@ class Sanic: if not uri.startswith('/'): uri = '/' + uri + if stream: + self.is_request_stream = True + def response(handler): + handler.is_stream = stream self.router.add(uri=uri, methods=methods, handler=handler, host=host, strict_slashes=strict_slashes) return handler @@ -160,6 +167,13 @@ class Sanic: return self.route(uri, methods=frozenset({"DELETE"}), host=host, strict_slashes=strict_slashes) + def stream( + self, uri, methods=frozenset({"POST"}), host=None, + strict_slashes=False): + return self.route(uri, methods=methods, host=host, + strict_slashes=strict_slashes, + stream=True) + def add_route(self, handler, uri, methods=frozenset({'GET'}), host=None, strict_slashes=False): """A helper method to register class instance or @@ -651,6 +665,8 @@ class Sanic: server_settings = { 'protocol': protocol, 'request_class': self.request_class, + 'is_request_stream': self.is_request_stream, + 'router': self.router, 'host': host, 'port': port, 'sock': sock, diff --git a/sanic/blueprints.py b/sanic/blueprints.py index 7e9953e0..fe1bbf90 100644 --- a/sanic/blueprints.py +++ b/sanic/blueprints.py @@ -5,7 +5,7 @@ from sanic.views import CompositionView FutureRoute = namedtuple('Route', ['handler', 'uri', 'methods', - 'host', 'strict_slashes']) + 'host', 'strict_slashes', 'stream']) FutureListener = namedtuple('Listener', ['handler', 'uri', 'methods', 'host']) FutureMiddleware = namedtuple('Route', ['middleware', 'args', 'kwargs']) FutureException = namedtuple('Route', ['handler', 'args', 'kwargs']) @@ -47,7 +47,8 @@ class Blueprint: uri=uri[1:] if uri.startswith('//') else uri, methods=future.methods, host=future.host or self.host, - strict_slashes=future.strict_slashes + strict_slashes=future.strict_slashes, + stream=future.stream )(future.handler) for future in self.websocket_routes: @@ -87,14 +88,15 @@ class Blueprint: app.listener(event)(listener) def route(self, uri, methods=frozenset({'GET'}), host=None, - strict_slashes=False): + strict_slashes=False, stream=False): """Create a blueprint route from a decorated function. :param uri: endpoint at which the route will be accessible. :param methods: list of acceptable HTTP methods. """ def decorator(handler): - route = FutureRoute(handler, uri, methods, host, strict_slashes) + route = FutureRoute( + handler, uri, methods, host, strict_slashes, stream) self.routes.append(route) return handler return decorator @@ -131,7 +133,7 @@ class Blueprint: :param uri: endpoint at which the route will be accessible. """ def decorator(handler): - route = FutureRoute(handler, uri, [], host, strict_slashes) + route = FutureRoute(handler, uri, [], host, strict_slashes, False) self.websocket_routes.append(route) return handler return decorator @@ -217,3 +219,7 @@ class Blueprint: def delete(self, uri, host=None, strict_slashes=False): return self.route(uri, methods=["DELETE"], host=host, strict_slashes=strict_slashes) + + def stream(self, uri, methods=["POST"], host=None, strict_slashes=False): + return self.route(uri, methods=methods, host=host, + strict_slashes=strict_slashes, stream=True) diff --git a/sanic/request.py b/sanic/request.py index 57b4773c..7aaed247 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -38,7 +38,7 @@ class Request(dict): __slots__ = ( 'app', 'headers', 'version', 'method', '_cookies', 'transport', 'body', 'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files', - '_ip', '_parsed_url', 'uri_template' + '_ip', '_parsed_url', 'uri_template', 'stream' ) def __init__(self, url_bytes, headers, version, method, transport): @@ -59,6 +59,7 @@ class Request(dict): self.parsed_args = None self.uri_template = None self._cookies = None + self.stream = None @property def json(self): diff --git a/sanic/router.py b/sanic/router.py index e9531be7..2d9ae616 100644 --- a/sanic/router.py +++ b/sanic/router.py @@ -345,3 +345,11 @@ class Router: if hasattr(route_handler, 'handlers'): route_handler = route_handler.handlers[method] return route_handler, [], kwargs, route.uri + + def is_stream_handler(self, request): + """ Handler for request is stream or not. + :param request: Request object + :return: bool + """ + handler = self.get(request)[0] + return handler.is_stream diff --git a/sanic/server.py b/sanic/server.py index 96b8e91c..072f23a7 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -64,22 +64,24 @@ class HttpProtocol(asyncio.Protocol): 'parser', 'request', 'url', 'headers', # request config 'request_handler', 'request_timeout', 'request_max_size', - 'request_class', + 'request_class', 'is_request_stream', 'router', # enable or disable access log / error log purpose 'has_log', # connection management - '_total_request_size', '_timeout_handler', '_last_communication_time') + '_total_request_size', '_timeout_handler', '_last_communication_time', + '_is_stream_handler') def __init__(self, *, loop, request_handler, error_handler, signal=Signal(), connections=set(), request_timeout=60, request_max_size=None, request_class=None, has_log=True, - keep_alive=True): + keep_alive=True, is_request_stream=False, router=None): self.loop = loop self.transport = None self.request = None self.parser = None self.url = None self.headers = None + self.router = router self.signal = signal self.has_log = has_log self.connections = connections @@ -88,10 +90,13 @@ class HttpProtocol(asyncio.Protocol): self.request_timeout = request_timeout self.request_max_size = request_max_size self.request_class = request_class or Request + self.is_request_stream = is_request_stream + self._is_stream_handler = False self._total_request_size = 0 self._timeout_handler = None self._last_request_time = None self._request_handler_task = None + self._request_stream_task = None self._keep_alive = keep_alive @property @@ -123,6 +128,8 @@ class HttpProtocol(asyncio.Protocol): self._timeout_handler = ( self.loop.call_later(time_left, self.connection_timeout)) else: + if self._request_stream_task: + self._request_stream_task.cancel() if self._request_handler_task: self._request_handler_task.cancel() exception = RequestTimeout('Request Timeout') @@ -171,13 +178,29 @@ class HttpProtocol(asyncio.Protocol): method=self.parser.get_method().decode(), transport=self.transport ) + if self.is_request_stream: + self._is_stream_handler = self.router.is_stream_handler( + self.request) + if self._is_stream_handler: + self.request.stream = asyncio.Queue() + self.execute_request_handler() def on_body(self, body): + if self.is_request_stream and self._is_stream_handler: + self._request_stream_task = self.loop.create_task( + self.request.stream.put(body)) + return self.request.body.append(body) def on_message_complete(self): + if self.is_request_stream and self._is_stream_handler: + self._request_stream_task = self.loop.create_task( + self.request.stream.put(None)) + return self.request.body = b''.join(self.request.body) + self.execute_request_handler() + def execute_request_handler(self): self._request_handler_task = self.loop.create_task( self.request_handler( self.request, @@ -317,7 +340,9 @@ class HttpProtocol(asyncio.Protocol): self.url = None self.headers = None self._request_handler_task = None + self._request_stream_task = None self._total_request_size = 0 + self._is_stream_handler = False def close_if_idle(self): """Close the connection if a request is not being sent or received @@ -359,7 +384,8 @@ def serve(host, port, request_handler, error_handler, before_start=None, request_timeout=60, ssl=None, sock=None, request_max_size=None, reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100, register_sys_signals=True, run_async=False, connections=None, - signal=Signal(), request_class=None, has_log=True, keep_alive=True): + signal=Signal(), request_class=None, has_log=True, keep_alive=True, + is_request_stream=False, router=None): """Start asynchronous HTTP Server on an individual process. :param host: Address to host on @@ -386,6 +412,8 @@ def serve(host, port, request_handler, error_handler, before_start=None, :param protocol: subclass of asyncio protocol class :param request_class: Request class to use :param has_log: disable/enable access log and error log + :param is_request_stream: disable/enable Request.stream + :param router: Router object :return: Nothing """ if not run_async: @@ -410,6 +438,8 @@ def serve(host, port, request_handler, error_handler, before_start=None, request_class=request_class, has_log=has_log, keep_alive=keep_alive, + is_request_stream=is_request_stream, + router=router, ) server_coroutine = loop.create_server( diff --git a/tests/test_request_stream.py b/tests/test_request_stream.py new file mode 100644 index 00000000..652299bc --- /dev/null +++ b/tests/test_request_stream.py @@ -0,0 +1,50 @@ +from sanic import Sanic +from sanic.blueprints import Blueprint +from sanic.response import stream, text + +bp = Blueprint('test_blueprint_request_stream') +app = Sanic('test_request_stream') + + +@app.stream('/stream') +async def handler(request): + async def streaming(response): + while True: + body = await request.stream.get() + if body is None: + break + response.write(body.decode('utf-8')) + return stream(streaming) + + +@app.get('/get') +async def get(request): + return text('OK') + + +@bp.stream('/bp_stream') +async def bp_handler(request): + result = '' + while True: + body = await request.stream.get() + if body is None: + break + result += body.decode('utf-8') + return text(result) + +app.blueprint(bp) + + +def test_request_stream(): + data = "abc" * 100000 + request, response = app.test_client.post('/stream', data=data) + assert response.status == 200 + assert response.text == data + + request, response = app.test_client.get('/get') + assert response.status == 200 + assert response.text == 'OK' + + request, response = app.test_client.post('/bp_stream', data=data) + assert response.status == 200 + assert response.text == data