From 38ff9069f3d1a96734ab14c05652806edc6b012a Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Wed, 30 Aug 2023 09:05:37 +0300 Subject: [PATCH] Suppress task cancel traceback (#2812) --- sanic/server/websockets/impl.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/sanic/server/websockets/impl.py b/sanic/server/websockets/impl.py index 71439cfd..e08a41b7 100644 --- a/sanic/server/websockets/impl.py +++ b/sanic/server/websockets/impl.py @@ -532,12 +532,11 @@ class WebsocketImplProtocol: raise WebsocketClosed( "Cannot receive from websocket interface after it is closed." ) + assembler_get: Optional[asyncio.Task] = None try: self.recv_cancel = asyncio.Future() - tasks = ( - self.recv_cancel, - asyncio.ensure_future(self.assembler.get(timeout)), - ) + assembler_get = asyncio.create_task(self.assembler.get(timeout)) + tasks = (self.recv_cancel, assembler_get) done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED, @@ -551,6 +550,11 @@ class WebsocketImplProtocol: else: self.recv_cancel.cancel() return done_task.result() + except asyncio.CancelledError: + # recv was cancelled + if assembler_get: + assembler_get.cancel() + raise finally: self.recv_cancel = None self.recv_lock.release() @@ -584,16 +588,15 @@ class WebsocketImplProtocol: "Cannot receive from websocket interface after it is closed." ) messages = [] + assembler_get: Optional[asyncio.Task] = None try: # Prevent pausing the transport when we're # receiving a burst of messages self.can_pause = False self.recv_cancel = asyncio.Future() while True: - tasks = ( - self.recv_cancel, - asyncio.ensure_future(self.assembler.get(timeout=0)), - ) + assembler_get = asyncio.create_task(self.assembler.get(0)) + tasks = (self.recv_cancel, assembler_get) done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED, @@ -616,6 +619,11 @@ class WebsocketImplProtocol: # next message to pass into the Assembler await asyncio.sleep(0) self.recv_cancel.cancel() + except asyncio.CancelledError: + # recv_burst was cancelled + if assembler_get: + assembler_get.cancel() + raise finally: self.recv_cancel = None self.can_pause = True