Clean up of examples. Removes non-core examples, optimizes and restyles remaining to strictly follow PEP 8 styling guidelines. Non-Core examples will be moved to Wiki.

This commit is contained in:
Jeremy Zimmerman 2017-06-01 11:53:05 -07:00
parent 4a1d1a0dc1
commit 4b5320a8f0
38 changed files with 36 additions and 1135 deletions

View File

@ -1,26 +0,0 @@
from sanic import Sanic
from sanic import response
import aiohttp
app = Sanic(__name__)
async def fetch(session, url):
"""
Use session object to perform 'get' request on url
"""
async with session.get(url) as result:
return await result.json()
@app.route('/')
async def handle_request(request):
url = "https://api.github.com/repos/channelcat/sanic"
async with aiohttp.ClientSession() as session:
result = await fetch(session, url)
return response.json(result)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000, workers=2)

View File

@ -1,149 +0,0 @@
from sanic import Sanic
from sanic.exceptions import NotFound, URLBuildError
from sanic.response import json
from sanic.views import HTTPMethodView
from asyncorm import configure_orm
from asyncorm.exceptions import QuerysetError
from library.models import Book
from library.serializer import BookSerializer
app = Sanic(name=__name__)
@app.listener('before_server_start')
def orm_configure(sanic, loop):
db_config = {'database': 'sanic_example',
'host': 'localhost',
'user': 'ormdbuser',
'password': 'ormDbPass',
}
# configure_orm needs a dictionary with:
# * the database configuration
# * the application/s where the models are defined
orm_app = configure_orm({'loop': loop, # always use the sanic loop!
'db_config': db_config,
'modules': ['library', ], # list of apps
})
# orm_app is the object that orchestrates the whole ORM
# sync_db should be run only once, better do that as external command
# it creates the tables in the database!!!!
# orm_app.sync_db()
# for all the 404 lets handle the exceptions
@app.exception(NotFound)
def ignore_404s(request, exception):
return json({'method': request.method,
'status': exception.status_code,
'error': exception.args[0],
'results': None,
})
@app.exception(URLBuildError)
def ignore_urlbuilderrors(request, exception):
return json({'method': request.method,
'status': exception.status_code,
'error': exception.args[0],
'results': None,
})
# now the propper sanic workflow
class BooksView(HTTPMethodView):
async def get(self, request):
filtered_by = request.raw_args
if filtered_by:
try:
q_books = Book.objects.filter(**filtered_by)
except AttributeError as e:
raise URLBuildError(e.args[0])
else:
q_books = Book.objects.all()
books = []
async for book in q_books:
books.append(BookSerializer.serialize(book))
return json({'method': request.method,
'status': 200,
'results': books or None,
'count': len(books),
})
async def post(self, request):
# populate the book with the data in the request
book = Book(**request.json)
# and await on save
await book.save()
return json({'method': request.method,
'status': 201,
'results': BookSerializer.serialize(book),
})
class BookView(HTTPMethodView):
async def get_object(self, request, book_id):
try:
# await on database consults
book = await Book.objects.get(**{'id': book_id})
except QuerysetError as e:
raise NotFound(e.args[0])
return book
async def get(self, request, book_id):
# await on database consults
book = await self.get_object(request, book_id)
return json({'method': request.method,
'status': 200,
'results': BookSerializer.serialize(book),
})
async def put(self, request, book_id):
# await on database consults
book = await self.get_object(request, book_id)
# await on save
await book.save(**request.json)
return json({'method': request.method,
'status': 200,
'results': BookSerializer.serialize(book),
})
async def patch(self, request, book_id):
# await on database consults
book = await self.get_object(request, book_id)
# await on save
await book.save(**request.json)
return json({'method': request.method,
'status': 200,
'results': BookSerializer.serialize(book),
})
async def delete(self, request, book_id):
# await on database consults
book = await self.get_object(request, book_id)
# await on its deletion
await book.delete()
return json({'method': request.method,
'status': 200,
'results': None
})
app.add_route(BooksView.as_view(), '/books/')
app.add_route(BookView.as_view(), '/books/<book_id:int>/')
if __name__ == '__main__':
app.run(port=9000, debug=True)

View File

