Merge pull request #1423 from yunstanford/request-streaming-support
basic request streaming support with flow control
This commit is contained in:
@@ -1071,6 +1071,7 @@ class Sanic:
|
||||
"response_timeout": self.config.RESPONSE_TIMEOUT,
|
||||
"keep_alive_timeout": self.config.KEEP_ALIVE_TIMEOUT,
|
||||
"request_max_size": self.config.REQUEST_MAX_SIZE,
|
||||
"request_buffer_queue_size": self.config.REQUEST_BUFFER_QUEUE_SIZE,
|
||||
"keep_alive": self.config.KEEP_ALIVE,
|
||||
"loop": loop,
|
||||
"register_sys_signals": register_sys_signals,
|
||||
|
||||
@@ -32,6 +32,7 @@ class Config(dict):
|
||||
▀▀▄▄▀
|
||||
"""
|
||||
self.REQUEST_MAX_SIZE = 100000000 # 100 megabytes
|
||||
self.REQUEST_BUFFER_QUEUE_SIZE = 100
|
||||
self.REQUEST_TIMEOUT = 60 # 60 seconds
|
||||
self.RESPONSE_TIMEOUT = 60 # 60 seconds
|
||||
self.KEEP_ALIVE = keep_alive
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
|
||||
@@ -47,6 +48,23 @@ class RequestParameters(dict):
|
||||
return super().get(name, default)
|
||||
|
||||
|
||||
class StreamBuffer:
|
||||
def __init__(self, buffer_size=100):
|
||||
self._queue = asyncio.Queue(buffer_size)
|
||||
|
||||
async def read(self):
|
||||
""" Stop reading when gets None """
|
||||
payload = await self._queue.get()
|
||||
self._queue.task_done()
|
||||
return payload
|
||||
|
||||
async def put(self, payload):
|
||||
await self._queue.put(payload)
|
||||
|
||||
def is_full(self):
|
||||
return self._queue.full()
|
||||
|
||||
|
||||
class Request(dict):
|
||||
"""Properties of an HTTP request such as URL, headers, etc."""
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ from sanic.exceptions import (
|
||||
ServiceUnavailable,
|
||||
)
|
||||
from sanic.log import access_logger, logger
|
||||
from sanic.request import Request
|
||||
from sanic.request import Request, StreamBuffer
|
||||
from sanic.response import HTTPResponse
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@ class HttpProtocol(asyncio.Protocol):
|
||||
"response_timeout",
|
||||
"keep_alive_timeout",
|
||||
"request_max_size",
|
||||
"request_buffer_queue_size",
|
||||
"request_class",
|
||||
"is_request_stream",
|
||||
"router",
|
||||
@@ -89,11 +90,12 @@ class HttpProtocol(asyncio.Protocol):
|
||||
request_handler,
|
||||
error_handler,
|
||||
signal=Signal(),
|
||||
connections=set(),
|
||||
connections=None,
|
||||
request_timeout=60,
|
||||
response_timeout=60,
|
||||
keep_alive_timeout=5,
|
||||
request_max_size=None,
|
||||
request_buffer_queue_size=100,
|
||||
request_class=None,
|
||||
access_log=True,
|
||||
keep_alive=True,
|
||||
@@ -112,10 +114,11 @@ class HttpProtocol(asyncio.Protocol):
|
||||
self.router = router
|
||||
self.signal = signal
|
||||
self.access_log = access_log
|
||||
self.connections = connections
|
||||
self.connections = connections or set()
|
||||
self.request_handler = request_handler
|
||||
self.error_handler = error_handler
|
||||
self.request_timeout = request_timeout
|
||||
self.request_buffer_queue_size = request_buffer_queue_size
|
||||
self.response_timeout = response_timeout
|
||||
self.keep_alive_timeout = keep_alive_timeout
|
||||
self.request_max_size = request_max_size
|
||||
@@ -298,16 +301,26 @@ class HttpProtocol(asyncio.Protocol):
|
||||
self.request
|
||||
)
|
||||
if self._is_stream_handler:
|
||||
self.request.stream = asyncio.Queue()
|
||||
self.request.stream = StreamBuffer(
|
||||
self.request_buffer_queue_size
|
||||
)
|
||||
self.execute_request_handler()
|
||||
|
||||
def on_body(self, body):
|
||||
if self.is_request_stream and self._is_stream_handler:
|
||||
self._request_stream_task = self.loop.create_task(
|
||||
self.request.stream.put(body)
|
||||
self.body_append(body)
|
||||
)
|
||||
return
|
||||
self.request.body_push(body)
|
||||
else:
|
||||
self.request.body_push(body)
|
||||
|
||||
async def body_append(self, body):
|
||||
if self.request.stream.is_full():
|
||||
self.transport.pause_reading()
|
||||
await self.request.stream.put(body)
|
||||
self.transport.resume_reading()
|
||||
else:
|
||||
await self.request.stream.put(body)
|
||||
|
||||
def on_message_complete(self):
|
||||
# Entire request (headers and whole body) is received.
|
||||
@@ -575,6 +588,7 @@ def serve(
|
||||
ssl=None,
|
||||
sock=None,
|
||||
request_max_size=None,
|
||||
request_buffer_queue_size=100,
|
||||
reuse_port=False,
|
||||
loop=None,
|
||||
protocol=HttpProtocol,
|
||||
@@ -635,6 +649,7 @@ def serve(
|
||||
outgoing bytes, the low-water limit is a
|
||||
quarter of the high-water limit.
|
||||
:param is_request_stream: disable/enable Request.stream
|
||||
:param request_buffer_queue_size: streaming request buffer queue size
|
||||
:param router: Router object
|
||||
:param graceful_shutdown_timeout: How long take to Force close non-idle
|
||||
connection
|
||||
|
||||
Reference in New Issue
Block a user