37 lines
986 B
Python
37 lines
986 B
Python
|
import io
|
||
|
|
||
|
from sanic.response import text
|
||
|
|
||
|
data = "abc" * 10_000_000
|
||
|
|
||
|
|
||
|
def test_request_buffer_queue_size(app):
|
||
|
default_buf_qsz = app.config.get("REQUEST_BUFFER_QUEUE_SIZE")
|
||
|
qsz = 1
|
||
|
while qsz == default_buf_qsz:
|
||
|
qsz += 1
|
||
|
app.config.REQUEST_BUFFER_QUEUE_SIZE = qsz
|
||
|
|
||
|
@app.post("/post", stream=True)
|
||
|
async def post(request):
|
||
|
assert request.stream.buffer_size == qsz
|
||
|
print("request.stream.buffer_size =", request.stream.buffer_size)
|
||
|
|
||
|
bio = io.BytesIO()
|
||
|
while True:
|
||
|
bdata = await request.stream.read()
|
||
|
if not bdata:
|
||
|
break
|
||
|
bio.write(bdata)
|
||
|
|
||
|
head = bdata[:3].decode("utf-8")
|
||
|
tail = bdata[3:][-3:].decode("utf-8")
|
||
|
print(head, "...", tail)
|
||
|
|
||
|
bio.seek(0)
|
||
|
return text(bio.read().decode("utf-8"))
|
||
|
|
||
|
request, response = app.test_client.post("/post", data=data)
|
||
|
assert response.status == 200
|
||
|
assert response.text == data
|