Added IPware algorithm
This commit is contained in:
parent
15b6980a1a
commit
bdf66cbb3f
79
sanic/ip.py
Normal file
79
sanic/ip.py
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
import socket
|
||||||
|
|
||||||
|
# CAPS R OK BCUZ STR.CASEFOLD
|
||||||
|
HEADER_PRECEDENCE_ORDER = (
|
||||||
|
'HTTP_X_FORWARDED_FOR', 'X_FORWARDED_FOR',
|
||||||
|
# (client, proxy1, proxy2) OR (proxy2, proxy1, client)
|
||||||
|
'HTTP_CLIENT_IP',
|
||||||
|
'HTTP_X_REAL_IP',
|
||||||
|
'HTTP_X_FORWARDED',
|
||||||
|
'HTTP_X_CLUSTER_CLIENT_IP',
|
||||||
|
'HTTP_FORWARDED_FOR',
|
||||||
|
'HTTP_FORWARDED',
|
||||||
|
'HTTP_VIA',
|
||||||
|
'REMOTE_ADDR',
|
||||||
|
)
|
||||||
|
|
||||||
|
# Private IP addresses
|
||||||
|
# http://en.wikipedia.org/wiki/List_of_assigned_/8_IPv4_address_blocks
|
||||||
|
# http://www.ietf.org/rfc/rfc3330.txt (IPv4)
|
||||||
|
# http://www.ietf.org/rfc/rfc5156.txt (IPv6)
|
||||||
|
# Regex would be ideal here, but this is keeping it simple
|
||||||
|
PRIVATE_IP_PREFIX = (
|
||||||
|
'0.', # externally non-routable
|
||||||
|
'10.', # class A private block
|
||||||
|
'169.254.', # link-local block
|
||||||
|
'172.16.', '172.17.', '172.18.', '172.19.',
|
||||||
|
'172.20.', '172.21.', '172.22.', '172.23.',
|
||||||
|
'172.24.', '172.25.', '172.26.', '172.27.',
|
||||||
|
'172.28.', '172.29.', '172.30.', '172.31.', # class B private blocks
|
||||||
|
'192.0.2.', # reserved for documentation and example code
|
||||||
|
'192.168.', # class C private block
|
||||||
|
'255.255.255.', # IPv4 broadcast address
|
||||||
|
'2001:db8:', # reserved for documentation and example code
|
||||||
|
'fc00:', # IPv6 private block
|
||||||
|
'fe80:', # link-local unicast
|
||||||
|
'ff00:', # IPv6 multicast
|
||||||
|
)
|
||||||
|
|
||||||
|
LOOPBACK_PREFIX = (
|
||||||
|
'127.', # IPv4 loopback device
|
||||||
|
'::1', # IPv6 loopback device
|
||||||
|
)
|
||||||
|
|
||||||
|
NON_PUBLIC_IP_PREFIX = PRIVATE_IP_PREFIX + LOOPBACK_PREFIX
|
||||||
|
|
||||||
|
|
||||||
|
def is_valid_ipv4(ip_str):
|
||||||
|
"""
|
||||||
|
Check the validity of an IPv4 address
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
socket.inet_pton(socket.AF_INET, ip_str)
|
||||||
|
except AttributeError: # pragma: no cover
|
||||||
|
try: # Fall-back on legacy API or False
|
||||||
|
socket.inet_aton(ip_str)
|
||||||
|
except (AttributeError, socket.error):
|
||||||
|
return False
|
||||||
|
return ip_str.count('.') == 3
|
||||||
|
except socket.error:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def is_valid_ipv6(ip_str):
|
||||||
|
"""
|
||||||
|
Check the validity of an IPv6 address
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
socket.inet_pton(socket.AF_INET6, ip_str)
|
||||||
|
except socket.error:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def is_valid_ip(ip_str):
|
||||||
|
"""
|
||||||
|
Check the validity of an IP address
|
||||||
|
"""
|
||||||
|
return is_valid_ipv4(ip_str) or is_valid_ipv6(ip_str)
|
|
@ -5,6 +5,7 @@ from collections import namedtuple
|
||||||
from http.cookies import SimpleCookie
|
from http.cookies import SimpleCookie
|
||||||
from httptools import parse_url
|
from httptools import parse_url
|
||||||
from urllib.parse import parse_qs, urlunparse
|
from urllib.parse import parse_qs, urlunparse
|
||||||
|
from .ip import is_valid_ip, HEADER_PRECEDENCE_ORDER, NON_PUBLIC_IP_PREFIX
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from ujson import loads as json_loads
|
from ujson import loads as json_loads
|
||||||
|
@ -166,11 +167,23 @@ class Request(dict):
|
||||||
return self._cookies
|
return self._cookies
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ip(self):
|
def ip(self, right_most_proxy=False):
|
||||||
if not hasattr(self, '_ip'):
|
# Need attr to differentiate the right_most_proxy thing
|
||||||
self._ip = (self.transport.get_extra_info('peername') or
|
# Or we could use a separate method for right_most_proxy
|
||||||
(None, None))
|
attr = f'_ip{right_most_proxy}'
|
||||||
return self._ip
|
if not hasattr(self, attr):
|
||||||
|
setattr(self, attr, None)
|
||||||
|
for key in HEADER_PRECEDENCE_ORDER:
|
||||||
|
value = self.headers.get(key, self.headers.get(key.replace('_', '-'), '')).strip()
|
||||||
|
if value is not None and value != '':
|
||||||
|
ips = [ip.strip().lower() for ip in value.split(',')]
|
||||||
|
if right_most_proxy and len(ips) > 1:
|
||||||
|
ips = reversed(ips)
|
||||||
|
for ip_str in ips:
|
||||||
|
if ip_str and is_valid_ip(ip_str):
|
||||||
|
if not ip_str.startswith(NON_PUBLIC_IP_PREFIX):
|
||||||
|
setattr(self, attr, ip_str)
|
||||||
|
return getattr(self, attr)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def remote_addr(self):
|
def remote_addr(self):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user