@ -1,24 +0,0 @@
from asyncorm import models
BOOK_CHOICES = (
('hard cover', 'hard cover book'),
('paperback', 'paperback book')
)
# This is a simple model definition
class Book(models.Model):
name = models.CharField(max_length=50)
synopsis = models.CharField(max_length=255)
book_type = models.CharField(
max_length=15,
null=True,
choices=BOOK_CHOICES
)
pages = models.IntegerField(null=True)
date_created = models.DateField(auto_now=True)
class Meta():
ordering = ['-name', ]
unique_together = ['name', 'synopsis']

View File

@ -1,15 +0,0 @@
from asyncorm.serializers import ModelSerializer, SerializerMethod
from library.models import Book
class BookSerializer(ModelSerializer):
book_type = SerializerMethod()
def get_book_type(self, instance):
return instance.book_type_display()
class Meta():
model = Book
fields = [
'id', 'name', 'synopsis', 'book_type', 'pages', 'date_created'
]

View File

@ -1,3 +0,0 @@
asyncorm>=0.0.9
sanic==0.5.4

View File

@ -1,6 +1,6 @@
from sanic import Sanic from sanic import Sanic
from sanic import Blueprint from sanic import Blueprint
from sanic.response import json, text from sanic.response import json
app = Sanic(__name__) app = Sanic(__name__)

View File

@ -1,73 +0,0 @@
"""
Example of caching using aiocache package. To run it you will need to install
aiocache with `pip install aiocache` plus a Redis instance running
in localhost:6379
Running this example you will see that the first call lasts 3 seconds and
the rest are instant because the value is retrieved from Redis.
If you want more info about the package check
https://github.com/argaen/aiocache
"""
import asyncio
import uuid
from sanic import Sanic
from sanic.response import json
from sanic.log import log
from aiocache import caches, cached
app = Sanic(__name__)
config = {
"default": {
"cache": "aiocache.RedisCache",
"endpoint": "127.0.0.1",
"timeout": 2,
"namespace": "sanic",
"serializer": {
"class": "aiocache.serializers.JsonSerializer"
}
}
}
@app.listener('before_server_start')
def init_cache(sanic, loop):
caches.set_config(config)
# You can use alias or pass explicit args instead
@cached(key="my_custom_key", ttl=30, alias="default")
async def expensive_call():
log.info("Expensive has been called")
await asyncio.sleep(3)
# You are storing the whole dict under "my_custom_key"
return {"test": str(uuid.uuid4())}
async def get_cache_value():
# This lazy loads a singleton so it will return the same instance every
# time. If you want to create a new instance, you can use
# `caches.create("default")`
cache = caches.get("default")
return await cache.get("my_custom_key")
@app.route("/")
async def test(request):
log.info("Received GET /")
return json(await expensive_call())
@app.route("/retrieve")
async def test(request):
log.info("Received GET /retrieve")
return json(await get_cache_value())
app.run(host="0.0.0.0", port=8000)

View File

@ -1,41 +0,0 @@
from sanic import Sanic
from sanic import response
from tornado.platform.asyncio import BaseAsyncIOLoop, to_asyncio_future
from distributed import LocalCluster, Client
app = Sanic(__name__)
def square(x):
return x**2
@app.listener('after_server_start')
async def setup(app, loop):
# configure tornado use asyncio's loop
ioloop = BaseAsyncIOLoop(loop)
# init distributed client
app.client = Client('tcp://localhost:8786', loop=ioloop, start=False)
await to_asyncio_future(app.client._start())
@app.listener('before_server_stop')
async def stop(app, loop):
await to_asyncio_future(app.client._shutdown())
@app.route('/<value:int>')
async def test(request, value):
future = app.client.submit(square, value)
result = await to_asyncio_future(future._result())
return response.text(f'The square of {value} is {result}')
if __name__ == '__main__':
# Distributed cluster should run somewhere else
with LocalCluster(scheduler_port=8786, nanny=False, n_workers=2,
threads_per_worker=1) as cluster:
app.run(host="0.0.0.0", port=8000)

View File

