A test was missing that body_init/body_push/body_finish are never called. Rewritten using receive_body and case switching to make it fail if bypassed.

This commit is contained in:
L. Kärkkäinen 2020-03-01 16:34:20 +02:00
parent c0a0b50bc1
commit 5a96996003

View File

@ -6,18 +6,13 @@ from sanic.response import json_dumps, text
class CustomRequest(Request):
__slots__ = ("body_buffer",)
def body_init(self):
self.body_buffer = BytesIO()
def body_push(self, data):
self.body_buffer.write(data)
def body_finish(self):
self.body = self.body_buffer.getvalue()
self.body_buffer.close()
"""Alternative implementation for loading body (non-streaming handlers)"""
async def receive_body(self):
buffer = BytesIO()
async for data in self.stream:
buffer.write(data)
self.body = buffer.getvalue().upper()
buffer.close()
def test_custom_request():
app = Sanic(request_class=CustomRequest)
@ -37,8 +32,8 @@ def test_custom_request():
"/post", data=json_dumps(payload), headers=headers
)
assert request.body == b'{"test":"OK"}'
assert request.json.get("test") == "OK"
assert request.body == b'{"TEST":"OK"}'
assert request.json.get("TEST") == "OK"
assert response.text == "OK"
assert response.status == 200