From 101151b4195117d01a584667510a2c51b7a661ff Mon Sep 17 00:00:00 2001 From: Sergey Rybakov <64586748+SerGeRybakov@users.noreply.github.com> Date: Thu, 6 Jan 2022 20:14:52 +0300 Subject: [PATCH] Add credentials property to Request objects (#2357) --- sanic/headers.py | 16 +++- sanic/models/http_types.py | 35 +++++++++ sanic/request.py | 49 +++++++++--- tests/conftest.py | 5 ++ tests/test_requests.py | 151 +++++++++++++++++++------------------ 5 files changed, 170 insertions(+), 86 deletions(-) create mode 100644 sanic/models/http_types.py diff --git a/sanic/headers.py b/sanic/headers.py index b744974c..b4457653 100644 --- a/sanic/headers.py +++ b/sanic/headers.py @@ -2,7 +2,7 @@ from __future__ import annotations import re -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union from urllib.parse import unquote from sanic.exceptions import InvalidHeader @@ -394,3 +394,17 @@ def parse_accept(accept: str) -> AcceptContainer: return AcceptContainer( sorted(accept_list, key=_sort_accept_value, reverse=True) ) + + +def parse_credentials( + header: Optional[str], + prefixes: Union[List, Tuple, Set] = None, +) -> Tuple[Optional[str], Optional[str]]: + """Parses any header with the aim to retrieve any credentials from it.""" + if not prefixes or not isinstance(prefixes, (list, tuple, set)): + prefixes = ("Basic", "Bearer", "Token") + if header is not None: + for prefix in prefixes: + if prefix in header: + return prefix, header.partition(prefix)[-1].strip() + return None, header diff --git a/sanic/models/http_types.py b/sanic/models/http_types.py new file mode 100644 index 00000000..595eaf0e --- /dev/null +++ b/sanic/models/http_types.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from base64 import b64decode +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass() +class Credentials: + auth_type: Optional[str] + token: Optional[str] + _username: Optional[str] = field(default=None) + _password: Optional[str] = field(default=None) + + def __post_init__(self): + if self._auth_is_basic: + self._username, self._password = ( + b64decode(self.token.encode("utf-8")).decode().split(":") + ) + + @property + def username(self): + if not self._auth_is_basic: + raise AttributeError("Username is available for Basic Auth only") + return self._username + + @property + def password(self): + if not self._auth_is_basic: + raise AttributeError("Password is available for Basic Auth only") + return self._password + + @property + def _auth_is_basic(self) -> bool: + return self.auth_type == "Basic" diff --git a/sanic/request.py b/sanic/request.py index 97ab9982..f733950f 100644 --- a/sanic/request.py +++ b/sanic/request.py @@ -14,6 +14,8 @@ from typing import ( from sanic_routing.route import Route # type: ignore +from sanic.models.http_types import Credentials + if TYPE_CHECKING: # no cov from sanic.server import ConnInfo @@ -37,6 +39,7 @@ from sanic.headers import ( Options, parse_accept, parse_content_header, + parse_credentials, parse_forwarded, parse_host, parse_xforwarded, @@ -98,11 +101,13 @@ class Request: "method", "parsed_accept", "parsed_args", - "parsed_not_grouped_args", + "parsed_credentials", "parsed_files", "parsed_form", - "parsed_json", "parsed_forwarded", + "parsed_json", + "parsed_not_grouped_args", + "parsed_token", "raw_url", "responded", "request_middleware_started", @@ -122,6 +127,7 @@ class Request: app: Sanic, head: bytes = b"", ): + self.raw_url = url_bytes # TODO: Content-Encoding detection self._parsed_url = parse_url(url_bytes) @@ -141,9 +147,11 @@ class Request: self.ctx = SimpleNamespace() self.parsed_forwarded: Optional[Options] = None self.parsed_accept: Optional[AcceptContainer] = None + self.parsed_credentials: Optional[Credentials] = None self.parsed_json = None self.parsed_form = None self.parsed_files = None + self.parsed_token: Optional[str] = None self.parsed_args: DefaultDict[ Tuple[bool, bool, str, str], RequestParameters ] = defaultdict(RequestParameters) @@ -332,20 +340,41 @@ class Request: return self.parsed_accept @property - def token(self): + def token(self) -> Optional[str]: """Attempt to return the auth header token. :return: token related to request """ - prefixes = ("Bearer", "Token") - auth_header = self.headers.getone("authorization", None) + if self.parsed_token is None: + prefixes = ("Bearer", "Token") + _, token = parse_credentials( + self.headers.getone("authorization", None), prefixes + ) + self.parsed_token = token + return self.parsed_token - if auth_header is not None: - for prefix in prefixes: - if prefix in auth_header: - return auth_header.partition(prefix)[-1].strip() + @property + def credentials(self) -> Optional[Credentials]: + """Attempt to return the auth header value. - return auth_header + Covers NoAuth, Basic Auth, Bearer Token, Api Token authentication + schemas. + + :return: A named tuple with token or username and password related + to request + """ + if self.parsed_credentials is None: + try: + prefix, credentials = parse_credentials( + self.headers.getone("authorization", None) + ) + if credentials: + self.parsed_credentials = Credentials( + auth_type=prefix, token=credentials + ) + except ValueError: + pass + return self.parsed_credentials @property def form(self): diff --git a/tests/conftest.py b/tests/conftest.py index 22decde5..09194e6e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +import base64 import logging import random import re @@ -204,3 +205,7 @@ def sanic_ext(ext_instance): # noqa yield sanic_ext with suppress(KeyError): del sys.modules["sanic_ext"] + + +def encode_basic_auth_credentials(username, password): + return base64.b64encode(f"{username}:{password}".encode()).decode("ascii") diff --git a/tests/test_requests.py b/tests/test_requests.py index c8f6e3f0..0b15d8d6 100644 --- a/tests/test_requests.py +++ b/tests/test_requests.py @@ -15,9 +15,10 @@ from sanic_testing.testing import ( ) from sanic import Blueprint, Sanic -from sanic.exceptions import SanicException, ServerError -from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, Request, RequestParameters +from sanic.exceptions import ServerError +from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters from sanic.response import html, json, text +from tests.conftest import encode_basic_auth_credentials # ------------------------------------------------------------ # @@ -362,93 +363,95 @@ async def test_uri_template_asgi(app): assert request.uri_template == "/foo//bar/" -def test_token(app): +@pytest.mark.parametrize( + ("auth_type", "token"), + [ + # uuid4 generated token set in "Authorization" header + (None, "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"), + # uuid4 generated token with API Token authorization + ("Token", "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"), + # uuid4 generated token with Bearer Token authorization + ("Bearer", "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"), + # no Authorization header + (None, None), + ], +) +def test_token(app, auth_type, token): @app.route("/") async def handler(request): return text("OK") - # uuid4 generated token. - token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" - headers = { - "content-type": "application/json", - "Authorization": f"{token}", - } + if token: + headers = { + "content-type": "application/json", + "Authorization": f"{auth_type} {token}" + if auth_type + else f"{token}", + } + else: + headers = {"content-type": "application/json"} request, response = app.test_client.get("/", headers=headers) - assert request.token == token - token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" - headers = { - "content-type": "application/json", - "Authorization": f"Token {token}", - } - request, response = app.test_client.get("/", headers=headers) - - assert request.token == token - - token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" - headers = { - "content-type": "application/json", - "Authorization": f"Bearer {token}", - } - - request, response = app.test_client.get("/", headers=headers) - - assert request.token == token - - # no Authorization headers - headers = {"content-type": "application/json"} - - request, response = app.test_client.get("/", headers=headers) - - assert request.token is None - - -@pytest.mark.asyncio -async def test_token_asgi(app): +@pytest.mark.parametrize( + ("auth_type", "token", "username", "password"), + [ + # uuid4 generated token set in "Authorization" header + (None, "a1d895e0-553a-421a-8e22-5ff8ecb48cbf", None, None), + # uuid4 generated token with API Token authorization + ("Token", "a1d895e0-553a-421a-8e22-5ff8ecb48cbf", None, None), + # uuid4 generated token with Bearer Token authorization + ("Bearer", "a1d895e0-553a-421a-8e22-5ff8ecb48cbf", None, None), + # username and password with Basic Auth authorization + ( + "Basic", + encode_basic_auth_credentials("some_username", "some_pass"), + "some_username", + "some_pass", + ), + # no Authorization header + (None, None, None, None), + ], +) +def test_credentials(app, capfd, auth_type, token, username, password): @app.route("/") async def handler(request): return text("OK") - # uuid4 generated token. - token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" - headers = { - "content-type": "application/json", - "Authorization": f"{token}", - } + if token: + headers = { + "content-type": "application/json", + "Authorization": f"{auth_type} {token}" + if auth_type + else f"{token}", + } + else: + headers = {"content-type": "application/json"} - request, response = await app.asgi_client.get("/", headers=headers) + request, response = app.test_client.get("/", headers=headers) - assert request.token == token + if auth_type == "Basic": + assert request.credentials.username == username + assert request.credentials.password == password + else: + _, err = capfd.readouterr() + with pytest.raises(AttributeError): + request.credentials.password + assert "Password is available for Basic Auth only" in err + request.credentials.username + assert "Username is available for Basic Auth only" in err - token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" - headers = { - "content-type": "application/json", - "Authorization": f"Token {token}", - } - - request, response = await app.asgi_client.get("/", headers=headers) - - assert request.token == token - - token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" - headers = { - "content-type": "application/json", - "Authorization": f"Bearer {token}", - } - - request, response = await app.asgi_client.get("/", headers=headers) - - assert request.token == token - - # no Authorization headers - headers = {"content-type": "application/json"} - - request, response = await app.asgi_client.get("/", headers=headers) - - assert request.token is None + if token: + assert request.credentials.token == token + assert request.credentials.auth_type == auth_type + else: + assert request.credentials is None + assert not hasattr(request.credentials, "token") + assert not hasattr(request.credentials, "auth_type") + assert not hasattr(request.credentials, "_username") + assert not hasattr(request.credentials, "_password") def test_content_type(app): @@ -1714,7 +1717,6 @@ async def test_request_query_args_custom_parsing_asgi(app): def test_request_cookies(app): - cookies = {"test": "OK"} @app.get("/") @@ -1729,7 +1731,6 @@ def test_request_cookies(app): @pytest.mark.asyncio async def test_request_cookies_asgi(app): - cookies = {"test": "OK"} @app.get("/")