@ -1,136 +0,0 @@
# This demo requires aioredis and environmental variables established in ENV_VARS
import json
import logging
import os
from datetime import datetime
import aioredis
import sanic
from sanic import Sanic
ENV_VARS = ["REDIS_HOST", "REDIS_PORT",
"REDIS_MINPOOL", "REDIS_MAXPOOL",
"REDIS_PASS", "APP_LOGFILE"]
app = Sanic(name=__name__)
logger = None
@app.middleware("request")
async def log_uri(request):
# Simple middleware to log the URI endpoint that was called
logger.info("URI called: {0}".format(request.url))
@app.listener('before_server_start')
async def before_server_start(app, loop):
logger.info("Starting redis pool")
app.redis_pool = await aioredis.create_pool(
(app.config.REDIS_HOST, int(app.config.REDIS_PORT)),
minsize=int(app.config.REDIS_MINPOOL),
maxsize=int(app.config.REDIS_MAXPOOL),
password=app.config.REDIS_PASS)
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
logger.info("Closing redis pool")
app.redis_pool.close()
await app.redis_pool.wait_closed()
@app.middleware("request")
async def attach_db_connectors(request):
# Just put the db objects in the request for easier access
logger.info("Passing redis pool to request object")
request["redis"] = request.app.redis_pool
@app.route("/state/<user_id>", methods=["GET"])
async def access_state(request, user_id):
try:
# Check to see if the value is in cache, if so lets return that
with await request["redis"] as redis_conn:
state = await redis_conn.get(user_id, encoding="utf-8")
if state:
return sanic.response.json({"msg": "Success",
"status": 200,
"success": True,
"data": json.loads(state),
"finished_at": datetime.now().isoformat()})
# Then state object is not in redis
logger.critical("Unable to find user_data in cache.")
return sanic.response.HTTPResponse({"msg": "User state not found",
"success": False,
"status": 404,
"finished_at": datetime.now().isoformat()}, status=404)
except aioredis.ProtocolError:
logger.critical("Unable to connect to state cache")
return sanic.response.HTTPResponse({"msg": "Internal Server Error",
"status": 500,
"success": False,
"finished_at": datetime.now().isoformat()}, status=500)
@app.route("/state/<user_id>/push", methods=["POST"])
async def set_state(request, user_id):
try:
# Pull a connection from the pool
with await request["redis"] as redis_conn:
# Set the value in cache to your new value
await redis_conn.set(user_id, json.dumps(request.json), expire=1800)
logger.info("Successfully pushed state to cache")
return sanic.response.HTTPResponse({"msg": "Successfully pushed state to cache",
"success": True,
"status": 200,
"finished_at": datetime.now().isoformat()})
except aioredis.ProtocolError:
logger.critical("Unable to connect to state cache")
return sanic.response.HTTPResponse({"msg": "Internal Server Error",
"status": 500,
"success": False,
"finished_at": datetime.now().isoformat()}, status=500)
def configure():
# Setup environment variables
env_vars = [os.environ.get(v, None) for v in ENV_VARS]
if not all(env_vars):
# Send back environment variables that were not set
return False, ", ".join([ENV_VARS[i] for i, flag in env_vars if not flag])
else:
# Add all the env vars to our app config
app.config.update({k: v for k, v in zip(ENV_VARS, env_vars)})
setup_logging()
return True, None
def setup_logging():
logging_format = "[%(asctime)s] %(process)d-%(levelname)s "
logging_format += "%(module)s::%(funcName)s():l%(lineno)d: "
logging_format += "%(message)s"
logging.basicConfig(
filename=app.config.APP_LOGFILE,
format=logging_format,
level=logging.DEBUG)
def main(result, missing):
if result:
try:
app.run(host="0.0.0.0", port=8080, debug=True)
except:
logging.critical("User killed server. Closing")
else:
logging.critical("Unable to start. Missing environment variables [{0}]".format(missing))
if __name__ == "__main__":
result, missing = configure()
logger = logging.getLogger()
main(result, missing)

View File

@ -37,7 +37,6 @@ server's error_handler to an instance of our CustomHandler
""" """
from sanic import Sanic from sanic import Sanic
from sanic import response
app = Sanic(__name__) app = Sanic(__name__)
@ -49,8 +48,7 @@ app.error_handler = handler
async def test(request): async def test(request):
# Here, something occurs which causes an unexpected exception # Here, something occurs which causes an unexpected exception
# This exception will flow to our custom handler. # This exception will flow to our custom handler.
1 / 0 raise SanicException('You Broke It!')
return response.json({"test": True})
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000, debug=True) app.run(host="127.0.0.1", port=8000, debug=True)

