diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 74edc66b7..5c4ddcffd 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -86,6 +86,17 @@ LOG_IGNORE_DURING_DELETE = False +async def _shield_await(fut: asyncio.Future[Any]) -> Any: + """Await a future without cancelling it if the awaiting task is cancelled. + + This behaves like ``asyncio.shield(fut)`` but avoids the spurious + "exception in shielded future" error log on Python 3.11+ when the + awaiting task is cancelled and the future eventually fails. + """ + await asyncio.wait([fut]) + return fut.result() + + class WorkflowRunner(ABC): """Abstract runner for workflows that creates workflow instances to run. @@ -1900,9 +1911,10 @@ async def run_activity() -> Any: # be marked as unstarted handle._started = True try: - # We have to shield because we don't want the underlying - # result future to be cancelled - return await asyncio.shield(handle._result_fut) + # We use _shield_await instead of asyncio.shield to prevent + # the underlying result future from being cancelled while avoiding + # a spurious error log on Python 3.11+ (see issue #1600). + return await _shield_await(handle._result_fut) except _ActivityDoBackoffError as err: # We have to sleep then reschedule. Note this sleep can be # cancelled like any other timer. @@ -2016,9 +2028,10 @@ def apply_child_cancel_error(err: asyncio.CancelledError) -> None: async def run_child() -> Any: while True: try: - # We have to shield because we don't want the future itself - # to be cancelled - return await asyncio.shield(handle._result_fut) + # We use _shield_await instead of asyncio.shield to prevent + # the future itself from being cancelled while avoiding a + # spurious error log on Python 3.11+ (see issue #1600). + return await _shield_await(handle._result_fut) except asyncio.CancelledError as err: apply_child_cancel_error(err) # Clear the cancellation counter on Python 3.11+ so the @@ -2039,9 +2052,10 @@ async def run_child() -> Any: # Wait on start before returning while True: try: - # We have to shield because we don't want the future itself - # to be cancelled - await asyncio.shield(handle._start_fut) + # We use _shield_await instead of asyncio.shield to prevent + # the future itself from being cancelled while avoiding a + # spurious error log on Python 3.11+ (see issue #1600). + await _shield_await(handle._start_fut) return handle except asyncio.CancelledError as err: apply_child_cancel_error(err) @@ -2076,7 +2090,7 @@ async def _outbound_start_nexus_operation( async def operation_handle_fn() -> OutputT: while True: try: - return cast(OutputT, await asyncio.shield(handle._result_fut)) + return cast(OutputT, await _shield_await(handle._result_fut)) except asyncio.CancelledError: cancel_command = self._add_command() handle._apply_cancel_command(cancel_command) @@ -2105,7 +2119,10 @@ async def operation_handle_fn() -> OutputT: while True: try: - await asyncio.shield(handle._start_fut) + # We use _shield_await instead of asyncio.shield to prevent + # the future itself from being cancelled while avoiding a + # spurious error log on Python 3.11+ (see issue #1600). + await _shield_await(handle._start_fut) return handle except asyncio.CancelledError: cancel_command = self._add_command() @@ -2644,9 +2661,10 @@ async def _signal_external_workflow( # Wait until completed or cancelled while True: try: - # We have to shield because we don't want the future itself - # to be cancelled - return await asyncio.shield(done_fut) + # We use _shield_await instead of asyncio.shield to prevent + # the future itself from being cancelled while avoiding a + # spurious error log on Python 3.11+ (see issue #1600). + return await _shield_await(done_fut) except asyncio.CancelledError: cancel_command = self._add_command() cancel_command.cancel_signal_workflow.seq = seq diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index d20077cf5..5d927c2d0 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -9162,3 +9162,67 @@ async def test_workflow_uncancel_shield_signal_external(client: Client): assert shielded_err is None, ( f"Unexpected 'exception in shielded future' log: {shielded_err}" ) + + +class _SlowActivity: + def __init__(self) -> None: + self.started = asyncio.Event() + + @activity.defn(name="slow_activity") + async def slow_activity(self) -> None: + self.started.set() + await asyncio.sleep(60) + + +@workflow.defn +class _CancelInFlightActivityWorkflow: + @workflow.run + async def run(self) -> None: + await asyncio.gather( + *( + workflow.execute_activity( + "slow_activity", + start_to_close_timeout=timedelta(minutes=2), + ) + for _ in range(4) + ) + ) + + +@pytest.mark.asyncio +async def test_workflow_cancel_no_shielded_future_log( + client: Client, caplog: pytest.LogCaptureFixture +): + activity_inst = _SlowActivity() + async with new_worker( + client, + _CancelInFlightActivityWorkflow, + activities=[activity_inst.slow_activity], + ) as worker: + handle = await client.start_workflow( + _CancelInFlightActivityWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(minutes=5), + ) + + # Wait for activities to start + await asyncio.wait_for(activity_inst.started.wait(), timeout=10) + + # Ignore worker startup logs + caplog.clear() + + with caplog.at_level(logging.ERROR): + await handle.cancel() + + try: + await handle.result() + except WorkflowFailureError as err: + assert isinstance(err.cause, CancelledError) + + await asyncio.sleep(0.5) + + assert not any( + "exception in shielded future" in record.message + for record in caplog.records + )