Preserve blank form values for urlencoded forms (option) (#2439)
* task(request.form): Add tests for blank values * fix(request): abstract form property to implement get_form(), allow for preserving of blanks * fix(request): hinting for parsed_form * fix(request): typing for parsed_files * fix(request): ignore type assumption * fix(request): mypy typechecking caused E501 when type set to ignore * fix(request): mypy is too stupid to parse continuations * fix(request): formatting * fix(request): fix annotation and return for get_form() * fix(request): linting, hinting
This commit is contained in:
parent
3a6cc7389c
commit
78b6723149
|
@ -152,8 +152,8 @@ class Request:
|
|||
self.parsed_accept: Optional[AcceptContainer] = None
|
||||
self.parsed_credentials: Optional[Credentials] = None
|
||||
self.parsed_json = None
|
||||
self.parsed_form = None
|
||||
self.parsed_files = None
|
||||
self.parsed_form: Optional[RequestParameters] = None
|
||||
self.parsed_files: Optional[RequestParameters] = None
|
||||
self.parsed_token: Optional[str] = None
|
||||
self.parsed_args: DefaultDict[
|
||||
Tuple[bool, bool, str, str], RequestParameters
|
||||
|
@ -426,28 +426,40 @@ class Request:
|
|||
pass
|
||||
return self.parsed_credentials
|
||||
|
||||
def get_form(
|
||||
self, keep_blank_values: bool = False
|
||||
) -> Optional[RequestParameters]:
|
||||
self.parsed_form = RequestParameters()
|
||||
self.parsed_files = RequestParameters()
|
||||
content_type = self.headers.getone(
|
||||
"content-type", DEFAULT_HTTP_CONTENT_TYPE
|
||||
)
|
||||
content_type, parameters = parse_content_header(content_type)
|
||||
try:
|
||||
if content_type == "application/x-www-form-urlencoded":
|
||||
self.parsed_form = RequestParameters(
|
||||
parse_qs(
|
||||
self.body.decode("utf-8"),
|
||||
keep_blank_values=keep_blank_values,
|
||||
)
|
||||
)
|
||||
elif content_type == "multipart/form-data":
|
||||
# TODO: Stream this instead of reading to/from memory
|
||||
boundary = parameters["boundary"].encode( # type: ignore
|
||||
"utf-8"
|
||||
) # type: ignore
|
||||
self.parsed_form, self.parsed_files = parse_multipart_form(
|
||||
self.body, boundary
|
||||
)
|
||||
except Exception:
|
||||
error_logger.exception("Failed when parsing form")
|
||||
|
||||
return self.parsed_form
|
||||
|
||||
@property
|
||||
def form(self):
|
||||
if self.parsed_form is None:
|
||||
self.parsed_form = RequestParameters()
|
||||
self.parsed_files = RequestParameters()
|
||||
content_type = self.headers.getone(
|
||||
"content-type", DEFAULT_HTTP_CONTENT_TYPE
|
||||
)
|
||||
content_type, parameters = parse_content_header(content_type)
|
||||
try:
|
||||
if content_type == "application/x-www-form-urlencoded":
|
||||
self.parsed_form = RequestParameters(
|
||||
parse_qs(self.body.decode("utf-8"))
|
||||
)
|
||||
elif content_type == "multipart/form-data":
|
||||
# TODO: Stream this instead of reading to/from memory
|
||||
boundary = parameters["boundary"].encode("utf-8")
|
||||
self.parsed_form, self.parsed_files = parse_multipart_form(
|
||||
self.body, boundary
|
||||
)
|
||||
except Exception:
|
||||
error_logger.exception("Failed when parsing form")
|
||||
self.get_form()
|
||||
|
||||
return self.parsed_form
|
||||
|
||||
|
|
|
@ -1016,6 +1016,72 @@ async def test_post_form_urlencoded_asgi(app):
|
|||
assert request.form.get("test") == "OK" # For request.parsed_form
|
||||
|
||||
|
||||
def test_post_form_urlencoded_keep_blanks(app):
|
||||
@app.route("/", methods=["POST"])
|
||||
async def handler(request):
|
||||
request.get_form(keep_blank_values=True)
|
||||
return text("OK")
|
||||
|
||||
payload = "test="
|
||||
headers = {"content-type": "application/x-www-form-urlencoded"}
|
||||
|
||||
request, response = app.test_client.post(
|
||||
"/", data=payload, headers=headers
|
||||
)
|
||||
|
||||
assert request.form.get("test") == ""
|
||||
assert request.form.get("test") == "" # For request.parsed_form
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_post_form_urlencoded_keep_blanks_asgi(app):
|
||||
@app.route("/", methods=["POST"])
|
||||
async def handler(request):
|
||||
request.get_form(keep_blank_values=True)
|
||||
return text("OK")
|
||||
|
||||
payload = "test="
|
||||
headers = {"content-type": "application/x-www-form-urlencoded"}
|
||||
|
||||
request, response = await app.asgi_client.post(
|
||||
"/", data=payload, headers=headers
|
||||
)
|
||||
|
||||
assert request.form.get("test") == ""
|
||||
assert request.form.get("test") == "" # For request.parsed_form
|
||||
|
||||
|
||||
|
||||
def test_post_form_urlencoded_drop_blanks(app):
|
||||
@app.route("/", methods=["POST"])
|
||||
async def handler(request):
|
||||
return text("OK")
|
||||
|
||||
payload = "test="
|
||||
headers = {"content-type": "application/x-www-form-urlencoded"}
|
||||
|
||||
request, response = app.test_client.post(
|
||||
"/", data=payload, headers=headers
|
||||
)
|
||||
|
||||
assert "test" not in request.form.keys()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_post_form_urlencoded_drop_blanks_asgi(app):
|
||||
@app.route("/", methods=["POST"])
|
||||
async def handler(request):
|
||||
return text("OK")
|
||||
|
||||
payload = "test="
|
||||
headers = {"content-type": "application/x-www-form-urlencoded"}
|
||||
|
||||
request, response = await app.asgi_client.post(
|
||||
"/", data=payload, headers=headers
|
||||
)
|
||||
|
||||
assert "test" not in request.form.keys()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"payload",
|
||||
[
|
||||
|
|
Loading…
Reference in New Issue
Block a user