View File

@ -1,34 +0,0 @@
# Render templates in a Flask like way from a "template" directory in
# the project
from sanic import Sanic
from sanic import response
from jinja2 import Environment, PackageLoader, select_autoescape
import sys
# Enabling async template execution which allows you to take advantage
# of newer Python features requires Python 3.6 or later.
enable_async = sys.version_info >= (3, 6)
app = Sanic(__name__)
# Load the template environment with async support
template_env = Environment(
loader=PackageLoader('jinja_example', 'templates'),
autoescape=select_autoescape(['html', 'xml']),
enable_async=enable_async
)
# Load the template from file
template = template_env.get_template("example_template.html")
@app.route('/')
async def test(request):
rendered_template = await template.render_async(
knights='that say nih; asynchronously')
return response.html(rendered_template)
app.run(host="0.0.0.0", port=8080, debug=True)

View File

@ -1,8 +0,0 @@
aiofiles==0.3.1
httptools==0.0.9
Jinja2==2.9.6
MarkupSafe==1.0
sanic==0.5.2
ujson==1.35
uvloop==0.8.0
websockets==3.3

View File

@ -1,10 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<title>My Webpage</title>
</head>
<body>
<h1>Hello World</h1>
<p>knights - {{ knights }}</p>
</body>
</html>

View File

@ -8,11 +8,12 @@ app = Sanic(__name__)
sem = None sem = None
@app.listener('before_server_start') @app.listener('before_server_start')
def init(sanic, loop): def init(sanic, loop):
global sem global sem
CONCURRENCY_PER_WORKER = 4 concurrency_per_worker = 4
sem = asyncio.Semaphore(CONCURRENCY_PER_WORKER, loop=loop) sem = asyncio.Semaphore(concurrency_per_worker, loop=loop)
async def bounded_fetch(session, url): async def bounded_fetch(session, url):
""" """

View File

@ -7,6 +7,7 @@ from sanic import response
app = Sanic(__name__) app = Sanic(__name__)
@app.route('/') @app.route('/')
def handle_request(request): def handle_request(request):
return response.json( return response.json(
@ -15,6 +16,7 @@ def handle_request(request):
status=200 status=200
) )
@app.route('/unauthorized') @app.route('/unauthorized')
def handle_request(request): def handle_request(request):
return response.json( return response.json(

View File

@ -14,6 +14,8 @@ log = logging.getLogger()
# Set logger to override default basicConfig # Set logger to override default basicConfig
sanic = Sanic() sanic = Sanic()
@sanic.route("/") @sanic.route("/")
def test(request): def test(request):
log.info("received request; responding with 'hey'") log.info("received request; responding with 'hey'")

View File

@ -1,25 +0,0 @@
from sanic import Sanic
from sanic.response import html
import plotly
import plotly.graph_objs as go
app = Sanic(__name__)
@app.route('/')
async def index(request):
trace1 = go.Scatter(
x=[0, 1, 2, 3, 4, 5],
y=[1.5, 1, 1.3, 0.7, 0.8, 0.9]
)
trace2 = go.Bar(
x=[0, 1, 2, 3, 4, 5],
y=[1, 0.5, 0.7, -1.2, 0.3, 0.4]
)
data = [trace1, trace2]
return html(plotly.offline.plot(data, auto_open=False, output_type='div'))
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000, debug=True)

View File

@ -1,2 +0,0 @@
plotly>=2.0.7
sanic>=0.5.0

View File

@ -8,6 +8,7 @@ app = Sanic(__name__)
def handle_request(request): def handle_request(request):
return response.redirect('/redirect') return response.redirect('/redirect')
@app.route('/redirect') @app.route('/redirect')
async def test(request): async def test(request):
return response.json({"Redirected": True}) return response.json({"Redirected": True})

View File

@ -1,10 +0,0 @@
import requests
# Warning: This is a heavy process.
data = ""
for i in range(1, 250000):
data += str(i)
r = requests.post('http://127.0.0.1:8000/stream', data=data)
print(r.text)

View File

@ -1,65 +0,0 @@
from sanic import Sanic
from sanic.views import CompositionView
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator
from sanic.blueprints import Blueprint
from sanic.response import stream, text
bp = Blueprint('blueprint_request_stream')
app = Sanic('request_stream')
class SimpleView(HTTPMethodView):
@stream_decorator
async def post(self, request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8')
return text(result)
@app.post('/stream', stream=True)
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.get()
if body is None:
break
body = body.decode('utf-8').replace('1', 'A')
response.write(body)
return stream(streaming)
@bp.put('/bp_stream', stream=True)
async def bp_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)
async def post_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8')
return text(result)
app.blueprint(bp)
app.add_route(SimpleView.as_view(), '/method_view')
view = CompositionView()
view.add(['POST'], post_handler, stream=True)
app.add_route(view, '/composition_view')
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000)

