Add credentials property to Request objects (#2357)
This commit is contained in:
@@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
||||
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
|
||||
from urllib.parse import unquote
|
||||
|
||||
from sanic.exceptions import InvalidHeader
|
||||
@@ -394,3 +394,17 @@ def parse_accept(accept: str) -> AcceptContainer:
|
||||
return AcceptContainer(
|
||||
sorted(accept_list, key=_sort_accept_value, reverse=True)
|
||||
)
|
||||
|
||||
|
||||
def parse_credentials(
|
||||
header: Optional[str],
|
||||
prefixes: Union[List, Tuple, Set] = None,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Parses any header with the aim to retrieve any credentials from it."""
|
||||
if not prefixes or not isinstance(prefixes, (list, tuple, set)):
|
||||
prefixes = ("Basic", "Bearer", "Token")
|
||||
if header is not None:
|
||||
for prefix in prefixes:
|
||||
if prefix in header:
|
||||
return prefix, header.partition(prefix)[-1].strip()
|
||||
return None, header
|
||||
|
||||
35
sanic/models/http_types.py
Normal file
35
sanic/models/http_types.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from base64 import b64decode
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass()
|
||||
class Credentials:
|
||||
auth_type: Optional[str]
|
||||
token: Optional[str]
|
||||
_username: Optional[str] = field(default=None)
|
||||
_password: Optional[str] = field(default=None)
|
||||
|
||||
def __post_init__(self):
|
||||
if self._auth_is_basic:
|
||||
self._username, self._password = (
|
||||
b64decode(self.token.encode("utf-8")).decode().split(":")
|
||||
)
|
||||
|
||||
@property
|
||||
def username(self):
|
||||
if not self._auth_is_basic:
|
||||
raise AttributeError("Username is available for Basic Auth only")
|
||||
return self._username
|
||||
|
||||
@property
|
||||
def password(self):
|
||||
if not self._auth_is_basic:
|
||||
raise AttributeError("Password is available for Basic Auth only")
|
||||
return self._password
|
||||
|
||||
@property
|
||||
def _auth_is_basic(self) -> bool:
|
||||
return self.auth_type == "Basic"
|
||||
@@ -14,6 +14,8 @@ from typing import (
|
||||
|
||||
from sanic_routing.route import Route # type: ignore
|
||||
|
||||
from sanic.models.http_types import Credentials
|
||||
|
||||
|
||||
if TYPE_CHECKING: # no cov
|
||||
from sanic.server import ConnInfo
|
||||
@@ -37,6 +39,7 @@ from sanic.headers import (
|
||||
Options,
|
||||
parse_accept,
|
||||
parse_content_header,
|
||||
parse_credentials,
|
||||
parse_forwarded,
|
||||
parse_host,
|
||||
parse_xforwarded,
|
||||
@@ -98,11 +101,13 @@ class Request:
|
||||
"method",
|
||||
"parsed_accept",
|
||||
"parsed_args",
|
||||
"parsed_not_grouped_args",
|
||||
"parsed_credentials",
|
||||
"parsed_files",
|
||||
"parsed_form",
|
||||
"parsed_json",
|
||||
"parsed_forwarded",
|
||||
"parsed_json",
|
||||
"parsed_not_grouped_args",
|
||||
"parsed_token",
|
||||
"raw_url",
|
||||
"responded",
|
||||
"request_middleware_started",
|
||||
@@ -122,6 +127,7 @@ class Request:
|
||||
app: Sanic,
|
||||
head: bytes = b"",
|
||||
):
|
||||
|
||||
self.raw_url = url_bytes
|
||||
# TODO: Content-Encoding detection
|
||||
self._parsed_url = parse_url(url_bytes)
|
||||
@@ -141,9 +147,11 @@ class Request:
|
||||
self.ctx = SimpleNamespace()
|
||||
self.parsed_forwarded: Optional[Options] = None
|
||||
self.parsed_accept: Optional[AcceptContainer] = None
|
||||
self.parsed_credentials: Optional[Credentials] = None
|
||||
self.parsed_json = None
|
||||
self.parsed_form = None
|
||||
self.parsed_files = None
|
||||
self.parsed_token: Optional[str] = None
|
||||
self.parsed_args: DefaultDict[
|
||||
Tuple[bool, bool, str, str], RequestParameters
|
||||
] = defaultdict(RequestParameters)
|
||||
@@ -332,20 +340,41 @@ class Request:
|
||||
return self.parsed_accept
|
||||
|
||||
@property
|
||||
def token(self):
|
||||
def token(self) -> Optional[str]:
|
||||
"""Attempt to return the auth header token.
|
||||
|
||||
:return: token related to request
|
||||
"""
|
||||
prefixes = ("Bearer", "Token")
|
||||
auth_header = self.headers.getone("authorization", None)
|
||||
if self.parsed_token is None:
|
||||
prefixes = ("Bearer", "Token")
|
||||
_, token = parse_credentials(
|
||||
self.headers.getone("authorization", None), prefixes
|
||||
)
|
||||
self.parsed_token = token
|
||||
return self.parsed_token
|
||||
|
||||
if auth_header is not None:
|
||||
for prefix in prefixes:
|
||||
if prefix in auth_header:
|
||||
return auth_header.partition(prefix)[-1].strip()
|
||||
@property
|
||||
def credentials(self) -> Optional[Credentials]:
|
||||
"""Attempt to return the auth header value.
|
||||
|
||||
return auth_header
|
||||
Covers NoAuth, Basic Auth, Bearer Token, Api Token authentication
|
||||
schemas.
|
||||
|
||||
:return: A named tuple with token or username and password related
|
||||
to request
|
||||
"""
|
||||
if self.parsed_credentials is None:
|
||||
try:
|
||||
prefix, credentials = parse_credentials(
|
||||
self.headers.getone("authorization", None)
|
||||
)
|
||||
if credentials:
|
||||
self.parsed_credentials = Credentials(
|
||||
auth_type=prefix, token=credentials
|
||||
)
|
||||
except ValueError:
|
||||
pass
|
||||
return self.parsed_credentials
|
||||
|
||||
@property
|
||||
def form(self):
|
||||
|
||||
Reference in New Issue
Block a user