Files
sanic/sanic/touchup/schemes/ode.py
2022-01-16 09:03:04 +02:00

103 lines
3.3 KiB
Python

from ast import Attribute, Await, Dict, Expr, NodeTransformer, parse
from inspect import getsource
from textwrap import dedent
from typing import Any
from sanic.log import logger
from .base import BaseScheme
class OptionalDispatchEvent(BaseScheme):
ident = "ODE"
SYNC_SIGNAL_NAMESPACES = "http."
def __init__(self, app) -> None:
super().__init__(app)
self._sync_events()
self._registered_events = [
signal.name for signal in app.signal_router.routes
]
def run(self, method, module_globals):
raw_source = getsource(method)
src = dedent(raw_source)
tree = parse(src)
node = RemoveDispatch(
self._registered_events, self.app.state.verbosity
).visit(tree)
compiled_src = compile(node, method.__name__, "exec")
exec_locals: Dict[str, Any] = {}
exec(compiled_src, module_globals, exec_locals) # nosec
return exec_locals[method.__name__]
def _sync_events(self):
all_events = set()
app_events = {}
for app in self.app.__class__._app_registry.values():
if app.state.server_info:
app_events[app] = {
signal.name for signal in app.signal_router.routes
}
all_events.update(app_events[app])
for app, events in app_events.items():
missing = {
x
for x in all_events.difference(events)
if any(x.startswith(y) for y in self.SYNC_SIGNAL_NAMESPACES)
}
if missing:
was_finalized = app.signal_router.finalized
if was_finalized: # no cov
app.signal_router.reset()
for event in missing:
app.signal(event)(self.noop)
if was_finalized: # no cov
app.signal_router.finalize()
@staticmethod
async def noop(**_): # no cov
...
class RemoveDispatch(NodeTransformer):
def __init__(self, registered_events, verbosity: int = 0) -> None:
self._registered_events = registered_events
self._verbosity = verbosity
def visit_Expr(self, node: Expr) -> Any:
call = node.value
if isinstance(call, Await):
call = call.value
func = getattr(call, "func", None)
args = getattr(call, "args", None)
if not func or not args:
return node
if isinstance(func, Attribute) and func.attr == "dispatch":
event = args[0]
if hasattr(event, "s"):
event_name = getattr(event, "value", event.s)
if self._not_registered(event_name):
if self._verbosity >= 2:
logger.debug(f"Disabling event: {event_name}")
return None
return node
def _not_registered(self, event_name):
dynamic = []
for event in self._registered_events:
if event.endswith(">"):
namespace_concern, _ = event.rsplit(".", 1)
dynamic.append(namespace_concern)
namespace_concern, _ = event_name.rsplit(".", 1)
return (
event_name not in self._registered_events
and namespace_concern not in dynamic
)