fix: GIT-1623: handle cookie duplication and serialization issue
Signed-off-by: Harsha Narayana <harsha2k4@gmail.com>
This commit is contained in:
		| @@ -1,7 +1,6 @@ | ||||
| import asyncio | ||||
| import warnings | ||||
|  | ||||
| from http.cookies import SimpleCookie | ||||
| from inspect import isawaitable | ||||
| from typing import Any, Awaitable, Callable, MutableMapping, Union | ||||
| from urllib.parse import quote | ||||
| @@ -288,11 +287,22 @@ class ASGIApp: | ||||
|         """ | ||||
|         Write the response. | ||||
|         """ | ||||
|  | ||||
|         headers = [] | ||||
|         cookies = dict() | ||||
|         try: | ||||
|             headers = [ | ||||
|             cookies = { | ||||
|                 v.key: v | ||||
|                 for _, v in list( | ||||
|                     filter( | ||||
|                         lambda item: item[0].lower() == "set-cookie", | ||||
|                         response.headers.items(), | ||||
|                     ) | ||||
|                 ) | ||||
|             } | ||||
|             headers += [ | ||||
|                 (str(name).encode("latin-1"), str(value).encode("latin-1")) | ||||
|                 for name, value in response.headers.items() | ||||
|                 if name.lower() not in ["set-cookie"] | ||||
|             ] | ||||
|         except AttributeError: | ||||
|             logger.error( | ||||
| @@ -319,12 +329,18 @@ class ASGIApp: | ||||
|             ] | ||||
|  | ||||
|         if response.cookies: | ||||
|             cookies = SimpleCookie() | ||||
|             cookies.load(response.cookies) | ||||
|             headers += [ | ||||
|                 (b"set-cookie", cookie.encode("utf-8")) | ||||
|                 for name, cookie in response.cookies.items() | ||||
|             ] | ||||
|             cookies.update( | ||||
|                 { | ||||
|                     v.key: v | ||||
|                     for _, v in response.cookies.items() | ||||
|                     if v.key not in cookies.keys() | ||||
|                 } | ||||
|             ) | ||||
|  | ||||
|         headers += [ | ||||
|             (b"set-cookie", cookie.encode("utf-8")) | ||||
|             for k, cookie in cookies.items() | ||||
|         ] | ||||
|  | ||||
|         await self.transport.send( | ||||
|             { | ||||
|   | ||||
| @@ -229,3 +229,30 @@ async def test_request_class_custom(): | ||||
|  | ||||
|     _, response = await app.asgi_client.get("/custom") | ||||
|     assert response.body == b"MyCustomRequest" | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_cookie_customization(app): | ||||
|     @app.get("/cookie") | ||||
|     def get_cookie(request): | ||||
|         response = text("There's a cookie up in this response") | ||||
|         response.cookies["test"] = "Cookie1" | ||||
|         response.cookies["test"]["httponly"] = True | ||||
|  | ||||
|         response.cookies["c2"] = "Cookie2" | ||||
|         response.cookies["c2"]["httponly"] = False | ||||
|  | ||||
|         return response | ||||
|  | ||||
|     _, response = await app.asgi_client.get("/cookie") | ||||
|     cookie_map = { | ||||
|         "test": {"value": "Cookie1", "HttpOnly": True}, | ||||
|         "c2": {"value": "Cookie2", "HttpOnly": False}, | ||||
|     } | ||||
|  | ||||
|     for k, v in ( | ||||
|         response.cookies._cookies.get("mockserver.local").get("/").items() | ||||
|     ): | ||||
|         assert cookie_map.get(k).get("value") == v.value | ||||
|         if cookie_map.get(k).get("HttpOnly"): | ||||
|             assert "HttpOnly" in v._rest.keys() | ||||
|   | ||||
| @@ -635,13 +635,15 @@ def test_forwarded_scheme(app): | ||||
|         return text(request.remote_addr) | ||||
|  | ||||
|     request, response = app.test_client.get("/") | ||||
|     assert request.scheme == 'http' | ||||
|     assert request.scheme == "http" | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={'X-Forwarded-Proto': 'https'}) | ||||
|     assert request.scheme == 'https' | ||||
|     request, response = app.test_client.get( | ||||
|         "/", headers={"X-Forwarded-Proto": "https"} | ||||
|     ) | ||||
|     assert request.scheme == "https" | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={'X-Scheme': 'https'}) | ||||
|     assert request.scheme == 'https' | ||||
|     request, response = app.test_client.get("/", headers={"X-Scheme": "https"}) | ||||
|     assert request.scheme == "https" | ||||
|  | ||||
|  | ||||
| def test_match_info(app): | ||||
| @@ -1677,7 +1679,7 @@ def test_request_server_name(app): | ||||
|         return text("OK") | ||||
|  | ||||
|     request, response = app.test_client.get("/") | ||||
|     assert request.server_name == '127.0.0.1' | ||||
|     assert request.server_name == "127.0.0.1" | ||||
|  | ||||
|  | ||||
| def test_request_server_name_in_host_header(app): | ||||
| @@ -1685,8 +1687,10 @@ def test_request_server_name_in_host_header(app): | ||||
|     def handler(request): | ||||
|         return text("OK") | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={'Host': 'my_server:5555'}) | ||||
|     assert request.server_name == 'my_server' | ||||
|     request, response = app.test_client.get( | ||||
|         "/", headers={"Host": "my_server:5555"} | ||||
|     ) | ||||
|     assert request.server_name == "my_server" | ||||
|  | ||||
|  | ||||
| def test_request_server_name_forwarded(app): | ||||
| @@ -1694,11 +1698,11 @@ def test_request_server_name_forwarded(app): | ||||
|     def handler(request): | ||||
|         return text("OK") | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={ | ||||
|         'Host': 'my_server:5555', | ||||
|         'X-Forwarded-Host': 'your_server' | ||||
|     }) | ||||
|     assert request.server_name == 'your_server' | ||||
|     request, response = app.test_client.get( | ||||
|         "/", | ||||
|         headers={"Host": "my_server:5555", "X-Forwarded-Host": "your_server"}, | ||||
|     ) | ||||
|     assert request.server_name == "your_server" | ||||
|  | ||||
|  | ||||
| def test_request_server_port(app): | ||||
| @@ -1706,9 +1710,7 @@ def test_request_server_port(app): | ||||
|     def handler(request): | ||||
|         return text("OK") | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={ | ||||
|         'Host': 'my_server' | ||||
|     }) | ||||
|     request, response = app.test_client.get("/", headers={"Host": "my_server"}) | ||||
|     assert request.server_port == app.test_client.port | ||||
|  | ||||
|  | ||||
| @@ -1717,9 +1719,9 @@ def test_request_server_port_in_host_header(app): | ||||
|     def handler(request): | ||||
|         return text("OK") | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={ | ||||
|         'Host': 'my_server:5555' | ||||
|     }) | ||||
|     request, response = app.test_client.get( | ||||
|         "/", headers={"Host": "my_server:5555"} | ||||
|     ) | ||||
|     assert request.server_port == 5555 | ||||
|  | ||||
|  | ||||
| @@ -1728,10 +1730,9 @@ def test_request_server_port_forwarded(app): | ||||
|     def handler(request): | ||||
|         return text("OK") | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={ | ||||
|         'Host': 'my_server:5555', | ||||
|         'X-Forwarded-Port': '4444' | ||||
|     }) | ||||
|     request, response = app.test_client.get( | ||||
|         "/", headers={"Host": "my_server:5555", "X-Forwarded-Port": "4444"} | ||||
|     ) | ||||
|     assert request.server_port == 4444 | ||||
|  | ||||
|  | ||||
| @@ -1754,29 +1755,34 @@ def test_url_for_with_forwarded_request(app): | ||||
|     def view_name(request): | ||||
|         return text("OK") | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={ | ||||
|         'X-Forwarded-Proto': 'https', | ||||
|     }) | ||||
|     assert app.url_for('view_name') == '/another_view' | ||||
|     assert app.url_for('view_name', _external=True) == 'http:///another_view' | ||||
|     assert request.url_for('view_name') == 'https://127.0.0.1:{}/another_view'.format(app.test_client.port) | ||||
|     request, response = app.test_client.get( | ||||
|         "/", headers={"X-Forwarded-Proto": "https"} | ||||
|     ) | ||||
|     assert app.url_for("view_name") == "/another_view" | ||||
|     assert app.url_for("view_name", _external=True) == "http:///another_view" | ||||
|     assert request.url_for( | ||||
|         "view_name" | ||||
|     ) == "https://127.0.0.1:{}/another_view".format(app.test_client.port) | ||||
|  | ||||
|     app.config.SERVER_NAME = "my_server" | ||||
|     request, response = app.test_client.get("/", headers={ | ||||
|         'X-Forwarded-Proto': 'https', | ||||
|         'X-Forwarded-Port': '6789', | ||||
|     }) | ||||
|     assert app.url_for('view_name') == '/another_view' | ||||
|     assert app.url_for('view_name', _external=True) == 'http://my_server/another_view' | ||||
|     assert request.url_for('view_name') == 'https://my_server:6789/another_view' | ||||
|     request, response = app.test_client.get( | ||||
|         "/", headers={"X-Forwarded-Proto": "https", "X-Forwarded-Port": "6789"} | ||||
|     ) | ||||
|     assert app.url_for("view_name") == "/another_view" | ||||
|     assert ( | ||||
|         app.url_for("view_name", _external=True) | ||||
|         == "http://my_server/another_view" | ||||
|     ) | ||||
|     assert ( | ||||
|         request.url_for("view_name") == "https://my_server:6789/another_view" | ||||
|     ) | ||||
|  | ||||
|     request, response = app.test_client.get( | ||||
|         "/", headers={"X-Forwarded-Proto": "https", "X-Forwarded-Port": "443"} | ||||
|     ) | ||||
|     assert request.url_for("view_name") == "https://my_server/another_view" | ||||
|  | ||||
|     request, response = app.test_client.get("/", headers={ | ||||
|         'X-Forwarded-Proto': 'https', | ||||
|         'X-Forwarded-Port': '443', | ||||
|     }) | ||||
|     assert request.url_for('view_name') == 'https://my_server/another_view' | ||||
|  | ||||
|      | ||||
| @pytest.mark.asyncio | ||||
| async def test_request_form_invalid_content_type_asgi(app): | ||||
|     @app.route("/", methods=["POST"]) | ||||
| @@ -1787,7 +1793,7 @@ async def test_request_form_invalid_content_type_asgi(app): | ||||
|  | ||||
|     assert request.form == {} | ||||
|  | ||||
|      | ||||
|  | ||||
| def test_endpoint_basic(): | ||||
|     app = Sanic() | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Harsha Narayana
					Harsha Narayana