fix stop event

This commit is contained in:
Raphael Deem 2017-01-07 15:46:43 -08:00
parent ed8e3f237c
commit dd28d70680
2 changed files with 17 additions and 49 deletions

View File

@ -3,7 +3,6 @@ from collections import deque
from functools import partial from functools import partial
from inspect import isawaitable, stack, getmodulename from inspect import isawaitable, stack, getmodulename
from multiprocessing import Process, Event from multiprocessing import Process, Event
from select import select
from signal import signal, SIGTERM, SIGINT from signal import signal, SIGTERM, SIGINT
from traceback import format_exc from traceback import format_exc
import logging import logging
@ -41,6 +40,8 @@ class Sanic:
self._blueprint_order = [] self._blueprint_order = []
self.loop = None self.loop = None
self.debug = None self.debug = None
self.sock = None
self.processes = None
# Register alternative method names # Register alternative method names
self.go_fast = self.run self.go_fast = self.run
@ -333,9 +334,12 @@ class Sanic:
""" """
This kills the Sanic This kills the Sanic
""" """
if self.processes is not None:
for process in self.processes:
process.terminate()
self.sock.close()
get_event_loop().stop() get_event_loop().stop()
@staticmethod
def serve_multiple(self, server_settings, workers, stop_event=None): def serve_multiple(self, server_settings, workers, stop_event=None):
""" """
Starts multiple server processes simultaneously. Stops on interrupt Starts multiple server processes simultaneously. Stops on interrupt
@ -348,28 +352,25 @@ class Sanic:
server_settings['reuse_port'] = True server_settings['reuse_port'] = True
# Create a stop event to be triggered by a signal # Create a stop event to be triggered by a signal
if not stop_event: if stop_event is None:
stop_event = Event() stop_event = Event()
signal(SIGINT, lambda s, f: stop_event.set()) signal(SIGINT, lambda s, f: stop_event.set())
signal(SIGTERM, lambda s, f: stop_event.set()) signal(SIGTERM, lambda s, f: stop_event.set())
sock = socket() self.sock = socket()
#sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind((server_settings['host'], server_settings['port'])) self.sock.bind((server_settings['host'], server_settings['port']))
set_inheritable(sock.fileno(), True) set_inheritable(self.sock.fileno(), True)
server_settings['sock'] = sock server_settings['sock'] = self.sock
server_settings['host'] = None server_settings['host'] = None
server_settings['port'] = None server_settings['port'] = None
processes = [] self.processes = []
for _ in range(workers): for _ in range(workers):
process = Process(target=serve, kwargs=server_settings) process = Process(target=serve, kwargs=server_settings)
process.daemon = True process.daemon = True
process.start() process.start()
processes.append(process) self.processes.append(process)
for process in processes: for process in self.processes:
process.terminate()
for process in processes:
process.join() process.join()

View File

@ -1,13 +1,9 @@
from multiprocessing import Array, Event, Process from multiprocessing import Array, Event, Process
from time import sleep, time from time import sleep, time
from ujson import loads as json_loads from ujson import loads as json_loads
from asyncio import get_event_loop
from os import killpg, kill
from signal import SIGUSR1, signal, SIGINT, SIGTERM, SIGKILL
from sanic import Sanic from sanic import Sanic
from sanic.response import json, text from sanic.response import json
from sanic.exceptions import Handler
from sanic.utils import local_request, HOST, PORT from sanic.utils import local_request, HOST, PORT
@ -54,12 +50,11 @@ def skip_test_multiprocessing():
except: except:
raise ValueError("Expected JSON response but got '{}'".format(response)) raise ValueError("Expected JSON response but got '{}'".format(response))
stop_event.set()
assert results.get('test') == True assert results.get('test') == True
def test_drain_connections(): def test_drain_connections():
app = Sanic('test_stop') app = Sanic('test_json')
@app.route('/') @app.route('/')
async def handler(request): async def handler(request):
@ -80,31 +75,3 @@ def test_drain_connections():
end = time() end = time()
assert end - start < 0.05 assert end - start < 0.05
def skip_test_workers():
app = Sanic('test_workers')
@app.route('/')
async def handler(request):
return text('ok')
stop_event = Event()
d = []
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
d.append(http_response.text)
stop_event.set()
p = Process(target=app.run, kwargs={'host':HOST,
'port':PORT,
'after_start': after_start,
'workers':2,
'stop_event':stop_event})
p.start()
loop = get_event_loop()
loop.run_until_complete(after_start())
#killpg(p.pid, SIGUSR1)
kill(p.pid, SIGUSR1)
assert d[0] == 1