New streaming test and minor fixes.

This commit is contained in:
L. Kärkkäinen 2020-03-26 16:23:40 +02:00
parent 1aac4f546b
commit 01480c437b
3 changed files with 91 additions and 2 deletions

View File

@ -226,7 +226,9 @@ class Http:
data = b"" data = b""
self.response_func = self.head_response_ignored self.response_func = self.head_response_ignored
headers["connection"] = "keep-alive" if self.keep_alive else "close" headers["connection"] = "keep-alive" if self.keep_alive else "close"
ret = format_http1_response(status, res.processed_headers) + data ret = format_http1_response(status, res.processed_headers)
if data:
ret += data
# Send a 100-continue if expected and not Expectation Failed # Send a 100-continue if expected and not Expectation Failed
if self.expecting_continue: if self.expecting_continue:
self.expecting_continue = False self.expecting_continue = False

View File

@ -54,6 +54,8 @@ class BaseHTTPResponse:
""" """
if data is None and end_stream is None: if data is None and end_stream is None:
end_stream = True end_stream = True
if end_stream and not data and self.stream.send is None:
return
data = data.encode() if hasattr(data, "encode") else data or b"" data = data.encode() if hasattr(data, "encode") else data or b""
await self.stream.send(data, end_stream=end_stream) await self.stream.send(data, end_stream=end_stream)

View File

@ -1,5 +1,11 @@
import asyncio
from contextlib import closing
from socket import socket
import pytest import pytest
from sanic import Sanic
from sanic.blueprints import Blueprint from sanic.blueprints import Blueprint
from sanic.exceptions import HeaderExpectationFailed from sanic.exceptions import HeaderExpectationFailed
from sanic.response import json, stream, text from sanic.response import json, stream, text
@ -582,5 +588,84 @@ def test_streaming_new_api(app):
assert response.status == 200 assert response.status == 200
res = response.json res = response.json
assert isinstance(res, list) assert isinstance(res, list)
assert len(res) > 1
assert "".join(res) == data assert "".join(res) == data
def test_streaming_echo():
"""2-way streaming chat between server and client."""
app = Sanic(name=__name__)
@app.post("/echo", stream=True)
async def handler(request):
res = await request.respond(content_type="text/plain; charset=utf-8")
# Send headers
await res.send(end_stream=False)
# Echo back data (case swapped)
async for data in request.stream:
await res.send(data.swapcase())
# Add EOF marker after successful operation
await res.send(b"-", end_stream=True)
@app.listener("after_server_start")
async def client_task(app, loop):
try:
reader, writer = await asyncio.open_connection(*addr)
await client(app, reader, writer)
finally:
writer.close()
app.stop()
async def client(app, reader, writer):
# Unfortunately httpx does not support 2-way streaming, so do it by hand.
host = f"host: {addr[0]}:{addr[1]}\r\n".encode()
writer.write(
b"POST /echo HTTP/1.1\r\n" + host + b"content-length: 2\r\n"
b"content-type: text/plain; charset=utf-8\r\n"
b"\r\n"
)
# Read response
res = b""
while not b"\r\n\r\n" in res:
res += await reader.read(4096)
assert res.startswith(b"HTTP/1.1 200 OK\r\n")
assert res.endswith(b"\r\n\r\n")
buffer = b""
async def read_chunk():
nonlocal buffer
while not b"\r\n" in buffer:
data = await reader.read(4096)
assert data
buffer += data
size, buffer = buffer.split(b"\r\n", 1)
size = int(size, 16)
if size == 0:
return None
while len(buffer) < size + 2:
data = await reader.read(4096)
assert data
buffer += data
print(res)
assert buffer[size : size + 2] == b"\r\n"
ret, buffer = buffer[:size], buffer[size + 2 :]
return ret
# Chat with server
writer.write(b"a")
res = await read_chunk()
assert res == b"A"
writer.write(b"b")
res = await read_chunk()
assert res == b"B"
res = await read_chunk()
assert res == b"-"
res = await read_chunk()
assert res == None
# Use random port for tests
with closing(socket()) as sock:
sock.bind(("127.0.0.1", 0))
addr = sock.getsockname()
app.run(sock=sock, access_log=False)