Add Request.stream

This commit is contained in:
38elements 2017-05-05 20:09:32 +09:00
parent 7cf3d49f00
commit ef2cc7ebf5
9 changed files with 220 additions and 12 deletions

View File

@ -30,3 +30,50 @@ async def index(request):
return stream(stream_from_db) return stream(stream_from_db)
``` ```
## Request Streaming
Sanic allows you to get request data by stream, as below. When the request ends, `request.stream.get()` returns `None`.
```
from sanic import Sanic
from sanic.blueprints import Blueprint
from sanic.response import stream, text
bp = Blueprint('blueprint_request_stream')
app = Sanic('request_stream', is_request_stream=True)
@app.stream('/stream')
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.get()
if body is None:
break
body = body.decode('utf-8').replace('1', 'A')
response.write(body)
return stream(streaming)
@app.get('/get')
async def get(request):
return text('OK')
@bp.stream('/bp_stream')
async def bp_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)
app.blueprint(bp)
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000)
```

View File

@ -0,0 +1,10 @@
import requests
# Warning: This is a heavy process.
data = ""
for i in range(1, 250000):
data += str(i)
r = requests.post('http://127.0.0.1:8000/bp_stream', data=data)
print(r.text)

View File

@ -0,0 +1,40 @@
from sanic import Sanic
from sanic.blueprints import Blueprint
from sanic.response import stream, text
bp = Blueprint('blueprint_request_stream')
app = Sanic('request_stream')
@app.stream('/stream')
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.get()
if body is None:
break
body = body.decode('utf-8').replace('1', 'A')
response.write(body)
return stream(streaming)
@app.get('/get')
async def get(request):
return text('OK')
@bp.stream('/bp_stream')
async def bp_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)
app.blueprint(bp)
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000)

View File

@ -60,6 +60,7 @@ class Sanic:
self.sock = None self.sock = None
self.listeners = defaultdict(list) self.listeners = defaultdict(list)
self.is_running = False self.is_running = False
self.is_request_stream = False
self.websocket_enabled = False self.websocket_enabled = False
self.websocket_tasks = [] self.websocket_tasks = []
@ -110,12 +111,14 @@ class Sanic:
# Decorator # Decorator
def route(self, uri, methods=frozenset({'GET'}), host=None, def route(self, uri, methods=frozenset({'GET'}), host=None,
strict_slashes=False): strict_slashes=False, stream=False):
"""Decorate a function to be registered as a route """Decorate a function to be registered as a route
:param uri: path of the URL :param uri: path of the URL
:param methods: list or tuple of methods allowed :param methods: list or tuple of methods allowed
:param host: :param host:
:param strict_slashes:
:param stream:
:return: decorated function :return: decorated function
""" """
@ -124,7 +127,11 @@ class Sanic:
if not uri.startswith('/'): if not uri.startswith('/'):
uri = '/' + uri uri = '/' + uri
if stream:
self.is_request_stream = True
def response(handler): def response(handler):
handler.is_stream = stream
self.router.add(uri=uri, methods=methods, handler=handler, self.router.add(uri=uri, methods=methods, handler=handler,
host=host, strict_slashes=strict_slashes) host=host, strict_slashes=strict_slashes)
return handler return handler
@ -160,6 +167,13 @@ class Sanic:
return self.route(uri, methods=frozenset({"DELETE"}), host=host, return self.route(uri, methods=frozenset({"DELETE"}), host=host,
strict_slashes=strict_slashes) strict_slashes=strict_slashes)
def stream(
self, uri, methods=frozenset({"POST"}), host=None,
strict_slashes=False):
return self.route(uri, methods=methods, host=host,
strict_slashes=strict_slashes,
stream=True)
def add_route(self, handler, uri, methods=frozenset({'GET'}), host=None, def add_route(self, handler, uri, methods=frozenset({'GET'}), host=None,
strict_slashes=False): strict_slashes=False):
"""A helper method to register class instance or """A helper method to register class instance or
@ -651,6 +665,8 @@ class Sanic:
server_settings = { server_settings = {
'protocol': protocol, 'protocol': protocol,
'request_class': self.request_class, 'request_class': self.request_class,
'is_request_stream': self.is_request_stream,
'router': self.router,
'host': host, 'host': host,
'port': port, 'port': port,
'sock': sock, 'sock': sock,

View File

@ -5,7 +5,7 @@ from sanic.views import CompositionView
FutureRoute = namedtuple('Route', FutureRoute = namedtuple('Route',
['handler', 'uri', 'methods', ['handler', 'uri', 'methods',
'host', 'strict_slashes']) 'host', 'strict_slashes', 'stream'])
FutureListener = namedtuple('Listener', ['handler', 'uri', 'methods', 'host']) FutureListener = namedtuple('Listener', ['handler', 'uri', 'methods', 'host'])
FutureMiddleware = namedtuple('Route', ['middleware', 'args', 'kwargs']) FutureMiddleware = namedtuple('Route', ['middleware', 'args', 'kwargs'])
FutureException = namedtuple('Route', ['handler', 'args', 'kwargs']) FutureException = namedtuple('Route', ['handler', 'args', 'kwargs'])
@ -47,7 +47,8 @@ class Blueprint:
uri=uri[1:] if uri.startswith('//') else uri, uri=uri[1:] if uri.startswith('//') else uri,
methods=future.methods, methods=future.methods,
host=future.host or self.host, host=future.host or self.host,
strict_slashes=future.strict_slashes strict_slashes=future.strict_slashes,
stream=future.stream
)(future.handler) )(future.handler)
for future in self.websocket_routes: for future in self.websocket_routes:
@ -87,14 +88,15 @@ class Blueprint:
app.listener(event)(listener) app.listener(event)(listener)
def route(self, uri, methods=frozenset({'GET'}), host=None, def route(self, uri, methods=frozenset({'GET'}), host=None,
strict_slashes=False): strict_slashes=False, stream=False):
"""Create a blueprint route from a decorated function. """Create a blueprint route from a decorated function.
:param uri: endpoint at which the route will be accessible. :param uri: endpoint at which the route will be accessible.
:param methods: list of acceptable HTTP methods. :param methods: list of acceptable HTTP methods.
""" """
def decorator(handler): def decorator(handler):
route = FutureRoute(handler, uri, methods, host, strict_slashes) route = FutureRoute(
handler, uri, methods, host, strict_slashes, stream)
self.routes.append(route) self.routes.append(route)
return handler return handler
return decorator return decorator
@ -131,7 +133,7 @@ class Blueprint:
:param uri: endpoint at which the route will be accessible. :param uri: endpoint at which the route will be accessible.
""" """
def decorator(handler): def decorator(handler):
route = FutureRoute(handler, uri, [], host, strict_slashes) route = FutureRoute(handler, uri, [], host, strict_slashes, False)
self.websocket_routes.append(route) self.websocket_routes.append(route)
return handler return handler
return decorator return decorator
@ -217,3 +219,7 @@ class Blueprint:
def delete(self, uri, host=None, strict_slashes=False): def delete(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=["DELETE"], host=host, return self.route(uri, methods=["DELETE"], host=host,
strict_slashes=strict_slashes) strict_slashes=strict_slashes)
def stream(self, uri, methods=["POST"], host=None, strict_slashes=False):
return self.route(uri, methods=methods, host=host,
strict_slashes=strict_slashes, stream=True)

