cancel websocket tasks if server is stopped

This commit is contained in:
Miguel Grinberg 2017-03-15 22:27:13 -07:00
parent fa69892f70
commit fd823c63ab

View File

@ -1,7 +1,7 @@
import logging import logging
import re import re
import warnings import warnings
from asyncio import get_event_loop from asyncio import get_event_loop, ensure_future, CancelledError
from collections import deque, defaultdict from collections import deque, defaultdict
from functools import partial from functools import partial
from inspect import isawaitable, stack, getmodulename from inspect import isawaitable, stack, getmodulename
@ -54,6 +54,7 @@ class Sanic:
self.listeners = defaultdict(list) self.listeners = defaultdict(list)
self.is_running = False self.is_running = False
self.websocket_enabled = False self.websocket_enabled = False
self.websocket_tasks = []
# Register alternative method names # Register alternative method names
self.go_fast = self.run self.go_fast = self.run
@ -178,7 +179,7 @@ class Sanic:
:param host: :param host:
:return: decorated function :return: decorated function
""" """
self.websocket_enabled = True self.enable_websocket()
# Fix case where the user did not prefix the URL with a / # Fix case where the user did not prefix the URL with a /
# and will probably get confused as to why it's not working # and will probably get confused as to why it's not working
@ -190,11 +191,17 @@ class Sanic:
request.app = self request.app = self
protocol = request.transport.get_protocol() protocol = request.transport.get_protocol()
ws = await protocol.websocket_handshake(request) ws = await protocol.websocket_handshake(request)
# schedule the application handler
# its future is kept in self.websocket_tasks in case it
# needs to be cancelled due to the server being stopped
fut = ensure_future(handler(request, ws, *args, **kwargs))
self.websocket_tasks.append(fut)
try: try:
# invoke the application handler await fut
await handler(request, ws, *args, **kwargs) except (CancelledError, ConnectionClosed):
except ConnectionClosed:
pass pass
self.websocket_tasks.remove(fut)
await ws.close() await ws.close()
self.router.add(uri=uri, handler=websocket_handler, self.router.add(uri=uri, handler=websocket_handler,
@ -213,6 +220,14 @@ class Sanic:
Websocket is enabled automatically if websocket routes are Websocket is enabled automatically if websocket routes are
added to the application. added to the application.
""" """
if not self.websocket_enabled:
# if the server is stopped, we want to cancel any ongoing
# websocket tasks, to allow the server to exit promptly
@self.listener('before_server_stop')
def cancel_websocket_tasks(app, loop):
for task in self.websocket_tasks:
task.cancel()
self.websocket_enabled = enable self.websocket_enabled = enable
def remove_route(self, uri, clean_cache=True, host=None): def remove_route(self, uri, clean_cache=True, host=None):