Prepare initial websocket support

This commit is contained in:
Adam Hopkins 2019-05-22 01:42:19 +03:00
parent 8a56da84e6
commit 7b8e3624b8
10 changed files with 207 additions and 995 deletions

37
examples/run_asgi.py Normal file
View File

@ -0,0 +1,37 @@
"""
1. Create a simple Sanic app
2. Run with an ASGI server:
$ uvicorn run_asgi:app
or
$ hypercorn run_asgi:app
"""
from sanic import Sanic
from sanic.response import text
app = Sanic(__name__)
@app.route("/")
def handler(request):
return text("Hello")
@app.route("/foo")
def handler_foo(request):
return text("bar")
@app.websocket('/feed')
async def feed(request, ws):
name = "<someone>"
while True:
data = f"Hello {name}"
await ws.send(data)
name = await ws.recv()
if not name:
break
if __name__ == '__main__':
app.run(debug=True)

View File

@ -8,7 +8,6 @@ from asyncio import CancelledError, Protocol, ensure_future, get_event_loop
from collections import defaultdict, deque from collections import defaultdict, deque
from functools import partial from functools import partial
from inspect import getmodulename, isawaitable, signature, stack from inspect import getmodulename, isawaitable, signature, stack
from multidict import CIMultiDict
from socket import socket from socket import socket
from ssl import Purpose, SSLContext, create_default_context from ssl import Purpose, SSLContext, create_default_context
from traceback import format_exc from traceback import format_exc
@ -24,11 +23,10 @@ from sanic.exceptions import SanicException, ServerError, URLBuildError
from sanic.handlers import ErrorHandler from sanic.handlers import ErrorHandler
from sanic.log import LOGGING_CONFIG_DEFAULTS, error_logger, logger from sanic.log import LOGGING_CONFIG_DEFAULTS, error_logger, logger
from sanic.response import HTTPResponse, StreamingHTTPResponse from sanic.response import HTTPResponse, StreamingHTTPResponse
from sanic.request import Request
from sanic.router import Router from sanic.router import Router
from sanic.server import HttpProtocol, Signal, serve, serve_multiple from sanic.server import HttpProtocol, Signal, serve, serve_multiple
from sanic.static import register as static_register from sanic.static import register as static_register
from sanic.testing import SanicTestClient, SanicASGITestClient from sanic.testing import SanicASGITestClient, SanicTestClient
from sanic.views import CompositionView from sanic.views import CompositionView
from sanic.websocket import ConnectionClosed, WebSocketProtocol from sanic.websocket import ConnectionClosed, WebSocketProtocol
@ -56,6 +54,7 @@ class Sanic:
logging.config.dictConfig(log_config or LOGGING_CONFIG_DEFAULTS) logging.config.dictConfig(log_config or LOGGING_CONFIG_DEFAULTS)
self.name = name self.name = name
self.asgi = True
self.router = router or Router() self.router = router or Router()
self.request_class = request_class self.request_class = request_class
self.error_handler = error_handler or ErrorHandler() self.error_handler = error_handler or ErrorHandler()
@ -468,13 +467,23 @@ class Sanic:
getattr(handler, "__blueprintname__", "") getattr(handler, "__blueprintname__", "")
+ handler.__name__ + handler.__name__
) )
try:
protocol = request.transport.get_protocol() pass
except AttributeError:
# On Python3.5 the Transport classes in asyncio do not if self.asgi:
# have a get_protocol() method as in uvloop ws = request.transport.get_websocket_connection()
protocol = request.transport._protocol else:
ws = await protocol.websocket_handshake(request, subprotocols) try:
protocol = request.transport.get_protocol()
except AttributeError:
# On Python3.5 the Transport classes in asyncio do not
# have a get_protocol() method as in uvloop
protocol = request.transport._protocol
protocol.app = self
ws = await protocol.websocket_handshake(
request, subprotocols
)
# schedule the application handler # schedule the application handler
# its future is kept in self.websocket_tasks in case it # its future is kept in self.websocket_tasks in case it
@ -985,7 +994,13 @@ class Sanic:
if write_callback is None or isinstance( if write_callback is None or isinstance(
response, StreamingHTTPResponse response, StreamingHTTPResponse
): ):
await stream_callback(response) if stream_callback:
await stream_callback(response)
else:
# Should only end here IF it is an ASGI websocket.
# TODO:
# - Add exception handling
pass
else: else:
write_callback(response) write_callback(response)
@ -1374,5 +1389,5 @@ class Sanic:
# -------------------------------------------------------------------- # # -------------------------------------------------------------------- #
async def __call__(self, scope, receive, send): async def __call__(self, scope, receive, send):
asgi_app = ASGIApp(self, scope, receive, send) asgi_app = await ASGIApp.create(self, scope, receive, send)
await asgi_app() await asgi_app()

