From 181adebf822e4d3be9905e70475cd9c503867dea Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Mon, 3 Dec 2018 22:19:26 -0800 Subject: [PATCH 1/7] add StreamBuffer for request flow control --- sanic/request.py | 24 ++++++++++++++++++++++++ sanic/server.py | 21 ++++++++++++++++----- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/sanic/request.py b/sanic/request.py index e775596a..8ba25236 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -1,5 +1,6 @@ import json import sys +import asyncio from cgi import parse_header from collections import namedtuple @@ -47,6 +48,25 @@ class RequestParameters(dict): return super().get(name, default) +class StreamBuffer: + + def __init__(self, buffer_size=None): + self._buffer_size = buffer_size or 100 + self._queue = asyncio.Queue() + + async def read(self): + """ Stop reading when gets None """ + payload = await self._queue.get() + self._queue.task_done() + return payload + + async def put(self, payload): + await self._queue.put(payload) + + def is_full(self): + return self._queue.full() + + class Request(dict): """Properties of an HTTP request such as URL, headers, etc.""" @@ -254,6 +274,10 @@ class Request(dict): self._remote_addr = "" return self._remote_addr + @property + def stream(self): + return self.stream + @property def scheme(self): if ( diff --git a/sanic/server.py b/sanic/server.py index dc197941..9e1bdd12 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -22,7 +22,7 @@ from sanic.exceptions import ( ServiceUnavailable, ) from sanic.log import access_logger, logger -from sanic.request import Request +from sanic.request import Request, StreamBuffer from sanic.response import HTTPResponse @@ -59,6 +59,7 @@ class HttpProtocol(asyncio.Protocol): "response_timeout", "keep_alive_timeout", "request_max_size", + "request_body_buffer_queue_size", "request_class", "is_request_stream", "router", @@ -82,8 +83,9 @@ class HttpProtocol(asyncio.Protocol): request_handler, error_handler, signal=Signal(), - connections=set(), + connections=None, request_timeout=60, + request_body_buffer_queue_size=100, response_timeout=60, keep_alive_timeout=5, request_max_size=None, @@ -105,10 +107,11 @@ class HttpProtocol(asyncio.Protocol): self.router = router self.signal = signal self.access_log = access_log - self.connections = connections + self.connections = connections or set() self.request_handler = request_handler self.error_handler = error_handler self.request_timeout = request_timeout + self.request_body_buffer_queue_size = request_body_buffer_queue_size self.response_timeout = response_timeout self.keep_alive_timeout = keep_alive_timeout self.request_max_size = request_max_size @@ -291,17 +294,25 @@ class HttpProtocol(asyncio.Protocol): self.request ) if self._is_stream_handler: - self.request.stream = asyncio.Queue() + self.request.stream = StreamBuffer(self.request_body_buffer_queue_size) self.execute_request_handler() def on_body(self, body): if self.is_request_stream and self._is_stream_handler: self._request_stream_task = self.loop.create_task( - self.request.stream.put(body) + self.body_append(body) ) return self.request.body_push(body) + async def body_append(self, body): + if self.request.stream.is_full(): + self.transport.pause_reading() + await self.request.stream.put(body) + self.transport.resume_reading() + else: + await self.request.stream.put(body) + def on_message_complete(self): # Entire request (headers and whole body) is received. # We can cancel and remove the request timeout handler now. From 268d254d85fe40f223ac6b00dd42debd75933dfc Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Mon, 3 Dec 2018 22:28:22 -0800 Subject: [PATCH 2/7] fix unit tests --- sanic/request.py | 4 --- tests/test_request_stream.py | 54 +++++++++++++++++++----------------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/sanic/request.py b/sanic/request.py index 8ba25236..51d872e1 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -274,10 +274,6 @@ class Request(dict): self._remote_addr = "" return self._remote_addr - @property - def stream(self): - return self.stream - @property def scheme(self): if ( diff --git a/tests/test_request_stream.py b/tests/test_request_stream.py index f46e235b..ae50a9f1 100644 --- a/tests/test_request_stream.py +++ b/tests/test_request_stream.py @@ -4,6 +4,8 @@ from sanic.views import CompositionView from sanic.views import HTTPMethodView from sanic.views import stream as stream_decorator from sanic.response import stream, text +from sanic.request import StreamBuffer + data = "abc" * 100000 @@ -19,10 +21,10 @@ def test_request_stream_method_view(app): @stream_decorator async def post(self, request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) result = '' while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break result += body.decode('utf-8') @@ -71,11 +73,11 @@ def test_request_stream_app(app): @app.post('/post/', stream=True) async def post(request, id): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break await response.write(body.decode('utf-8')) @@ -88,11 +90,11 @@ def test_request_stream_app(app): @app.put('/put', stream=True) async def put(request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break await response.write(body.decode('utf-8')) @@ -105,11 +107,11 @@ def test_request_stream_app(app): @app.patch('/patch', stream=True) async def patch(request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break await response.write(body.decode('utf-8')) @@ -163,11 +165,11 @@ def test_request_stream_handle_exception(app): @app.post('/post/', stream=True) async def post(request, id): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break await response.write(body.decode('utf-8')) @@ -216,11 +218,11 @@ def test_request_stream_blueprint(app): @bp.post('/post/', stream=True) async def post(request, id): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break await response.write(body.decode('utf-8')) @@ -233,11 +235,11 @@ def test_request_stream_blueprint(app): @bp.put('/put', stream=True) async def put(request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break await response.write(body.decode('utf-8')) @@ -250,11 +252,11 @@ def test_request_stream_blueprint(app): @bp.patch('/patch', stream=True) async def patch(request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break await response.write(body.decode('utf-8')) @@ -313,10 +315,10 @@ def test_request_stream_composition_view(app): return text('OK') async def post_handler(request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) result = '' while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break result += body.decode('utf-8') @@ -350,10 +352,10 @@ def test_request_stream(app): @stream_decorator async def post(self, request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) result = '' while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break result += body.decode('utf-8') @@ -361,11 +363,11 @@ def test_request_stream(app): @app.post('/stream', stream=True) async def handler(request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break await response.write(body.decode('utf-8')) @@ -378,10 +380,10 @@ def test_request_stream(app): @bp.post('/bp_stream', stream=True) async def bp_stream(request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) result = '' while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break result += body.decode('utf-8') @@ -397,10 +399,10 @@ def test_request_stream(app): return text('OK') async def post_handler(request): - assert isinstance(request.stream, asyncio.Queue) + assert isinstance(request.stream, StreamBuffer) result = '' while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break result += body.decode('utf-8') From fca7cb9fb03c63c6b18257e9429618d5b1b58917 Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Mon, 3 Dec 2018 22:51:09 -0800 Subject: [PATCH 3/7] update request streaming doc --- docs/sanic/streaming.md | 10 +++++----- tests/test_request_stream.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/sanic/streaming.md b/docs/sanic/streaming.md index bf3ca664..53eebad7 100644 --- a/docs/sanic/streaming.md +++ b/docs/sanic/streaming.md @@ -2,7 +2,7 @@ ## Request Streaming -Sanic allows you to get request data by stream, as below. When the request ends, `request.stream.get()` returns `None`. Only post, put and patch decorator have stream argument. +Sanic allows you to get request data by stream, as below. When the request ends, `await request.stream.read()` returns `None`. Only post, put and patch decorator have stream argument. ```python from sanic import Sanic @@ -22,7 +22,7 @@ class SimpleView(HTTPMethodView): async def post(self, request): result = '' while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break result += body.decode('utf-8') @@ -33,7 +33,7 @@ class SimpleView(HTTPMethodView): async def handler(request): async def streaming(response): while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break body = body.decode('utf-8').replace('1', 'A') @@ -45,7 +45,7 @@ async def handler(request): async def bp_handler(request): result = '' while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break result += body.decode('utf-8').replace('1', 'A') @@ -55,7 +55,7 @@ async def bp_handler(request): async def post_handler(request): result = '' while True: - body = await request.stream.get() + body = await request.stream.read() if body is None: break result += body.decode('utf-8') diff --git a/tests/test_request_stream.py b/tests/test_request_stream.py index ae50a9f1..ecf201f2 100644 --- a/tests/test_request_stream.py +++ b/tests/test_request_stream.py @@ -7,7 +7,7 @@ from sanic.response import stream, text from sanic.request import StreamBuffer -data = "abc" * 100000 +data = "abc" * 10000000 def test_request_stream_method_view(app): From 7c9957e058a9ea3f11fd364c76488603348953ad Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Mon, 3 Dec 2018 23:03:14 -0800 Subject: [PATCH 4/7] update README.rst (clean up badges) --- README.rst | 55 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/README.rst b/README.rst index 311be757..b064612d 100644 --- a/README.rst +++ b/README.rst @@ -1,7 +1,44 @@ Sanic ===== -|Join the chat at https://gitter.im/sanic-python/Lobby| |Build Status| |AppVeyor Build Status| |Documentation| |Codecov| |PyPI| |PyPI version| |Code style black| +.. start-badges + +.. list-table:: + :stub-columns: 1 + + * - Build + - | |Build Status| |AppVeyor Build Status| |Codecov| + * - Docs + - |Documentation| + * - Package + - | |PyPI| |PyPI version| |Wheel| |Supported implementations| |Code style black| + * - Support + - |Join the chat at https://gitter.im/sanic-python/Lobby| + +.. |Join the chat at https://gitter.im/sanic-python/Lobby| image:: https://badges.gitter.im/sanic-python/Lobby.svg + :target: https://gitter.im/sanic-python/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge +.. |Codecov| image:: https://codecov.io/gh/huge-success/sanic/branch/master/graph/badge.svg + :target: https://codecov.io/gh/huge-success/sanic +.. |Build Status| image:: https://travis-ci.org/huge-success/sanic.svg?branch=master + :target: https://travis-ci.org/huge-success/sanic +.. |AppVeyor Build Status| image:: https://ci.appveyor.com/api/projects/status/d8pt3ids0ynexi8c/branch/master?svg=true + :target: https://ci.appveyor.com/project/huge-success/sanic +.. |Documentation| image:: https://readthedocs.org/projects/sanic/badge/?version=latest + :target: http://sanic.readthedocs.io/en/latest/?badge=latest +.. |PyPI| image:: https://img.shields.io/pypi/v/sanic.svg + :target: https://pypi.python.org/pypi/sanic/ +.. |PyPI version| image:: https://img.shields.io/pypi/pyversions/sanic.svg + :target: https://pypi.python.org/pypi/sanic/ +.. |Code style black| image:: https://img.shields.io/badge/code%20style-black-000000.svg + :target: https://github.com/ambv/black +.. |Wheel| image:: https://img.shields.io/pypi/wheel/sanic.svg + :alt: PyPI Wheel + :target: https://pypi.python.org/pypi/sanic +.. |Supported implementations| image:: https://img.shields.io/pypi/implementation/sanic.svg + :alt: Supported implementations + :target: https://pypi.python.org/pypi/sanic + +.. end-badges Sanic is a Flask-like Python 3.5+ web server that's written to go fast. It's based on the work done by the amazing folks at magicstack, and was inspired by `this article `_. @@ -45,22 +82,6 @@ Documentation `Documentation on Readthedocs `_. -.. |Join the chat at https://gitter.im/sanic-python/Lobby| image:: https://badges.gitter.im/sanic-python/Lobby.svg - :target: https://gitter.im/sanic-python/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge -.. |Codecov| image:: https://codecov.io/gh/huge-success/sanic/branch/master/graph/badge.svg - :target: https://codecov.io/gh/huge-success/sanic -.. |Build Status| image:: https://travis-ci.org/huge-success/sanic.svg?branch=master - :target: https://travis-ci.org/huge-success/sanic -.. |AppVeyor Build Status| image:: https://ci.appveyor.com/api/projects/status/d8pt3ids0ynexi8c/branch/master?svg=true - :target: https://ci.appveyor.com/project/huge-success/sanic -.. |Documentation| image:: https://readthedocs.org/projects/sanic/badge/?version=latest - :target: http://sanic.readthedocs.io/en/latest/?badge=latest -.. |PyPI| image:: https://img.shields.io/pypi/v/sanic.svg - :target: https://pypi.python.org/pypi/sanic/ -.. |PyPI version| image:: https://img.shields.io/pypi/pyversions/sanic.svg - :target: https://pypi.python.org/pypi/sanic/ -.. |Code style black| image:: https://img.shields.io/badge/code%20style-black-000000.svg - :target: https://github.com/ambv/black Questions and Discussion ------------------------ From b5287184e96cbc571bbaaa87e1c7fe396d7e9866 Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Mon, 3 Dec 2018 23:16:45 -0800 Subject: [PATCH 5/7] fix lint fix isort --- sanic/request.py | 3 +-- sanic/server.py | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sanic/request.py b/sanic/request.py index 51d872e1..faa63137 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -1,6 +1,6 @@ +import asyncio import json import sys -import asyncio from cgi import parse_header from collections import namedtuple @@ -49,7 +49,6 @@ class RequestParameters(dict): class StreamBuffer: - def __init__(self, buffer_size=None): self._buffer_size = buffer_size or 100 self._queue = asyncio.Queue() diff --git a/sanic/server.py b/sanic/server.py index 9e1bdd12..ff87198e 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -294,7 +294,9 @@ class HttpProtocol(asyncio.Protocol): self.request ) if self._is_stream_handler: - self.request.stream = StreamBuffer(self.request_body_buffer_queue_size) + self.request.stream = StreamBuffer( + self.request_body_buffer_queue_size + ) self.execute_request_handler() def on_body(self, body): From 1bfbc67c623e4fc2cd06fb03c9dcc49bebd9eb1e Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Tue, 4 Dec 2018 20:12:04 -0800 Subject: [PATCH 6/7] expose request_buffer_queue_size to be configurable and update documentation fix StreamBuffer buffer_size --- docs/sanic/config.md | 1 + sanic/app.py | 1 + sanic/config.py | 1 + sanic/request.py | 5 ++--- sanic/server.py | 10 ++++++---- 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/docs/sanic/config.md b/docs/sanic/config.md index c16e2397..5f87348a 100644 --- a/docs/sanic/config.md +++ b/docs/sanic/config.md @@ -88,6 +88,7 @@ Out of the box there are just a few predefined values which can be overwritten w | Variable | Default | Description | | ------------------------- | --------- | ------------------------------------------------------ | | REQUEST_MAX_SIZE | 100000000 | How big a request may be (bytes) | + | REQUEST_BUFFER_QUEUE_SIZE | 100 | Request streaming buffer queue size | | REQUEST_TIMEOUT | 60 | How long a request can take to arrive (sec) | | RESPONSE_TIMEOUT | 60 | How long a response can take to process (sec) | | KEEP_ALIVE | True | Disables keep-alive when False | diff --git a/sanic/app.py b/sanic/app.py index c2a24464..7a4af641 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -1071,6 +1071,7 @@ class Sanic: "response_timeout": self.config.RESPONSE_TIMEOUT, "keep_alive_timeout": self.config.KEEP_ALIVE_TIMEOUT, "request_max_size": self.config.REQUEST_MAX_SIZE, + "request_buffer_queue_size": self.config.REQUEST_BUFFER_QUEUE_SIZE, "keep_alive": self.config.KEEP_ALIVE, "loop": loop, "register_sys_signals": register_sys_signals, diff --git a/sanic/config.py b/sanic/config.py index 0a53a607..15341829 100644 --- a/sanic/config.py +++ b/sanic/config.py @@ -32,6 +32,7 @@ class Config(dict): ▀▀▄▄▀ """ self.REQUEST_MAX_SIZE = 100000000 # 100 megabytes + self.REQUEST_BUFFER_QUEUE_SIZE = 100 self.REQUEST_TIMEOUT = 60 # 60 seconds self.RESPONSE_TIMEOUT = 60 # 60 seconds self.KEEP_ALIVE = keep_alive diff --git a/sanic/request.py b/sanic/request.py index faa63137..fc392c11 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -49,9 +49,8 @@ class RequestParameters(dict): class StreamBuffer: - def __init__(self, buffer_size=None): - self._buffer_size = buffer_size or 100 - self._queue = asyncio.Queue() + def __init__(self, buffer_size=100): + self._queue = asyncio.Queue(buffer_size) async def read(self): """ Stop reading when gets None """ diff --git a/sanic/server.py b/sanic/server.py index ff87198e..af253775 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -59,7 +59,7 @@ class HttpProtocol(asyncio.Protocol): "response_timeout", "keep_alive_timeout", "request_max_size", - "request_body_buffer_queue_size", + "request_buffer_queue_size", "request_class", "is_request_stream", "router", @@ -85,10 +85,10 @@ class HttpProtocol(asyncio.Protocol): signal=Signal(), connections=None, request_timeout=60, - request_body_buffer_queue_size=100, response_timeout=60, keep_alive_timeout=5, request_max_size=None, + request_buffer_queue_size=100, request_class=None, access_log=True, keep_alive=True, @@ -111,7 +111,7 @@ class HttpProtocol(asyncio.Protocol): self.request_handler = request_handler self.error_handler = error_handler self.request_timeout = request_timeout - self.request_body_buffer_queue_size = request_body_buffer_queue_size + self.request_buffer_queue_size = request_buffer_queue_size self.response_timeout = response_timeout self.keep_alive_timeout = keep_alive_timeout self.request_max_size = request_max_size @@ -295,7 +295,7 @@ class HttpProtocol(asyncio.Protocol): ) if self._is_stream_handler: self.request.stream = StreamBuffer( - self.request_body_buffer_queue_size + self.request_buffer_queue_size ) self.execute_request_handler() @@ -581,6 +581,7 @@ def serve( ssl=None, sock=None, request_max_size=None, + request_buffer_queue_size=100, reuse_port=False, loop=None, protocol=HttpProtocol, @@ -641,6 +642,7 @@ def serve( outgoing bytes, the low-water limit is a quarter of the high-water limit. :param is_request_stream: disable/enable Request.stream + :param request_buffer_queue_size: streaming request buffer queue size :param router: Router object :param graceful_shutdown_timeout: How long take to Force close non-idle connection From 956793e066ff4ceff6c443979510ff4145b4814e Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Sun, 9 Dec 2018 15:18:33 -0800 Subject: [PATCH 7/7] address review feedback, small code refactoring --- sanic/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sanic/server.py b/sanic/server.py index af253775..bb93f816 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -304,8 +304,8 @@ class HttpProtocol(asyncio.Protocol): self._request_stream_task = self.loop.create_task( self.body_append(body) ) - return - self.request.body_push(body) + else: + self.request.body_push(body) async def body_append(self, body): if self.request.stream.is_full():