Moved request handling into sanic

This commit is contained in:
Channel Cat 2016-10-08 19:45:11 -07:00
parent b0d38f8a04
commit 8fbc6c2c4e
4 changed files with 101 additions and 77 deletions

View File

@ -21,4 +21,4 @@ class Config:
""" """
REQUEST_MAX_SIZE = 100000000 # 100 megababies REQUEST_MAX_SIZE = 100000000 # 100 megababies
KEEP_ALIVE_TIMEOUT = 60 # 60 seconds REQUEST_TIMEOUT = 60 # 60 seconds

View File

@ -1,9 +1,11 @@
from .config import Config from .config import Config
from .exceptions import Handler from .exceptions import Handler
from .log import log from .log import log, logging
from .response import HTTPResponse from .response import HTTPResponse
from .router import Router from .router import Router
from .server import serve from .server import serve
from .exceptions import ServerError
from inspect import isawaitable
class Sanic: class Sanic:
name = None name = None
@ -25,14 +27,54 @@ class Sanic:
return response return response
def handler(self, *args, **kwargs): def exception(self, *args, **kwargs):
def response(handler): def response(handler):
self.error_handler.add(*args, **kwargs) self.error_handler.add(*args, **kwargs)
return handler return handler
return response return response
async def handle_request(self, request, respond):
try:
handler = self.router.get(request)
if handler is None:
raise ServerError("'None' was returned while requesting a handler from the router")
response = handler(request)
# Check if the handler is asynchronous
if isawaitable(response):
response = await response
except Exception as e:
try:
response = self.error_handler.response(request, e)
except Exception as e:
if self.debug:
response = HTTPResponse("Error while handling error: {}\nStack: {}".format(e, format_exc()))
else:
response = HTTPResponse("An error occured while handling an error")
respond(response)
def run(self, host="127.0.0.1", port=8000, debug=False, on_start=None, on_stop=None): def run(self, host="127.0.0.1", port=8000, debug=False, on_start=None, on_stop=None):
self.error_handler.debug=True self.error_handler.debug=True
self.debug = debug self.debug = debug
return serve(sanic=self, host=host, port=port, debug=debug, on_start=on_start, on_stop=on_stop)
if debug:
log.setLevel(logging.DEBUG)
log.debug(self.config.LOGO)
# Serve
log.info('Goin\' Fast @ {}:{}'.format(host, port))
return serve(
host=host,
port=port,
debug=debug,
on_start=on_start,
on_stop=on_stop,
request_handler=self.handle_request,
request_timeout=self.config.REQUEST_TIMEOUT,
request_max_size=self.config.REQUEST_MAX_SIZE,
)

View File

