Suppress task cancel traceback (#2812)

This commit is contained in:
Adam Hopkins 2023-08-30 09:05:37 +03:00 committed by GitHub
parent 4dde4572ec
commit 38ff9069f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -532,12 +532,11 @@ class WebsocketImplProtocol:
raise WebsocketClosed( raise WebsocketClosed(
"Cannot receive from websocket interface after it is closed." "Cannot receive from websocket interface after it is closed."
) )
assembler_get: Optional[asyncio.Task] = None
try: try:
self.recv_cancel = asyncio.Future() self.recv_cancel = asyncio.Future()
tasks = ( assembler_get = asyncio.create_task(self.assembler.get(timeout))
self.recv_cancel, tasks = (self.recv_cancel, assembler_get)
asyncio.ensure_future(self.assembler.get(timeout)),
)
done, pending = await asyncio.wait( done, pending = await asyncio.wait(
tasks, tasks,
return_when=asyncio.FIRST_COMPLETED, return_when=asyncio.FIRST_COMPLETED,
@ -551,6 +550,11 @@ class WebsocketImplProtocol:
else: else:
self.recv_cancel.cancel() self.recv_cancel.cancel()
return done_task.result() return done_task.result()
except asyncio.CancelledError:
# recv was cancelled
if assembler_get:
assembler_get.cancel()
raise
finally: finally:
self.recv_cancel = None self.recv_cancel = None
self.recv_lock.release() self.recv_lock.release()
@ -584,16 +588,15 @@ class WebsocketImplProtocol:
"Cannot receive from websocket interface after it is closed." "Cannot receive from websocket interface after it is closed."
) )
messages = [] messages = []
assembler_get: Optional[asyncio.Task] = None
try: try:
# Prevent pausing the transport when we're # Prevent pausing the transport when we're
# receiving a burst of messages # receiving a burst of messages
self.can_pause = False self.can_pause = False
self.recv_cancel = asyncio.Future() self.recv_cancel = asyncio.Future()
while True: while True:
tasks = ( assembler_get = asyncio.create_task(self.assembler.get(0))
self.recv_cancel, tasks = (self.recv_cancel, assembler_get)
asyncio.ensure_future(self.assembler.get(timeout=0)),
)
done, pending = await asyncio.wait( done, pending = await asyncio.wait(
tasks, tasks,
return_when=asyncio.FIRST_COMPLETED, return_when=asyncio.FIRST_COMPLETED,
@ -616,6 +619,11 @@ class WebsocketImplProtocol:
# next message to pass into the Assembler # next message to pass into the Assembler
await asyncio.sleep(0) await asyncio.sleep(0)
self.recv_cancel.cancel() self.recv_cancel.cancel()
except asyncio.CancelledError:
# recv_burst was cancelled
if assembler_get:
assembler_get.cancel()
raise
finally: finally:
self.recv_cancel = None self.recv_cancel = None
self.can_pause = True self.can_pause = True