Added multiprocessing

This commit is contained in:
Channel Cat 2016-10-18 01:22:49 -07:00
parent 18aa937f29
commit 6f105a647e
10 changed files with 262 additions and 19 deletions

View File

@ -47,6 +47,7 @@ app.run(host="0.0.0.0", port=8000)
* [Middleware](docs/middleware.md) * [Middleware](docs/middleware.md)
* [Exceptions](docs/exceptions.md) * [Exceptions](docs/exceptions.md)
* [Blueprints](docs/blueprints.md) * [Blueprints](docs/blueprints.md)
* [Deploying](docs/deploying.md)
* [Contributing](docs/contributing.md) * [Contributing](docs/contributing.md)
* [License](LICENSE) * [License](LICENSE)

35
docs/deploying.md Normal file
View File

@ -0,0 +1,35 @@
# Deploying
When it comes to deploying Sanic, there's not much to it, but there are
a few things to take note of.
## Workers
By default, Sanic listens in the main process using only 1 CPU core.
To crank up the juice, just specify the number of workers in the run
arguments like so:
```python
app.run(host='0.0.0.0', port=1337, workers=4)
```
Sanic will automatically spin up multiple processes and route
traffic between them. We recommend as many workers as you have
available cores.
## Running via Command
If you like using command line arguments, you can launch a sanic server
by executing the module. For example, if you initialized sanic as
app in a file named server.py, you could run the server like so:
`python -m sanic server.app --host=0.0.0.0 --port=1337 --workers=4`
With this way of running sanic, it is not necessary to run app.run in
your python file. If you do, just make sure you wrap it in name == main
like so:
```python
if __name__ == '__main__':
app.run(host='0.0.0.0', port=1337, workers=4)
```

View File

@ -8,3 +8,4 @@ tox
gunicorn gunicorn
bottle bottle
kyoukai kyoukai
falcon

36
sanic/__main__.py Normal file
View File

@ -0,0 +1,36 @@
from argparse import ArgumentParser
from importlib import import_module
from .log import log
from .sanic import Sanic
if __name__ == "__main__":
parser = ArgumentParser(prog='sanic')
parser.add_argument('--host', dest='host', type=str, default='127.0.0.1')
parser.add_argument('--port', dest='port', type=int, default=8000)
parser.add_argument('--workers', dest='workers', type=int, default=1, )
parser.add_argument('--debug', dest='debug', action="store_true")
parser.add_argument('module')
args = parser.parse_args()
try:
module_parts = args.module.split(".")
module_name = ".".join(module_parts[:-1])
app_name = module_parts[-1]
module = import_module(module_name)
app = getattr(module, app_name, None)
if type(app) is not Sanic:
raise ValueError("Module is not a Sanic app, it is a {}. "
"Perhaps you meant {}.app?"
.format(type(app).__name__, args.module))
app.run(host=args.host, port=args.port,
workers=args.workers, debug=args.debug)
except ImportError:
log.error("No module named {} found.\n"
" Example File: project/sanic_server.py -> app\n"
" Example Module: project.sanic_server.app"
.format(module_name))
except ValueError as e:
log.error("{}".format(e))

View File

@ -1,5 +1,9 @@
import asyncio from argparse import ArgumentParser
from asyncio import get_event_loop
from inspect import isawaitable from inspect import isawaitable
from multiprocessing import Process, Event
from signal import signal, SIGTERM, SIGINT
from time import sleep
from traceback import format_exc from traceback import format_exc
from .config import Config from .config import Config
@ -167,7 +171,7 @@ class Sanic:
# -------------------------------------------------------------------- # # -------------------------------------------------------------------- #
def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None, def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None,
before_stop=None): before_stop=None, sock=None, workers=1):
""" """
Runs the HTTP Server and listens until keyboard interrupt or term Runs the HTTP Server and listens until keyboard interrupt or term
signal. On termination, drains connections before closing. signal. On termination, drains connections before closing.
@ -178,11 +182,24 @@ class Sanic:
listening listening
:param before_stop: Function to be executed when a stop signal is :param before_stop: Function to be executed when a stop signal is
received before it is respected received before it is respected
:param sock: Socket for the server to accept connections from
:param workers: Number of processes
received before it is respected
:return: Nothing :return: Nothing
""" """
self.error_handler.debug = True self.error_handler.debug = True
self.debug = debug self.debug = debug
server_settings = {
'host': host,
'port': port,
'sock': sock,
'debug': debug,
'request_handler': self.handle_request,
'request_timeout': self.config.REQUEST_TIMEOUT,
'request_max_size': self.config.REQUEST_MAX_SIZE,
}
if debug: if debug:
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
log.debug(self.config.LOGO) log.debug(self.config.LOGO)
@ -191,23 +208,61 @@ class Sanic:
log.info('Goin\' Fast @ http://{}:{}'.format(host, port)) log.info('Goin\' Fast @ http://{}:{}'.format(host, port))
try: try:
serve( if workers == 1:
host=host, server_settings['after_start'] = after_start
port=port, server_settings['before_stop'] = before_stop
debug=debug, serve(**server_settings)
after_start=after_start, else:
before_stop=before_stop, log.info('Spinning up {} workers...'.format(workers))
request_handler=self.handle_request,
request_timeout=self.config.REQUEST_TIMEOUT, self.serve_multiple(server_settings, workers)
request_max_size=self.config.REQUEST_MAX_SIZE,
)
except Exception as e: except Exception as e:
log.exception( log.exception(
'Experienced exception while trying to serve: {}'.format(e)) 'Experienced exception while trying to serve: {}'.format(e))
pass pass
log.info("Server Stopped")
def stop(self): def stop(self):
""" """
This kills the Sanic This kills the Sanic
""" """
asyncio.get_event_loop().stop() get_event_loop().stop()
@staticmethod
def serve_multiple(server_settings, workers, stop_event=None):
"""
Starts multiple server processes simultaneously. Stops on interrupt
and terminate signals, and drains connections when complete.
:param server_settings: kw arguments to be passed to the serve function
:param workers: number of workers to launch
:param stop_event: if provided, is used as a stop signal
:return:
"""
server_settings['reuse_port'] = True
# Create a stop event to be triggered by a signal
if not stop_event:
stop_event = Event()
signal(SIGINT, lambda s, f: stop_event.set())
signal(SIGTERM, lambda s, f: stop_event.set())
processes = []
for w in range(workers):
process = Process(target=serve, kwargs=server_settings)
process.start()
processes.append(process)
# Infinitely wait for the stop event
try:
while not stop_event.is_set():
sleep(0.3)
except:
pass
log.info('Spinning down workers...')
for process in processes:
process.terminate()
for process in processes:
process.join()