View File

@ -1,12 +1,12 @@
from sanic import Sanic from sanic import Sanic
from sanic import response from sanic import response
from multiprocessing import Event
from signal import signal, SIGINT from signal import signal, SIGINT
import asyncio import asyncio
import uvloop import uvloop
app = Sanic(__name__) app = Sanic(__name__)
@app.route("/") @app.route("/")
async def test(request): async def test(request):
return response.json({"answer": "42"}) return response.json({"answer": "42"})

View File

@ -1,60 +0,0 @@
# encoding: utf-8
"""
You need the aiomysql
"""
import os
import aiomysql
from sanic import Sanic
from sanic.response import json
database_name = os.environ['DATABASE_NAME']
database_host = os.environ['DATABASE_HOST']
database_user = os.environ['DATABASE_USER']
database_password = os.environ['DATABASE_PASSWORD']
app = Sanic()
@app.listener("before_server_start")
async def get_pool(app, loop):
"""
the first param is the global instance ,
so we can store our connection pool in it .
and it can be used by different request
:param args:
:param kwargs:
:return:
"""
app.pool = {
"aiomysql": await aiomysql.create_pool(host=database_host, user=database_user, password=database_password,
db=database_name,
maxsize=5)}
async with app.pool['aiomysql'].acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('DROP TABLE IF EXISTS sanic_polls')
await cur.execute("""CREATE TABLE sanic_polls (
id serial primary key,
question varchar(50),
pub_date timestamp
);""")
for i in range(0, 100):
await cur.execute("""INSERT INTO sanic_polls
(id, question, pub_date) VALUES ({}, {}, now())
""".format(i, i))
@app.route("/")
async def test():
data = {}
async with app.pool['aiomysql'].acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT question, pub_date FROM sanic_polls")
result = await cur.fetchall()
if result or len(result) > 0:
data['data'] = result
return json(data)
if __name__ == '__main__':
app.run(host="127.0.0.1", workers=4, port=12000)

View File

@ -1,61 +0,0 @@
from sanic import Sanic
from sanic.response import json
from aiopeewee import AioModel, AioMySQLDatabase, model_to_dict
from peewee import CharField, TextField, DateTimeField
from peewee import ForeignKeyField, PrimaryKeyField
db = AioMySQLDatabase('test', user='root', password='',
host='127.0.0.1', port=3306)
class User(AioModel):
username = CharField()
class Meta:
database = db
class Blog(AioModel):
user = ForeignKeyField(User)
title = CharField(max_length=25)
content = TextField(default='')
pub_date = DateTimeField(null=True)
pk = PrimaryKeyField()
class Meta:
database = db
app = Sanic(__name__)
@app.listener('before_server_start')
async def setup(app, loop):
# create connection pool
await db.connect(loop)
# create table if not exists
await db.create_tables([User, Blog], safe=True)
@app.listener('before_server_stop')
async def stop(app, loop):
# close connection pool
await db.close()
@app.post('/users')
async def add_user(request):
user = await User.create(**request.json)
return json(await model_to_dict(user))
@app.get('/users/count')
async def user_count(request):
count = await User.select().count()
return json({'count': count})
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)

View File