View File

@ -38,7 +38,7 @@ class Request(dict):
__slots__ = ( __slots__ = (
'app', 'headers', 'version', 'method', '_cookies', 'transport', 'app', 'headers', 'version', 'method', '_cookies', 'transport',
'body', 'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files', 'body', 'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files',
'_ip', '_parsed_url', 'uri_template' '_ip', '_parsed_url', 'uri_template', 'stream'
) )
def __init__(self, url_bytes, headers, version, method, transport): def __init__(self, url_bytes, headers, version, method, transport):
@ -59,6 +59,7 @@ class Request(dict):
self.parsed_args = None self.parsed_args = None
self.uri_template = None self.uri_template = None
self._cookies = None self._cookies = None
self.stream = None
@property @property
def json(self): def json(self):

View File

@ -345,3 +345,11 @@ class Router:
if hasattr(route_handler, 'handlers'): if hasattr(route_handler, 'handlers'):
route_handler = route_handler.handlers[method] route_handler = route_handler.handlers[method]
return route_handler, [], kwargs, route.uri return route_handler, [], kwargs, route.uri
def is_stream_handler(self, request):
""" Handler for request is stream or not.
:param request: Request object
:return: bool
"""
handler = self.get(request)[0]
return handler.is_stream

View File

@ -64,22 +64,24 @@ class HttpProtocol(asyncio.Protocol):
'parser', 'request', 'url', 'headers', 'parser', 'request', 'url', 'headers',
# request config # request config
'request_handler', 'request_timeout', 'request_max_size', 'request_handler', 'request_timeout', 'request_max_size',
'request_class', 'request_class', 'is_request_stream', 'router',
# enable or disable access log / error log purpose # enable or disable access log / error log purpose
'has_log', 'has_log',
# connection management # connection management
'_total_request_size', '_timeout_handler', '_last_communication_time') '_total_request_size', '_timeout_handler', '_last_communication_time',
'_is_stream_handler')
def __init__(self, *, loop, request_handler, error_handler, def __init__(self, *, loop, request_handler, error_handler,
signal=Signal(), connections=set(), request_timeout=60, signal=Signal(), connections=set(), request_timeout=60,
request_max_size=None, request_class=None, has_log=True, request_max_size=None, request_class=None, has_log=True,
keep_alive=True): keep_alive=True, is_request_stream=False, router=None):
self.loop = loop self.loop = loop
self.transport = None self.transport = None
self.request = None self.request = None
self.parser = None self.parser = None
self.url = None self.url = None
self.headers = None self.headers = None
self.router = router
self.signal = signal self.signal = signal
self.has_log = has_log self.has_log = has_log
self.connections = connections self.connections = connections
@ -88,10 +90,13 @@ class HttpProtocol(asyncio.Protocol):
self.request_timeout = request_timeout self.request_timeout = request_timeout
self.request_max_size = request_max_size self.request_max_size = request_max_size
self.request_class = request_class or Request self.request_class = request_class or Request
self.is_request_stream = is_request_stream
self._is_stream_handler = False
self._total_request_size = 0 self._total_request_size = 0
self._timeout_handler = None self._timeout_handler = None
self._last_request_time = None self._last_request_time = None
self._request_handler_task = None self._request_handler_task = None
self._request_stream_task = None
self._keep_alive = keep_alive self._keep_alive = keep_alive
@property @property
@ -123,6 +128,8 @@ class HttpProtocol(asyncio.Protocol):
self._timeout_handler = ( self._timeout_handler = (
self.loop.call_later(time_left, self.connection_timeout)) self.loop.call_later(time_left, self.connection_timeout))
else: else:
if self._request_stream_task:
self._request_stream_task.cancel()
if self._request_handler_task: if self._request_handler_task:
self._request_handler_task.cancel() self._request_handler_task.cancel()
exception = RequestTimeout('Request Timeout') exception = RequestTimeout('Request Timeout')
@ -171,13 +178,29 @@ class HttpProtocol(asyncio.Protocol):
method=self.parser.get_method().decode(), method=self.parser.get_method().decode(),
transport=self.transport transport=self.transport
) )
if self.is_request_stream:
self._is_stream_handler = self.router.is_stream_handler(
self.request)
if self._is_stream_handler:
self.request.stream = asyncio.Queue()
self.execute_request_handler()
def on_body(self, body): def on_body(self, body):
if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task(
self.request.stream.put(body))
return
self.request.body.append(body) self.request.body.append(body)
def on_message_complete(self): def on_message_complete(self):
if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task(
self.request.stream.put(None))
return
self.request.body = b''.join(self.request.body) self.request.body = b''.join(self.request.body)
self.execute_request_handler()
def execute_request_handler(self):
self._request_handler_task = self.loop.create_task( self._request_handler_task = self.loop.create_task(
self.request_handler( self.request_handler(
self.request, self.request,
@ -317,7 +340,9 @@ class HttpProtocol(asyncio.Protocol):
self.url = None self.url = None
self.headers = None self.headers = None
self._request_handler_task = None self._request_handler_task = None
self._request_stream_task = None
self._total_request_size = 0 self._total_request_size = 0
self._is_stream_handler = False
def close_if_idle(self): def close_if_idle(self):
"""Close the connection if a request is not being sent or received """Close the connection if a request is not being sent or received
@ -359,7 +384,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
request_timeout=60, ssl=None, sock=None, request_max_size=None, request_timeout=60, ssl=None, sock=None, request_max_size=None,
reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100, reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100,
register_sys_signals=True, run_async=False, connections=None, register_sys_signals=True, run_async=False, connections=None,
signal=Signal(), request_class=None, has_log=True, keep_alive=True): signal=Signal(), request_class=None, has_log=True, keep_alive=True,
is_request_stream=False, router=None):
"""Start asynchronous HTTP Server on an individual process. """Start asynchronous HTTP Server on an individual process.
:param host: Address to host on :param host: Address to host on
@ -386,6 +412,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
:param protocol: subclass of asyncio protocol class :param protocol: subclass of asyncio protocol class
:param request_class: Request class to use :param request_class: Request class to use
:param has_log: disable/enable access log and error log :param has_log: disable/enable access log and error log
:param is_request_stream: disable/enable Request.stream
:param router: Router object
:return: Nothing :return: Nothing
""" """
if not run_async: if not run_async:
@ -410,6 +438,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
request_class=request_class, request_class=request_class,
has_log=has_log, has_log=has_log,
keep_alive=keep_alive, keep_alive=keep_alive,
is_request_stream=is_request_stream,
router=router,
) )
server_coroutine = loop.create_server( server_coroutine = loop.create_server(

View File

@ -0,0 +1,50 @@
from sanic import Sanic
from sanic.blueprints import Blueprint
from sanic.response import stream, text
bp = Blueprint('test_blueprint_request_stream')
app = Sanic('test_request_stream')
@app.stream('/stream')
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.get()
if body is None:
break
response.write(body.decode('utf-8'))
return stream(streaming)
@app.get('/get')
async def get(request):
return text('OK')
@bp.stream('/bp_stream')
async def bp_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8')
return text(result)
app.blueprint(bp)
def test_request_stream():
data = "abc" * 100000
request, response = app.test_client.post('/stream', data=data)
assert response.status == 200
assert response.text == data
request, response = app.test_client.get('/get')
assert response.status == 200
assert response.text == 'OK'
request, response = app.test_client.post('/bp_stream', data=data)
assert response.status == 200
assert response.text == data