Recycling gunicorn worker (#800)

* add recycling feature to gunicorn worker

* add unit tests

* add more unit tests, and remove redundant trigger_events call

* fixed up unit tests

* make flake8 happy

* address feedbacks

* make flake8 happy

* add doc
This commit is contained in:
7 2017-06-22 13:26:50 -07:00 committed by Eli Uriegas
parent b5369e611c
commit f049a4ca67
4 changed files with 118 additions and 10 deletions

View File

@ -57,6 +57,12 @@ for Gunicorn `worker-class` argument:
gunicorn myapp:app --bind 0.0.0.0:1337 --worker-class sanic.worker.GunicornWorker gunicorn myapp:app --bind 0.0.0.0:1337 --worker-class sanic.worker.GunicornWorker
``` ```
If your application suffers from memory leaks, you can configure Gunicorn to gracefully restart a worker
after it has processed a given number of requests. This can be a convenient way to help limit the effects
of the memory leak.
See the [Gunicorn Docs](http://docs.gunicorn.org/en/latest/settings.html#max-requests) for more information.
## Asynchronous support ## Asynchronous support
This is suitable if you *need* to share the sanic process with other applications, in particular the `loop`. This is suitable if you *need* to share the sanic process with other applications, in particular the `loop`.
However be advised that this method does not support using multiple processes, and is not the preferred way However be advised that this method does not support using multiple processes, and is not the preferred way

View File

@ -75,7 +75,7 @@ class HttpProtocol(asyncio.Protocol):
signal=Signal(), connections=set(), request_timeout=60, signal=Signal(), connections=set(), request_timeout=60,
request_max_size=None, request_class=None, has_log=True, request_max_size=None, request_class=None, has_log=True,
keep_alive=True, is_request_stream=False, router=None, keep_alive=True, is_request_stream=False, router=None,
**kwargs): state=None, **kwargs):
self.loop = loop self.loop = loop
self.transport = None self.transport = None
self.request = None self.request = None
@ -99,6 +99,9 @@ class HttpProtocol(asyncio.Protocol):
self._request_handler_task = None self._request_handler_task = None
self._request_stream_task = None self._request_stream_task = None
self._keep_alive = keep_alive self._keep_alive = keep_alive
self.state = state if state else {}
if 'requests_count' not in self.state:
self.state['requests_count'] = 0
@property @property
def keep_alive(self): def keep_alive(self):
@ -154,6 +157,9 @@ class HttpProtocol(asyncio.Protocol):
self.headers = [] self.headers = []
self.parser = HttpRequestParser(self) self.parser = HttpRequestParser(self)
# requests count
self.state['requests_count'] = self.state['requests_count'] + 1
# Parse request chunk or close connection # Parse request chunk or close connection
try: try:
self.parser.feed_data(data) self.parser.feed_data(data)
@ -389,7 +395,7 @@ def serve(host, port, request_handler, error_handler, before_start=None,
register_sys_signals=True, run_async=False, connections=None, register_sys_signals=True, run_async=False, connections=None,
signal=Signal(), request_class=None, has_log=True, keep_alive=True, signal=Signal(), request_class=None, has_log=True, keep_alive=True,
is_request_stream=False, router=None, websocket_max_size=None, is_request_stream=False, router=None, websocket_max_size=None,
websocket_max_queue=None): websocket_max_queue=None, state=None):
"""Start asynchronous HTTP Server on an individual process. """Start asynchronous HTTP Server on an individual process.
:param host: Address to host on :param host: Address to host on
@ -427,8 +433,6 @@ def serve(host, port, request_handler, error_handler, before_start=None,
if debug: if debug:
loop.set_debug(debug) loop.set_debug(debug)
trigger_events(before_start, loop)
connections = connections if connections is not None else set() connections = connections if connections is not None else set()
server = partial( server = partial(
protocol, protocol,
@ -445,7 +449,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
is_request_stream=is_request_stream, is_request_stream=is_request_stream,
router=router, router=router,
websocket_max_size=websocket_max_size, websocket_max_size=websocket_max_size,
websocket_max_queue=websocket_max_queue websocket_max_queue=websocket_max_queue,
state=state
) )
server_coroutine = loop.create_server( server_coroutine = loop.create_server(
@ -457,6 +462,7 @@ def serve(host, port, request_handler, error_handler, before_start=None,
sock=sock, sock=sock,
backlog=backlog backlog=backlog
) )
# Instead of pulling time at the end of every request, # Instead of pulling time at the end of every request,
# pull it once per minute # pull it once per minute
loop.call_soon(partial(update_current_time, loop)) loop.call_soon(partial(update_current_time, loop))
@ -464,6 +470,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
if run_async: if run_async:
return server_coroutine return server_coroutine
trigger_events(before_start, loop)
try: try:
http_server = loop.run_until_complete(server_coroutine) http_server = loop.run_until_complete(server_coroutine)
except: except:

View File

@ -29,7 +29,7 @@ class GunicornWorker(base.Worker):
self.ssl_context = self._create_ssl_context(cfg) self.ssl_context = self._create_ssl_context(cfg)
else: else:
self.ssl_context = None self.ssl_context = None
self.servers = [] self.servers = {}
self.connections = set() self.connections = set()
self.exit_code = 0 self.exit_code = 0
self.signal = Signal() self.signal = Signal()
@ -96,11 +96,16 @@ class GunicornWorker(base.Worker):
async def _run(self): async def _run(self):
for sock in self.sockets: for sock in self.sockets:
self.servers.append(await serve( state = dict(requests_count=0)
self._server_settings["host"] = None
self._server_settings["port"] = None
server = await serve(
sock=sock, sock=sock,
connections=self.connections, connections=self.connections,
state=state,
**self._server_settings **self._server_settings
)) )
self.servers[server] = state
async def _check_alive(self): async def _check_alive(self):
# If our parent changed then we shut down. # If our parent changed then we shut down.
@ -109,7 +114,15 @@ class GunicornWorker(base.Worker):
while self.alive: while self.alive:
self.notify() self.notify()
if pid == os.getpid() and self.ppid != os.getppid(): req_count = sum(
self.servers[srv]["requests_count"] for srv in self.servers
)
if self.max_requests and req_count > self.max_requests:
self.alive = False
self.log.info(
"Max requests exceeded, shutting down: %s", self
)
elif pid == os.getpid() and self.ppid != os.getppid():
self.alive = False self.alive = False
self.log.info("Parent changed, shutting down: %s", self) self.log.info("Parent changed, shutting down: %s", self)
else: else:
@ -166,3 +179,4 @@ class GunicornWorker(base.Worker):
self.alive = False self.alive = False
self.exit_code = 1 self.exit_code = 1
self.cfg.worker_abort(self) self.cfg.worker_abort(self)
sys.exit(1)

View File

@ -3,7 +3,11 @@ import json
import shlex import shlex
import subprocess import subprocess
import urllib.request import urllib.request
from unittest import mock
from sanic.worker import GunicornWorker
from sanic.app import Sanic
import asyncio
import logging
import pytest import pytest
@ -20,3 +24,79 @@ def test_gunicorn_worker(gunicorn_worker):
with urllib.request.urlopen('http://localhost:1337/') as f: with urllib.request.urlopen('http://localhost:1337/') as f:
res = json.loads(f.read(100).decode()) res = json.loads(f.read(100).decode())
assert res['test'] assert res['test']
class GunicornTestWorker(GunicornWorker):
def __init__(self):
self.app = mock.Mock()
self.app.callable = Sanic("test_gunicorn_worker")
self.servers = {}
self.exit_code = 0
self.cfg = mock.Mock()
self.notify = mock.Mock()
@pytest.fixture
def worker():
return GunicornTestWorker()
def test_worker_init_process(worker):
with mock.patch('sanic.worker.asyncio') as mock_asyncio:
try:
worker.init_process()
except TypeError:
pass
assert mock_asyncio.get_event_loop.return_value.close.called
assert mock_asyncio.new_event_loop.called
assert mock_asyncio.set_event_loop.called
def test_worker_init_signals(worker):
worker.loop = mock.Mock()
worker.init_signals()
assert worker.loop.add_signal_handler.called
def test_handle_abort(worker):
with mock.patch('sanic.worker.sys') as mock_sys:
worker.handle_abort(object(), object())
assert not worker.alive
assert worker.exit_code == 1
mock_sys.exit.assert_called_with(1)
def test_handle_quit(worker):
worker.handle_quit(object(), object())
assert not worker.alive
assert worker.exit_code == 0
def test_run_max_requests_exceeded(worker):
loop = asyncio.new_event_loop()
worker.ppid = 1
worker.alive = True
sock = mock.Mock()
sock.cfg_addr = ('localhost', 8080)
worker.sockets = [sock]
worker.wsgi = mock.Mock()
worker.connections = set()
worker.log = mock.Mock()
worker.loop = loop
worker.servers = {
"server1": {"requests_count": 14},
"server2": {"requests_count": 15},
}
worker.max_requests = 10
worker._run = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
# exceeding request count
_runner = asyncio.ensure_future(worker._check_alive(), loop=loop)
loop.run_until_complete(_runner)
assert worker.alive == False
worker.notify.assert_called_with()
worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s",
worker)