@ -1,65 +0,0 @@
""" To run this example you need additional aiopg package
"""
import os
import asyncio
import uvloop
import aiopg
from sanic import Sanic
from sanic.response import json
database_name = os.environ['DATABASE_NAME']
database_host = os.environ['DATABASE_HOST']
database_user = os.environ['DATABASE_USER']
database_password = os.environ['DATABASE_PASSWORD']
connection = 'postgres://{0}:{1}@{2}/{3}'.format(database_user,
database_password,
database_host,
database_name)
async def get_pool():
return await aiopg.create_pool(connection)
app = Sanic(name=__name__)
@app.listener('before_server_start')
async def prepare_db(app, loop):
"""
Let's create some table and add some data
"""
async with aiopg.create_pool(connection) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('DROP TABLE IF EXISTS sanic_polls')
await cur.execute("""CREATE TABLE sanic_polls (
id serial primary key,
question varchar(50),
pub_date timestamp
);""")
for i in range(0, 100):
await cur.execute("""INSERT INTO sanic_polls
(id, question, pub_date) VALUES ({}, {}, now())
""".format(i, i))
@app.route("/")
async def handle(request):
result = []
async def test_select():
async with aiopg.create_pool(connection) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT question, pub_date FROM sanic_polls")
async for row in cur:
result.append({"question": row[0], "pub_date": row[1]})
res = await test_select()
return json({'polls': result})
if __name__ == '__main__':
app.run(host='0.0.0.0',
port=8000,
debug=True)

View File

@ -1,67 +0,0 @@
""" To run this example you need additional aiopg package
"""
import os
import asyncio
import datetime
import uvloop
from aiopg.sa import create_engine
import sqlalchemy as sa
from sanic import Sanic
from sanic.response import json
database_name = os.environ['DATABASE_NAME']
database_host = os.environ['DATABASE_HOST']
database_user = os.environ['DATABASE_USER']
database_password = os.environ['DATABASE_PASSWORD']
connection = 'postgres://{0}:{1}@{2}/{3}'.format(database_user,
database_password,
database_host,
database_name)
metadata = sa.MetaData()
polls = sa.Table('sanic_polls', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('question', sa.String(50)),
sa.Column("pub_date", sa.DateTime))
app = Sanic(name=__name__)
@app.listener('before_server_start')
async def prepare_db(app, loop):
""" Let's add some data
"""
async with create_engine(connection) as engine:
async with engine.acquire() as conn:
await conn.execute('DROP TABLE IF EXISTS sanic_polls')
await conn.execute("""CREATE TABLE sanic_polls (
id serial primary key,
question varchar(50),
pub_date timestamp
);""")
for i in range(0, 100):
await conn.execute(
polls.insert().values(question=i,
pub_date=datetime.datetime.now())
)
@app.route("/")
async def handle(request):
async with create_engine(connection) as engine:
async with engine.acquire() as conn:
result = []
async for row in conn.execute(polls.select()):
result.append({"question": row.question,
"pub_date": row.pub_date})
return json({"polls": result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)

View File

@ -1,34 +0,0 @@
""" To run this example you need additional aioredis package
"""
from sanic import Sanic, response
import aioredis
app = Sanic(__name__)
@app.route("/")
async def handle(request):
async with request.app.redis_pool.get() as redis:
await redis.set('test-my-key', 'value')
val = await redis.get('test-my-key')
return response.text(val.decode('utf-8'))
@app.listener('before_server_start')
async def before_server_start(app, loop):
app.redis_pool = await aioredis.create_pool(
('localhost', 6379),
minsize=5,
maxsize=10,
loop=loop
)
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
app.redis_pool.close()
await app.redis_pool.wait_closed()
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)

View File

@ -1,51 +0,0 @@
import os
import asyncio
import uvloop
from asyncpg import connect, create_pool
from sanic import Sanic
from sanic.response import json
DB_CONFIG = {
'host': '<host>',
'user': '<user>',
'password': '<password>',
'port': '<port>',
'database': '<database>'
}
def jsonify(records):
"""
Parse asyncpg record response into JSON format
"""
return [dict(r.items()) for r in records]
app = Sanic(__name__)
@app.listener('before_server_start')
async def register_db(app, loop):
app.pool = await create_pool(**DB_CONFIG, loop=loop, max_size=100)
async with app.pool.acquire() as connection:
await connection.execute('DROP TABLE IF EXISTS sanic_post')
await connection.execute("""CREATE TABLE sanic_post (
id serial primary key,
content varchar(50),
post_date timestamp
);""")
for i in range(0, 1000):
await connection.execute(f"""INSERT INTO sanic_post
(id, content, post_date) VALUES ({i}, {i}, now())""")
@app.get('/')
async def root_get(request):
async with app.pool.acquire() as connection:
results = await connection.fetch('SELECT * FROM sanic_post')
return json({'posts': jsonify(results)})
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8080)

