Make all requests streaming and preload body for non-streaming handlers.

This commit is contained in:
L. Kärkkäinen
2020-02-21 13:28:50 +02:00
parent 6279eac3d1
commit fe64a2764d
11 changed files with 41 additions and 177 deletions

View File

@@ -82,7 +82,6 @@ class Sanic:
self.strict_slashes = strict_slashes
self.listeners = defaultdict(list)
self.is_running = False
self.is_request_stream = False
self.websocket_enabled = False
self.websocket_tasks = set()
self.named_request_middleware = {}
@@ -187,9 +186,6 @@ class Sanic:
if not uri.startswith("/"):
uri = "/" + uri
if stream:
self.is_request_stream = True
if strict_slashes is None:
strict_slashes = self.strict_slashes
@@ -956,6 +952,10 @@ class Sanic:
# Fetch handler from router
handler, args, kwargs, uri, name = self.router.get(request)
# Non-streaming handlers have their body preloaded
if not self.router.is_stream_handler(request):
await request.receive_body()
# -------------------------------------------- #
# Request Middleware
# -------------------------------------------- #
@@ -1381,7 +1381,7 @@ class Sanic:
server_settings = {
"protocol": protocol,
"request_class": self.request_class,
"is_request_stream": self.is_request_stream,
"is_request_stream": True,
"router": self.router,
"host": host,
"port": port,

View File

@@ -190,7 +190,6 @@ class ASGIApp:
sanic_app: "sanic.app.Sanic"
request: Request
transport: MockTransport
do_stream: bool
lifespan: Lifespan
ws: Optional[WebSocketConnection]
@@ -213,9 +212,6 @@ class ASGIApp:
for key, value in scope.get("headers", [])
]
)
instance.do_stream = (
True if headers.get("expect") == "100-continue" else False
)
instance.lifespan = Lifespan(instance)
if scope["type"] == "lifespan":
@@ -256,15 +252,9 @@ class ASGIApp:
sanic_app,
)
if sanic_app.is_request_stream:
is_stream_handler = sanic_app.router.is_stream_handler(
instance.request
)
if is_stream_handler:
instance.request.stream = StreamBuffer(
sanic_app.config.REQUEST_BUFFER_QUEUE_SIZE
)
instance.do_stream = True
instance.request.stream = StreamBuffer(
sanic_app.config.REQUEST_BUFFER_QUEUE_SIZE
)
return instance
@@ -300,10 +290,7 @@ class ASGIApp:
"""
Handle the incoming request.
"""
if not self.do_stream:
self.request.body = await self.read_body()
else:
self.sanic_app.loop.create_task(self.stream_body())
self.sanic_app.loop.create_task(self.stream_body())
handler = self.sanic_app.handle_request
callback = None if self.ws else self.stream_callback

View File

@@ -116,7 +116,7 @@ class Request:
self.transport = transport
# Init but do not inhale
self.body_init()
self.body = None
self.ctx = SimpleNamespace()
self.parsed_forwarded = None
self.parsed_json = None
@@ -159,17 +159,7 @@ class Request:
Custom context is now stored in `request.custom_context.yourkey`"""
setattr(self.ctx, key, value)
def body_init(self):
self.body = []
def body_push(self, data):
self.body.append(data)
def body_finish(self):
self.body = b"".join(self.body)
async def receive_body(self):
assert self.body == []
self.body = b"".join([data async for data in self.stream])
@property

View File

@@ -328,14 +328,8 @@ class HttpProtocol(asyncio.Protocol):
self.expect_handler()
if self.is_request_stream:
self._is_stream_handler = self.router.is_stream_handler(
self.request
)
if self._is_stream_handler:
self.request.stream = StreamBuffer(
self.request_buffer_queue_size
)
self.execute_request_handler()
self.request.stream = StreamBuffer(self.request_buffer_queue_size)
self.execute_request_handler()
def expect_handler(self):
"""
@@ -353,21 +347,18 @@ class HttpProtocol(asyncio.Protocol):
)
def on_body(self, body):
if self.is_request_stream and self._is_stream_handler:
# body chunks can be put into asyncio.Queue out of order if
# multiple tasks put concurrently and the queue is full in python
# 3.7. so we should not create more than one task putting into the
# queue simultaneously.
self._body_chunks.append(body)
if (
not self._request_stream_task
or self._request_stream_task.done()
):
self._request_stream_task = self.loop.create_task(
self.stream_append()
)
else:
self.request.body_push(body)
# body chunks can be put into asyncio.Queue out of order if
# multiple tasks put concurrently and the queue is full in python
# 3.7. so we should not create more than one task putting into the
# queue simultaneously.
self._body_chunks.append(body)
if (
not self._request_stream_task
or self._request_stream_task.done()
):
self._request_stream_task = self.loop.create_task(
self.stream_append()
)
async def body_append(self, body):
if (
@@ -385,7 +376,7 @@ class HttpProtocol(asyncio.Protocol):
await self.request.stream.put(body)
async def stream_append(self):
while self._body_chunks:
while self._body_chunks and self.request:
body = self._body_chunks.popleft()
if self.request.stream.is_full():
self.transport.pause_reading()
@@ -393,6 +384,7 @@ class HttpProtocol(asyncio.Protocol):
self.transport.resume_reading()
else:
await self.request.stream.put(body)
self._body_chunks.clear()
def on_message_complete(self):
# Entire request (headers and whole body) is received.
@@ -400,18 +392,15 @@ class HttpProtocol(asyncio.Protocol):
if self._request_timeout_handler:
self._request_timeout_handler.cancel()
self._request_timeout_handler = None
if self.is_request_stream and self._is_stream_handler:
self._body_chunks.append(None)
if (
not self._request_stream_task
or self._request_stream_task.done()
):
self._request_stream_task = self.loop.create_task(
self.stream_append()
)
return
self.request.body_finish()
self.execute_request_handler()
self._body_chunks.append(None)
if (
not self._request_stream_task
or self._request_stream_task.done()
):
self._request_stream_task = self.loop.create_task(
self.stream_append()
)
def execute_request_handler(self):
"""
@@ -639,7 +628,6 @@ class HttpProtocol(asyncio.Protocol):
self._request_handler_task = None
self._request_stream_task = None
self._total_request_size = 0
self._is_stream_handler = False
def close_if_idle(self):
"""Close the connection if a request is not being sent or received