@ -1,14 +1,5 @@
import argparse
import sys
import asyncio import asyncio
import signal
import functools
import httptools
import logging
from inspect import isawaitable from inspect import isawaitable
from ujson import loads as json_loads
from traceback import format_exc
from time import time
import httptools import httptools
try: try:
@ -16,40 +7,45 @@ try:
except: except:
async_loop = asyncio async_loop = asyncio
from socket import *
from .log import log from .log import log
from .exceptions import ServerError
from .response import HTTPResponse
from .request import Request from .request import Request
class Signal:
stopped = False
class HttpProtocol(asyncio.Protocol): class HttpProtocol(asyncio.Protocol):
__slots__ = ('loop', 'transport', # event loop, connection __slots__ = ('loop', 'transport', 'connections', 'signal', # event loop, connection
'parser', 'request', 'url', 'headers', # request params 'parser', 'request', 'url', 'headers', # request params
'sanic', # router and config object 'request_handler', 'request_timeout', 'request_max_size', # request config
'_total_body_size', '_timeout_handler') # connection management '_total_request_size', '_timeout_handler') # connection management
def __init__(self, *, sanic, loop): def __init__(self, *, loop, request_handler, signal=Signal(), connections={}, request_timeout=60, request_max_size=None):
self.loop = loop self.loop = loop
self.transport = None self.transport = None
self.request = None self.request = None
self.parser = None self.parser = None
self.url = None self.url = None
self.headers = None self.headers = None
self.sanic = sanic self.signal = signal
self.connections = connections
self.request_handler = request_handler
self.request_timeout = request_timeout
self.request_max_size = request_max_size
self._total_request_size = 0 self._total_request_size = 0
self._timeout_handler = None
# -------------------------------------------- # # -------------------------------------------- #
# Connection # Connection
# -------------------------------------------- # # -------------------------------------------- #
def connection_made(self, transport): def connection_made(self, transport):
self._timeout_handler = self.loop.call_later(self.sanic.config.KEEP_ALIVE_TIMEOUT, self.connection_timeout) self.connections[self] = True
self._timeout_handler = self.loop.call_later(self.request_timeout, self.connection_timeout)
self.transport = transport self.transport = transport
#TODO: handle connection timeout
def connection_lost(self, exc): def connection_lost(self, exc):
del self.connections[self]
self._timeout_handler.cancel() self._timeout_handler.cancel()
self.cleanup() self.cleanup()
@ -63,7 +59,7 @@ class HttpProtocol(asyncio.Protocol):
def data_received(self, data): def data_received(self, data):
# Check for the request itself getting too large and exceeding memory limits # Check for the request itself getting too large and exceeding memory limits
self._total_request_size += len(data) self._total_request_size += len(data)
if self._total_request_size > self.sanic.config.REQUEST_MAX_SIZE: if self._total_request_size > self.request_max_size:
return self.bail_out("Request too large ({}), connection closed".format(self._total_request_size)) return self.bail_out("Request too large ({}), connection closed".format(self._total_request_size))
# Create parser if this is the first time we're receiving data # Create parser if this is the first time we're receiving data
@ -82,7 +78,7 @@ class HttpProtocol(asyncio.Protocol):
self.url = url self.url = url
def on_header(self, name, value): def on_header(self, name, value):
if name == 'Content-Length' and int(value) > self.sanic.config.REQUEST_MAX_SIZE: if name == 'Content-Length' and int(value) > self.request_max_size:
return self.bail_out("Request body too large ({}), connection closed".format(value)) return self.bail_out("Request body too large ({}), connection closed".format(value))
self.headers.append((name, value.decode('utf-8'))) self.headers.append((name, value.decode('utf-8')))
@ -98,41 +94,16 @@ class HttpProtocol(asyncio.Protocol):
def on_body(self, body): def on_body(self, body):
self.request.body = body self.request.body = body
def on_message_complete(self): def on_message_complete(self):
self.loop.create_task(self.get_response()) self.loop.create_task(self.request_handler(self.request, self.write_response))
# -------------------------------------------- # # -------------------------------------------- #
# Responding # Responding
# -------------------------------------------- # # -------------------------------------------- #
async def get_response(self):
try:
handler = self.sanic.router.get(self.request)
if handler is None:
raise ServerError("'None' was returned while requesting a handler from the router")
response = handler(self.request)
# Check if the handler is asynchronous
if isawaitable(response):
response = await response
except Exception as e:
try:
response = self.sanic.error_handler.response(self.request, e)
except Exception as e:
if self.sanic.debug:
response = HTTPResponse("Error while handling error: {}\nStack: {}".format(e, format_exc()))
else:
response = HTTPResponse("An error occured while handling an error")
self.write_response(response)
def write_response(self, response): def write_response(self, response):
#print("response - {} - {}".format(self.n, self.request))
try: try:
keep_alive = self.parser.should_keep_alive() keep_alive = self.parser.should_keep_alive() and not self.signal.stopped
self.transport.write(response.output(self.request.version, keep_alive, self.sanic.config.KEEP_ALIVE_TIMEOUT)) self.transport.write(response.output(self.request.version, keep_alive, self.request_timeout))
#print("KA - {}".format(self.parser.should_keep_alive()))
if not keep_alive: if not keep_alive:
self.transport.close() self.transport.close()
else: else:
@ -140,8 +111,8 @@ class HttpProtocol(asyncio.Protocol):
except Exception as e: except Exception as e:
self.bail_out("Writing request failed, connection closed {}".format(e)) self.bail_out("Writing request failed, connection closed {}".format(e))
def bail_out(self, error): def bail_out(self, message):
log.error(error) log.error(message)
self.transport.close() self.transport.close()
def cleanup(self): def cleanup(self):
@ -149,39 +120,53 @@ class HttpProtocol(asyncio.Protocol):
self.request = None self.request = None
self.url = None self.url = None
self.headers = None self.headers = None
self._total_body_size = 0 self._total_request_size = 0
def serve(sanic, host, port, debug=False, on_start=None, on_stop=None): def serve(host, port, request_handler, on_start=None, on_stop=None, debug=False, request_timeout=60, request_max_size=None):
# Create Event Loop # Create Event Loop
loop = async_loop.new_event_loop() loop = async_loop.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
loop.set_debug(debug) loop.set_debug(debug)
if debug:
log.setLevel(logging.DEBUG)
log.debug(sanic.config.LOGO)
# Serve
log.info('Goin\' Fast @ {}:{}'.format(host, port))
# Run the on_start function if provided # Run the on_start function if provided
if on_start: if on_start:
result = on_start(sanic, loop) result = on_start(loop)
if isawaitable(result): if isawaitable(result):
loop.run_until_complete(result) loop.run_until_complete(result)
server_coroutine = loop.create_server(lambda: HttpProtocol(loop=loop, sanic=sanic), host, port) connections = {}
#connection_timeout_coroutine = signal = Signal()
server_loop = loop.run_until_complete(server_coroutine) server_coroutine = loop.create_server(lambda: HttpProtocol(
loop=loop,
connections = connections,
signal = signal,
request_handler=request_handler,
request_timeout=request_timeout,
request_max_size=request_max_size,
), host, port)
http_server = loop.run_until_complete(server_coroutine)
try: try:
loop.run_forever() loop.run_forever()
except Exception:
pass
finally: finally:
log.info("Stop requested, draining connections...")
# Run the on_stop function if provided # Run the on_stop function if provided
if on_stop: if on_stop:
result = on_stop(sanic, loop) result = on_stop(loop)
if isawaitable(result): if isawaitable(result):
loop.run_until_complete(result) loop.run_until_complete(result)
# Wait for event loop to finish and all connections to drain # Wait for event loop to finish and all connections to drain
server_loop.close() http_server.close()
loop.close() loop.run_until_complete(http_server.wait_closed())
# Complete all tasks on the loop
signal.stopped = True
while connections:
loop.run_until_complete(asyncio.sleep(0.1))
loop.close()
log.info("Server Stopped")

View File

@ -11,13 +11,10 @@ setup(
author='Channel Cat', author='Channel Cat',
author_email='channelcat@gmail.com', author_email='channelcat@gmail.com',
description='A microframework based on uvloop and httptools', description='A microframework based on uvloop and httptools',
#long_description=,
packages=['sanic'], packages=['sanic'],
#include_package_data=True,
#zip_safe=False,
platforms='any', platforms='any',
install_requires=[ install_requires=[
'uvloop>=0.5.3', #'uvloop>=0.5.3',
'httptools>=0.0.9', 'httptools>=0.0.9',
'ujson>=1.35', 'ujson>=1.35',
], ],