Remove tests using the deprecated get_headers function that can no longer be supported. Chunked mode is now autodetected, so do not put content-length header if chunked mode is preferred.
This commit is contained in:
		| @@ -198,7 +198,6 @@ def streaming_app(app): | ||||
|     async def test(request): | ||||
|         return stream( | ||||
|             sample_streaming_fn, | ||||
|             headers={"Content-Length": "7"}, | ||||
|             content_type="text/csv", | ||||
|         ) | ||||
|  | ||||
| @@ -247,114 +246,6 @@ def test_non_chunked_streaming_returns_correct_content( | ||||
|     assert response.text == "foo,bar" | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize("status", [200, 201, 400, 401]) | ||||
| def test_stream_response_status_returns_correct_headers(status): | ||||
|     response = StreamingHTTPResponse(sample_streaming_fn, status=status) | ||||
|     headers = response.get_headers() | ||||
|     assert b"HTTP/1.1 %s" % str(status).encode() in headers | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize("keep_alive_timeout", [10, 20, 30]) | ||||
| def test_stream_response_keep_alive_returns_correct_headers( | ||||
|     keep_alive_timeout | ||||
| ): | ||||
|     response = StreamingHTTPResponse(sample_streaming_fn) | ||||
|     headers = response.get_headers( | ||||
|         keep_alive=True, keep_alive_timeout=keep_alive_timeout | ||||
|     ) | ||||
|  | ||||
|     assert b"Keep-Alive: %s\r\n" % str(keep_alive_timeout).encode() in headers | ||||
|  | ||||
|  | ||||
| def test_stream_response_includes_chunked_header_http11(): | ||||
|     response = StreamingHTTPResponse(sample_streaming_fn) | ||||
|     headers = response.get_headers(version="1.1") | ||||
|     assert b"Transfer-Encoding: chunked\r\n" in headers | ||||
|  | ||||
|  | ||||
| def test_stream_response_does_not_include_chunked_header_http10(): | ||||
|     response = StreamingHTTPResponse(sample_streaming_fn) | ||||
|     headers = response.get_headers(version="1.0") | ||||
|     assert b"Transfer-Encoding: chunked\r\n" not in headers | ||||
|  | ||||
|  | ||||
| def test_stream_response_does_not_include_chunked_header_if_disabled(): | ||||
|     response = StreamingHTTPResponse(sample_streaming_fn, chunked=False) | ||||
|     headers = response.get_headers(version="1.1") | ||||
|     assert b"Transfer-Encoding: chunked\r\n" not in headers | ||||
|  | ||||
|  | ||||
| def test_stream_response_writes_correct_content_to_transport_when_chunked( | ||||
|     streaming_app | ||||
| ): | ||||
|     response = StreamingHTTPResponse(sample_streaming_fn) | ||||
|     response.protocol = MagicMock(HttpProtocol) | ||||
|     response.protocol.transport = MagicMock(asyncio.Transport) | ||||
|  | ||||
|     async def mock_drain(): | ||||
|         pass | ||||
|  | ||||
|     async def mock_push_data(data): | ||||
|         response.protocol.transport.write(data) | ||||
|  | ||||
|     response.protocol.push_data = mock_push_data | ||||
|     response.protocol.drain = mock_drain | ||||
|  | ||||
|     @streaming_app.listener("after_server_start") | ||||
|     async def run_stream(app, loop): | ||||
|         await response.stream() | ||||
|         assert response.protocol.transport.write.call_args_list[1][0][0] == ( | ||||
|             b"4\r\nfoo,\r\n" | ||||
|         ) | ||||
|  | ||||
|         assert response.protocol.transport.write.call_args_list[2][0][0] == ( | ||||
|             b"3\r\nbar\r\n" | ||||
|         ) | ||||
|  | ||||
|         assert response.protocol.transport.write.call_args_list[3][0][0] == ( | ||||
|             b"0\r\n\r\n" | ||||
|         ) | ||||
|  | ||||
|         assert len(response.protocol.transport.write.call_args_list) == 4 | ||||
|  | ||||
|         app.stop() | ||||
|  | ||||
|     streaming_app.run(host=HOST, port=PORT) | ||||
|  | ||||
|  | ||||
| def test_stream_response_writes_correct_content_to_transport_when_not_chunked( | ||||
|     streaming_app, | ||||
| ): | ||||
|     response = StreamingHTTPResponse(sample_streaming_fn) | ||||
|     response.protocol = MagicMock(HttpProtocol) | ||||
|     response.protocol.transport = MagicMock(asyncio.Transport) | ||||
|  | ||||
|     async def mock_drain(): | ||||
|         pass | ||||
|  | ||||
|     async def mock_push_data(data): | ||||
|         response.protocol.transport.write(data) | ||||
|  | ||||
|     response.protocol.push_data = mock_push_data | ||||
|     response.protocol.drain = mock_drain | ||||
|  | ||||
|     @streaming_app.listener("after_server_start") | ||||
|     async def run_stream(app, loop): | ||||
|         await response.stream(version="1.0") | ||||
|         assert response.protocol.transport.write.call_args_list[1][0][0] == ( | ||||
|             b"foo," | ||||
|         ) | ||||
|  | ||||
|         assert response.protocol.transport.write.call_args_list[2][0][0] == ( | ||||
|             b"bar" | ||||
|         ) | ||||
|  | ||||
|         assert len(response.protocol.transport.write.call_args_list) == 3 | ||||
|  | ||||
|         app.stop() | ||||
|  | ||||
|     streaming_app.run(host=HOST, port=PORT) | ||||
|  | ||||
|  | ||||
| def test_stream_response_with_cookies(app): | ||||
|     @app.route("/") | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 L. Kärkkäinen
					L. Kärkkäinen