View File

@ -158,8 +158,8 @@ class HttpProtocol(asyncio.Protocol):
def serve(host, port, request_handler, after_start=None, before_stop=None, def serve(host, port, request_handler, after_start=None, before_stop=None,
debug=False, request_timeout=60, debug=False, request_timeout=60, sock=None,
request_max_size=None): request_max_size=None, reuse_port=False):
# Create Event Loop # Create Event Loop
loop = async_loop.new_event_loop() loop = async_loop.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
@ -176,7 +176,7 @@ def serve(host, port, request_handler, after_start=None, before_stop=None,
request_handler=request_handler, request_handler=request_handler,
request_timeout=request_timeout, request_timeout=request_timeout,
request_max_size=request_max_size, request_max_size=request_max_size,
), host, port) ), host, port, reuse_port=reuse_port, sock=sock)
try: try:
http_server = loop.run_until_complete(server_coroutine) http_server = loop.run_until_complete(server_coroutine)
except Exception as e: except Exception as e:
@ -217,4 +217,3 @@ def serve(host, port, request_handler, after_start=None, before_stop=None,
loop.run_until_complete(asyncio.sleep(0.1)) loop.run_until_complete(asyncio.sleep(0.1))
loop.close() loop.close()
log.info("Server Stopped")

52
test.py Normal file
View File

@ -0,0 +1,52 @@
from multiprocessing import Array, Event, Process
from time import sleep
from ujson import loads as json_loads
from sanic import Sanic
from sanic.response import json
from sanic.utils import local_request, HOST, PORT
# ------------------------------------------------------------ #
# GET
# ------------------------------------------------------------ #
def test_json():
app = Sanic('test_json')
response = Array('c', 50)
@app.route('/')
async def handler(request):
return json({"test": True})
stop_event = Event()
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
response.value = http_response.text.encode()
stop_event.set()
def rescue_crew():
sleep(5)
stop_event.set()
rescue_process = Process(target=rescue_crew)
rescue_process.start()
app.serve_multiple({
'host': HOST,
'port': PORT,
'after_start': after_start,
'request_handler': app.handle_request,
'request_max_size': 100000,
}, workers=2, stop_event=stop_event)
rescue_process.terminate()
try:
results = json_loads(response.value)
except:
raise ValueError("Expected JSON response but got '{}'".format(response))
assert results.get('test') == True
test_json()

View File

@ -0,0 +1,11 @@
# Run with: gunicorn --workers=1 --worker-class=meinheld.gmeinheld.MeinheldWorker falc:app
import falcon
import ujson as json
class TestResource:
def on_get(self, req, resp):
resp.body = json.dumps({"test": True})
app = falcon.API()
app.add_route('/', TestResource())

View File

@ -15,5 +15,5 @@ app = Sanic("test")
async def test(request): async def test(request):
return json({"test": True}) return json({"test": True})
if __name__ == '__main__':
app.run(host="0.0.0.0", port=sys.argv[1]) app.run(host="0.0.0.0", port=sys.argv[1])

View File

@ -0,0 +1,53 @@
from multiprocessing import Array, Event, Process
from time import sleep
from ujson import loads as json_loads
from sanic import Sanic
from sanic.response import json
from sanic.utils import local_request, HOST, PORT
# ------------------------------------------------------------ #
# GET
# ------------------------------------------------------------ #
# TODO: Figure out why this freezes on pytest but not when
# executed via interpreter
def skip_test_multiprocessing():
app = Sanic('test_json')
response = Array('c', 50)
@app.route('/')
async def handler(request):
return json({"test": True})
stop_event = Event()
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
response.value = http_response.text.encode()
stop_event.set()
def rescue_crew():
sleep(5)
stop_event.set()
rescue_process = Process(target=rescue_crew)
rescue_process.start()
app.serve_multiple({
'host': HOST,
'port': PORT,
'after_start': after_start,
'request_handler': app.handle_request,
'request_max_size': 100000,
}, workers=2, stop_event=stop_event)
rescue_process.terminate()
try:
results = json_loads(response.value)
except:
raise ValueError("Expected JSON response but got '{}'".format(response))
assert results.get('test') == True