Entirely remove request body queue.
This commit is contained in:
parent
b87364bd91
commit
29c6f3c49f
|
@ -251,11 +251,7 @@ class ASGIApp:
|
||||||
instance.transport,
|
instance.transport,
|
||||||
sanic_app,
|
sanic_app,
|
||||||
)
|
)
|
||||||
|
instance.request.stream = StreamBuffer(protocol=instance)
|
||||||
instance.request.stream = StreamBuffer(
|
|
||||||
sanic_app.config.REQUEST_BUFFER_QUEUE_SIZE,
|
|
||||||
protocol=instance
|
|
||||||
)
|
|
||||||
|
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
|
|
|
@ -47,17 +47,12 @@ class RequestParameters(dict):
|
||||||
|
|
||||||
|
|
||||||
class StreamBuffer:
|
class StreamBuffer:
|
||||||
def __init__(self, buffer_size=100, protocol=None):
|
def __init__(self, protocol=None):
|
||||||
self._queue = asyncio.Queue(buffer_size)
|
|
||||||
self._protocol = protocol
|
self._protocol = protocol
|
||||||
|
|
||||||
async def read(self):
|
async def read(self):
|
||||||
""" Stop reading when gets None """
|
""" Stop reading when gets None """
|
||||||
if self._protocol:
|
return await self._protocol.stream_body()
|
||||||
return await self._protocol.stream_body()
|
|
||||||
payload = await self._queue.get()
|
|
||||||
self._queue.task_done()
|
|
||||||
return payload
|
|
||||||
|
|
||||||
async def __aiter__(self):
|
async def __aiter__(self):
|
||||||
while True:
|
while True:
|
||||||
|
@ -66,16 +61,6 @@ class StreamBuffer:
|
||||||
return
|
return
|
||||||
yield data
|
yield data
|
||||||
|
|
||||||
async def put(self, payload):
|
|
||||||
await self._queue.put(payload)
|
|
||||||
|
|
||||||
def is_full(self):
|
|
||||||
return self._queue.full()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def buffer_size(self):
|
|
||||||
return self._queue.maxsize
|
|
||||||
|
|
||||||
|
|
||||||
class Request:
|
class Request:
|
||||||
"""Properties of an HTTP request such as URL, headers, etc."""
|
"""Properties of an HTTP request such as URL, headers, etc."""
|
||||||
|
|
|
@ -266,9 +266,7 @@ class HttpProtocol(asyncio.Protocol):
|
||||||
if headers.get(EXPECT_HEADER):
|
if headers.get(EXPECT_HEADER):
|
||||||
self._status = Status.EXPECT
|
self._status = Status.EXPECT
|
||||||
self.expect_handler()
|
self.expect_handler()
|
||||||
self.request.stream = StreamBuffer(
|
self.request.stream = StreamBuffer(protocol=self)
|
||||||
self.request_buffer_queue_size, protocol=self,
|
|
||||||
)
|
|
||||||
if body is True:
|
if body is True:
|
||||||
self._request_chunked = True
|
self._request_chunked = True
|
||||||
pos -= 2 # One CRLF stays in buffer
|
pos -= 2 # One CRLF stays in buffer
|
||||||
|
|
|
@ -1,37 +0,0 @@
|
||||||
import io
|
|
||||||
|
|
||||||
from sanic.response import text
|
|
||||||
|
|
||||||
|
|
||||||
data = "abc" * 10_000_000
|
|
||||||
|
|
||||||
|
|
||||||
def test_request_buffer_queue_size(app):
|
|
||||||
default_buf_qsz = app.config.get("REQUEST_BUFFER_QUEUE_SIZE")
|
|
||||||
qsz = 1
|
|
||||||
while qsz == default_buf_qsz:
|
|
||||||
qsz += 1
|
|
||||||
app.config.REQUEST_BUFFER_QUEUE_SIZE = qsz
|
|
||||||
|
|
||||||
@app.post("/post", stream=True)
|
|
||||||
async def post(request):
|
|
||||||
assert request.stream.buffer_size == qsz
|
|
||||||
print("request.stream.buffer_size =", request.stream.buffer_size)
|
|
||||||
|
|
||||||
bio = io.BytesIO()
|
|
||||||
while True:
|
|
||||||
bdata = await request.stream.read()
|
|
||||||
if not bdata:
|
|
||||||
break
|
|
||||||
bio.write(bdata)
|
|
||||||
|
|
||||||
head = bdata[:3].decode("utf-8")
|
|
||||||
tail = bdata[3:][-3:].decode("utf-8")
|
|
||||||
print(head, "...", tail)
|
|
||||||
|
|
||||||
bio.seek(0)
|
|
||||||
return text(bio.read().decode("utf-8"))
|
|
||||||
|
|
||||||
request, response = app.test_client.post("/post", data=data)
|
|
||||||
assert response.status == 200
|
|
||||||
assert response.text == data
|
|
Loading…
Reference in New Issue
Block a user