Merge remote-tracking branch 'upstream/master' into bodybytes
This commit is contained in:
@@ -8,27 +8,33 @@ from sanic import headers
|
||||
[
|
||||
("text/plain", ("text/plain", {})),
|
||||
("text/vnd.just.made.this.up ; ", ("text/vnd.just.made.this.up", {})),
|
||||
("text/plain;charset=us-ascii", ("text/plain", {"charset": "us-ascii"})),
|
||||
('text/plain ; charset="us-ascii"', ("text/plain", {"charset": "us-ascii"})),
|
||||
(
|
||||
"text/plain;charset=us-ascii",
|
||||
("text/plain", {"charset": "us-ascii"}),
|
||||
),
|
||||
(
|
||||
'text/plain ; charset="us-ascii"',
|
||||
("text/plain", {"charset": "us-ascii"}),
|
||||
),
|
||||
(
|
||||
'text/plain ; charset="us-ascii"; another=opt',
|
||||
("text/plain", {"charset": "us-ascii", "another": "opt"})
|
||||
("text/plain", {"charset": "us-ascii", "another": "opt"}),
|
||||
),
|
||||
(
|
||||
'attachment; filename="silly.txt"',
|
||||
("attachment", {"filename": "silly.txt"})
|
||||
("attachment", {"filename": "silly.txt"}),
|
||||
),
|
||||
(
|
||||
'attachment; filename="strange;name"',
|
||||
("attachment", {"filename": "strange;name"})
|
||||
("attachment", {"filename": "strange;name"}),
|
||||
),
|
||||
(
|
||||
'attachment; filename="strange;name";size=123;',
|
||||
("attachment", {"filename": "strange;name", "size": "123"})
|
||||
("attachment", {"filename": "strange;name", "size": "123"}),
|
||||
),
|
||||
(
|
||||
'form-data; name="files"; filename="fo\\"o;bar\\"',
|
||||
('form-data', {'name': 'files', 'filename': 'fo"o;bar\\'})
|
||||
("form-data", {"name": "files", "filename": 'fo"o;bar\\'})
|
||||
# cgi.parse_header:
|
||||
# ('form-data', {'name': 'files', 'filename': 'fo"o;bar\\'})
|
||||
# werkzeug.parse_options_header:
|
||||
@@ -39,7 +45,7 @@ from sanic import headers
|
||||
# Chrome:
|
||||
# Content-Disposition: form-data; name="foo%22;bar\"; filename="😀"
|
||||
'form-data; name="foo%22;bar\\"; filename="😀"',
|
||||
('form-data', {'name': 'foo";bar\\', 'filename': '😀'})
|
||||
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
|
||||
# cgi: ('form-data', {'name': 'foo%22;bar"; filename="😀'})
|
||||
# werkzeug: ('form-data', {'name': 'foo%22;bar"; filename='})
|
||||
),
|
||||
@@ -47,11 +53,11 @@ from sanic import headers
|
||||
# Firefox:
|
||||
# Content-Disposition: form-data; name="foo\";bar\"; filename="😀"
|
||||
'form-data; name="foo\\";bar\\"; filename="😀"',
|
||||
('form-data', {'name': 'foo";bar\\', 'filename': '😀'})
|
||||
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
|
||||
# cgi: ('form-data', {'name': 'foo";bar"; filename="😀'})
|
||||
# werkzeug: ('form-data', {'name': 'foo";bar"; filename='})
|
||||
),
|
||||
]
|
||||
],
|
||||
)
|
||||
def test_parse_headers(input, expected):
|
||||
assert headers.parse_content_header(input) == expected
|
||||
|
||||
@@ -5,6 +5,7 @@ import signal
|
||||
|
||||
import pytest
|
||||
|
||||
from sanic import Blueprint
|
||||
from sanic.response import text
|
||||
from sanic.testing import HOST, PORT
|
||||
|
||||
@@ -37,8 +38,6 @@ def test_multiprocessing(app):
|
||||
reason="SIGALRM is not implemented for this platform",
|
||||
)
|
||||
def test_multiprocessing_with_blueprint(app):
|
||||
from sanic import Blueprint
|
||||
|
||||
# Selects a number at random so we can spot check
|
||||
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
|
||||
process_list = set()
|
||||
@@ -64,27 +63,27 @@ def handler(request):
|
||||
return text("Hello")
|
||||
|
||||
|
||||
# Muliprocessing on Windows requires app to be able to be pickled
|
||||
# Multiprocessing on Windows requires app to be able to be pickled
|
||||
@pytest.mark.parametrize("protocol", [3, 4])
|
||||
def test_pickle_app(app, protocol):
|
||||
app.route("/")(handler)
|
||||
p_app = pickle.dumps(app, protocol=protocol)
|
||||
del app
|
||||
up_p_app = pickle.loads(p_app)
|
||||
assert up_p_app
|
||||
request, response = app.test_client.get("/")
|
||||
request, response = up_p_app.test_client.get("/")
|
||||
assert response.text == "Hello"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("protocol", [3, 4])
|
||||
def test_pickle_app_with_bp(app, protocol):
|
||||
from sanic import Blueprint
|
||||
|
||||
bp = Blueprint("test_text")
|
||||
bp.route("/")(handler)
|
||||
app.blueprint(bp)
|
||||
p_app = pickle.dumps(app, protocol=protocol)
|
||||
del app
|
||||
up_p_app = pickle.loads(p_app)
|
||||
assert up_p_app
|
||||
request, response = app.test_client.get("/")
|
||||
assert app.is_request_stream is False
|
||||
request, response = up_p_app.test_client.get("/")
|
||||
assert up_p_app.is_request_stream is False
|
||||
assert response.text == "Hello"
|
||||
|
||||
36
tests/test_request_buffer_queue_size.py
Normal file
36
tests/test_request_buffer_queue_size.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import io
|
||||
|
||||
from sanic.response import text
|
||||
|
||||
data = "abc" * 10_000_000
|
||||
|
||||
|
||||
def test_request_buffer_queue_size(app):
|
||||
default_buf_qsz = app.config.get("REQUEST_BUFFER_QUEUE_SIZE")
|
||||
qsz = 1
|
||||
while qsz == default_buf_qsz:
|
||||
qsz += 1
|
||||
app.config.REQUEST_BUFFER_QUEUE_SIZE = qsz
|
||||
|
||||
@app.post("/post", stream=True)
|
||||
async def post(request):
|
||||
assert request.stream.buffer_size == qsz
|
||||
print("request.stream.buffer_size =", request.stream.buffer_size)
|
||||
|
||||
bio = io.BytesIO()
|
||||
while True:
|
||||
bdata = await request.stream.read()
|
||||
if not bdata:
|
||||
break
|
||||
bio.write(bdata)
|
||||
|
||||
head = bdata[:3].decode("utf-8")
|
||||
tail = bdata[3:][-3:].decode("utf-8")
|
||||
print(head, "...", tail)
|
||||
|
||||
bio.seek(0)
|
||||
return text(bio.read().decode("utf-8"))
|
||||
|
||||
request, response = app.test_client.post("/post", data=data)
|
||||
assert response.status == 200
|
||||
assert response.text == data
|
||||
@@ -9,21 +9,74 @@ except ImportError:
|
||||
from json import loads
|
||||
|
||||
|
||||
def test_storage(app):
|
||||
def test_custom_context(app):
|
||||
@app.middleware("request")
|
||||
def store(request):
|
||||
request.ctx.user = "sanic"
|
||||
request.ctx.session = None
|
||||
|
||||
@app.route("/")
|
||||
def handler(request):
|
||||
# Accessing non-existant key should fail with AttributeError
|
||||
try:
|
||||
invalid = request.ctx.missing
|
||||
except AttributeError as e:
|
||||
invalid = str(e)
|
||||
return json(
|
||||
{
|
||||
"user": request.ctx.user,
|
||||
"session": request.ctx.session,
|
||||
"has_user": hasattr(request.ctx, "user"),
|
||||
"has_session": hasattr(request.ctx, "session"),
|
||||
"has_missing": hasattr(request.ctx, "missing"),
|
||||
"invalid": invalid,
|
||||
}
|
||||
)
|
||||
|
||||
request, response = app.test_client.get("/")
|
||||
assert response.json == {
|
||||
"user": "sanic",
|
||||
"session": None,
|
||||
"has_user": True,
|
||||
"has_session": True,
|
||||
"has_missing": False,
|
||||
"invalid": "'types.SimpleNamespace' object has no attribute 'missing'",
|
||||
}
|
||||
|
||||
|
||||
# Remove this once the deprecated API is abolished.
|
||||
def test_custom_context_old(app):
|
||||
@app.middleware("request")
|
||||
def store(request):
|
||||
try:
|
||||
request["foo"]
|
||||
except KeyError:
|
||||
pass
|
||||
request["user"] = "sanic"
|
||||
request["sidekick"] = "tails"
|
||||
sidekick = request.get("sidekick", "tails") # Item missing -> default
|
||||
request["sidekick"] = sidekick
|
||||
request["bar"] = request["sidekick"]
|
||||
del request["sidekick"]
|
||||
|
||||
@app.route("/")
|
||||
def handler(request):
|
||||
return json(
|
||||
{"user": request.get("user"), "sidekick": request.get("sidekick")}
|
||||
{
|
||||
"user": request.get("user"),
|
||||
"sidekick": request.get("sidekick"),
|
||||
"has_bar": "bar" in request,
|
||||
"has_sidekick": "sidekick" in request,
|
||||
}
|
||||
)
|
||||
|
||||
request, response = app.test_client.get("/")
|
||||
|
||||
assert response.json == {
|
||||
"user": "sanic",
|
||||
"sidekick": None,
|
||||
"has_bar": True,
|
||||
"has_sidekick": False,
|
||||
}
|
||||
response_json = loads(response.text)
|
||||
assert response_json["user"] == "sanic"
|
||||
assert response_json.get("sidekick") is None
|
||||
|
||||
@@ -413,15 +413,15 @@ def test_standard_forwarded(app):
|
||||
"Forwarded": (
|
||||
'for=1.1.1.1, for=injected;host="'
|
||||
', for="[::2]";proto=https;host=me.tld;path="/app/";secret=mySecret'
|
||||
',for=broken;;secret=b0rked'
|
||||
', for=127.0.0.3;scheme=http;port=1234'
|
||||
",for=broken;;secret=b0rked"
|
||||
", for=127.0.0.3;scheme=http;port=1234"
|
||||
),
|
||||
"X-Real-IP": "127.0.0.2",
|
||||
"X-Forwarded-For": "127.0.1.1",
|
||||
"X-Scheme": "ws",
|
||||
}
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == { "for": "127.0.0.2", "proto": "ws" }
|
||||
assert response.json == {"for": "127.0.0.2", "proto": "ws"}
|
||||
assert request.remote_addr == "127.0.0.2"
|
||||
assert request.scheme == "ws"
|
||||
assert request.server_port == 80
|
||||
@@ -433,7 +433,7 @@ def test_standard_forwarded(app):
|
||||
"proto": "https",
|
||||
"host": "me.tld",
|
||||
"path": "/app/",
|
||||
"secret": "mySecret"
|
||||
"secret": "mySecret",
|
||||
}
|
||||
assert request.remote_addr == "[::2]"
|
||||
assert request.server_name == "me.tld"
|
||||
@@ -443,7 +443,7 @@ def test_standard_forwarded(app):
|
||||
# Empty Forwarded header -> use X-headers
|
||||
headers["Forwarded"] = ""
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == { "for": "127.0.0.2", "proto": "ws" }
|
||||
assert response.json == {"for": "127.0.0.2", "proto": "ws"}
|
||||
|
||||
# Header present but not matching anything
|
||||
request, response = app.test_client.get("/", headers={"Forwarded": "."})
|
||||
@@ -451,8 +451,8 @@ def test_standard_forwarded(app):
|
||||
|
||||
# Forwarded header present but no matching secret -> use X-headers
|
||||
headers = {
|
||||
"Forwarded": 'for=1.1.1.1;secret=x, for=127.0.0.1',
|
||||
"X-Real-IP": "127.0.0.2"
|
||||
"Forwarded": "for=1.1.1.1;secret=x, for=127.0.0.1",
|
||||
"X-Real-IP": "127.0.0.2",
|
||||
}
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == {"for": "127.0.0.2"}
|
||||
@@ -464,7 +464,7 @@ def test_standard_forwarded(app):
|
||||
assert response.json == {
|
||||
"for": "127.0.0.4",
|
||||
"port": 1234,
|
||||
"secret": "mySecret"
|
||||
"secret": "mySecret",
|
||||
}
|
||||
|
||||
# Test escapes (modify this if you see anyone implementing quoted-pairs)
|
||||
@@ -472,29 +472,29 @@ def test_standard_forwarded(app):
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == {
|
||||
"for": "test",
|
||||
"quoted": '\\,x=x;y=\\',
|
||||
"secret": "mySecret"
|
||||
"quoted": "\\,x=x;y=\\",
|
||||
"secret": "mySecret",
|
||||
}
|
||||
|
||||
# Secret insulated by malformed field #1
|
||||
headers = {"Forwarded": 'for=test;secret=mySecret;b0rked;proto=wss;'}
|
||||
headers = {"Forwarded": "for=test;secret=mySecret;b0rked;proto=wss;"}
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == {"for": "test", "secret": "mySecret"}
|
||||
|
||||
# Secret insulated by malformed field #2
|
||||
headers = {"Forwarded": 'for=test;b0rked;secret=mySecret;proto=wss'}
|
||||
headers = {"Forwarded": "for=test;b0rked;secret=mySecret;proto=wss"}
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == {"proto": "wss", "secret": "mySecret"}
|
||||
|
||||
# Unexpected termination should not lose existing acceptable values
|
||||
headers = {"Forwarded": 'b0rked;secret=mySecret;proto=wss'}
|
||||
headers = {"Forwarded": "b0rked;secret=mySecret;proto=wss"}
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == {"proto": "wss", "secret": "mySecret"}
|
||||
|
||||
# Field normalization
|
||||
headers = {
|
||||
"Forwarded": 'PROTO=WSS;BY="CAFE::8000";FOR=unknown;PORT=X;HOST="A:2";'
|
||||
'PATH="/With%20Spaces%22Quoted%22/sanicApp?key=val";SECRET=mySecret'
|
||||
'PATH="/With%20Spaces%22Quoted%22/sanicApp?key=val";SECRET=mySecret'
|
||||
}
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == {
|
||||
@@ -507,7 +507,7 @@ def test_standard_forwarded(app):
|
||||
|
||||
# Using "by" field as secret
|
||||
app.config.FORWARDED_SECRET = "_proxySecret"
|
||||
headers = {"Forwarded": 'for=1.2.3.4; by=_proxySecret'}
|
||||
headers = {"Forwarded": "for=1.2.3.4; by=_proxySecret"}
|
||||
request, response = app.test_client.get("/", headers=headers)
|
||||
assert response.json == {"for": "1.2.3.4", "by": "_proxySecret"}
|
||||
|
||||
@@ -525,15 +525,15 @@ async def test_standard_forwarded_asgi(app):
|
||||
"Forwarded": (
|
||||
'for=1.1.1.1, for=injected;host="'
|
||||
', for="[::2]";proto=https;host=me.tld;path="/app/";secret=mySecret'
|
||||
',for=broken;;secret=b0rked'
|
||||
', for=127.0.0.3;scheme=http;port=1234'
|
||||
",for=broken;;secret=b0rked"
|
||||
", for=127.0.0.3;scheme=http;port=1234"
|
||||
),
|
||||
"X-Real-IP": "127.0.0.2",
|
||||
"X-Forwarded-For": "127.0.1.1",
|
||||
"X-Scheme": "ws",
|
||||
}
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == { "for": "127.0.0.2", "proto": "ws" }
|
||||
assert response.json() == {"for": "127.0.0.2", "proto": "ws"}
|
||||
assert request.remote_addr == "127.0.0.2"
|
||||
assert request.scheme == "ws"
|
||||
assert request.server_port == 80
|
||||
@@ -545,7 +545,7 @@ async def test_standard_forwarded_asgi(app):
|
||||
"proto": "https",
|
||||
"host": "me.tld",
|
||||
"path": "/app/",
|
||||
"secret": "mySecret"
|
||||
"secret": "mySecret",
|
||||
}
|
||||
assert request.remote_addr == "[::2]"
|
||||
assert request.server_name == "me.tld"
|
||||
@@ -555,16 +555,18 @@ async def test_standard_forwarded_asgi(app):
|
||||
# Empty Forwarded header -> use X-headers
|
||||
headers["Forwarded"] = ""
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == { "for": "127.0.0.2", "proto": "ws" }
|
||||
assert response.json() == {"for": "127.0.0.2", "proto": "ws"}
|
||||
|
||||
# Header present but not matching anything
|
||||
request, response = await app.asgi_client.get("/", headers={"Forwarded": "."})
|
||||
request, response = await app.asgi_client.get(
|
||||
"/", headers={"Forwarded": "."}
|
||||
)
|
||||
assert response.json() == {}
|
||||
|
||||
# Forwarded header present but no matching secret -> use X-headers
|
||||
headers = {
|
||||
"Forwarded": 'for=1.1.1.1;secret=x, for=127.0.0.1',
|
||||
"X-Real-IP": "127.0.0.2"
|
||||
"Forwarded": "for=1.1.1.1;secret=x, for=127.0.0.1",
|
||||
"X-Real-IP": "127.0.0.2",
|
||||
}
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == {"for": "127.0.0.2"}
|
||||
@@ -576,7 +578,7 @@ async def test_standard_forwarded_asgi(app):
|
||||
assert response.json() == {
|
||||
"for": "127.0.0.4",
|
||||
"port": 1234,
|
||||
"secret": "mySecret"
|
||||
"secret": "mySecret",
|
||||
}
|
||||
|
||||
# Test escapes (modify this if you see anyone implementing quoted-pairs)
|
||||
@@ -584,29 +586,29 @@ async def test_standard_forwarded_asgi(app):
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == {
|
||||
"for": "test",
|
||||
"quoted": '\\,x=x;y=\\',
|
||||
"secret": "mySecret"
|
||||
"quoted": "\\,x=x;y=\\",
|
||||
"secret": "mySecret",
|
||||
}
|
||||
|
||||
# Secret insulated by malformed field #1
|
||||
headers = {"Forwarded": 'for=test;secret=mySecret;b0rked;proto=wss;'}
|
||||
headers = {"Forwarded": "for=test;secret=mySecret;b0rked;proto=wss;"}
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == {"for": "test", "secret": "mySecret"}
|
||||
|
||||
# Secret insulated by malformed field #2
|
||||
headers = {"Forwarded": 'for=test;b0rked;secret=mySecret;proto=wss'}
|
||||
headers = {"Forwarded": "for=test;b0rked;secret=mySecret;proto=wss"}
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == {"proto": "wss", "secret": "mySecret"}
|
||||
|
||||
# Unexpected termination should not lose existing acceptable values
|
||||
headers = {"Forwarded": 'b0rked;secret=mySecret;proto=wss'}
|
||||
headers = {"Forwarded": "b0rked;secret=mySecret;proto=wss"}
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == {"proto": "wss", "secret": "mySecret"}
|
||||
|
||||
# Field normalization
|
||||
headers = {
|
||||
"Forwarded": 'PROTO=WSS;BY="CAFE::8000";FOR=unknown;PORT=X;HOST="A:2";'
|
||||
'PATH="/With%20Spaces%22Quoted%22/sanicApp?key=val";SECRET=mySecret'
|
||||
'PATH="/With%20Spaces%22Quoted%22/sanicApp?key=val";SECRET=mySecret'
|
||||
}
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == {
|
||||
@@ -619,7 +621,7 @@ async def test_standard_forwarded_asgi(app):
|
||||
|
||||
# Using "by" field as secret
|
||||
app.config.FORWARDED_SECRET = "_proxySecret"
|
||||
headers = {"Forwarded": 'for=1.2.3.4; by=_proxySecret'}
|
||||
headers = {"Forwarded": "for=1.2.3.4; by=_proxySecret"}
|
||||
request, response = await app.asgi_client.get("/", headers=headers)
|
||||
assert response.json() == {"for": "1.2.3.4", "by": "_proxySecret"}
|
||||
|
||||
@@ -813,11 +815,14 @@ def test_forwarded_scheme(app):
|
||||
assert request.scheme == "http"
|
||||
|
||||
request, response = app.test_client.get(
|
||||
"/", headers={"X-Forwarded-For": "127.1.2.3", "X-Forwarded-Proto": "https"}
|
||||
"/",
|
||||
headers={"X-Forwarded-For": "127.1.2.3", "X-Forwarded-Proto": "https"},
|
||||
)
|
||||
assert request.scheme == "https"
|
||||
|
||||
request, response = app.test_client.get("/", headers={"X-Forwarded-For": "127.1.2.3", "X-Scheme": "https"})
|
||||
request, response = app.test_client.get(
|
||||
"/", headers={"X-Forwarded-For": "127.1.2.3", "X-Scheme": "https"}
|
||||
)
|
||||
assert request.scheme == "https"
|
||||
|
||||
|
||||
@@ -1499,9 +1504,6 @@ def test_request_bool(app):
|
||||
request, response = app.test_client.get("/")
|
||||
assert bool(request)
|
||||
|
||||
request.transport = False
|
||||
assert not bool(request)
|
||||
|
||||
|
||||
def test_request_parsing_form_failed(app, caplog):
|
||||
@app.route("/", methods=["POST"])
|
||||
@@ -1875,7 +1877,7 @@ def test_request_server_name_in_host_header(app):
|
||||
request, response = app.test_client.get(
|
||||
"/", headers={"Host": "mal_formed"}
|
||||
)
|
||||
assert request.server_name == None # For now (later maybe 127.0.0.1)
|
||||
assert request.server_name == None # For now (later maybe 127.0.0.1)
|
||||
|
||||
|
||||
def test_request_server_name_forwarded(app):
|
||||
@@ -1886,7 +1888,11 @@ def test_request_server_name_forwarded(app):
|
||||
app.config.PROXIES_COUNT = 1
|
||||
request, response = app.test_client.get(
|
||||
"/",
|
||||
headers={"Host": "my-server:5555", "X-Forwarded-For": "127.1.2.3", "X-Forwarded-Host": "your-server"},
|
||||
headers={
|
||||
"Host": "my-server:5555",
|
||||
"X-Forwarded-For": "127.1.2.3",
|
||||
"X-Forwarded-Host": "your-server",
|
||||
},
|
||||
)
|
||||
assert request.server_name == "your-server"
|
||||
|
||||
@@ -1928,7 +1934,12 @@ def test_request_server_port_forwarded(app):
|
||||
|
||||
app.config.PROXIES_COUNT = 1
|
||||
request, response = app.test_client.get(
|
||||
"/", headers={"Host": "my-server:5555", "X-Forwarded-For": "127.1.2.3", "X-Forwarded-Port": "4444"}
|
||||
"/",
|
||||
headers={
|
||||
"Host": "my-server:5555",
|
||||
"X-Forwarded-For": "127.1.2.3",
|
||||
"X-Forwarded-Port": "4444",
|
||||
},
|
||||
)
|
||||
assert request.server_port == 4444
|
||||
|
||||
@@ -1951,7 +1962,10 @@ def test_server_name_and_url_for(app):
|
||||
app.config.SERVER_NAME = "my-server"
|
||||
assert app.url_for("handler", _external=True) == "http://my-server/foo"
|
||||
request, response = app.test_client.get("/foo")
|
||||
assert request.url_for("handler") == f"http://my-server:{app.test_client.port}/foo"
|
||||
assert (
|
||||
request.url_for("handler")
|
||||
== f"http://my-server:{app.test_client.port}/foo"
|
||||
)
|
||||
|
||||
app.config.SERVER_NAME = "https://my-server/path"
|
||||
request, response = app.test_client.get("/foo")
|
||||
@@ -1972,7 +1986,12 @@ def test_url_for_with_forwarded_request(app):
|
||||
app.config.SERVER_NAME = "my-server"
|
||||
app.config.PROXIES_COUNT = 1
|
||||
request, response = app.test_client.get(
|
||||
"/", headers={"X-Forwarded-For": "127.1.2.3", "X-Forwarded-Proto": "https", "X-Forwarded-Port": "6789"}
|
||||
"/",
|
||||
headers={
|
||||
"X-Forwarded-For": "127.1.2.3",
|
||||
"X-Forwarded-Proto": "https",
|
||||
"X-Forwarded-Port": "6789",
|
||||
},
|
||||
)
|
||||
assert app.url_for("view_name") == "/another_view"
|
||||
assert (
|
||||
@@ -1984,7 +2003,12 @@ def test_url_for_with_forwarded_request(app):
|
||||
)
|
||||
|
||||
request, response = app.test_client.get(
|
||||
"/", headers={"X-Forwarded-For": "127.1.2.3", "X-Forwarded-Proto": "https", "X-Forwarded-Port": "443"}
|
||||
"/",
|
||||
headers={
|
||||
"X-Forwarded-For": "127.1.2.3",
|
||||
"X-Forwarded-Proto": "https",
|
||||
"X-Forwarded-Port": "443",
|
||||
},
|
||||
)
|
||||
assert request.url_for("view_name") == "https://my-server/another_view"
|
||||
|
||||
@@ -2079,3 +2103,19 @@ async def test_endpoint_blueprint_asgi():
|
||||
request, response = await app.asgi_client.get("/bp")
|
||||
|
||||
assert request.endpoint == "named.my_blueprint.bp_root"
|
||||
|
||||
|
||||
def test_url_for_without_server_name(app):
|
||||
@app.route("/sample")
|
||||
def sample(request):
|
||||
return json({"url": request.url_for("url_for")})
|
||||
|
||||
@app.route("/url-for")
|
||||
def url_for(request):
|
||||
return text("url-for")
|
||||
|
||||
request, response = app.test_client.get("/sample")
|
||||
assert (
|
||||
response.json["url"]
|
||||
== f"http://127.0.0.1:{app.test_client.port}/url-for"
|
||||
)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import signal
|
||||
|
||||
import pytest
|
||||
@@ -89,3 +90,53 @@ async def test_trigger_before_events_create_server(app):
|
||||
|
||||
assert hasattr(app, "db")
|
||||
assert isinstance(app.db, MySanicDb)
|
||||
|
||||
|
||||
def test_create_server_trigger_events(app):
|
||||
"""Test if create_server can trigger server events"""
|
||||
|
||||
flag1 = False
|
||||
flag2 = False
|
||||
flag3 = False
|
||||
|
||||
async def stop(app, loop):
|
||||
nonlocal flag1
|
||||
flag1 = True
|
||||
await asyncio.sleep(0.1)
|
||||
app.stop()
|
||||
|
||||
async def before_stop(app, loop):
|
||||
nonlocal flag2
|
||||
flag2 = True
|
||||
|
||||
async def after_stop(app, loop):
|
||||
nonlocal flag3
|
||||
flag3 = True
|
||||
|
||||
app.listener("after_server_start")(stop)
|
||||
app.listener("before_server_stop")(before_stop)
|
||||
app.listener("after_server_stop")(after_stop)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
serv_coro = app.create_server(return_asyncio_server=True)
|
||||
serv_task = asyncio.ensure_future(serv_coro, loop=loop)
|
||||
server = loop.run_until_complete(serv_task)
|
||||
server.after_start()
|
||||
try:
|
||||
loop.run_forever()
|
||||
except KeyboardInterrupt as e:
|
||||
loop.stop()
|
||||
finally:
|
||||
# Run the on_stop function if provided
|
||||
server.before_stop()
|
||||
|
||||
# Wait for server to close
|
||||
close_task = server.close()
|
||||
loop.run_until_complete(close_task)
|
||||
|
||||
# Complete all tasks on the loop
|
||||
signal.stopped = True
|
||||
for connection in server.connections:
|
||||
connection.close_if_idle()
|
||||
server.after_stop()
|
||||
assert flag1 and flag2 and flag3
|
||||
|
||||
Reference in New Issue
Block a user