sanic/tests/test_requests.py
Serge Fedoruk 2a15583b87 add Request.not_grouped_args, deprecation warning Request.raw_args (#1476)
* add Request.not_grouped_args, deprecation warning Request.raw_args

* add 1 more test for coverage

* custom parser for Request.args and Request.query_args, some additional tests

* add docs for custom queryset parsing

* fix import sorting

* docstrings for get_query_args and get_args methods

* lost import
2019-03-14 09:04:05 -05:00

836 lines
22 KiB
Python

import logging
import os
import ssl
from json import dumps as json_dumps
from json import loads as json_loads
from urllib.parse import urlparse
import pytest
from sanic import Sanic
from sanic import Blueprint
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters
from sanic.response import json, text
from sanic.testing import HOST, PORT
# ------------------------------------------------------------ #
# GET
# ------------------------------------------------------------ #
def test_sync(app):
@app.route("/")
def handler(request):
return text("Hello")
request, response = app.test_client.get("/")
assert response.text == "Hello"
def test_remote_address(app):
@app.route("/")
def handler(request):
return text("{}".format(request.ip))
request, response = app.test_client.get("/")
assert response.text == "127.0.0.1"
def test_text(app):
@app.route("/")
async def handler(request):
return text("Hello")
request, response = app.test_client.get("/")
assert response.text == "Hello"
def test_headers(app):
@app.route("/")
async def handler(request):
headers = {"spam": "great"}
return text("Hello", headers=headers)
request, response = app.test_client.get("/")
assert response.headers.get("spam") == "great"
def test_non_str_headers(app):
@app.route("/")
async def handler(request):
headers = {"answer": 42}
return text("Hello", headers=headers)
request, response = app.test_client.get("/")
assert response.headers.get("answer") == "42"
def test_invalid_response(app):
@app.exception(ServerError)
def handler_exception(request, exception):
return text("Internal Server Error.", 500)
@app.route("/")
async def handler(request):
return "This should fail"
request, response = app.test_client.get("/")
assert response.status == 500
assert response.text == "Internal Server Error."
def test_json(app):
@app.route("/")
async def handler(request):
return json({"test": True})
request, response = app.test_client.get("/")
results = json_loads(response.text)
assert results.get("test") is True
def test_empty_json(app):
@app.route("/")
async def handler(request):
assert request.json is None
return json(request.json)
request, response = app.test_client.get("/")
assert response.status == 200
assert response.text == "null"
def test_invalid_json(app):
@app.route("/")
async def handler(request):
return json(request.json)
data = "I am not json"
request, response = app.test_client.get("/", data=data)
assert response.status == 400
def test_query_string(app):
@app.route("/")
async def handler(request):
return text("OK")
request, response = app.test_client.get(
"/", params=[("test1", "1"), ("test2", "false"), ("test2", "true")]
)
assert request.args.get("test1") == "1"
assert request.args.get("test2") == "false"
assert request.args.getlist("test2") == ["false", "true"]
assert request.args.getlist("test1") == ["1"]
assert request.args.get("test3", default="My value") == "My value"
def test_uri_template(app):
@app.route("/foo/<id:int>/bar/<name:[A-z]+>")
async def handler(request, id, name):
return text("OK")
request, response = app.test_client.get("/foo/123/bar/baz")
assert request.uri_template == "/foo/<id:int>/bar/<name:[A-z]+>"
def test_token(app):
@app.route("/")
async def handler(request):
return text("OK")
# uuid4 generated token.
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": "{}".format(token),
}
request, response = app.test_client.get("/", headers=headers)
assert request.token == token
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": "Token {}".format(token),
}
request, response = app.test_client.get("/", headers=headers)
assert request.token == token
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": "Bearer {}".format(token),
}
request, response = app.test_client.get("/", headers=headers)
assert request.token == token
# no Authorization headers
headers = {"content-type": "application/json"}
request, response = app.test_client.get("/", headers=headers)
assert request.token is None
def test_content_type(app):
@app.route("/")
async def handler(request):
return text(request.content_type)
request, response = app.test_client.get("/")
assert request.content_type == DEFAULT_HTTP_CONTENT_TYPE
assert response.text == DEFAULT_HTTP_CONTENT_TYPE
headers = {"content-type": "application/json"}
request, response = app.test_client.get("/", headers=headers)
assert request.content_type == "application/json"
assert response.text == "application/json"
def test_remote_addr(app):
@app.route("/")
async def handler(request):
return text(request.remote_addr)
headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.1"
assert response.text == "127.0.0.1"
request, response = app.test_client.get("/")
assert request.remote_addr == ""
assert response.text == ""
headers = {"X-Forwarded-For": "127.0.0.1, , ,,127.0.1.2"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.1"
assert response.text == "127.0.0.1"
def test_match_info(app):
@app.route("/api/v1/user/<user_id>/")
async def handler(request, user_id):
return json(request.match_info)
request, response = app.test_client.get("/api/v1/user/sanic_user/")
assert request.match_info == {"user_id": "sanic_user"}
assert json_loads(response.text) == {"user_id": "sanic_user"}
# ------------------------------------------------------------ #
# POST
# ------------------------------------------------------------ #
def test_post_json(app):
@app.route("/", methods=["POST"])
async def handler(request):
return text("OK")
payload = {"test": "OK"}
headers = {"content-type": "application/json"}
request, response = app.test_client.post(
"/", data=json_dumps(payload), headers=headers
)
assert request.json.get("test") == "OK"
assert request.json.get("test") == "OK" # for request.parsed_json
assert response.text == "OK"
def test_post_form_urlencoded(app):
@app.route("/", methods=["POST"])
async def handler(request):
return text("OK")
payload = "test=OK"
headers = {"content-type": "application/x-www-form-urlencoded"}
request, response = app.test_client.post(
"/", data=payload, headers=headers
)
assert request.form.get("test") == "OK"
assert request.form.get("test") == "OK" # For request.parsed_form
@pytest.mark.parametrize(
"payload",
[
"------sanic\r\n"
'Content-Disposition: form-data; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n",
"------sanic\r\n"
'content-disposition: form-data; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n",
],
)
def test_post_form_multipart_form_data(app, payload):
@app.route("/", methods=["POST"])
async def handler(request):
return text("OK")
headers = {"content-type": "multipart/form-data; boundary=----sanic"}
request, response = app.test_client.post(data=payload, headers=headers)
assert request.form.get("test") == "OK"
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "http://{}:{}/foo"),
("/bar/baz", "", "http://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "http://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_no_ssl(app, path, query, expected_url):
async def handler(request):
return text("OK")
app.add_route(handler, path)
request, response = app.test_client.get(path + "?{}".format(query))
assert request.url == expected_url.format(HOST, PORT)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_context(app, path, query, expected_url):
current_dir = os.path.dirname(os.path.realpath(__file__))
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(
os.path.join(current_dir, "certs/selfsigned.cert"),
keyfile=os.path.join(current_dir, "certs/selfsigned.key"),
)
async def handler(request):
return text("OK")
app.add_route(handler, path)
request, response = app.test_client.get(
"https://{}:{}".format(HOST, PORT) + path + "?{}".format(query),
server_kwargs={"ssl": context},
)
assert request.url == expected_url.format(HOST, PORT)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_dict(app, path, query, expected_url):
current_dir = os.path.dirname(os.path.realpath(__file__))
ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert")
ssl_key = os.path.join(current_dir, "certs/selfsigned.key")
ssl_dict = {"cert": ssl_cert, "key": ssl_key}
async def handler(request):
return text("OK")
app.add_route(handler, path)
request, response = app.test_client.get(
"https://{}:{}".format(HOST, PORT) + path + "?{}".format(query),
server_kwargs={"ssl": ssl_dict},
)
assert request.url == expected_url.format(HOST, PORT)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
def test_invalid_ssl_dict(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_dict = {"cert": None, "key": None}
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_dict}
)
assert str(excinfo.value) == "SSLContext or certificate and key required."
def test_form_with_multiple_values(app):
@app.route("/", methods=["POST"])
async def handler(request):
return text("OK")
payload = "selectedItems=v1&selectedItems=v2&selectedItems=v3"
headers = {"content-type": "application/x-www-form-urlencoded"}
request, response = app.test_client.post(
"/", data=payload, headers=headers
)
assert request.form.getlist("selectedItems") == ["v1", "v2", "v3"]
def test_request_string_representation(app):
@app.route("/", methods=["GET"])
async def get(request):
return text("OK")
request, _ = app.test_client.get("/")
assert repr(request) == "<Request: GET />"
@pytest.mark.parametrize(
"payload,filename",
[
("------sanic\r\n"
'Content-Disposition: form-data; filename="filename"; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n", "filename"),
("------sanic\r\n"
'content-disposition: form-data; filename="filename"; name="test"\r\n'
"\r\n"
'content-type: application/json; {"field": "value"}\r\n'
"------sanic--\r\n", "filename"),
("------sanic\r\n"
'Content-Disposition: form-data; filename=""; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n", ""),
("------sanic\r\n"
'content-disposition: form-data; filename=""; name="test"\r\n'
"\r\n"
'content-type: application/json; {"field": "value"}\r\n'
"------sanic--\r\n", ""),
("------sanic\r\n"
'Content-Disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n", "filename_\u00A0_test"),
("------sanic\r\n"
'content-disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n'
"\r\n"
'content-type: application/json; {"field": "value"}\r\n'
"------sanic--\r\n", "filename_\u00A0_test"),
],
)
def test_request_multipart_files(app, payload, filename):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
headers = {"content-type": "multipart/form-data; boundary=----sanic"}
request, _ = app.test_client.post(data=payload, headers=headers)
assert request.files.get("test").name == filename
def test_request_multipart_file_with_json_content_type(app):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
payload = (
"------sanic\r\n"
'Content-Disposition: form-data; name="file"; filename="test.json"\r\n'
"Content-Type: application/json\r\n"
"Content-Length: 0"
"\r\n"
"\r\n"
"------sanic--"
)
headers = {"content-type": "multipart/form-data; boundary=------sanic"}
request, _ = app.test_client.post(data=payload, headers=headers)
assert request.files.get("file").type == "application/json"
def test_request_multipart_file_without_field_name(app, caplog):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
payload = (
'------sanic\r\nContent-Disposition: form-data; filename="test.json"'
"\r\nContent-Type: application/json\r\n\r\n\r\n------sanic--"
)
headers = {"content-type": "multipart/form-data; boundary=------sanic"}
request, _ = app.test_client.post(
data=payload, headers=headers, debug=True
)
with caplog.at_level(logging.DEBUG):
request.form
assert caplog.record_tuples[-1] == (
"sanic.root",
logging.DEBUG,
"Form-data field does not have a 'name' parameter "
"in the Content-Disposition header",
)
def test_request_multipart_file_duplicate_filed_name(app):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
payload = (
"--e73ffaa8b1b2472b8ec848de833cb05b\r\n"
'Content-Disposition: form-data; name="file"\r\n'
"Content-Type: application/octet-stream\r\n"
"Content-Length: 15\r\n"
"\r\n"
'{"test":"json"}\r\n'
"--e73ffaa8b1b2472b8ec848de833cb05b\r\n"
'Content-Disposition: form-data; name="file"\r\n'
"Content-Type: application/octet-stream\r\n"
"Content-Length: 15\r\n"
"\r\n"
'{"test":"json2"}\r\n'
"--e73ffaa8b1b2472b8ec848de833cb05b--\r\n"
)
headers = {
"Content-Type": "multipart/form-data; boundary=e73ffaa8b1b2472b8ec848de833cb05b"
}
request, _ = app.test_client.post(
data=payload, headers=headers, debug=True
)
assert request.form.getlist("file") == [
'{"test":"json"}',
'{"test":"json2"}',
]
def test_request_multipart_with_multiple_files_and_type(app):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
payload = (
'------sanic\r\nContent-Disposition: form-data; name="file"; filename="test.json"'
"\r\nContent-Type: application/json\r\n\r\n\r\n"
'------sanic\r\nContent-Disposition: form-data; name="file"; filename="some_file.pdf"\r\n'
"Content-Type: application/pdf\r\n\r\n\r\n------sanic--"
)
headers = {"content-type": "multipart/form-data; boundary=------sanic"}
request, _ = app.test_client.post(data=payload, headers=headers)
assert len(request.files.getlist("file")) == 2
assert request.files.getlist("file")[0].type == "application/json"
assert request.files.getlist("file")[1].type == "application/pdf"
def test_request_repr(app):
@app.get("/")
def handler(request):
return text("pass")
request, response = app.test_client.get("/")
assert repr(request) == "<Request: GET />"
request.method = None
assert repr(request) == "<Request: None />"
def test_request_bool(app):
@app.get("/")
def handler(request):
return text("pass")
request, response = app.test_client.get("/")
assert bool(request)
request.transport = False
assert not bool(request)
def test_request_parsing_form_failed(app, caplog):
@app.route("/", methods=["POST"])
async def handler(request):
return text("OK")
payload = "test=OK"
headers = {"content-type": "multipart/form-data"}
request, response = app.test_client.post(
"/", data=payload, headers=headers
)
with caplog.at_level(logging.ERROR):
request.form
assert caplog.record_tuples[-1] == (
"sanic.error",
logging.ERROR,
"Failed when parsing form",
)
def test_request_args_no_query_string(app):
@app.get("/")
def handler(request):
return text("pass")
request, response = app.test_client.get("/")
assert request.args == {}
def test_request_raw_args(app):
params = {"test": "OK"}
@app.get("/")
def handler(request):
return text("pass")
request, response = app.test_client.get("/", params=params)
assert request.raw_args == params
def test_request_query_args(app):
# test multiple params with the same key
params = [('test', 'value1'), ('test', 'value2')]
@app.get("/")
def handler(request):
return text("pass")
request, response = app.test_client.get("/", params=params)
assert request.query_args == params
# test cached value
assert request.parsed_not_grouped_args[(False, False, "utf-8", "replace")] == request.query_args
# test params directly in the url
request, response = app.test_client.get("/?test=value1&test=value2")
assert request.query_args == params
# test unique params
params = [('test1', 'value1'), ('test2', 'value2')]
request, response = app.test_client.get("/", params=params)
assert request.query_args == params
# test no params
request, response = app.test_client.get("/")
assert not request.query_args
def test_request_query_args_custom_parsing(app):
@app.get("/")
def handler(request):
return text("pass")
request, response = app.test_client.get("/?test1=value1&test2=&test3=value3")
assert request.get_query_args(
keep_blank_values=True
) == [
('test1', 'value1'), ('test2', ''), ('test3', 'value3')
]
assert request.query_args == [
('test1', 'value1'), ('test3', 'value3')
]
assert request.get_query_args(
keep_blank_values=False
) == [
('test1', 'value1'), ('test3', 'value3')
]
assert request.get_args(
keep_blank_values=True
) == RequestParameters(
{"test1": ["value1"], "test2": [""], "test3": ["value3"]}
)
assert request.args == RequestParameters(
{"test1": ["value1"], "test3": ["value3"]}
)
assert request.get_args(
keep_blank_values=False
) == RequestParameters(
{"test1": ["value1"], "test3": ["value3"]}
)
def test_request_cookies(app):
cookies = {"test": "OK"}
@app.get("/")
def handler(request):
return text("OK")
request, response = app.test_client.get("/", cookies=cookies)
assert request.cookies == cookies
assert request.cookies == cookies # For request._cookies
def test_request_cookies_without_cookies(app):
@app.get("/")
def handler(request):
return text("OK")
request, response = app.test_client.get("/")
assert request.cookies == {}
def test_request_port(app):
@app.get("/")
def handler(request):
return text("OK")
request, response = app.test_client.get("/")
port = request.port
assert isinstance(port, int)
delattr(request, "_socket")
delattr(request, "_port")
port = request.port
assert isinstance(port, int)
assert hasattr(request, "_socket")
assert hasattr(request, "_port")
def test_request_socket(app):
@app.get("/")
def handler(request):
return text("OK")
request, response = app.test_client.get("/")
socket = request.socket
assert isinstance(socket, tuple)
ip = socket[0]
port = socket[1]
assert ip == request.ip
assert port == request.port
delattr(request, "_socket")
socket = request.socket
assert isinstance(socket, tuple)
assert hasattr(request, "_socket")
def test_request_form_invalid_content_type(app):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
request, response = app.test_client.post("/", json={"test": "OK"})
assert request.form == {}
def test_endpoint_basic():
app = Sanic()
@app.route("/")
def my_unique_handler(request):
return text("Hello")
request, response = app.test_client.get("/")
assert request.endpoint == "test_requests.my_unique_handler"
def test_endpoint_named_app():
app = Sanic("named")
@app.route("/")
def my_unique_handler(request):
return text("Hello")
request, response = app.test_client.get("/")
assert request.endpoint == "named.my_unique_handler"
def test_endpoint_blueprint():
bp = Blueprint("my_blueprint", url_prefix="/bp")
@bp.route("/")
async def bp_root(request):
return text("Hello")
app = Sanic("named")
app.blueprint(bp)
request, response = app.test_client.get("/bp")
assert request.endpoint == "named.my_blueprint.bp_root"