120f0262f7
* Fix Ctrl+C on Windows.
* Disable testing of a function N/A on Windows.
* Add test for coverage, avoid crash on missing _stopping.
* Initialise StreamingHTTPResponse.protocol = None
* Improved comments.
* Reduce amount of data in test_request_stream to avoid failures on Windows.
* The Windows test doesn't work on Windows :(
* Use port numbers more likely to be free than 8000.
* Disable the other signal tests on Windows as well.
* Windows doesn't properly support SO_REUSEADDR, so that's disabled in Python, and thus rebinding fails. For successful testing, reuse port instead.
* app.run argument handling: added server kwargs (alike create_server), added warning on extra kwargs, made auto_reload explicit argument. Another go at Windows tests
* Revert "app.run argument handling: added server kwargs (alike create_server), added warning on extra kwargs, made auto_reload explicit argument. Another go at Windows tests"
This reverts commit dc5d682448
.
* Use random test server port on most tests. Should avoid port/addr reuse issues.
* Another test to random port instead of 8000.
* Fix deprecation warnings about missing name on Sanic() in tests.
* Linter and typing
* Increase test coverage
* Rewrite test for ctrlc_windows_workaround
* py36 compat
* py36 compat
* py36 compat
* Don't rely on loop internals but add a stopping flag to app.
* App may be restarted.
* py36 compat
* Linter
* Add a constant for OS checking.
Co-authored-by: L. Kärkkäinen <tronic@users.noreply.github.com>
54 lines
1.4 KiB
Python
54 lines
1.4 KiB
Python
from io import BytesIO
|
|
|
|
from sanic import Sanic
|
|
from sanic.request import Request
|
|
from sanic.response import json_dumps, text
|
|
|
|
|
|
class CustomRequest(Request):
|
|
__slots__ = ("body_buffer",)
|
|
|
|
def body_init(self):
|
|
self.body_buffer = BytesIO()
|
|
|
|
def body_push(self, data):
|
|
self.body_buffer.write(data)
|
|
|
|
def body_finish(self):
|
|
self.body = self.body_buffer.getvalue()
|
|
self.body_buffer.close()
|
|
|
|
|
|
def test_custom_request():
|
|
app = Sanic(name=__name__, request_class=CustomRequest)
|
|
|
|
@app.route("/post", methods=["POST"])
|
|
async def post_handler(request):
|
|
return text("OK")
|
|
|
|
@app.route("/get")
|
|
async def get_handler(request):
|
|
return text("OK")
|
|
|
|
payload = {"test": "OK"}
|
|
headers = {"content-type": "application/json"}
|
|
|
|
request, response = app.test_client.post(
|
|
"/post", data=json_dumps(payload), headers=headers
|
|
)
|
|
|
|
assert isinstance(request.body_buffer, BytesIO)
|
|
assert request.body_buffer.closed
|
|
assert request.body == b'{"test":"OK"}'
|
|
assert request.json.get("test") == "OK"
|
|
assert response.text == "OK"
|
|
assert response.status == 200
|
|
|
|
request, response = app.test_client.get("/get")
|
|
|
|
assert isinstance(request.body_buffer, BytesIO)
|
|
assert request.body_buffer.closed
|
|
assert request.body == b""
|
|
assert response.text == "OK"
|
|
assert response.status == 200
|