From 9eb48c2b0dcec3be097a165c8c1fc6feaccd203a Mon Sep 17 00:00:00 2001 From: Szucs Krisztian Date: Sun, 16 Apr 2017 18:11:24 +0200 Subject: [PATCH 1/3] dask distributed example --- examples/dask_distributed.py | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 examples/dask_distributed.py diff --git a/examples/dask_distributed.py b/examples/dask_distributed.py new file mode 100644 index 00000000..a7ee5079 --- /dev/null +++ b/examples/dask_distributed.py @@ -0,0 +1,40 @@ +from sanic import Sanic +from sanic import response + +from tornado.platform.asyncio import BaseAsyncIOLoop, to_asyncio_future +from distributed import LocalCluster, Client + + +app = Sanic(__name__) + + +def square(x): + return x**2 + + +@app.listener('after_server_start') +async def setup(app, loop): + # configure tornado use asyncio's loop + ioloop = BaseAsyncIOLoop(loop) + + # init distributed client + app.client = Client('tcp://localhost:8786', loop=ioloop, start=False) + await to_asyncio_future(app.client._start()) + + +@app.listener('before_server_stop') +async def stop(app, loop): + await to_asyncio_future(app.client._shutdown()) + + +@app.route('/') +async def test(request, value): + future = app.client.submit(square, value) + result = await to_asyncio_future(future._result()) + return response.text(f'The square of {value} is {result}') + + +if __name__ == '__main__': + # Distributed cluster should be run somewhere else + with LocalCluster(scheduler_port=8786, processes=False) as cluster: + app.run(host="0.0.0.0", port=8000) From 5b22d1486a460b480656b4388963b530399cac85 Mon Sep 17 00:00:00 2001 From: Szucs Krisztian Date: Sun, 16 Apr 2017 18:13:00 +0200 Subject: [PATCH 2/3] fix syntax error in comment --- examples/dask_distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/dask_distributed.py b/examples/dask_distributed.py index a7ee5079..6de7c8ca 100644 --- a/examples/dask_distributed.py +++ b/examples/dask_distributed.py @@ -35,6 +35,6 @@ async def test(request, value): if __name__ == '__main__': - # Distributed cluster should be run somewhere else + # Distributed cluster should run somewhere else with LocalCluster(scheduler_port=8786, processes=False) as cluster: app.run(host="0.0.0.0", port=8000) From 1b939a6823343270f0e2dc09a8670dccc0a792a7 Mon Sep 17 00:00:00 2001 From: Szucs Krisztian Date: Mon, 17 Apr 2017 11:05:19 +0200 Subject: [PATCH 3/3] work with distributed 1.16.1 --- examples/dask_distributed.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/dask_distributed.py b/examples/dask_distributed.py index 6de7c8ca..ef3fe423 100644 --- a/examples/dask_distributed.py +++ b/examples/dask_distributed.py @@ -36,5 +36,6 @@ async def test(request, value): if __name__ == '__main__': # Distributed cluster should run somewhere else - with LocalCluster(scheduler_port=8786, processes=False) as cluster: + with LocalCluster(scheduler_port=8786, nanny=False, n_workers=2, + threads_per_worker=1) as cluster: app.run(host="0.0.0.0", port=8000)