View File

@ -1,24 +1,50 @@
from sanic.request import Request from typing import Any, Awaitable, Callable, MutableMapping, Union
from multidict import CIMultiDict
from sanic.response import StreamingHTTPResponse
from multidict import CIMultiDict
from sanic.request import Request
from sanic.response import HTTPResponse, StreamingHTTPResponse
from sanic.websocket import WebSocketConnection
ASGIScope = MutableMapping[str, Any]
ASGIMessage = MutableMapping[str, Any]
ASGISend = Callable[[ASGIMessage], Awaitable[None]]
ASGIReceive = Callable[[], Awaitable[ASGIMessage]]
class MockTransport: class MockTransport:
def __init__(self, scope): def __init__(self, scope: ASGIScope) -> None:
self.scope = scope self.scope = scope
def get_extra_info(self, info): def get_extra_info(self, info: str) -> Union[str, bool]:
if info == "peername": if info == "peername":
return self.scope.get("server") return self.scope.get("server")
elif info == "sslcontext": elif info == "sslcontext":
return self.scope.get("scheme") in ["https", "wss"] return self.scope.get("scheme") in ["https", "wss"]
def get_websocket_connection(self) -> WebSocketConnection:
return self._websocket_connection
def create_websocket_connection(
self,
send: ASGISend,
receive: ASGIReceive,
) -> WebSocketConnection:
self._websocket_connection = WebSocketConnection(send, receive)
return self._websocket_connection
class ASGIApp: class ASGIApp:
def __init__(self, sanic_app, scope, receive, send): def __init__(self) -> None:
self.sanic_app = sanic_app self.ws = None
self.receive = receive
self.send = send @classmethod
async def create(cls, sanic_app, scope: ASGIScope, receive: ASGIReceive, send: ASGISend) -> "ASGIApp":
instance = cls()
instance.sanic_app = sanic_app
instance.receive = receive
instance.send = send
url_bytes = scope.get("root_path", "") + scope["path"] url_bytes = scope.get("root_path", "") + scope["path"]
url_bytes = url_bytes.encode("latin-1") url_bytes = url_bytes.encode("latin-1")
url_bytes += scope["query_string"] url_bytes += scope["query_string"]
@ -28,18 +54,30 @@ class ASGIApp:
for key, value in scope.get("headers", []) for key, value in scope.get("headers", [])
] ]
) )
version = scope["http_version"]
method = scope["method"] transport = MockTransport(scope)
self.request = Request(
url_bytes, if scope["type"] == "http":
headers, version = scope["http_version"]
version, method = scope["method"]
method, elif scope["type"] == "websocket":
MockTransport(scope), version = "1.1"
sanic_app, method = "GET"
instance.ws = transport.create_websocket_connection(send, receive)
await instance.ws.accept()
else:
pass
# TODO:
# - close connection
instance.request = Request(
url_bytes, headers, version, method, transport, sanic_app
) )
async def read_body(self): return instance
async def read_body(self) -> bytes:
""" """
Read and return the entire body from an incoming ASGI message. Read and return the entire body from an incoming ASGI message.
""" """
@ -53,15 +91,16 @@ class ASGIApp:
return body return body
async def __call__(self): async def __call__(self) -> None:
""" """
Handle the incoming request. Handle the incoming request.
""" """
self.request.body = await self.read_body() self.request.body = await self.read_body()
handler = self.sanic_app.handle_request handler = self.sanic_app.handle_request
await handler(self.request, None, self.stream_callback) callback = None if self.ws else self.stream_callback
await handler(self.request, None, callback)
async def stream_callback(self, response): async def stream_callback(self, response: HTTPResponse) -> None:
""" """
Write the response. Write the response.
""" """

View File

