Merge pull request #545 from messense/feature/gunicorn-worker
Addition of a gunicorn worker
This commit is contained in:
commit
166f77cb86
|
@ -9,4 +9,5 @@ async def test(request):
|
||||||
return json({"test": True})
|
return json({"test": True})
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
app.run(host="0.0.0.0", port=8000)
|
app.run(host="0.0.0.0", port=8000)
|
||||||
|
|
|
@ -70,6 +70,11 @@ def query_string(request):
|
||||||
# Run Server
|
# Run Server
|
||||||
# ----------------------------------------------- #
|
# ----------------------------------------------- #
|
||||||
|
|
||||||
|
@app.listener('before_server_start')
|
||||||
|
def before_start(app, loop):
|
||||||
|
log.info("SERVER STARTING")
|
||||||
|
|
||||||
|
|
||||||
@app.listener('after_server_start')
|
@app.listener('after_server_start')
|
||||||
def after_start(app, loop):
|
def after_start(app, loop):
|
||||||
log.info("OH OH OH OH OHHHHHHHH")
|
log.info("OH OH OH OH OHHHHHHHH")
|
||||||
|
@ -77,7 +82,13 @@ def after_start(app, loop):
|
||||||
|
|
||||||
@app.listener('before_server_stop')
|
@app.listener('before_server_stop')
|
||||||
def before_stop(app, loop):
|
def before_stop(app, loop):
|
||||||
|
log.info("SERVER STOPPING")
|
||||||
|
|
||||||
|
|
||||||
|
@app.listener('after_server_stop')
|
||||||
|
def after_stop(app, loop):
|
||||||
log.info("TRIED EVERYTHING")
|
log.info("TRIED EVERYTHING")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
app.run(host="0.0.0.0", port=8000, debug=True)
|
app.run(host="0.0.0.0", port=8000, debug=True)
|
||||||
|
|
|
@ -578,6 +578,10 @@ class Sanic:
|
||||||
"""This kills the Sanic"""
|
"""This kills the Sanic"""
|
||||||
get_event_loop().stop()
|
get_event_loop().stop()
|
||||||
|
|
||||||
|
def __call__(self):
|
||||||
|
"""gunicorn compatibility"""
|
||||||
|
return self
|
||||||
|
|
||||||
async def create_server(self, host="127.0.0.1", port=8000, debug=False,
|
async def create_server(self, host="127.0.0.1", port=8000, debug=False,
|
||||||
before_start=None, after_start=None,
|
before_start=None, after_start=None,
|
||||||
before_stop=None, after_stop=None, ssl=None,
|
before_stop=None, after_stop=None, ssl=None,
|
||||||
|
@ -686,6 +690,7 @@ class Sanic:
|
||||||
server_settings['run_async'] = True
|
server_settings['run_async'] = True
|
||||||
|
|
||||||
# Serve
|
# Serve
|
||||||
|
if host and port:
|
||||||
proto = "http"
|
proto = "http"
|
||||||
if ssl is not None:
|
if ssl is not None:
|
||||||
proto = "https"
|
proto = "https"
|
||||||
|
|
|
@ -313,7 +313,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
|
||||||
after_start=None, before_stop=None, after_stop=None, debug=False,
|
after_start=None, before_stop=None, after_stop=None, debug=False,
|
||||||
request_timeout=60, ssl=None, sock=None, request_max_size=None,
|
request_timeout=60, ssl=None, sock=None, request_max_size=None,
|
||||||
reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100,
|
reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100,
|
||||||
register_sys_signals=True, run_async=False):
|
register_sys_signals=True, run_async=False, connections=None,
|
||||||
|
signal=Signal()):
|
||||||
"""Start asynchronous HTTP Server on an individual process.
|
"""Start asynchronous HTTP Server on an individual process.
|
||||||
|
|
||||||
:param host: Address to host on
|
:param host: Address to host on
|
||||||
|
@ -349,8 +350,7 @@ def serve(host, port, request_handler, error_handler, before_start=None,
|
||||||
|
|
||||||
trigger_events(before_start, loop)
|
trigger_events(before_start, loop)
|
||||||
|
|
||||||
connections = set()
|
connections = connections if connections is not None else set()
|
||||||
signal = Signal()
|
|
||||||
server = partial(
|
server = partial(
|
||||||
protocol,
|
protocol,
|
||||||
loop=loop,
|
loop=loop,
|
||||||
|
|
166
sanic/worker.py
Normal file
166
sanic/worker.py
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
try:
|
||||||
|
import ssl
|
||||||
|
except ImportError:
|
||||||
|
ssl = None
|
||||||
|
|
||||||
|
import uvloop
|
||||||
|
import gunicorn.workers.base as base
|
||||||
|
|
||||||
|
from sanic.server import trigger_events, serve, HttpProtocol, Signal
|
||||||
|
from sanic.websocket import WebSocketProtocol
|
||||||
|
|
||||||
|
|
||||||
|
class GunicornWorker(base.Worker):
|
||||||
|
|
||||||
|
def __init__(self, *args, **kw): # pragma: no cover
|
||||||
|
super().__init__(*args, **kw)
|
||||||
|
cfg = self.cfg
|
||||||
|
if cfg.is_ssl:
|
||||||
|
self.ssl_context = self._create_ssl_context(cfg)
|
||||||
|
else:
|
||||||
|
self.ssl_context = None
|
||||||
|
self.servers = []
|
||||||
|
self.connections = set()
|
||||||
|
self.exit_code = 0
|
||||||
|
self.signal = Signal()
|
||||||
|
|
||||||
|
def init_process(self):
|
||||||
|
# create new event_loop after fork
|
||||||
|
asyncio.get_event_loop().close()
|
||||||
|
|
||||||
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||||
|
self.loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(self.loop)
|
||||||
|
|
||||||
|
super().init_process()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
is_debug = self.log.loglevel == logging.DEBUG
|
||||||
|
protocol = (WebSocketProtocol if self.app.callable.websocket_enabled
|
||||||
|
else HttpProtocol)
|
||||||
|
self._server_settings = self.app.callable._helper(
|
||||||
|
host=None,
|
||||||
|
port=None,
|
||||||
|
loop=self.loop,
|
||||||
|
debug=is_debug,
|
||||||
|
protocol=protocol,
|
||||||
|
ssl=self.ssl_context,
|
||||||
|
run_async=True
|
||||||
|
)
|
||||||
|
self._server_settings.pop('sock')
|
||||||
|
trigger_events(self._server_settings.get('before_start', []),
|
||||||
|
self.loop)
|
||||||
|
self._server_settings['before_start'] = ()
|
||||||
|
|
||||||
|
self._runner = asyncio.ensure_future(self._run(), loop=self.loop)
|
||||||
|
try:
|
||||||
|
self.loop.run_until_complete(self._runner)
|
||||||
|
self.app.callable.is_running = True
|
||||||
|
trigger_events(self._server_settings.get('after_start', []),
|
||||||
|
self.loop)
|
||||||
|
self.loop.run_until_complete(self._check_alive())
|
||||||
|
trigger_events(self._server_settings.get('before_stop', []),
|
||||||
|
self.loop)
|
||||||
|
self.loop.run_until_complete(self.close())
|
||||||
|
finally:
|
||||||
|
trigger_events(self._server_settings.get('after_stop', []),
|
||||||
|
self.loop)
|
||||||
|
self.loop.close()
|
||||||
|
|
||||||
|
sys.exit(self.exit_code)
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
if self.servers:
|
||||||
|
# stop accepting connections
|
||||||
|
self.log.info("Stopping server: %s, connections: %s",
|
||||||
|
self.pid, len(self.connections))
|
||||||
|
for server in self.servers:
|
||||||
|
server.close()
|
||||||
|
await server.wait_closed()
|
||||||
|
self.servers.clear()
|
||||||
|
|
||||||
|
# prepare connections for closing
|
||||||
|
self.signal.stopped = True
|
||||||
|
for conn in self.connections:
|
||||||
|
conn.close_if_idle()
|
||||||
|
|
||||||
|
while self.connections:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
async def _run(self):
|
||||||
|
for sock in self.sockets:
|
||||||
|
self.servers.append(await serve(
|
||||||
|
sock=sock,
|
||||||
|
connections=self.connections,
|
||||||
|
signal=self.signal,
|
||||||
|
**self._server_settings
|
||||||
|
))
|
||||||
|
|
||||||
|
async def _check_alive(self):
|
||||||
|
# If our parent changed then we shut down.
|
||||||
|
pid = os.getpid()
|
||||||
|
try:
|
||||||
|
while self.alive:
|
||||||
|
self.notify()
|
||||||
|
|
||||||
|
if pid == os.getpid() and self.ppid != os.getppid():
|
||||||
|
self.alive = False
|
||||||
|
self.log.info("Parent changed, shutting down: %s", self)
|
||||||
|
else:
|
||||||
|
await asyncio.sleep(1.0, loop=self.loop)
|
||||||
|
except (Exception, BaseException, GeneratorExit, KeyboardInterrupt):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _create_ssl_context(cfg):
|
||||||
|
""" Creates SSLContext instance for usage in asyncio.create_server.
|
||||||
|
See ssl.SSLSocket.__init__ for more details.
|
||||||
|
"""
|
||||||
|
ctx = ssl.SSLContext(cfg.ssl_version)
|
||||||
|
ctx.load_cert_chain(cfg.certfile, cfg.keyfile)
|
||||||
|
ctx.verify_mode = cfg.cert_reqs
|
||||||
|
if cfg.ca_certs:
|
||||||
|
ctx.load_verify_locations(cfg.ca_certs)
|
||||||
|
if cfg.ciphers:
|
||||||
|
ctx.set_ciphers(cfg.ciphers)
|
||||||
|
return ctx
|
||||||
|
|
||||||
|
def init_signals(self):
|
||||||
|
# Set up signals through the event loop API.
|
||||||
|
|
||||||
|
self.loop.add_signal_handler(signal.SIGQUIT, self.handle_quit,
|
||||||
|
signal.SIGQUIT, None)
|
||||||
|
|
||||||
|
self.loop.add_signal_handler(signal.SIGTERM, self.handle_exit,
|
||||||
|
signal.SIGTERM, None)
|
||||||
|
|
||||||
|
self.loop.add_signal_handler(signal.SIGINT, self.handle_quit,
|
||||||
|
signal.SIGINT, None)
|
||||||
|
|
||||||
|
self.loop.add_signal_handler(signal.SIGWINCH, self.handle_winch,
|
||||||
|
signal.SIGWINCH, None)
|
||||||
|
|
||||||
|
self.loop.add_signal_handler(signal.SIGUSR1, self.handle_usr1,
|
||||||
|
signal.SIGUSR1, None)
|
||||||
|
|
||||||
|
self.loop.add_signal_handler(signal.SIGABRT, self.handle_abort,
|
||||||
|
signal.SIGABRT, None)
|
||||||
|
|
||||||
|
# Don't let SIGTERM and SIGUSR1 disturb active requests
|
||||||
|
# by interrupting system calls
|
||||||
|
signal.siginterrupt(signal.SIGTERM, False)
|
||||||
|
signal.siginterrupt(signal.SIGUSR1, False)
|
||||||
|
|
||||||
|
def handle_quit(self, sig, frame):
|
||||||
|
self.alive = False
|
||||||
|
self.cfg.worker_int(self)
|
||||||
|
|
||||||
|
def handle_abort(self, sig, frame):
|
||||||
|
self.alive = False
|
||||||
|
self.exit_code = 1
|
||||||
|
self.cfg.worker_abort(self)
|
Loading…
Reference in New Issue
Block a user