View File

@ -1,41 +0,0 @@
""" sanic motor (async driver for mongodb) example
Required packages:
pymongo==3.4.0
motor==1.1
sanic==0.2.0
"""
from sanic import Sanic
from sanic import response
app = Sanic('motor_mongodb')
def get_db():
from motor.motor_asyncio import AsyncIOMotorClient
mongo_uri = "mongodb://127.0.0.1:27017/test"
client = AsyncIOMotorClient(mongo_uri)
return client['test']
@app.route('/objects', methods=['GET'])
async def get(request):
db = get_db()
docs = await db.test_col.find().to_list(length=100)
for doc in docs:
doc['id'] = str(doc['_id'])
del doc['_id']
return response.json(docs)
@app.route('/post', methods=['POST'])
async def new(request):
doc = request.json
print(doc)
db = get_db()
object_id = await db.test_col.save(doc)
return response.json({'object_id': str(object_id)})
if __name__ == "__main__":
app.run(host='0.0.0.0', port=8000, debug=True)

View File

@ -1,116 +0,0 @@
## You need the following additional packages for this example
# aiopg
# peewee_async
# peewee
## sanic imports
from sanic import Sanic
from sanic.response import json
## peewee_async related imports
import peewee
from peewee import Model, BaseModel
from peewee_async import Manager, PostgresqlDatabase, execute
from functools import partial
# we instantiate a custom loop so we can pass it to our db manager
## from peewee_async docs:
# Also theres no need to connect and re-connect before executing async queries
# with manager! Its all automatic. But you can run Manager.connect() or
# Manager.close() when you need it.
class AsyncManager(Manager):
"""Inherit the peewee_async manager with our own object
configuration
database.allow_sync = False
"""
def __init__(self, _model_class, *args, **kwargs):
super(AsyncManager, self).__init__(*args, **kwargs)
self._model_class = _model_class
self.database.allow_sync = False
def _do_fill(self, method, *args, **kwargs):
_class_method = getattr(super(AsyncManager, self), method)
pf = partial(_class_method, self._model_class)
return pf(*args, **kwargs)
def new(self, *args, **kwargs):
return self._do_fill('create', *args, **kwargs)
def get(self, *args, **kwargs):
return self._do_fill('get', *args, **kwargs)
def execute(self, query):
return execute(query)
def _get_meta_db_class(db):
"""creating a declartive class model for db"""
class _BlockedMeta(BaseModel):
def __new__(cls, name, bases, attrs):
_instance = super(_BlockedMeta, cls).__new__(cls, name, bases, attrs)
_instance.objects = AsyncManager(_instance, db)
return _instance
class _Base(Model, metaclass=_BlockedMeta):
def to_dict(self):
return self._data
class Meta:
database=db
return _Base
def declarative_base(*args, **kwargs):
"""Returns a new Modeled Class after inheriting meta and Model classes"""
db = PostgresqlDatabase(*args, **kwargs)
return _get_meta_db_class(db)
AsyncBaseModel = declarative_base(database='test',
host='127.0.0.1',
user='postgres',
password='mysecretpassword')
# let's create a simple key value store:
class KeyValue(AsyncBaseModel):
key = peewee.CharField(max_length=40, unique=True)
text = peewee.TextField(default='')
app = Sanic('peewee_example')
@app.route('/post/<key>/<value>')
async def post(request, key, value):
"""
Save get parameters to database
"""
obj = await KeyValue.objects.new(key=key, text=value)
return json({'object_id': obj.id})
@app.route('/get')
async def get(request):
"""
Load all objects from database
"""
all_objects = await KeyValue.objects.execute(KeyValue.select())
serialized_obj = []
for obj in all_objects:
serialized_obj.append({
'id': obj.id,
'key': obj.key,
'value': obj.text}
)
return json({'objects': serialized_obj})
if __name__ == "__main__":
app.run(host='0.0.0.0', port=8000)

View File