@ -708,6 +708,8 @@ def serve(
if debug: if debug:
loop.set_debug(debug) loop.set_debug(debug)
app.asgi = False
connections = connections if connections is not None else set() connections = connections if connections is not None else set()
server = partial( server = partial(
protocol, protocol,

View File

@ -1,16 +1,21 @@
import typing
import types
import asyncio
from json import JSONDecodeError from json import JSONDecodeError
from socket import socket from socket import socket
from urllib.parse import unquote, urljoin, urlsplit from urllib.parse import unquote, urlsplit
import httpcore import httpcore
import requests_async as requests import requests_async as requests
import typing
import websockets import websockets
from sanic.asgi import ASGIApp from sanic.asgi import ASGIApp
from sanic.exceptions import MethodNotSupported from sanic.exceptions import MethodNotSupported
from sanic.log import logger from sanic.log import logger
from sanic.response import text from sanic.response import text
HOST = "127.0.0.1" HOST = "127.0.0.1"
PORT = 42101 PORT = 42101
@ -314,7 +319,7 @@ class TestASGIApp(ASGIApp):
async def app_call_with_return(self, scope, receive, send): async def app_call_with_return(self, scope, receive, send):
asgi_app = TestASGIApp(self, scope, receive, send) asgi_app = await TestASGIApp.create(self, scope, receive, send)
return await asgi_app() return await asgi_app()

View File

@ -1,3 +1,5 @@
from typing import Any, Awaitable, Callable, MutableMapping, Optional, Union
from httptools import HttpParserUpgrade from httptools import HttpParserUpgrade
from websockets import ConnectionClosed # noqa from websockets import ConnectionClosed # noqa
from websockets import InvalidHandshake, WebSocketCommonProtocol, handshake from websockets import InvalidHandshake, WebSocketCommonProtocol, handshake
@ -6,6 +8,9 @@ from sanic.exceptions import InvalidUsage
from sanic.server import HttpProtocol from sanic.server import HttpProtocol
ASIMessage = MutableMapping[str, Any]
class WebSocketProtocol(HttpProtocol): class WebSocketProtocol(HttpProtocol):
def __init__( def __init__(
self, self,
@ -19,6 +24,7 @@ class WebSocketProtocol(HttpProtocol):
): ):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.websocket = None self.websocket = None
self.app = None
self.websocket_timeout = websocket_timeout self.websocket_timeout = websocket_timeout
self.websocket_max_size = websocket_max_size self.websocket_max_size = websocket_max_size
self.websocket_max_queue = websocket_max_queue self.websocket_max_queue = websocket_max_queue
@ -103,3 +109,46 @@ class WebSocketProtocol(HttpProtocol):
self.websocket.connection_made(request.transport) self.websocket.connection_made(request.transport)
self.websocket.connection_open() self.websocket.connection_open()
return self.websocket return self.websocket
class WebSocketConnection:
# TODO
# - Implement ping/pong
def __init__(
self,
send: Callable[[ASIMessage], Awaitable[None]],
receive: Callable[[], Awaitable[ASIMessage]],
) -> None:
self._send = send
self._receive = receive
async def send(self, data: Union[str, bytes], *args, **kwargs) -> None:
message = {"type": "websocket.send"}
try:
data.decode()
except AttributeError:
message.update({"text": str(data)})
else:
message.update({"bytes": data})
await self._send(message)
async def recv(self, *args, **kwargs) -> Optional[str]:
message = await self._receive()
if message["type"] == "websocket.receive":
return message["text"]
elif message["type"] == "websocket.disconnect":
pass
# await self._send({
# "type": "websocket.close"
# })
async def accept(self) -> None:
await self._send({"type": "websocket.accept", "subprotocol": ""})
async def close(self) -> None:
pass

View File

@ -1,956 +1,5 @@
import pytest
from sanic.testing import SanicASGITestClient from sanic.testing import SanicASGITestClient
from sanic.response import text
def asgi_client_instantiation(app): def asgi_client_instantiation(app):
assert isinstance(app.asgi_client, SanicASGITestClient) assert isinstance(app.asgi_client, SanicASGITestClient)
# import logging
# import os
# import ssl
# from json import dumps as json_dumps
# from json import loads as json_loads
# from urllib.parse import urlparse
# import pytest
# from sanic import Blueprint, Sanic
# from sanic.exceptions import ServerError
# from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters
# from sanic.response import json, text
# from sanic.testing import HOST, PORT
# ------------------------------------------------------------ #
# GET - Adapted from test_requests.py
# ------------------------------------------------------------ #
@pytest.mark.asyncio
async def test_basic_request(app):
@app.route("/")
def handler(request):
return text("Hello")
_, response = await app.asgi_client.get("/")
assert response.text == "Hello"
@pytest.mark.asyncio
async def test_ip(app):
@app.route("/")
def handler(request):
return text("{}".format(request.ip))
request, response = await app.asgi_client.get("/")
assert response.text == "mockserver"
@pytest.mark.asyncio
def test_text(app):
@app.route("/")
async def handler(request):
return text("Hello")
request, response = await app.asgi_client.get("/")
assert response.text == "Hello"
# def test_headers(app):
# @app.route("/")
# async def handler(request):
# headers = {"spam": "great"}
# return text("Hello", headers=headers)
# request, response = app.asgi_client.get("/")
# assert response.headers.get("spam") == "great"
# def test_non_str_headers(app):
# @app.route("/")
# async def handler(request):
# headers = {"answer": 42}
# return text("Hello", headers=headers)
# request, response = app.asgi_client.get("/")
# assert response.headers.get("answer") == "42"
# def test_invalid_response(app):
# @app.exception(ServerError)
# def handler_exception(request, exception):
# return text("Internal Server Error.", 500)
# @app.route("/")
# async def handler(request):
# return "This should fail"
# request, response = app.asgi_client.get("/")
# assert response.status == 500
# assert response.text == "Internal Server Error."
# def test_json(app):
# @app.route("/")
# async def handler(request):
# return json({"test": True})
# request, response = app.asgi_client.get("/")
# results = json_loads(response.text)
# assert results.get("test") is True
# def test_empty_json(app):
# @app.route("/")
# async def handler(request):
# assert request.json is None
# return json(request.json)
# request, response = app.asgi_client.get("/")
# assert response.status == 200
# assert response.text == "null"
# def test_invalid_json(app):
# @app.route("/")
# async def handler(request):
# return json(request.json)
# data = "I am not json"
# request, response = app.asgi_client.get("/", data=data)
# assert response.status == 400
# def test_query_string(app):
# @app.route("/")
# async def handler(request):
# return text("OK")
# request, response = app.asgi_client.get(
# "/", params=[("test1", "1"), ("test2", "false"), ("test2", "true")]
# )
# assert request.args.get("test1") == "1"
# assert request.args.get("test2") == "false"
# assert request.args.getlist("test2") == ["false", "true"]
# assert request.args.getlist("test1") == ["1"]
# assert request.args.get("test3", default="My value") == "My value"
# def test_uri_template(app):
# @app.route("/foo/<id:int>/bar/<name:[A-z]+>")
# async def handler(request, id, name):
# return text("OK")
# request, response = app.asgi_client.get("/foo/123/bar/baz")
# assert request.uri_template == "/foo/<id:int>/bar/<name:[A-z]+>"
# def test_token(app):
# @app.route("/")
# async def handler(request):
# return text("OK")
# # uuid4 generated token.
# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
# headers = {
# "content-type": "application/json",
# "Authorization": "{}".format(token),
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.token == token
# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
# headers = {
# "content-type": "application/json",
# "Authorization": "Token {}".format(token),
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.token == token
# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
# headers = {
# "content-type": "application/json",
# "Authorization": "Bearer {}".format(token),
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.token == token
# # no Authorization headers
# headers = {"content-type": "application/json"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.token is None
# def test_content_type(app):
# @app.route("/")
# async def handler(request):
# return text(request.content_type)
# request, response = app.asgi_client.get("/")
# assert request.content_type == DEFAULT_HTTP_CONTENT_TYPE
# assert response.text == DEFAULT_HTTP_CONTENT_TYPE
# headers = {"content-type": "application/json"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.content_type == "application/json"
# assert response.text == "application/json"
# def test_remote_addr_with_two_proxies(app):
# app.config.PROXIES_COUNT = 2
# @app.route("/")
# async def handler(request):
# return text(request.remote_addr)
# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.2"
# assert response.text == "127.0.0.2"
# headers = {"X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.1"
# assert response.text == "127.0.0.1"
# request, response = app.asgi_client.get("/")
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"X-Forwarded-For": "127.0.0.1, , ,,127.0.1.2"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.1"
# assert response.text == "127.0.0.1"
# headers = {
# "X-Forwarded-For": ", 127.0.2.2, , ,127.0.0.1, , ,,127.0.1.2"
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.1"
# assert response.text == "127.0.0.1"
# def test_remote_addr_with_infinite_number_of_proxies(app):
# app.config.PROXIES_COUNT = -1
# @app.route("/")
# async def handler(request):
# return text(request.remote_addr)
# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.2"
# assert response.text == "127.0.0.2"
# headers = {"X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.1.1"
# assert response.text == "127.0.1.1"
# headers = {
# "X-Forwarded-For": "127.0.0.5, 127.0.0.4, 127.0.0.3, 127.0.0.2, 127.0.0.1"
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.5"
# assert response.text == "127.0.0.5"
# def test_remote_addr_without_proxy(app):
# app.config.PROXIES_COUNT = 0
# @app.route("/")
# async def handler(request):
# return text(request.remote_addr)
# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# def test_remote_addr_custom_headers(app):
# app.config.PROXIES_COUNT = 1
# app.config.REAL_IP_HEADER = "Client-IP"
# app.config.FORWARDED_FOR_HEADER = "Forwarded"
# @app.route("/")
# async def handler(request):
# return text(request.remote_addr)
# headers = {"X-Real-IP": "127.0.0.2", "Forwarded": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.1.1"
# assert response.text == "127.0.1.1"
# headers = {"X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"Client-IP": "127.0.0.2", "Forwarded": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.2"
# assert response.text == "127.0.0.2"
# def test_match_info(app):
# @app.route("/api/v1/user/<user_id>/")
# async def handler(request, user_id):
# return json(request.match_info)
# request, response = app.asgi_client.get("/api/v1/user/sanic_user/")
# assert request.match_info == {"user_id": "sanic_user"}
# assert json_loads(response.text) == {"user_id": "sanic_user"}
# # ------------------------------------------------------------ #
# # POST
# # ------------------------------------------------------------ #
# def test_post_json(app):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# payload = {"test": "OK"}
# headers = {"content-type": "application/json"}
# request, response = app.asgi_client.post(
# "/", data=json_dumps(payload), headers=headers
# )
# assert request.json.get("test") == "OK"
# assert request.json.get("test") == "OK" # for request.parsed_json
# assert response.text == "OK"
# def test_post_form_urlencoded(app):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# payload = "test=OK"
# headers = {"content-type": "application/x-www-form-urlencoded"}
# request, response = app.asgi_client.post(
# "/", data=payload, headers=headers
# )
# assert request.form.get("test") == "OK"
# assert request.form.get("test") == "OK" # For request.parsed_form
# @pytest.mark.parametrize(
# "payload",
# [
# "------sanic\r\n"
# 'Content-Disposition: form-data; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# "------sanic\r\n"
# 'content-disposition: form-data; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# ],
# )
# def test_post_form_multipart_form_data(app, payload):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# headers = {"content-type": "multipart/form-data; boundary=----sanic"}
# request, response = app.asgi_client.post(data=payload, headers=headers)
# assert request.form.get("test") == "OK"
# @pytest.mark.parametrize(
# "path,query,expected_url",
# [
# ("/foo", "", "http://{}:{}/foo"),
# ("/bar/baz", "", "http://{}:{}/bar/baz"),
# ("/moo/boo", "arg1=val1", "http://{}:{}/moo/boo?arg1=val1"),
# ],
# )
# def test_url_attributes_no_ssl(app, path, query, expected_url):
# async def handler(request):
# return text("OK")
# app.add_route(handler, path)
# request, response = app.asgi_client.get(path + "?{}".format(query))
# assert request.url == expected_url.format(HOST, PORT)
# parsed = urlparse(request.url)
# assert parsed.scheme == request.scheme
# assert parsed.path == request.path
# assert parsed.query == request.query_string
# assert parsed.netloc == request.host
# @pytest.mark.parametrize(
# "path,query,expected_url",
# [
# ("/foo", "", "https://{}:{}/foo"),
# ("/bar/baz", "", "https://{}:{}/bar/baz"),
# ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
# ],
# )
# def test_url_attributes_with_ssl_context(app, path, query, expected_url):
# current_dir = os.path.dirname(os.path.realpath(__file__))
# context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
# context.load_cert_chain(
# os.path.join(current_dir, "certs/selfsigned.cert"),
# keyfile=os.path.join(current_dir, "certs/selfsigned.key"),
# )
# async def handler(request):
# return text("OK")
# app.add_route(handler, path)
# request, response = app.asgi_client.get(
# "https://{}:{}".format(HOST, PORT) + path + "?{}".format(query),
# server_kwargs={"ssl": context},
# )
# assert request.url == expected_url.format(HOST, PORT)
# parsed = urlparse(request.url)
# assert parsed.scheme == request.scheme
# assert parsed.path == request.path
# assert parsed.query == request.query_string
# assert parsed.netloc == request.host
# @pytest.mark.parametrize(
# "path,query,expected_url",
# [
# ("/foo", "", "https://{}:{}/foo"),
# ("/bar/baz", "", "https://{}:{}/bar/baz"),
# ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
# ],
# )
# def test_url_attributes_with_ssl_dict(app, path, query, expected_url):
# current_dir = os.path.dirname(os.path.realpath(__file__))
# ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert")
# ssl_key = os.path.join(current_dir, "certs/selfsigned.key")
# ssl_dict = {"cert": ssl_cert, "key": ssl_key}
# async def handler(request):
# return text("OK")
# app.add_route(handler, path)
# request, response = app.asgi_client.get(
# "https://{}:{}".format(HOST, PORT) + path + "?{}".format(query),
# server_kwargs={"ssl": ssl_dict},
# )
# assert request.url == expected_url.format(HOST, PORT)
# parsed = urlparse(request.url)
# assert parsed.scheme == request.scheme
# assert parsed.path == request.path
# assert parsed.query == request.query_string
# assert parsed.netloc == request.host
# def test_invalid_ssl_dict(app):
# @app.get("/test")
# async def handler(request):
# return text("ssl test")
# ssl_dict = {"cert": None, "key": None}
# with pytest.raises(ValueError) as excinfo:
# request, response = app.asgi_client.get(
# "/test", server_kwargs={"ssl": ssl_dict}
# )
# assert str(excinfo.value) == "SSLContext or certificate and key required."
# def test_form_with_multiple_values(app):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# payload = "selectedItems=v1&selectedItems=v2&selectedItems=v3"
# headers = {"content-type": "application/x-www-form-urlencoded"}
# request, response = app.asgi_client.post(
# "/", data=payload, headers=headers
# )
# assert request.form.getlist("selectedItems") == ["v1", "v2", "v3"]
# def test_request_string_representation(app):
# @app.route("/", methods=["GET"])
# async def get(request):
# return text("OK")
# request, _ = app.asgi_client.get("/")
# assert repr(request) == "<Request: GET />"
# @pytest.mark.parametrize(
# "payload,filename",
# [
# (
# "------sanic\r\n"
# 'Content-Disposition: form-data; filename="filename"; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# "filename",
# ),
# (
# "------sanic\r\n"
# 'content-disposition: form-data; filename="filename"; name="test"\r\n'
# "\r\n"
# 'content-type: application/json; {"field": "value"}\r\n'
# "------sanic--\r\n",
# "filename",
# ),
# (
# "------sanic\r\n"
# 'Content-Disposition: form-data; filename=""; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# "",
# ),
# (
# "------sanic\r\n"
# 'content-disposition: form-data; filename=""; name="test"\r\n'
# "\r\n"
# 'content-type: application/json; {"field": "value"}\r\n'
# "------sanic--\r\n",
# "",
# ),
# (
# "------sanic\r\n"
# 'Content-Disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# "filename_\u00A0_test",
# ),
# (
# "------sanic\r\n"
# 'content-disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n'
# "\r\n"
# 'content-type: application/json; {"field": "value"}\r\n'
# "------sanic--\r\n",
# "filename_\u00A0_test",
# ),
# ],
# )
# def test_request_multipart_files(app, payload, filename):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# headers = {"content-type": "multipart/form-data; boundary=----sanic"}
# request, _ = app.asgi_client.post(data=payload, headers=headers)
# assert request.files.get("test").name == filename
# def test_request_multipart_file_with_json_content_type(app):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# payload = (
# "------sanic\r\n"
# 'Content-Disposition: form-data; name="file"; filename="test.json"\r\n'
# "Content-Type: application/json\r\n"
# "Content-Length: 0"
# "\r\n"
# "\r\n"
# "------sanic--"
# )
# headers = {"content-type": "multipart/form-data; boundary=------sanic"}
# request, _ = app.asgi_client.post(data=payload, headers=headers)
# assert request.files.get("file").type == "application/json"
# def test_request_multipart_file_without_field_name(app, caplog):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# payload = (
# '------sanic\r\nContent-Disposition: form-data; filename="test.json"'
# "\r\nContent-Type: application/json\r\n\r\n\r\n------sanic--"
# )
# headers = {"content-type": "multipart/form-data; boundary=------sanic"}
# request, _ = app.asgi_client.post(
# data=payload, headers=headers, debug=True
# )
# with caplog.at_level(logging.DEBUG):
# request.form
# assert caplog.record_tuples[-1] == (
# "sanic.root",
# logging.DEBUG,
# "Form-data field does not have a 'name' parameter "
# "in the Content-Disposition header",
# )
# def test_request_multipart_file_duplicate_filed_name(app):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# payload = (
# "--e73ffaa8b1b2472b8ec848de833cb05b\r\n"
# 'Content-Disposition: form-data; name="file"\r\n'
# "Content-Type: application/octet-stream\r\n"
# "Content-Length: 15\r\n"
# "\r\n"
# '{"test":"json"}\r\n'
# "--e73ffaa8b1b2472b8ec848de833cb05b\r\n"
# 'Content-Disposition: form-data; name="file"\r\n'
# "Content-Type: application/octet-stream\r\n"
# "Content-Length: 15\r\n"
# "\r\n"
# '{"test":"json2"}\r\n'
# "--e73ffaa8b1b2472b8ec848de833cb05b--\r\n"
# )
# headers = {
# "Content-Type": "multipart/form-data; boundary=e73ffaa8b1b2472b8ec848de833cb05b"
# }
# request, _ = app.asgi_client.post(
# data=payload, headers=headers, debug=True
# )
# assert request.form.getlist("file") == [
# '{"test":"json"}',
# '{"test":"json2"}',
# ]
# def test_request_multipart_with_multiple_files_and_type(app):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# payload = (
# '------sanic\r\nContent-Disposition: form-data; name="file"; filename="test.json"'
# "\r\nContent-Type: application/json\r\n\r\n\r\n"
# '------sanic\r\nContent-Disposition: form-data; name="file"; filename="some_file.pdf"\r\n'
# "Content-Type: application/pdf\r\n\r\n\r\n------sanic--"
# )
# headers = {"content-type": "multipart/form-data; boundary=------sanic"}
# request, _ = app.asgi_client.post(data=payload, headers=headers)
# assert len(request.files.getlist("file")) == 2
# assert request.files.getlist("file")[0].type == "application/json"
# assert request.files.getlist("file")[1].type == "application/pdf"
# def test_request_repr(app):
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/")
# assert repr(request) == "<Request: GET />"
# request.method = None
# assert repr(request) == "<Request: None />"
# def test_request_bool(app):
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/")
# assert bool(request)
# request.transport = False
# assert not bool(request)
# def test_request_parsing_form_failed(app, caplog):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# payload = "test=OK"
# headers = {"content-type": "multipart/form-data"}
# request, response = app.asgi_client.post(
# "/", data=payload, headers=headers
# )
# with caplog.at_level(logging.ERROR):
# request.form
# assert caplog.record_tuples[-1] == (
# "sanic.error",
# logging.ERROR,
# "Failed when parsing form",
# )
# def test_request_args_no_query_string(app):
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/")
# assert request.args == {}
# def test_request_raw_args(app):
# params = {"test": "OK"}
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/", params=params)
# assert request.raw_args == params
# def test_request_query_args(app):
# # test multiple params with the same key
# params = [("test", "value1"), ("test", "value2")]
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/", params=params)
# assert request.query_args == params
# # test cached value
# assert (
# request.parsed_not_grouped_args[(False, False, "utf-8", "replace")]
# == request.query_args
# )
# # test params directly in the url
# request, response = app.asgi_client.get("/?test=value1&test=value2")
# assert request.query_args == params
# # test unique params
# params = [("test1", "value1"), ("test2", "value2")]
# request, response = app.asgi_client.get("/", params=params)
# assert request.query_args == params
# # test no params
# request, response = app.asgi_client.get("/")
# assert not request.query_args
# def test_request_query_args_custom_parsing(app):
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get(
# "/?test1=value1&test2=&test3=value3"
# )
# assert request.get_query_args(keep_blank_values=True) == [
# ("test1", "value1"),
# ("test2", ""),
# ("test3", "value3"),
# ]
# assert request.query_args == [("test1", "value1"), ("test3", "value3")]
# assert request.get_query_args(keep_blank_values=False) == [
# ("test1", "value1"),
# ("test3", "value3"),
# ]
# assert request.get_args(keep_blank_values=True) == RequestParameters(
# {"test1": ["value1"], "test2": [""], "test3": ["value3"]}
# )
# assert request.args == RequestParameters(
# {"test1": ["value1"], "test3": ["value3"]}
# )
# assert request.get_args(keep_blank_values=False) == RequestParameters(
# {"test1": ["value1"], "test3": ["value3"]}
# )
# def test_request_cookies(app):
# cookies = {"test": "OK"}
# @app.get("/")
# def handler(request):
# return text("OK")
# request, response = app.asgi_client.get("/", cookies=cookies)
# assert request.cookies == cookies
# assert request.cookies == cookies # For request._cookies
# def test_request_cookies_without_cookies(app):
# @app.get("/")
# def handler(request):
# return text("OK")
# request, response = app.asgi_client.get("/")
# assert request.cookies == {}
# def test_request_port(app):
# @app.get("/")
# def handler(request):
# return text("OK")
# request, response = app.asgi_client.get("/")
# port = request.port
# assert isinstance(port, int)
# delattr(request, "_socket")
# delattr(request, "_port")
# port = request.port
# assert isinstance(port, int)
# assert hasattr(request, "_socket")
# assert hasattr(request, "_port")
# def test_request_socket(app):
# @app.get("/")
# def handler(request):
# return text("OK")
# request, response = app.asgi_client.get("/")
# socket = request.socket
# assert isinstance(socket, tuple)
# ip = socket[0]
# port = socket[1]
# assert ip == request.ip
# assert port == request.port
# delattr(request, "_socket")
# socket = request.socket
# assert isinstance(socket, tuple)
# assert hasattr(request, "_socket")
# def test_request_form_invalid_content_type(app):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# request, response = app.asgi_client.post("/", json={"test": "OK"})
# assert request.form == {}
# def test_endpoint_basic():
# app = Sanic()
# @app.route("/")
# def my_unique_handler(request):
# return text("Hello")
# request, response = app.asgi_client.get("/")
# assert request.endpoint == "test_requests.my_unique_handler"
# def test_endpoint_named_app():
# app = Sanic("named")
# @app.route("/")
# def my_unique_handler(request):
# return text("Hello")
# request, response = app.asgi_client.get("/")
# assert request.endpoint == "named.my_unique_handler"
# def test_endpoint_blueprint():
# bp = Blueprint("my_blueprint", url_prefix="/bp")
# @bp.route("/")
# async def bp_root(request):
# return text("Hello")
# app = Sanic("named")
# app.blueprint(bp)
# request, response = app.asgi_client.get("/bp")
# assert request.endpoint == "named.my_blueprint.bp_root"

View File

@ -19,10 +19,6 @@ from sanic.testing import HOST, PORT, SanicTestClient
# import traceback # import traceback
CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True} CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True}
old_conn = None old_conn = None

View File

@ -30,6 +30,17 @@ def test_sync(app):
assert response.text == "Hello" assert response.text == "Hello"
@pytest.mark.asyncio
async def test_sync_asgi(app):
@app.route("/")
def handler(request):
return text("Hello")
request, response = await app.asgi_client.get("/")
assert response.text == "Hello"
def test_ip(app): def test_ip(app):
@app.route("/") @app.route("/")
def handler(request): def handler(request):
@ -40,6 +51,17 @@ def test_ip(app):
assert response.text == "127.0.0.1" assert response.text == "127.0.0.1"
@pytest.mark.asyncio
async def test_ip_asgi(app):
@app.route("/")
def handler(request):
return text("{}".format(request.ip))
request, response = await app.asgi_client.get("/")
assert response.text == "mockserver"
def test_text(app): def test_text(app):
@app.route("/") @app.route("/")
async def handler(request): async def handler(request):

View File

@ -231,9 +231,7 @@ def test_chunked_streaming_returns_correct_content(streaming_app):
assert response.text == "foo,bar" assert response.text == "foo,bar"
def test_non_chunked_streaming_adds_correct_headers( def test_non_chunked_streaming_adds_correct_headers(non_chunked_streaming_app):
non_chunked_streaming_app
):
request, response = non_chunked_streaming_app.test_client.get("/") request, response = non_chunked_streaming_app.test_client.get("/")
assert "Transfer-Encoding" not in response.headers assert "Transfer-Encoding" not in response.headers
assert response.headers["Content-Type"] == "text/csv" assert response.headers["Content-Type"] == "text/csv"