From bc2b9869f2d56148bd8828a6368c9f3c9f6e65ed Mon Sep 17 00:00:00 2001 From: DABH Date: Tue, 23 Jun 2026 17:17:14 -0500 Subject: [PATCH] Fix flaky TMPRL1104 duration-log assertions in test_extstore The worker logs TMPRL1104 with one of three wordings depending on wall-clock task duration: "duration information" (<=5s), "exceeded 5 seconds" (>5s), or "exceeded 10 seconds" (>10s). The tests matched only the "duration information" wording, but the duration is wall-clock time over the whole activation, so on a loaded CI host (tests run under pytest-xdist) a trivial task can occasionally cross 5s and emit the "exceeded" wording instead, failing the re.match. The payload/duration fields under test are attached identically in all three branches, so the assertions now accept any duration-bucket wording. --- tests/worker/test_extstore.py | 42 ++++++++++++----------------------- 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/tests/worker/test_extstore.py b/tests/worker/test_extstore.py index e186f4e67..998a2bd27 100644 --- a/tests/worker/test_extstore.py +++ b/tests/worker/test_extstore.py @@ -624,6 +624,13 @@ def _tmprl1104_records(capturer: LogCapturer) -> list[logging.LogRecord]: return capturer.find_all(lambda r: r.getMessage().startswith("[TMPRL1104]")) +# Accept any duration-bucket wording: a loaded host can push a trivial task past 5s. +_TMPRL1104_DURATION_MESSAGE = re.compile( + r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task " + r"(?:duration information|exceeded \d+ seconds) \(" +) + + async def _expected_payload_size( converter: temporalio.converter.DataConverter, value: object ) -> int: @@ -655,10 +662,7 @@ async def test_tmprl1104_no_extstore(env: WorkflowEnvironment) -> None: records = _tmprl1104_records(capturer) assert len(records) == 1 record = records[0] - assert re.match( - r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(", - record.getMessage(), - ) + assert _TMPRL1104_DURATION_MESSAGE.match(record.getMessage()) assert hasattr(record, "workflow_task_duration") assert hasattr(record, "event_id") # No external storage — download/upload fields must be absent @@ -710,20 +714,14 @@ async def test_tmprl1104_with_extstore_download(env: WorkflowEnvironment) -> Non assert len(records) == 2 # WFT 1: retrieves the externalized workflow input - assert re.match( - r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(", - records[0].getMessage(), - ) + assert _TMPRL1104_DURATION_MESSAGE.match(records[0].getMessage()) assert getattr(records[0], "payload_download_count") == 1 assert getattr(records[0], "payload_download_size") == expected_input_size assert getattr(records[0], "payload_download_duration") > timedelta(0) assert not hasattr(records[0], "payload_upload_count") # WFT 2: activity result is small — no external storage - assert re.match( - r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(", - records[1].getMessage(), - ) + assert _TMPRL1104_DURATION_MESSAGE.match(records[1].getMessage()) assert not hasattr(records[1], "payload_download_count") assert not hasattr(records[1], "payload_upload_count") @@ -768,18 +766,12 @@ async def test_tmprl1104_with_extstore_upload(env: WorkflowEnvironment) -> None: assert len(records) == 2 # WFT 1: small input — no external storage - assert re.match( - r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(", - records[0].getMessage(), - ) + assert _TMPRL1104_DURATION_MESSAGE.match(records[0].getMessage()) assert not hasattr(records[0], "payload_download_count") assert not hasattr(records[0], "payload_upload_count") # WFT 2: workflow returns large result → uploaded - assert re.match( - r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(", - records[1].getMessage(), - ) + assert _TMPRL1104_DURATION_MESSAGE.match(records[1].getMessage()) assert not hasattr(records[1], "payload_download_count") assert getattr(records[1], "payload_upload_count") == 1 assert getattr(records[1], "payload_upload_size") == expected_output_size @@ -830,20 +822,14 @@ async def test_tmprl1104_with_extstore_download_and_upload( assert len(records) == 2 # WFT 1: retrieves externalized workflow input - assert re.match( - r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(", - records[0].getMessage(), - ) + assert _TMPRL1104_DURATION_MESSAGE.match(records[0].getMessage()) assert getattr(records[0], "payload_download_count") == 1 assert getattr(records[0], "payload_download_size") == expected_input_size assert getattr(records[0], "payload_download_duration") > timedelta(0) assert not hasattr(records[0], "payload_upload_count") # WFT 2: uploads externalized workflow result - assert re.match( - r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(", - records[1].getMessage(), - ) + assert _TMPRL1104_DURATION_MESSAGE.match(records[1].getMessage()) assert not hasattr(records[1], "payload_download_count") assert getattr(records[1], "payload_upload_count") == 1 assert getattr(records[1], "payload_upload_size") == expected_output_size