@ -19,24 +19,27 @@ def test_sync(request):
@app.route("/dynamic/<name>/<id:int>") @app.route("/dynamic/<name>/<id:int>")
def test_params(request, name, id): def test_params(request, name, i):
return response.text("yeehaww {} {}".format(name, id)) return response.text("yeehaww {} {}".format(name, i))
@app.route("/exception") @app.route("/exception")
def exception(request): def exception(request):
raise ServerError("It's dead jim") raise ServerError("It's dead jim")
@app.route("/await") @app.route("/await")
async def test_await(request): async def test_await(request):
import asyncio import asyncio
await asyncio.sleep(5) await asyncio.sleep(5)
return response.text("I'm feeling sleepy") return response.text("I'm feeling sleepy")
@app.route("/file") @app.route("/file")
async def test_file(request): async def test_file(request):
return await response.file(os.path.abspath("setup.py")) return await response.file(os.path.abspath("setup.py"))
@app.route("/file_stream") @app.route("/file_stream")
async def test_file_stream(request): async def test_file_stream(request):
return await response.file_stream(os.path.abspath("setup.py"), return await response.file_stream(os.path.abspath("setup.py"),
@ -46,9 +49,11 @@ async def test_file_stream(request):
# Exceptions # Exceptions
# ----------------------------------------------- # # ----------------------------------------------- #
@app.exception(ServerError) @app.exception(ServerError)
async def test(request, exception): async def test(request, exception):
return response.json({"exception": "{}".format(exception), "status": exception.status_code}, status=exception.status_code) return response.json({"exception": "{}".format(exception), "status": exception.status_code},
status=exception.status_code)
# ----------------------------------------------- # # ----------------------------------------------- #
@ -67,7 +72,8 @@ def post_json(request):
@app.route("/query_string") @app.route("/query_string")
def query_string(request): def query_string(request):
return response.json({"parsed": True, "args": request.args, "url": request.url, "query_string": request.query_string}) return response.json({"parsed": True, "args": request.args, "url": request.url,
"query_string": request.query_string})
# ----------------------------------------------- # # ----------------------------------------------- #

View File

@ -1,11 +1,11 @@
from sanic import Sanic from sanic import Sanic
from sanic import response from sanic import response
import socket import socket
import sys
import os import os
app = Sanic(__name__) app = Sanic(__name__)
@app.route("/test") @app.route("/test")
async def test(request): async def test(request):
return response.text("OK") return response.text("OK")

View File

@ -3,6 +3,7 @@ from sanic import response
app = Sanic(__name__) app = Sanic(__name__)
@app.route('/') @app.route('/')
async def index(request): async def index(request):
# generate a URL for the endpoint `post_handler` # generate a URL for the endpoint `post_handler`
@ -10,6 +11,7 @@ async def index(request):
# the URL is `/posts/5`, redirect to it # the URL is `/posts/5`, redirect to it
return response.redirect(url) return response.redirect(url)
@app.route('/posts/<post_id>') @app.route('/posts/<post_id>')
async def post_handler(request, post_id): async def post_handler(request, post_id):
return response.text('Post - {}'.format(post_id)) return response.text('Post - {}'.format(post_id))

View File

@ -11,20 +11,24 @@ from sanic.blueprints import Blueprint
app = Sanic() app = Sanic()
bp = Blueprint("bp", host="bp.example.com") bp = Blueprint("bp", host="bp.example.com")
@app.route('/', host=["example.com", @app.route('/', host=["example.com",
"somethingelse.com", "somethingelse.com",
"therestofyourdomains.com"]) "therestofyourdomains.com"])
async def hello(request): async def hello(request):
return response.text("Some defaults") return response.text("Some defaults")
@app.route('/', host="sub.example.com") @app.route('/', host="sub.example.com")
async def hello(request): async def hello(request):
return response.text("42") return response.text("42")
@bp.route("/question") @bp.route("/question")
async def hello(request): async def hello(request):
return response.text("What is the meaning of life?") return response.text("What is the meaning of life?")
@bp.route("/answer") @bp.route("/answer")
async def hello(request): async def hello(request):
return response.text("42") return response.text("42")

View File

@ -20,4 +20,5 @@ async def feed(request, ws):
if __name__ == '__main__': if __name__ == '__main__':
app.run() app.run(host="0.0.0.0", port=8000, debug=True)