Merge pull request #57 from channelcat/multiprocessing

Added multiprocessing
This commit is contained in:
Channel Cat 2016-10-18 01:48:56 -07:00 committed by GitHub
commit a904a57fa2
11 changed files with 268 additions and 19 deletions

7
CHANGES Normal file
View File

@ -0,0 +1,7 @@
Version 0.1
-----------
- 0.1.4 - Multiprocessing
- 0.1.3 - Blueprint support
- 0.1.1 - 0.1.2 - Struggling to update pypi via CI
Released to public.

View File

@ -49,6 +49,7 @@ app.run(host="0.0.0.0", port=8000)
* [Middleware](docs/middleware.md)
* [Exceptions](docs/exceptions.md)
* [Blueprints](docs/blueprints.md)
* [Deploying](docs/deploying.md)
* [Contributing](docs/contributing.md)
* [License](LICENSE)

35
docs/deploying.md Normal file
View File

@ -0,0 +1,35 @@
# Deploying
When it comes to deploying Sanic, there's not much to it, but there are
a few things to take note of.
## Workers
By default, Sanic listens in the main process using only 1 CPU core.
To crank up the juice, just specify the number of workers in the run
arguments like so:
```python
app.run(host='0.0.0.0', port=1337, workers=4)
```
Sanic will automatically spin up multiple processes and route
traffic between them. We recommend as many workers as you have
available cores.
## Running via Command
If you like using command line arguments, you can launch a sanic server
by executing the module. For example, if you initialized sanic as
app in a file named server.py, you could run the server like so:
`python -m sanic server.app --host=0.0.0.0 --port=1337 --workers=4`
With this way of running sanic, it is not necessary to run app.run in
your python file. If you do, just make sure you wrap it in name == main
like so:
```python
if __name__ == '__main__':
app.run(host='0.0.0.0', port=1337, workers=4)
```

View File

@ -8,3 +8,4 @@ tox
gunicorn
bottle
kyoukai
falcon

36
sanic/__main__.py Normal file
View File

@ -0,0 +1,36 @@
from argparse import ArgumentParser
from importlib import import_module
from .log import log
from .sanic import Sanic
if __name__ == "__main__":
parser = ArgumentParser(prog='sanic')
parser.add_argument('--host', dest='host', type=str, default='127.0.0.1')
parser.add_argument('--port', dest='port', type=int, default=8000)
parser.add_argument('--workers', dest='workers', type=int, default=1, )
parser.add_argument('--debug', dest='debug', action="store_true")
parser.add_argument('module')
args = parser.parse_args()
try:
module_parts = args.module.split(".")
module_name = ".".join(module_parts[:-1])
app_name = module_parts[-1]
module = import_module(module_name)
app = getattr(module, app_name, None)
if type(app) is not Sanic:
raise ValueError("Module is not a Sanic app, it is a {}. "
"Perhaps you meant {}.app?"
.format(type(app).__name__, args.module))
app.run(host=args.host, port=args.port,
workers=args.workers, debug=args.debug)
except ImportError:
log.error("No module named {} found.\n"
" Example File: project/sanic_server.py -> app\n"
" Example Module: project.sanic_server.app"
.format(module_name))
except ValueError as e:
log.error("{}".format(e))

View File

@ -1,5 +1,8 @@
import asyncio
from asyncio import get_event_loop
from inspect import isawaitable
from multiprocessing import Process, Event
from signal import signal, SIGTERM, SIGINT
from time import sleep
from traceback import format_exc
from .config import Config
@ -167,7 +170,7 @@ class Sanic:
# -------------------------------------------------------------------- #
def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None,
before_stop=None):
before_stop=None, sock=None, workers=1):
"""
Runs the HTTP Server and listens until keyboard interrupt or term
signal. On termination, drains connections before closing.
@ -178,11 +181,24 @@ class Sanic:
listening
:param before_stop: Function to be executed when a stop signal is
received before it is respected
:param sock: Socket for the server to accept connections from
:param workers: Number of processes
received before it is respected
:return: Nothing
"""
self.error_handler.debug = True
self.debug = debug
server_settings = {
'host': host,
'port': port,
'sock': sock,
'debug': debug,
'request_handler': self.handle_request,
'request_timeout': self.config.REQUEST_TIMEOUT,
'request_max_size': self.config.REQUEST_MAX_SIZE,
}
if debug:
log.setLevel(logging.DEBUG)
log.debug(self.config.LOGO)
@ -191,23 +207,61 @@ class Sanic:
log.info('Goin\' Fast @ http://{}:{}'.format(host, port))
try:
serve(
host=host,
port=port,
debug=debug,
after_start=after_start,
before_stop=before_stop,
request_handler=self.handle_request,
request_timeout=self.config.REQUEST_TIMEOUT,
request_max_size=self.config.REQUEST_MAX_SIZE,
)
if workers == 1:
server_settings['after_start'] = after_start
server_settings['before_stop'] = before_stop
serve(**server_settings)
else:
log.info('Spinning up {} workers...'.format(workers))
self.serve_multiple(server_settings, workers)
except Exception as e:
log.exception(
'Experienced exception while trying to serve: {}'.format(e))
pass
log.info("Server Stopped")
def stop(self):
"""
This kills the Sanic
"""
asyncio.get_event_loop().stop()
get_event_loop().stop()
@staticmethod
def serve_multiple(server_settings, workers, stop_event=None):
"""
Starts multiple server processes simultaneously. Stops on interrupt
and terminate signals, and drains connections when complete.
:param server_settings: kw arguments to be passed to the serve function
:param workers: number of workers to launch
:param stop_event: if provided, is used as a stop signal
:return:
"""
server_settings['reuse_port'] = True
# Create a stop event to be triggered by a signal
if not stop_event:
stop_event = Event()
signal(SIGINT, lambda s, f: stop_event.set())
signal(SIGTERM, lambda s, f: stop_event.set())
processes = []
for w in range(workers):
process = Process(target=serve, kwargs=server_settings)
process.start()
processes.append(process)
# Infinitely wait for the stop event
try:
while not stop_event.is_set():
sleep(0.3)
except:
pass
log.info('Spinning down workers...')
for process in processes:
process.terminate()
for process in processes:
process.join()

