diff --git a/sanic/response/__init__.py b/sanic/response/__init__.py new file mode 100644 index 00000000..99b93075 --- /dev/null +++ b/sanic/response/__init__.py @@ -0,0 +1,36 @@ +from .convenience import ( + empty, + file, + file_stream, + html, + json, + raw, + redirect, + text, + validate_file, +) +from .types import ( + BaseHTTPResponse, + HTTPResponse, + JSONResponse, + ResponseStream, + json_dumps, +) + + +__all__ = ( + "BaseHTTPResponse", + "HTTPResponse", + "JSONResponse", + "ResponseStream", + "empty", + "json", + "text", + "raw", + "html", + "validate_file", + "file", + "redirect", + "file_stream", + "json_dumps", +) diff --git a/sanic/response.py b/sanic/response/convenience.py similarity index 56% rename from sanic/response.py rename to sanic/response/convenience.py index 8e6c2632..429b3214 100644 --- a/sanic/response.py +++ b/sanic/response/convenience.py @@ -2,212 +2,20 @@ from __future__ import annotations from datetime import datetime, timezone from email.utils import formatdate, parsedate_to_datetime -from functools import partial from mimetypes import guess_type from os import path from pathlib import PurePath from time import time -from typing import ( - TYPE_CHECKING, - Any, - AnyStr, - Callable, - Coroutine, - Dict, - Iterator, - Optional, - Tuple, - TypeVar, - Union, -) +from typing import Any, AnyStr, Callable, Dict, Optional, Union from urllib.parse import quote_plus from sanic.compat import Header, open_async, stat_async from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE -from sanic.cookies import CookieJar -from sanic.exceptions import SanicException, ServerError -from sanic.helpers import ( - Default, - _default, - has_message_body, - remove_entity_headers, -) -from sanic.http import Http +from sanic.helpers import Default, _default from sanic.log import logger from sanic.models.protocol_types import HTMLProtocol, Range - -if TYPE_CHECKING: - from sanic.asgi import ASGIApp - from sanic.http.http3 import HTTPReceiver - from sanic.request import Request -else: - Request = TypeVar("Request") - - -try: - from ujson import dumps as json_dumps -except ImportError: - # This is done in order to ensure that the JSON response is - # kept consistent across both ujson and inbuilt json usage. - from json import dumps - - json_dumps = partial(dumps, separators=(",", ":")) - - -class BaseHTTPResponse: - """ - The base class for all HTTP Responses - """ - - __slots__ = ( - "asgi", - "body", - "content_type", - "stream", - "status", - "headers", - "_cookies", - ) - - _dumps = json_dumps - - def __init__(self): - self.asgi: bool = False - self.body: Optional[bytes] = None - self.content_type: Optional[str] = None - self.stream: Optional[Union[Http, ASGIApp, HTTPReceiver]] = None - self.status: int = None - self.headers = Header({}) - self._cookies: Optional[CookieJar] = None - - def __repr__(self): - class_name = self.__class__.__name__ - return f"<{class_name}: {self.status} {self.content_type}>" - - def _encode_body(self, data: Optional[AnyStr]): - if data is None: - return b"" - return ( - data.encode() if hasattr(data, "encode") else data # type: ignore - ) - - @property - def cookies(self) -> CookieJar: - """ - The response cookies. Cookies should be set and written as follows: - - .. code-block:: python - - response.cookies["test"] = "It worked!" - response.cookies["test"]["domain"] = ".yummy-yummy-cookie.com" - response.cookies["test"]["httponly"] = True - - `See user guide re: cookies - `__ - - :return: the cookie jar - :rtype: CookieJar - """ - if self._cookies is None: - self._cookies = CookieJar(self.headers) - return self._cookies - - @property - def processed_headers(self) -> Iterator[Tuple[bytes, bytes]]: - """ - Obtain a list of header tuples encoded in bytes for sending. - - Add and remove headers based on status and content_type. - - :return: response headers - :rtype: Tuple[Tuple[bytes, bytes], ...] - """ - # TODO: Make a blacklist set of header names and then filter with that - if self.status in (304, 412): # Not Modified, Precondition Failed - self.headers = remove_entity_headers(self.headers) - if has_message_body(self.status): - self.headers.setdefault("content-type", self.content_type) - # Encode headers into bytes - return ( - (name.encode("ascii"), f"{value}".encode(errors="surrogateescape")) - for name, value in self.headers.items() - ) - - async def send( - self, - data: Optional[AnyStr] = None, - end_stream: Optional[bool] = None, - ) -> None: - """ - Send any pending response headers and the given data as body. - - :param data: str or bytes to be written - :param end_stream: whether to close the stream after this block - """ - if data is None and end_stream is None: - end_stream = True - if self.stream is None: - raise SanicException( - "No stream is connected to the response object instance." - ) - if self.stream.send is None: - if end_stream and not data: - return - raise ServerError( - "Response stream was ended, no more response data is " - "allowed to be sent." - ) - data = ( - data.encode() # type: ignore - if hasattr(data, "encode") - else data or b"" - ) - await self.stream.send( - data, # type: ignore - end_stream=end_stream or False, - ) - - -class HTTPResponse(BaseHTTPResponse): - """ - HTTP response to be sent back to the client. - - :param body: the body content to be returned - :type body: Optional[bytes] - :param status: HTTP response number. **Default=200** - :type status: int - :param headers: headers to be returned - :type headers: Optional; - :param content_type: content type to be returned (as a header) - :type content_type: Optional[str] - """ - - __slots__ = () - - def __init__( - self, - body: Optional[AnyStr] = None, - status: int = 200, - headers: Optional[Union[Header, Dict[str, str]]] = None, - content_type: Optional[str] = None, - ): - super().__init__() - - self.content_type: Optional[str] = content_type - self.body = self._encode_body(body) - self.status = status - self.headers = Header(headers or {}) - self._cookies = None - - async def eof(self): - await self.send("", True) - - async def __aenter__(self): - return self.send - - async def __aexit__(self, *_): - await self.eof() +from .types import HTTPResponse, JSONResponse, ResponseStream def empty( @@ -229,7 +37,7 @@ def json( content_type: str = "application/json", dumps: Optional[Callable[..., str]] = None, **kwargs: Any, -) -> HTTPResponse: +) -> JSONResponse: """ Returns response object with body in json format. @@ -238,13 +46,14 @@ def json( :param headers: Custom Headers. :param kwargs: Remaining arguments that are passed to the json encoder. """ - if not dumps: - dumps = BaseHTTPResponse._dumps - return HTTPResponse( - dumps(body, **kwargs), - headers=headers, + + return JSONResponse( + body, status=status, + headers=headers, content_type=content_type, + dumps=dumps, + **kwargs, ) @@ -465,80 +274,6 @@ def redirect( ) -class ResponseStream: - """ - ResponseStream is a compat layer to bridge the gap after the deprecation - of StreamingHTTPResponse. It will be removed when: - - file_stream is moved to new style streaming - - file and file_stream are combined into a single API - """ - - __slots__ = ( - "_cookies", - "content_type", - "headers", - "request", - "response", - "status", - "streaming_fn", - ) - - def __init__( - self, - streaming_fn: Callable[ - [Union[BaseHTTPResponse, ResponseStream]], - Coroutine[Any, Any, None], - ], - status: int = 200, - headers: Optional[Union[Header, Dict[str, str]]] = None, - content_type: Optional[str] = None, - ): - self.streaming_fn = streaming_fn - self.status = status - self.headers = headers or Header() - self.content_type = content_type - self.request: Optional[Request] = None - self._cookies: Optional[CookieJar] = None - - async def write(self, message: str): - await self.response.send(message) - - async def stream(self) -> HTTPResponse: - if not self.request: - raise ServerError("Attempted response to unknown request") - self.response = await self.request.respond( - headers=self.headers, - status=self.status, - content_type=self.content_type, - ) - await self.streaming_fn(self) - return self.response - - async def eof(self) -> None: - await self.response.eof() - - @property - def cookies(self) -> CookieJar: - if self._cookies is None: - self._cookies = CookieJar(self.headers) - return self._cookies - - @property - def processed_headers(self): - return self.response.processed_headers - - @property - def body(self): - return self.response.body - - def __call__(self, request: Request) -> ResponseStream: - self.request = request - return self - - def __await__(self): - return self.stream().__await__() - - async def file_stream( location: Union[str, PurePath], status: int = 200, diff --git a/sanic/response/types.py b/sanic/response/types.py new file mode 100644 index 00000000..fe3c941e --- /dev/null +++ b/sanic/response/types.py @@ -0,0 +1,453 @@ +from __future__ import annotations + +from functools import partial +from typing import ( + TYPE_CHECKING, + Any, + AnyStr, + Callable, + Coroutine, + Dict, + Iterator, + Optional, + Tuple, + TypeVar, + Union, +) + +from sanic.compat import Header +from sanic.cookies import CookieJar +from sanic.exceptions import SanicException, ServerError +from sanic.helpers import ( + Default, + _default, + has_message_body, + remove_entity_headers, +) +from sanic.http import Http + + +if TYPE_CHECKING: + from sanic.asgi import ASGIApp + from sanic.http.http3 import HTTPReceiver + from sanic.request import Request +else: + Request = TypeVar("Request") + + +try: + from ujson import dumps as json_dumps +except ImportError: + # This is done in order to ensure that the JSON response is + # kept consistent across both ujson and inbuilt json usage. + from json import dumps + + json_dumps = partial(dumps, separators=(",", ":")) + + +class BaseHTTPResponse: + """ + The base class for all HTTP Responses + """ + + __slots__ = ( + "asgi", + "body", + "content_type", + "stream", + "status", + "headers", + "_cookies", + ) + + _dumps = json_dumps + + def __init__(self): + self.asgi: bool = False + self.body: Optional[bytes] = None + self.content_type: Optional[str] = None + self.stream: Optional[Union[Http, ASGIApp, HTTPReceiver]] = None + self.status: int = None + self.headers = Header({}) + self._cookies: Optional[CookieJar] = None + + def __repr__(self): + class_name = self.__class__.__name__ + return f"<{class_name}: {self.status} {self.content_type}>" + + def _encode_body(self, data: Optional[AnyStr]): + if data is None: + return b"" + return ( + data.encode() if hasattr(data, "encode") else data # type: ignore + ) + + @property + def cookies(self) -> CookieJar: + """ + The response cookies. Cookies should be set and written as follows: + + .. code-block:: python + + response.cookies["test"] = "It worked!" + response.cookies["test"]["domain"] = ".yummy-yummy-cookie.com" + response.cookies["test"]["httponly"] = True + + `See user guide re: cookies + ` + + :return: the cookie jar + :rtype: CookieJar + """ + if self._cookies is None: + self._cookies = CookieJar(self.headers) + return self._cookies + + @property + def processed_headers(self) -> Iterator[Tuple[bytes, bytes]]: + """ + Obtain a list of header tuples encoded in bytes for sending. + + Add and remove headers based on status and content_type. + + :return: response headers + :rtype: Tuple[Tuple[bytes, bytes], ...] + """ + # TODO: Make a blacklist set of header names and then filter with that + if self.status in (304, 412): # Not Modified, Precondition Failed + self.headers = remove_entity_headers(self.headers) + if has_message_body(self.status): + self.headers.setdefault("content-type", self.content_type) + # Encode headers into bytes + return ( + (name.encode("ascii"), f"{value}".encode(errors="surrogateescape")) + for name, value in self.headers.items() + ) + + async def send( + self, + data: Optional[AnyStr] = None, + end_stream: Optional[bool] = None, + ) -> None: + """ + Send any pending response headers and the given data as body. + + :param data: str or bytes to be written + :param end_stream: whether to close the stream after this block + """ + if data is None and end_stream is None: + end_stream = True + if self.stream is None: + raise SanicException( + "No stream is connected to the response object instance." + ) + if self.stream.send is None: + if end_stream and not data: + return + raise ServerError( + "Response stream was ended, no more response data is " + "allowed to be sent." + ) + data = ( + data.encode() # type: ignore + if hasattr(data, "encode") + else data or b"" + ) + await self.stream.send( + data, # type: ignore + end_stream=end_stream or False, + ) + + +class HTTPResponse(BaseHTTPResponse): + """ + HTTP response to be sent back to the client. + + :param body: the body content to be returned + :type body: Optional[bytes] + :param status: HTTP response number. **Default=200** + :type status: int + :param headers: headers to be returned + :type headers: Optional; + :param content_type: content type to be returned (as a header) + :type content_type: Optional[str] + """ + + __slots__ = () + + def __init__( + self, + body: Optional[Any] = None, + status: int = 200, + headers: Optional[Union[Header, Dict[str, str]]] = None, + content_type: Optional[str] = None, + ): + super().__init__() + + self.content_type: Optional[str] = content_type + self.body = self._encode_body(body) + self.status = status + self.headers = Header(headers or {}) + self._cookies = None + + async def eof(self): + await self.send("", True) + + async def __aenter__(self): + return self.send + + async def __aexit__(self, *_): + await self.eof() + + +class JSONResponse(HTTPResponse): + """ + HTTP response to be sent back to the client, when the response + is of json type. Offers several utilities to manipulate common + json data types. + + :param body: the body content to be returned + :type body: Optional[Any] + :param status: HTTP response number. **Default=200** + :type status: int + :param headers: headers to be returned + :type headers: Optional + :param content_type: content type to be returned (as a header) + :type content_type: Optional[str] + :param dumps: json.dumps function to use + :type dumps: Optional[Callable] + """ + + __slots__ = ( + "_body", + "_body_manually_set", + "_initialized", + "_raw_body", + "_use_dumps", + "_use_dumps_kwargs", + ) + + def __init__( + self, + body: Optional[Any] = None, + status: int = 200, + headers: Optional[Union[Header, Dict[str, str]]] = None, + content_type: Optional[str] = None, + dumps: Optional[Callable[..., str]] = None, + **kwargs: Any, + ): + self._initialized = False + self._body_manually_set = False + + self._use_dumps = dumps or BaseHTTPResponse._dumps + self._use_dumps_kwargs = kwargs + + self._raw_body = body + + super().__init__( + self._encode_body(self._use_dumps(body, **self._use_dumps_kwargs)), + headers=headers, + status=status, + content_type=content_type, + ) + + self._initialized = True + + def _check_body_not_manually_set(self): + if self._body_manually_set: + raise SanicException( + "Cannot use raw_body after body has been manually set." + ) + + @property + def raw_body(self) -> Optional[Any]: + """Returns the raw body, as long as body has not been manually + set previously. + + NOTE: This object should not be mutated, as it will not be + reflected in the response body. If you need to mutate the + response body, consider using one of the provided methods in + this class or alternatively call set_body() with the mutated + object afterwards or set the raw_body property to it. + """ + + self._check_body_not_manually_set() + return self._raw_body + + @raw_body.setter + def raw_body(self, value: Any): + self._body_manually_set = False + self._body = self._encode_body( + self._use_dumps(value, **self._use_dumps_kwargs) + ) + self._raw_body = value + + @property # type: ignore + def body(self) -> Optional[bytes]: # type: ignore + return self._body + + @body.setter + def body(self, value: Optional[bytes]): + self._body = value + if not self._initialized: + return + self._body_manually_set = True + + def set_body( + self, + body: Any, + dumps: Optional[Callable[..., str]] = None, + **dumps_kwargs: Any, + ) -> None: + """Sets a new response body using the given dumps function + and kwargs, or falling back to the defaults given when + creating the object if none are specified. + """ + + self._body_manually_set = False + self._raw_body = body + + use_dumps = dumps or self._use_dumps + use_dumps_kwargs = dumps_kwargs if dumps else self._use_dumps_kwargs + + self._body = self._encode_body(use_dumps(body, **use_dumps_kwargs)) + + def append(self, value: Any) -> None: + """Appends a value to the response raw_body, ensuring that + body is kept up to date. This can only be used if raw_body + is a list. + """ + + self._check_body_not_manually_set() + + if not isinstance(self._raw_body, list): + raise SanicException("Cannot append to a non-list object.") + + self._raw_body.append(value) + self.raw_body = self._raw_body + + def extend(self, value: Any) -> None: + """Extends the response's raw_body with the given values, ensuring + that body is kept up to date. This can only be used if raw_body is + a list. + """ + + self._check_body_not_manually_set() + + if not isinstance(self._raw_body, list): + raise SanicException("Cannot extend a non-list object.") + + self._raw_body.extend(value) + self.raw_body = self._raw_body + + def update(self, *args, **kwargs) -> None: + """Updates the response's raw_body with the given values, ensuring + that body is kept up to date. This can only be used if raw_body is + a dict. + """ + + self._check_body_not_manually_set() + + if not isinstance(self._raw_body, dict): + raise SanicException("Cannot update a non-dict object.") + + self._raw_body.update(*args, **kwargs) + self.raw_body = self._raw_body + + def pop(self, key: Any, default: Any = _default) -> Any: + """Pops a key from the response's raw_body, ensuring that body is + kept up to date. This can only be used if raw_body is a dict or a + list. + """ + + self._check_body_not_manually_set() + + if not isinstance(self._raw_body, (list, dict)): + raise SanicException( + "Cannot pop from a non-list and non-dict object." + ) + + if isinstance(default, Default): + value = self._raw_body.pop(key) + elif isinstance(self._raw_body, list): + raise TypeError("pop doesn't accept a default argument for lists") + else: + value = self._raw_body.pop(key, default) + + self.raw_body = self._raw_body + + return value + + +class ResponseStream: + """ + ResponseStream is a compat layer to bridge the gap after the deprecation + of StreamingHTTPResponse. It will be removed when: + - file_stream is moved to new style streaming + - file and file_stream are combined into a single API + """ + + __slots__ = ( + "_cookies", + "content_type", + "headers", + "request", + "response", + "status", + "streaming_fn", + ) + + def __init__( + self, + streaming_fn: Callable[ + [Union[BaseHTTPResponse, ResponseStream]], + Coroutine[Any, Any, None], + ], + status: int = 200, + headers: Optional[Union[Header, Dict[str, str]]] = None, + content_type: Optional[str] = None, + ): + self.streaming_fn = streaming_fn + self.status = status + self.headers = headers or Header() + self.content_type = content_type + self.request: Optional[Request] = None + self._cookies: Optional[CookieJar] = None + + async def write(self, message: str): + await self.response.send(message) + + async def stream(self) -> HTTPResponse: + if not self.request: + raise ServerError("Attempted response to unknown request") + self.response = await self.request.respond( + headers=self.headers, + status=self.status, + content_type=self.content_type, + ) + await self.streaming_fn(self) + return self.response + + async def eof(self) -> None: + await self.response.eof() + + @property + def cookies(self) -> CookieJar: + if self._cookies is None: + self._cookies = CookieJar(self.headers) + return self._cookies + + @property + def processed_headers(self): + return self.response.processed_headers + + @property + def body(self): + return self.response.body + + def __call__(self, request: Request) -> ResponseStream: + self.request = request + return self + + def __await__(self): + return self.stream().__await__() diff --git a/tests/test_response_json.py b/tests/test_response_json.py new file mode 100644 index 00000000..c89dba42 --- /dev/null +++ b/tests/test_response_json.py @@ -0,0 +1,215 @@ +import json + +from functools import partial +from unittest.mock import Mock + +import pytest + +from sanic import Request, Sanic +from sanic.exceptions import SanicException +from sanic.response import json as json_response +from sanic.response.types import JSONResponse + + +JSON_BODY = {"ok": True} +json_dumps = partial(json.dumps, separators=(",", ":")) + + +@pytest.fixture +def json_app(app: Sanic): + @app.get("/json") + async def handle(request: Request): + return json_response(JSON_BODY) + + return app + + +def test_body_can_be_retrieved(json_app: Sanic): + _, resp = json_app.test_client.get("/json") + assert resp.body == json_dumps(JSON_BODY).encode() + + +def test_body_can_be_set(json_app: Sanic): + new_body = b'{"hello":"world"}' + + @json_app.on_response + def set_body(request: Request, response: JSONResponse): + response.body = new_body + + _, resp = json_app.test_client.get("/json") + assert resp.body == new_body + + +def test_raw_body_can_be_retrieved(json_app: Sanic): + @json_app.on_response + def check_body(request: Request, response: JSONResponse): + assert response.raw_body == JSON_BODY + + json_app.test_client.get("/json") + + +def test_raw_body_can_be_set(json_app: Sanic): + new_body = {"hello": "world"} + + @json_app.on_response + def set_body(request: Request, response: JSONResponse): + response.raw_body = new_body + assert response.raw_body == new_body + assert response.body == json_dumps(new_body).encode() + + json_app.test_client.get("/json") + + +def test_raw_body_cant_be_retrieved_after_body_set(json_app: Sanic): + new_body = b'{"hello":"world"}' + + @json_app.on_response + def check_raw_body(request: Request, response: JSONResponse): + response.body = new_body + with pytest.raises(SanicException): + response.raw_body + + json_app.test_client.get("/json") + + +def test_raw_body_can_be_reset_after_body_set(json_app: Sanic): + new_body = b'{"hello":"world"}' + new_new_body = {"lorem": "ipsum"} + + @json_app.on_response + def set_bodies(request: Request, response: JSONResponse): + response.body = new_body + response.raw_body = new_new_body + + _, resp = json_app.test_client.get("/json") + assert resp.body == json_dumps(new_new_body).encode() + + +def test_set_body_method(json_app: Sanic): + new_body = {"lorem": "ipsum"} + + @json_app.on_response + def set_body(request: Request, response: JSONResponse): + response.set_body(new_body) + + _, resp = json_app.test_client.get("/json") + assert resp.body == json_dumps(new_body).encode() + + +def test_set_body_method_after_body_set(json_app: Sanic): + new_body = b'{"hello":"world"}' + new_new_body = {"lorem": "ipsum"} + + @json_app.on_response + def set_body(request: Request, response: JSONResponse): + response.body = new_body + response.set_body(new_new_body) + + _, resp = json_app.test_client.get("/json") + assert resp.body == json_dumps(new_new_body).encode() + + +def test_custom_dumps_and_kwargs(json_app: Sanic): + custom_dumps = Mock(return_value="custom") + + @json_app.get("/json-custom") + async def handle_custom(request: Request): + return json_response(JSON_BODY, dumps=custom_dumps, prry="platypus") + + _, resp = json_app.test_client.get("/json-custom") + assert resp.body == "custom".encode() + custom_dumps.assert_called_once_with(JSON_BODY, prry="platypus") + + +def test_override_dumps_and_kwargs(json_app: Sanic): + custom_dumps_1 = Mock(return_value="custom1") + custom_dumps_2 = Mock(return_value="custom2") + + @json_app.get("/json-custom") + async def handle_custom(request: Request): + return json_response(JSON_BODY, dumps=custom_dumps_1, prry="platypus") + + @json_app.on_response + def set_body(request: Request, response: JSONResponse): + response.set_body(JSON_BODY, dumps=custom_dumps_2, platypus="prry") + + _, resp = json_app.test_client.get("/json-custom") + + assert resp.body == "custom2".encode() + custom_dumps_1.assert_called_once_with(JSON_BODY, prry="platypus") + custom_dumps_2.assert_called_once_with(JSON_BODY, platypus="prry") + + +def test_append(json_app: Sanic): + @json_app.get("/json-append") + async def handler_append(request: Request): + return json_response(["a", "b"], status=200) + + @json_app.on_response + def do_append(request: Request, response: JSONResponse): + response.append("c") + + _, resp = json_app.test_client.get("/json-append") + assert resp.body == json_dumps(["a", "b", "c"]).encode() + + +def test_extend(json_app: Sanic): + @json_app.get("/json-extend") + async def handler_extend(request: Request): + return json_response(["a", "b"], status=200) + + @json_app.on_response + def do_extend(request: Request, response: JSONResponse): + response.extend(["c", "d"]) + + _, resp = json_app.test_client.get("/json-extend") + assert resp.body == json_dumps(["a", "b", "c", "d"]).encode() + + +def test_update(json_app: Sanic): + @json_app.get("/json-update") + async def handler_update(request: Request): + return json_response({"a": "b"}, status=200) + + @json_app.on_response + def do_update(request: Request, response: JSONResponse): + response.update({"c": "d"}, e="f") + + _, resp = json_app.test_client.get("/json-update") + assert resp.body == json_dumps({"a": "b", "c": "d", "e": "f"}).encode() + + +def test_pop_dict(json_app: Sanic): + @json_app.get("/json-pop") + async def handler_pop(request: Request): + return json_response({"a": "b", "c": "d"}, status=200) + + @json_app.on_response + def do_pop(request: Request, response: JSONResponse): + val = response.pop("c") + assert val == "d" + + val_default = response.pop("e", "f") + assert val_default == "f" + + _, resp = json_app.test_client.get("/json-pop") + assert resp.body == json_dumps({"a": "b"}).encode() + + +def test_pop_list(json_app: Sanic): + @json_app.get("/json-pop") + async def handler_pop(request: Request): + return json_response(["a", "b"], status=200) + + @json_app.on_response + def do_pop(request: Request, response: JSONResponse): + val = response.pop(0) + assert val == "a" + + with pytest.raises( + TypeError, match="pop doesn't accept a default argument for lists" + ): + response.pop(21, "nah nah") + + _, resp = json_app.test_client.get("/json-pop") + assert resp.body == json_dumps(["b"]).encode()