sanic/tests/test_request_buffer_queue_size.py

38 lines
987 B
Python
Raw Normal View History

import io
from sanic.response import text
data = "abc" * 10_000_000
def test_request_buffer_queue_size(app):
default_buf_qsz = app.config.get("REQUEST_BUFFER_QUEUE_SIZE")
qsz = 1
while qsz == default_buf_qsz:
qsz += 1
app.config.REQUEST_BUFFER_QUEUE_SIZE = qsz
@app.post("/post", stream=True)
async def post(request):
assert request.stream.buffer_size == qsz
print("request.stream.buffer_size =", request.stream.buffer_size)
bio = io.BytesIO()
while True:
bdata = await request.stream.read()
if not bdata:
break
bio.write(bdata)
head = bdata[:3].decode("utf-8")
tail = bdata[3:][-3:].decode("utf-8")
print(head, "...", tail)
bio.seek(0)
return text(bio.read().decode("utf-8"))
request, response = app.test_client.post("/post", data=data)
assert response.status == 200
assert response.text == data