From 181adebf822e4d3be9905e70475cd9c503867dea Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Mon, 3 Dec 2018 22:19:26 -0800 Subject: [PATCH] add StreamBuffer for request flow control --- sanic/request.py | 24 ++++++++++++++++++++++++ sanic/server.py | 21 ++++++++++++++++----- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/sanic/request.py b/sanic/request.py index e775596a..8ba25236 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -1,5 +1,6 @@ import json import sys +import asyncio from cgi import parse_header from collections import namedtuple @@ -47,6 +48,25 @@ class RequestParameters(dict): return super().get(name, default) +class StreamBuffer: + + def __init__(self, buffer_size=None): + self._buffer_size = buffer_size or 100 + self._queue = asyncio.Queue() + + async def read(self): + """ Stop reading when gets None """ + payload = await self._queue.get() + self._queue.task_done() + return payload + + async def put(self, payload): + await self._queue.put(payload) + + def is_full(self): + return self._queue.full() + + class Request(dict): """Properties of an HTTP request such as URL, headers, etc.""" @@ -254,6 +274,10 @@ class Request(dict): self._remote_addr = "" return self._remote_addr + @property + def stream(self): + return self.stream + @property def scheme(self): if ( diff --git a/sanic/server.py b/sanic/server.py index dc197941..9e1bdd12 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -22,7 +22,7 @@ from sanic.exceptions import ( ServiceUnavailable, ) from sanic.log import access_logger, logger -from sanic.request import Request +from sanic.request import Request, StreamBuffer from sanic.response import HTTPResponse @@ -59,6 +59,7 @@ class HttpProtocol(asyncio.Protocol): "response_timeout", "keep_alive_timeout", "request_max_size", + "request_body_buffer_queue_size", "request_class", "is_request_stream", "router", @@ -82,8 +83,9 @@ class HttpProtocol(asyncio.Protocol): request_handler, error_handler, signal=Signal(), - connections=set(), + connections=None, request_timeout=60, + request_body_buffer_queue_size=100, response_timeout=60, keep_alive_timeout=5, request_max_size=None, @@ -105,10 +107,11 @@ class HttpProtocol(asyncio.Protocol): self.router = router self.signal = signal self.access_log = access_log - self.connections = connections + self.connections = connections or set() self.request_handler = request_handler self.error_handler = error_handler self.request_timeout = request_timeout + self.request_body_buffer_queue_size = request_body_buffer_queue_size self.response_timeout = response_timeout self.keep_alive_timeout = keep_alive_timeout self.request_max_size = request_max_size @@ -291,17 +294,25 @@ class HttpProtocol(asyncio.Protocol): self.request ) if self._is_stream_handler: - self.request.stream = asyncio.Queue() + self.request.stream = StreamBuffer(self.request_body_buffer_queue_size) self.execute_request_handler() def on_body(self, body): if self.is_request_stream and self._is_stream_handler: self._request_stream_task = self.loop.create_task( - self.request.stream.put(body) + self.body_append(body) ) return self.request.body_push(body) + async def body_append(self, body): + if self.request.stream.is_full(): + self.transport.pause_reading() + await self.request.stream.put(body) + self.transport.resume_reading() + else: + await self.request.stream.put(body) + def on_message_complete(self): # Entire request (headers and whole body) is received. # We can cancel and remove the request timeout handler now.