Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@
LOG_IGNORE_DURING_DELETE = False


async def _shield_await(fut: asyncio.Future[Any]) -> Any:
"""Await a future without cancelling it if the awaiting task is cancelled.

This behaves like ``asyncio.shield(fut)`` but avoids the spurious
"exception in shielded future" error log on Python 3.11+ when the
awaiting task is cancelled and the future eventually fails.
"""
await asyncio.wait([fut])
return fut.result()


class WorkflowRunner(ABC):
"""Abstract runner for workflows that creates workflow instances to run.

Expand Down Expand Up @@ -1900,9 +1911,10 @@ async def run_activity() -> Any:
# be marked as unstarted
handle._started = True
try:
# We have to shield because we don't want the underlying
# result future to be cancelled
return await asyncio.shield(handle._result_fut)
# We use _shield_await instead of asyncio.shield to prevent
# the underlying result future from being cancelled while avoiding
# a spurious error log on Python 3.11+ (see issue #1600).
return await _shield_await(handle._result_fut)
except _ActivityDoBackoffError as err:
# We have to sleep then reschedule. Note this sleep can be
# cancelled like any other timer.
Expand Down Expand Up @@ -2016,9 +2028,10 @@ def apply_child_cancel_error(err: asyncio.CancelledError) -> None:
async def run_child() -> Any:
while True:
try:
# We have to shield because we don't want the future itself
# to be cancelled
return await asyncio.shield(handle._result_fut)
# We use _shield_await instead of asyncio.shield to prevent
# the future itself from being cancelled while avoiding a
# spurious error log on Python 3.11+ (see issue #1600).
return await _shield_await(handle._result_fut)
except asyncio.CancelledError as err:
apply_child_cancel_error(err)
# Clear the cancellation counter on Python 3.11+ so the
Expand All @@ -2039,9 +2052,10 @@ async def run_child() -> Any:
# Wait on start before returning
while True:
try:
# We have to shield because we don't want the future itself
# to be cancelled
await asyncio.shield(handle._start_fut)
# We use _shield_await instead of asyncio.shield to prevent
# the future itself from being cancelled while avoiding a
# spurious error log on Python 3.11+ (see issue #1600).
await _shield_await(handle._start_fut)
return handle
except asyncio.CancelledError as err:
apply_child_cancel_error(err)
Expand Down Expand Up @@ -2076,7 +2090,7 @@ async def _outbound_start_nexus_operation(
async def operation_handle_fn() -> OutputT:
while True:
try:
return cast(OutputT, await asyncio.shield(handle._result_fut))
return cast(OutputT, await _shield_await(handle._result_fut))
except asyncio.CancelledError:
cancel_command = self._add_command()
handle._apply_cancel_command(cancel_command)
Expand Down Expand Up @@ -2105,7 +2119,10 @@ async def operation_handle_fn() -> OutputT:

while True:
try:
await asyncio.shield(handle._start_fut)
# We use _shield_await instead of asyncio.shield to prevent
# the future itself from being cancelled while avoiding a
# spurious error log on Python 3.11+ (see issue #1600).
await _shield_await(handle._start_fut)
return handle
except asyncio.CancelledError:
cancel_command = self._add_command()
Expand Down Expand Up @@ -2644,9 +2661,10 @@ async def _signal_external_workflow(
# Wait until completed or cancelled
while True:
try:
# We have to shield because we don't want the future itself
# to be cancelled
return await asyncio.shield(done_fut)
# We use _shield_await instead of asyncio.shield to prevent
# the future itself from being cancelled while avoiding a
# spurious error log on Python 3.11+ (see issue #1600).
return await _shield_await(done_fut)
except asyncio.CancelledError:
cancel_command = self._add_command()
cancel_command.cancel_signal_workflow.seq = seq
Expand Down
64 changes: 64 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9162,3 +9162,67 @@ async def test_workflow_uncancel_shield_signal_external(client: Client):
assert shielded_err is None, (
f"Unexpected 'exception in shielded future' log: {shielded_err}"
)


class _SlowActivity:
def __init__(self) -> None:
self.started = asyncio.Event()

@activity.defn(name="slow_activity")
async def slow_activity(self) -> None:
self.started.set()
await asyncio.sleep(60)


@workflow.defn
class _CancelInFlightActivityWorkflow:
@workflow.run
async def run(self) -> None:
await asyncio.gather(
*(
workflow.execute_activity(
"slow_activity",
start_to_close_timeout=timedelta(minutes=2),
)
for _ in range(4)
)
)


@pytest.mark.asyncio
async def test_workflow_cancel_no_shielded_future_log(
client: Client, caplog: pytest.LogCaptureFixture
):
activity_inst = _SlowActivity()
async with new_worker(
client,
_CancelInFlightActivityWorkflow,
activities=[activity_inst.slow_activity],
) as worker:
handle = await client.start_workflow(
_CancelInFlightActivityWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
execution_timeout=timedelta(minutes=5),
)

# Wait for activities to start
await asyncio.wait_for(activity_inst.started.wait(), timeout=10)

# Ignore worker startup logs
caplog.clear()

with caplog.at_level(logging.ERROR):
await handle.cancel()

try:
await handle.result()
except WorkflowFailureError as err:
assert isinstance(err.cause, CancelledError)

await asyncio.sleep(0.5)

assert not any(
"exception in shielded future" in record.message
for record in caplog.records
)