Files
sanic/sanic/compat.py
L. Kärkkäinen 120f0262f7 Fix Ctrl+C and tests on Windows. (#1808)
* Fix Ctrl+C on Windows.

* Disable testing of a function N/A on Windows.

* Add test for coverage, avoid crash on missing _stopping.

* Initialise StreamingHTTPResponse.protocol = None

* Improved comments.

* Reduce amount of data in test_request_stream to avoid failures on Windows.

* The Windows test doesn't work on Windows :(

* Use port numbers more likely to be free than 8000.

* Disable the other signal tests on Windows as well.

* Windows doesn't properly support SO_REUSEADDR, so that's disabled in Python, and thus rebinding fails. For successful testing, reuse port instead.

* app.run argument handling: added server kwargs (alike create_server), added warning on extra kwargs, made auto_reload explicit argument. Another go at Windows tests

* Revert "app.run argument handling: added server kwargs (alike create_server), added warning on extra kwargs, made auto_reload explicit argument. Another go at Windows tests"

This reverts commit dc5d682448.

* Use random test server port on most tests. Should avoid port/addr reuse issues.

* Another test to random port instead of 8000.

* Fix deprecation warnings about missing name on Sanic() in tests.

* Linter and typing

* Increase test coverage

* Rewrite test for ctrlc_windows_workaround

* py36 compat

* py36 compat

* py36 compat

* Don't rely on loop internals but add a stopping flag to app.

* App may be restarted.

* py36 compat

* Linter

* Add a constant for OS checking.

Co-authored-by: L. Kärkkäinen <tronic@users.noreply.github.com>
2020-03-25 21:42:46 -07:00

53 lines
1.4 KiB
Python

import asyncio
import signal
from sys import argv
from multidict import CIMultiDict # type: ignore
class Header(CIMultiDict):
def get_all(self, key):
return self.getall(key, default=[])
use_trio = argv[0].endswith("hypercorn") and "trio" in argv
if use_trio:
from trio import open_file as open_async, Path # type: ignore
def stat_async(path):
return Path(path).stat()
else:
from aiofiles import open as aio_open # type: ignore
from aiofiles.os import stat as stat_async # type: ignore # noqa: F401
async def open_async(file, mode="r", **kwargs):
return aio_open(file, mode, **kwargs)
def ctrlc_workaround_for_windows(app):
async def stay_active(app):
"""Asyncio wakeups to allow receiving SIGINT in Python"""
while not die:
# If someone else stopped the app, just exit
if app.is_stopping:
return
# Windows Python blocks signal handlers while the event loop is
# waiting for I/O. Frequent wakeups keep interrupts flowing.
await asyncio.sleep(0.1)
# Can't be called from signal handler, so call it from here
app.stop()
def ctrlc_handler(sig, frame):
nonlocal die
if die:
raise KeyboardInterrupt("Non-graceful Ctrl+C")
die = True
die = False
signal.signal(signal.SIGINT, ctrlc_handler)
app.add_task(stay_active)