import sys import json import socket from cgi import parse_header from collections import namedtuple from http.cookies import SimpleCookie from httptools import parse_url from urllib.parse import parse_qs, urlunparse try: from ujson import loads as json_loads except ImportError: if sys.version_info[:2] == (3, 5): def json_loads(data): # on Python 3.5 json.loads only supports str not bytes return json.loads(data.decode()) else: json_loads = json.loads from sanic.exceptions import InvalidUsage from sanic.log import error_logger DEFAULT_HTTP_CONTENT_TYPE = "application/octet-stream" # HTTP/1.1: https://www.w3.org/Protocols/rfc2616/rfc2616-sec7.html#sec7.2.1 # > If the media type remains unknown, the recipient SHOULD treat it # > as type "application/octet-stream" class RequestParameters(dict): """Hosts a dict with lists as values where get returns the first value of the list and getlist returns the whole shebang """ def get(self, name, default=None): """Return the first value, either the default or actual""" return super().get(name, [default])[0] def getlist(self, name, default=None): """Return the entire list""" return super().get(name, default) class Request(dict): """Properties of an HTTP request such as URL, headers, etc.""" __slots__ = ( 'app', 'headers', 'version', 'method', '_cookies', 'transport', 'body', 'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files', '_ip', '_parsed_url', 'uri_template', 'stream', '_remote_addr', '_socket', '_port' ) def __init__(self, url_bytes, headers, version, method, transport): # TODO: Content-Encoding detection self._parsed_url = parse_url(url_bytes) self.app = None self.headers = headers self.version = version self.method = method self.transport = transport # Init but do not inhale self.body = [] self.parsed_json = None self.parsed_form = None self.parsed_files = None self.parsed_args = None self.uri_template = None self._cookies = None self.stream = None def __repr__(self): if self.method is None or not self.path: return '<{0}>'.format(self.__class__.__name__) return '<{0}: {1} {2}>'.format(self.__class__.__name__, self.method, self.path) @property def json(self): if self.parsed_json is None: self.load_json() return self.parsed_json def load_json(self, loads=json_loads): try: self.parsed_json = loads(self.body) except Exception: if not self.body: return None raise InvalidUsage("Failed when parsing body as json") return self.parsed_json @property def token(self): """Attempt to return the auth header token. :return: token related to request """ prefixes = ('Bearer', 'Token') auth_header = self.headers.get('Authorization') if auth_header is not None: for prefix in prefixes: if prefix in auth_header: return auth_header.partition(prefix)[-1].strip() return auth_header @property def form(self): if self.parsed_form is None: self.parsed_form = RequestParameters() self.parsed_files = RequestParameters() content_type = self.headers.get( 'Content-Type', DEFAULT_HTTP_CONTENT_TYPE) content_type, parameters = parse_header(content_type) try: if content_type == 'application/x-www-form-urlencoded': self.parsed_form = RequestParameters( parse_qs(self.body.decode('utf-8'))) elif content_type == 'multipart/form-data': # TODO: Stream this instead of reading to/from memory boundary = parameters['boundary'].encode('utf-8') self.parsed_form, self.parsed_files = ( parse_multipart_form(self.body, boundary)) except Exception: error_logger.exception("Failed when parsing form") return self.parsed_form @property def files(self): if self.parsed_files is None: self.form # compute form to get files return self.parsed_files @property def args(self): if self.parsed_args is None: if self.query_string: self.parsed_args = RequestParameters( parse_qs(self.query_string)) else: self.parsed_args = RequestParameters() return self.parsed_args @property def raw_args(self): return {k: v[0] for k, v in self.args.items()} @property def cookies(self): if self._cookies is None: cookie = self.headers.get('Cookie') if cookie is not None: cookies = SimpleCookie() cookies.load(cookie) self._cookies = {name: cookie.value for name, cookie in cookies.items()} else: self._cookies = {} return self._cookies @property def ip(self): if not hasattr(self, '_socket'): self._get_address() return self._ip @property def port(self): if not hasattr(self, '_socket'): self._get_address() return self._port @property def socket(self): if not hasattr(self, '_socket'): self._get_socket() return self._socket def _get_address(self): sock = self.transport.get_extra_info('socket') if sock.family == socket.AF_INET: self._socket = (self.transport.get_extra_info('peername') or (None, None)) self._ip, self._port = self._socket elif sock.family == socket.AF_INET6: self._socket = (self.transport.get_extra_info('peername') or (None, None, None, None)) self._ip, self._port, *_ = self._socket else: self._ip, self._port = (None, None) @property def remote_addr(self): """Attempt to return the original client ip based on X-Forwarded-For. :return: original client ip. """ if not hasattr(self, '_remote_addr'): forwarded_for = self.headers.get('X-Forwarded-For', '').split(',') remote_addrs = [ addr for addr in [ addr.strip() for addr in forwarded_for ] if addr ] if len(remote_addrs) > 0: self._remote_addr = remote_addrs[0] else: self._remote_addr = '' return self._remote_addr @property def scheme(self): if self.app.websocket_enabled \ and self.headers.get('upgrade') == 'websocket': scheme = 'ws' else: scheme = 'http' if self.transport.get_extra_info('sslcontext'): scheme += 's' return scheme @property def host(self): # it appears that httptools doesn't return the host # so pull it from the headers return self.headers.get('Host', '') @property def content_type(self): return self.headers.get('Content-Type', DEFAULT_HTTP_CONTENT_TYPE) @property def match_info(self): """return matched info after resolving route""" return self.app.router.get(self)[2] @property def path(self): return self._parsed_url.path.decode('utf-8') @property def query_string(self): if self._parsed_url.query: return self._parsed_url.query.decode('utf-8') else: return '' @property def url(self): return urlunparse(( self.scheme, self.host, self.path, None, self.query_string, None)) File = namedtuple('File', ['type', 'body', 'name']) def parse_multipart_form(body, boundary): """Parse a request body and returns fields and files :param body: bytes request body :param boundary: bytes multipart boundary :return: fields (RequestParameters), files (RequestParameters) """ files = RequestParameters() fields = RequestParameters() form_parts = body.split(boundary) for form_part in form_parts[1:-1]: file_name = None file_type = None field_name = None line_index = 2 line_end_index = 0 while not line_end_index == -1: line_end_index = form_part.find(b'\r\n', line_index) form_line = form_part[line_index:line_end_index].decode('utf-8') line_index = line_end_index + 2 if not form_line: break colon_index = form_line.index(':') form_header_field = form_line[0:colon_index].lower() form_header_value, form_parameters = parse_header( form_line[colon_index + 2:]) if form_header_field == 'content-disposition': if 'filename' in form_parameters: file_name = form_parameters['filename'] field_name = form_parameters.get('name') elif form_header_field == 'content-type': file_type = form_header_value post_data = form_part[line_index:-4] if file_name or file_type: file = File(type=file_type, name=file_name, body=post_data) if field_name in files: files[field_name].append(file) else: files[field_name] = [file] else: value = post_data.decode('utf-8') if field_name in fields: fields[field_name].append(value) else: fields[field_name] = [value] return fields, files