add StreamBuffer for request flow control

This commit is contained in:
Yun Xu 2018-12-03 22:19:26 -08:00
parent e955e833c4
commit 181adebf82
2 changed files with 40 additions and 5 deletions

View File

@ -1,5 +1,6 @@
import json import json
import sys import sys
import asyncio
from cgi import parse_header from cgi import parse_header
from collections import namedtuple from collections import namedtuple
@ -47,6 +48,25 @@ class RequestParameters(dict):
return super().get(name, default) return super().get(name, default)
class StreamBuffer:
def __init__(self, buffer_size=None):
self._buffer_size = buffer_size or 100
self._queue = asyncio.Queue()
async def read(self):
""" Stop reading when gets None """
payload = await self._queue.get()
self._queue.task_done()
return payload
async def put(self, payload):
await self._queue.put(payload)
def is_full(self):
return self._queue.full()
class Request(dict): class Request(dict):
"""Properties of an HTTP request such as URL, headers, etc.""" """Properties of an HTTP request such as URL, headers, etc."""
@ -254,6 +274,10 @@ class Request(dict):
self._remote_addr = "" self._remote_addr = ""
return self._remote_addr return self._remote_addr
@property
def stream(self):
return self.stream
@property @property
def scheme(self): def scheme(self):
if ( if (

View File

@ -22,7 +22,7 @@ from sanic.exceptions import (
ServiceUnavailable, ServiceUnavailable,
) )
from sanic.log import access_logger, logger from sanic.log import access_logger, logger
from sanic.request import Request from sanic.request import Request, StreamBuffer
from sanic.response import HTTPResponse from sanic.response import HTTPResponse
@ -59,6 +59,7 @@ class HttpProtocol(asyncio.Protocol):
"response_timeout", "response_timeout",
"keep_alive_timeout", "keep_alive_timeout",
"request_max_size", "request_max_size",
"request_body_buffer_queue_size",
"request_class", "request_class",
"is_request_stream", "is_request_stream",
"router", "router",
@ -82,8 +83,9 @@ class HttpProtocol(asyncio.Protocol):
request_handler, request_handler,
error_handler, error_handler,
signal=Signal(), signal=Signal(),
connections=set(), connections=None,
request_timeout=60, request_timeout=60,
request_body_buffer_queue_size=100,
response_timeout=60, response_timeout=60,
keep_alive_timeout=5, keep_alive_timeout=5,
request_max_size=None, request_max_size=None,
@ -105,10 +107,11 @@ class HttpProtocol(asyncio.Protocol):
self.router = router self.router = router
self.signal = signal self.signal = signal
self.access_log = access_log self.access_log = access_log
self.connections = connections self.connections = connections or set()
self.request_handler = request_handler self.request_handler = request_handler
self.error_handler = error_handler self.error_handler = error_handler
self.request_timeout = request_timeout self.request_timeout = request_timeout
self.request_body_buffer_queue_size = request_body_buffer_queue_size
self.response_timeout = response_timeout self.response_timeout = response_timeout
self.keep_alive_timeout = keep_alive_timeout self.keep_alive_timeout = keep_alive_timeout
self.request_max_size = request_max_size self.request_max_size = request_max_size
@ -291,17 +294,25 @@ class HttpProtocol(asyncio.Protocol):
self.request self.request
) )
if self._is_stream_handler: if self._is_stream_handler:
self.request.stream = asyncio.Queue() self.request.stream = StreamBuffer(self.request_body_buffer_queue_size)
self.execute_request_handler() self.execute_request_handler()
def on_body(self, body): def on_body(self, body):
if self.is_request_stream and self._is_stream_handler: if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task( self._request_stream_task = self.loop.create_task(
self.request.stream.put(body) self.body_append(body)
) )
return return
self.request.body_push(body) self.request.body_push(body)
async def body_append(self, body):
if self.request.stream.is_full():
self.transport.pause_reading()
await self.request.stream.put(body)
self.transport.resume_reading()
else:
await self.request.stream.put(body)
def on_message_complete(self): def on_message_complete(self):
# Entire request (headers and whole body) is received. # Entire request (headers and whole body) is received.
# We can cancel and remove the request timeout handler now. # We can cancel and remove the request timeout handler now.