View File

@ -158,8 +158,8 @@ class HttpProtocol(asyncio.Protocol):
def serve(host, port, request_handler, after_start=None, before_stop=None,
debug=False, request_timeout=60,
request_max_size=None):
debug=False, request_timeout=60, sock=None,
request_max_size=None, reuse_port=False):
# Create Event Loop
loop = async_loop.new_event_loop()
asyncio.set_event_loop(loop)
@ -176,7 +176,7 @@ def serve(host, port, request_handler, after_start=None, before_stop=None,
request_handler=request_handler,
request_timeout=request_timeout,
request_max_size=request_max_size,
), host, port)
), host, port, reuse_port=reuse_port, sock=sock)
try:
http_server = loop.run_until_complete(server_coroutine)
except Exception as e:
@ -217,4 +217,3 @@ def serve(host, port, request_handler, after_start=None, before_stop=None,
loop.run_until_complete(asyncio.sleep(0.1))
loop.close()
log.info("Server Stopped")

52
test.py Normal file
View File

@ -0,0 +1,52 @@
from multiprocessing import Array, Event, Process
from time import sleep
from ujson import loads as json_loads
from sanic import Sanic
from sanic.response import json
from sanic.utils import local_request, HOST, PORT
# ------------------------------------------------------------ #
# GET
# ------------------------------------------------------------ #
def test_json():
app = Sanic('test_json')
response = Array('c', 50)
@app.route('/')
async def handler(request):
return json({"test": True})
stop_event = Event()
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
response.value = http_response.text.encode()
stop_event.set()
def rescue_crew():
sleep(5)
stop_event.set()
rescue_process = Process(target=rescue_crew)
rescue_process.start()
app.serve_multiple({
'host': HOST,
'port': PORT,
'after_start': after_start,
'request_handler': app.handle_request,
'request_max_size': 100000,
}, workers=2, stop_event=stop_event)
rescue_process.terminate()
try:
results = json_loads(response.value)
except:
raise ValueError("Expected JSON response but got '{}'".format(response))
assert results.get('test') == True
test_json()

View File

@ -0,0 +1,11 @@
# Run with: gunicorn --workers=1 --worker-class=meinheld.gmeinheld.MeinheldWorker falc:app
import falcon
import ujson as json
class TestResource:
def on_get(self, req, resp):
resp.body = json.dumps({"test": True})
app = falcon.API()
app.add_route('/', TestResource())

View File

@ -15,5 +15,5 @@ app = Sanic("test")
async def test(request):
return json({"test": True})
app.run(host="0.0.0.0", port=sys.argv[1])
if __name__ == '__main__':
app.run(host="0.0.0.0", port=sys.argv[1])

View File

@ -0,0 +1,53 @@
from multiprocessing import Array, Event, Process
from time import sleep
from ujson import loads as json_loads
from sanic import Sanic
from sanic.response import json
from sanic.utils import local_request, HOST, PORT
# ------------------------------------------------------------ #
# GET
# ------------------------------------------------------------ #
# TODO: Figure out why this freezes on pytest but not when
# executed via interpreter
def skip_test_multiprocessing():
app = Sanic('test_json')
response = Array('c', 50)
@app.route('/')
async def handler(request):
return json({"test": True})
stop_event = Event()
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
response.value = http_response.text.encode()
stop_event.set()
def rescue_crew():
sleep(5)
stop_event.set()
rescue_process = Process(target=rescue_crew)
rescue_process.start()
app.serve_multiple({
'host': HOST,
'port': PORT,
'after_start': after_start,
'request_handler': app.handle_request,
'request_max_size': 100000,
}, workers=2, stop_event=stop_event)
rescue_process.terminate()
try:
results = json_loads(response.value)
except:
raise ValueError("Expected JSON response but got '{}'".format(response))
assert results.get('test') == True