Merge pull request #560 from miguelgrinberg/cancel-websocket-tasks

cancel websocket tasks if server is stopped
This commit is contained in:
Eli Uriegas 2017-03-16 09:02:30 -05:00 committed by GitHub
commit 879fab120f

View File

@ -1,7 +1,7 @@
import logging import logging
import re import re
import warnings import warnings
from asyncio import get_event_loop from asyncio import get_event_loop, ensure_future, CancelledError
from collections import deque, defaultdict from collections import deque, defaultdict
from functools import partial from functools import partial
from inspect import isawaitable, stack, getmodulename from inspect import isawaitable, stack, getmodulename
@ -54,6 +54,7 @@ class Sanic:
self.listeners = defaultdict(list) self.listeners = defaultdict(list)
self.is_running = False self.is_running = False
self.websocket_enabled = False self.websocket_enabled = False
self.websocket_tasks = []
# Register alternative method names # Register alternative method names
self.go_fast = self.run self.go_fast = self.run
@ -178,7 +179,7 @@ class Sanic:
:param host: :param host:
:return: decorated function :return: decorated function
""" """
self.websocket_enabled = True self.enable_websocket()
# Fix case where the user did not prefix the URL with a / # Fix case where the user did not prefix the URL with a /
# and will probably get confused as to why it's not working # and will probably get confused as to why it's not working
@ -190,11 +191,17 @@ class Sanic:
request.app = self request.app = self
protocol = request.transport.get_protocol() protocol = request.transport.get_protocol()
ws = await protocol.websocket_handshake(request) ws = await protocol.websocket_handshake(request)
# schedule the application handler
# its future is kept in self.websocket_tasks in case it
# needs to be cancelled due to the server being stopped
fut = ensure_future(handler(request, ws, *args, **kwargs))
self.websocket_tasks.append(fut)
try: try:
# invoke the application handler await fut
await handler(request, ws, *args, **kwargs) except (CancelledError, ConnectionClosed):
except ConnectionClosed:
pass pass
self.websocket_tasks.remove(fut)
await ws.close() await ws.close()
self.router.add(uri=uri, handler=websocket_handler, self.router.add(uri=uri, handler=websocket_handler,
@ -213,6 +220,14 @@ class Sanic:
Websocket is enabled automatically if websocket routes are Websocket is enabled automatically if websocket routes are
added to the application. added to the application.
""" """
if not self.websocket_enabled:
# if the server is stopped, we want to cancel any ongoing
# websocket tasks, to allow the server to exit promptly
@self.listener('before_server_stop')
def cancel_websocket_tasks(app, loop):
for task in self.websocket_tasks:
task.cancel()
self.websocket_enabled = enable self.websocket_enabled = enable
def remove_route(self, uri, clean_cache=True, host=None): def remove_route(self, uri, clean_cache=True, host=None):