All tests OK.

This commit is contained in:
L. Kärkkäinen 2020-02-26 19:00:38 +02:00
parent 6b9f0ece7c
commit b87364bd91
3 changed files with 24 additions and 19 deletions

View File

@ -253,7 +253,8 @@ class ASGIApp:
) )
instance.request.stream = StreamBuffer( instance.request.stream = StreamBuffer(
sanic_app.config.REQUEST_BUFFER_QUEUE_SIZE sanic_app.config.REQUEST_BUFFER_QUEUE_SIZE,
protocol=instance
) )
return instance return instance
@ -275,23 +276,15 @@ class ASGIApp:
""" """
Read and stream the body in chunks from an incoming ASGI message. Read and stream the body in chunks from an incoming ASGI message.
""" """
more_body = True message = await self.transport.receive()
if not message.get("more_body", False):
while more_body: return None
message = await self.transport.receive() return message.get("body", b"")
chunk = message.get("body", b"")
await self.request.stream.put(chunk)
more_body = message.get("more_body", False)
await self.request.stream.put(None)
async def __call__(self) -> None: async def __call__(self) -> None:
""" """
Handle the incoming request. Handle the incoming request.
""" """
self.sanic_app.loop.create_task(self.stream_body())
handler = self.sanic_app.handle_request handler = self.sanic_app.handle_request
callback = None if self.ws else self.stream_callback callback = None if self.ws else self.stream_callback
await handler(self.request, None, callback) await handler(self.request, None, callback)

View File

@ -54,7 +54,7 @@ class StreamBuffer:
async def read(self): async def read(self):
""" Stop reading when gets None """ """ Stop reading when gets None """
if self._protocol: if self._protocol:
return await self._protocol.request_body() return await self._protocol.stream_body()
payload = await self._queue.get() payload = await self._queue.get()
self._queue.task_done() self._queue.task_done()
return payload return payload

View File

@ -94,6 +94,7 @@ class HttpProtocol(asyncio.Protocol):
"_can_write", "_can_write",
"_data_received", "_data_received",
"_task", "_task",
"_exception",
) )
def __init__( def __init__(
@ -197,19 +198,23 @@ class HttpProtocol(asyncio.Protocol):
if (status == Status.IDLE and duration > self.keep_alive_timeout): if (status == Status.IDLE and duration > self.keep_alive_timeout):
logger.debug("KeepAlive Timeout. Closing connection.") logger.debug("KeepAlive Timeout. Closing connection.")
elif (status == Status.REQUEST and duration > self.request_timeout): elif (status == Status.REQUEST and duration > self.request_timeout):
self.write_error(RequestTimeout("Request Timeout")) self._exception = RequestTimeout("Request Timeout")
elif (status.value > Status.REQUEST.value and duration > self.response_timeout): elif (status.value > Status.REQUEST.value and duration > self.response_timeout):
self.write_error(ServiceUnavailable("Response Timeout")) self._exception = ServiceUnavailable("Response Timeout")
else: else:
self.loop.call_later(.1, self.check_timeouts) self.loop.call_later(.1, self.check_timeouts)
return return
self.close() self._task.cancel()
async def http1(self): async def http1(self):
"""HTTP 1.1 connection handler""" """HTTP 1.1 connection handler"""
try: try:
self._exception = None
buf = self._buffer buf = self._buffer
# Note: connections are initially in request mode and do not obey
# keep-alive timeout like with some other servers.
self._status = Status.REQUEST
while self.keep_alive: while self.keep_alive:
# Read request header # Read request header
pos = 0 pos = 0
@ -248,6 +253,8 @@ class HttpProtocol(asyncio.Protocol):
transport=self.transport, transport=self.transport,
app=self.app, app=self.app,
) )
if headers.get("connection", "").lower() == "close":
self.keep_alive = False
# Prepare for request body # Prepare for request body
body = ( body = (
headers.get("transfer-encoding") == "chunked" headers.get("transfer-encoding") == "chunked"
@ -277,8 +284,13 @@ class HttpProtocol(asyncio.Protocol):
# Consume any remaining request body # Consume any remaining request body
if self._request_bytes_left or self._request_chunked: if self._request_bytes_left or self._request_chunked:
logger.error(f"Handler of {method} {self.url} did not consume request body.") logger.error(f"Handler of {method} {self.url} did not consume request body.")
while await self.request_body(): pass while await self.stream_body(): pass
self._status, self._time = Status.IDLE, current_time() self._status, self._time = Status.IDLE, current_time()
except asyncio.CancelledError:
self.write_error(
self._exception or
ServiceUnavailable("Request handler cancelled")
)
except SanicException as e: except SanicException as e:
self.write_error(e) self.write_error(e)
except Exception as e: except Exception as e:
@ -286,7 +298,7 @@ class HttpProtocol(asyncio.Protocol):
finally: finally:
self.close() self.close()
async def request_body(self): async def stream_body(self):
buf = self._buffer buf = self._buffer
if self._request_chunked and self._request_bytes_left == 0: if self._request_chunked and self._request_bytes_left == 0:
# Process a chunk header: \r\n<size>[;<chunk extensions>]\r\n # Process a chunk header: \r\n<size>[;<chunk extensions>]\r\n