From 9f2ba26e9d42ff5a4072fe4d56208ab86b13e895 Mon Sep 17 00:00:00 2001 From: 38elements Date: Fri, 5 May 2017 20:09:32 +0900 Subject: [PATCH] Add Request.stream --- docs/sanic/streaming.md | 29 ++++++++++++++++++++++++++++- examples/request_stream/client.py | 10 ++++++++++ examples/request_stream/server.py | 21 +++++++++++++++++++++ sanic/app.py | 4 +++- sanic/request.py | 3 ++- sanic/server.py | 31 ++++++++++++++++++++++++++----- tests/test_request_stream.py | 26 ++++++++++++++++++++++++++ 7 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 examples/request_stream/client.py create mode 100644 examples/request_stream/server.py create mode 100644 tests/test_request_stream.py diff --git a/docs/sanic/streaming.md b/docs/sanic/streaming.md index c13fffb3..4a40dc26 100644 --- a/docs/sanic/streaming.md +++ b/docs/sanic/streaming.md @@ -29,4 +29,31 @@ async def index(request): response.write(record[0]) return stream(stream_from_db) -``` \ No newline at end of file +``` + +## Request Streaming + +Sanic allows you to get request data by stream, as below. It is necessary to set `is_request_stream` to True. When the request ends, `request.stream.get()` returns `None`. It is not able to use common request and request stream in same application. + +``` +from sanic import Sanic +from sanic.response import stream + +app = Sanic(__name__, is_request_stream=True) + + +@app.post('/stream') +async def handler(request): + async def sample_streaming_fn(response): + while True: + body = await request.stream.get() + if body is None: + break + body = body.decode('utf-8').replace('1', 'A') + response.write(body) + return stream(sample_streaming_fn) + + +if __name__ == '__main__': + app.run(host='127.0.0.1', port=8000) +``` diff --git a/examples/request_stream/client.py b/examples/request_stream/client.py new file mode 100644 index 00000000..30fcff06 --- /dev/null +++ b/examples/request_stream/client.py @@ -0,0 +1,10 @@ +import requests + +# Warning: This is a heavy process. + +data = "" +for i in range(1, 250000): + data += str(i) + +r = requests.post('http://127.0.0.1:8000/stream', data=data) +print(r.text) diff --git a/examples/request_stream/server.py b/examples/request_stream/server.py new file mode 100644 index 00000000..4c57557c --- /dev/null +++ b/examples/request_stream/server.py @@ -0,0 +1,21 @@ +from sanic import Sanic +from sanic.response import stream + +app = Sanic(__name__, is_request_stream=True) + + +@app.post('/stream') +async def handler(request): + async def sample_streaming_fn(response): + while True: + body = await request.stream.get() + if body is None: + break + print('Hello!') + body = body.decode('utf-8').replace('1', 'A') + response.write(body) + return stream(sample_streaming_fn) + + +if __name__ == '__main__': + app.run(host='127.0.0.1', port=8000) diff --git a/sanic/app.py b/sanic/app.py index 8d846c3e..b77f9b96 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -27,7 +27,7 @@ from sanic.websocket import WebSocketProtocol, ConnectionClosed class Sanic: def __init__(self, name=None, router=None, error_handler=None, - load_env=True, request_class=None, + load_env=True, request_class=None, is_request_stream=False, log_config=LOGGING): if log_config: logging.config.dictConfig(log_config) @@ -60,6 +60,7 @@ class Sanic: self.sock = None self.listeners = defaultdict(list) self.is_running = False + self.is_request_stream = is_request_stream self.websocket_enabled = False self.websocket_tasks = [] @@ -651,6 +652,7 @@ class Sanic: server_settings = { 'protocol': protocol, 'request_class': self.request_class, + 'is_request_stream': self.is_request_stream, 'host': host, 'port': port, 'sock': sock, diff --git a/sanic/request.py b/sanic/request.py index 57b4773c..7aaed247 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -38,7 +38,7 @@ class Request(dict): __slots__ = ( 'app', 'headers', 'version', 'method', '_cookies', 'transport', 'body', 'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files', - '_ip', '_parsed_url', 'uri_template' + '_ip', '_parsed_url', 'uri_template', 'stream' ) def __init__(self, url_bytes, headers, version, method, transport): @@ -59,6 +59,7 @@ class Request(dict): self.parsed_args = None self.uri_template = None self._cookies = None + self.stream = None @property def json(self): diff --git a/sanic/server.py b/sanic/server.py index 96b8e91c..b5fcc5e2 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -64,7 +64,7 @@ class HttpProtocol(asyncio.Protocol): 'parser', 'request', 'url', 'headers', # request config 'request_handler', 'request_timeout', 'request_max_size', - 'request_class', + 'request_class', 'is_request_stream', # enable or disable access log / error log purpose 'has_log', # connection management @@ -73,7 +73,7 @@ class HttpProtocol(asyncio.Protocol): def __init__(self, *, loop, request_handler, error_handler, signal=Signal(), connections=set(), request_timeout=60, request_max_size=None, request_class=None, has_log=True, - keep_alive=True): + keep_alive=True, is_request_stream=False): self.loop = loop self.transport = None self.request = None @@ -88,10 +88,12 @@ class HttpProtocol(asyncio.Protocol): self.request_timeout = request_timeout self.request_max_size = request_max_size self.request_class = request_class or Request + self.is_request_stream = is_request_stream self._total_request_size = 0 self._timeout_handler = None self._last_request_time = None self._request_handler_task = None + self._request_stream_task = None self._keep_alive = keep_alive @property @@ -123,6 +125,8 @@ class HttpProtocol(asyncio.Protocol): self._timeout_handler = ( self.loop.call_later(time_left, self.connection_timeout)) else: + if self._request_stream_task: + self._request_stream_task.cancel() if self._request_handler_task: self._request_handler_task.cancel() exception = RequestTimeout('Request Timeout') @@ -171,13 +175,26 @@ class HttpProtocol(asyncio.Protocol): method=self.parser.get_method().decode(), transport=self.transport ) + if self.is_request_stream: + self.request.stream = asyncio.Queue() + self.execute_request_handler() def on_body(self, body): - self.request.body.append(body) + if self.is_request_stream: + self._request_stream_task = self.loop.create_task( + self.request.stream.put(body)) + else: + self.request.body.append(body) def on_message_complete(self): - self.request.body = b''.join(self.request.body) + if self.is_request_stream: + self._request_stream_task = self.loop.create_task( + self.request.stream.put(None)) + else: + self.request.body = b''.join(self.request.body) + self.execute_request_handler() + def execute_request_handler(self): self._request_handler_task = self.loop.create_task( self.request_handler( self.request, @@ -317,6 +334,7 @@ class HttpProtocol(asyncio.Protocol): self.url = None self.headers = None self._request_handler_task = None + self._request_stream_task = None self._total_request_size = 0 def close_if_idle(self): @@ -359,7 +377,8 @@ def serve(host, port, request_handler, error_handler, before_start=None, request_timeout=60, ssl=None, sock=None, request_max_size=None, reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100, register_sys_signals=True, run_async=False, connections=None, - signal=Signal(), request_class=None, has_log=True, keep_alive=True): + signal=Signal(), request_class=None, has_log=True, keep_alive=True, + is_request_stream=False): """Start asynchronous HTTP Server on an individual process. :param host: Address to host on @@ -386,6 +405,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, :param protocol: subclass of asyncio protocol class :param request_class: Request class to use :param has_log: disable/enable access log and error log + :param is_request_stream: disable/enable Request.stream :return: Nothing """ if not run_async: @@ -410,6 +430,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, request_class=request_class, has_log=has_log, keep_alive=keep_alive, + is_request_stream=is_request_stream, ) server_coroutine = loop.create_server( diff --git a/tests/test_request_stream.py b/tests/test_request_stream.py new file mode 100644 index 00000000..794864e0 --- /dev/null +++ b/tests/test_request_stream.py @@ -0,0 +1,26 @@ +from sanic import Sanic +from sanic.response import stream + +app = Sanic('test_request_stream', is_request_stream=True) + + +@app.post('/stream') +async def handler(request): + async def sample_streaming_fn(response): + while True: + body = await request.stream.get() + if body is None: + break + body = body.decode('utf-8').replace('1', 'A') + response.write(body) + return stream(sample_streaming_fn) + + +def test_request_stream(): + data = "" + for i in range(1, 250000): + data += str(i) + request, response = app.test_client.post('/stream', data=data) + text = data.replace('1', 'A') + assert response.status == 200 + assert response.text == text