sanic/tests/test_worker.py

141 lines
3.8 KiB
Python

import time
import json
import shlex
import subprocess
import urllib.request
from unittest import mock
from sanic.worker import GunicornWorker
from sanic.app import Sanic
import asyncio
import pytest
@pytest.fixture(scope='module')
def gunicorn_worker():
command = (
'gunicorn '
'--bind 127.0.0.1:1337 '
'--worker-class sanic.worker.GunicornWorker '
'examples.simple_server:app'
)
worker = subprocess.Popen(shlex.split(command))
time.sleep(3)
yield
worker.kill()
def test_gunicorn_worker(gunicorn_worker):
with urllib.request.urlopen('http://localhost:1337/') as f:
res = json.loads(f.read(100).decode())
assert res['test']
class GunicornTestWorker(GunicornWorker):
def __init__(self):
self.app = mock.Mock()
self.app.callable = Sanic("test_gunicorn_worker")
self.servers = {}
self.exit_code = 0
self.cfg = mock.Mock()
self.notify = mock.Mock()
@pytest.fixture
def worker():
return GunicornTestWorker()
def test_worker_init_process(worker):
with mock.patch('sanic.worker.asyncio') as mock_asyncio:
try:
worker.init_process()
except TypeError:
pass
assert mock_asyncio.get_event_loop.return_value.close.called
assert mock_asyncio.new_event_loop.called
assert mock_asyncio.set_event_loop.called
def test_worker_init_signals(worker):
worker.loop = mock.Mock()
worker.init_signals()
assert worker.loop.add_signal_handler.called
def test_handle_abort(worker):
with mock.patch('sanic.worker.sys') as mock_sys:
worker.handle_abort(object(), object())
assert not worker.alive
assert worker.exit_code == 1
mock_sys.exit.assert_called_with(1)
def test_handle_quit(worker):
worker.handle_quit(object(), object())
assert not worker.alive
assert worker.exit_code == 0
def test_run_max_requests_exceeded(worker):
loop = asyncio.new_event_loop()
worker.ppid = 1
worker.alive = True
sock = mock.Mock()
sock.cfg_addr = ('localhost', 8080)
worker.sockets = [sock]
worker.wsgi = mock.Mock()
worker.connections = set()
worker.log = mock.Mock()
worker.loop = loop
worker.servers = {
"server1": {"requests_count": 14},
"server2": {"requests_count": 15},
}
worker.max_requests = 10
worker._run = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
# exceeding request count
_runner = asyncio.ensure_future(worker._check_alive(), loop=loop)
loop.run_until_complete(_runner)
assert not worker.alive
worker.notify.assert_called_with()
worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s",
worker)
def test_worker_close(worker):
loop = asyncio.new_event_loop()
asyncio.sleep = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
worker.ppid = 1
worker.pid = 2
worker.cfg.graceful_timeout = 1.0
worker.signal = mock.Mock()
worker.signal.stopped = False
worker.wsgi = mock.Mock()
conn = mock.Mock()
conn.websocket = mock.Mock()
conn.websocket.close_connection = mock.Mock(
wraps=asyncio.coroutine(lambda *a, **kw: None)
)
worker.connections = set([conn])
worker.log = mock.Mock()
worker.loop = loop
server = mock.Mock()
server.close = mock.Mock(wraps=lambda *a, **kw: None)
server.wait_closed = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
worker.servers = {
server: {"requests_count": 14},
}
worker.max_requests = 10
# close worker
_close = asyncio.ensure_future(worker.close(), loop=loop)
loop.run_until_complete(_close)
assert worker.signal.stopped
assert conn.websocket.close_connection.called
assert len(worker.servers) == 0