Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 14 additions & 28 deletions tests/worker/test_extstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,13 @@ def _tmprl1104_records(capturer: LogCapturer) -> list[logging.LogRecord]:
return capturer.find_all(lambda r: r.getMessage().startswith("[TMPRL1104]"))


# Accept any duration-bucket wording: a loaded host can push a trivial task past 5s.
_TMPRL1104_DURATION_MESSAGE = re.compile(
r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task "
r"(?:duration information|exceeded \d+ seconds) \("
)


async def _expected_payload_size(
converter: temporalio.converter.DataConverter, value: object
) -> int:
Expand Down Expand Up @@ -655,10 +662,7 @@ async def test_tmprl1104_no_extstore(env: WorkflowEnvironment) -> None:
records = _tmprl1104_records(capturer)
assert len(records) == 1
record = records[0]
assert re.match(
r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(",
record.getMessage(),
)
assert _TMPRL1104_DURATION_MESSAGE.match(record.getMessage())
assert hasattr(record, "workflow_task_duration")
assert hasattr(record, "event_id")
# No external storage — download/upload fields must be absent
Expand Down Expand Up @@ -710,20 +714,14 @@ async def test_tmprl1104_with_extstore_download(env: WorkflowEnvironment) -> Non
assert len(records) == 2

# WFT 1: retrieves the externalized workflow input
assert re.match(
r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(",
records[0].getMessage(),
)
assert _TMPRL1104_DURATION_MESSAGE.match(records[0].getMessage())
assert getattr(records[0], "payload_download_count") == 1
assert getattr(records[0], "payload_download_size") == expected_input_size
assert getattr(records[0], "payload_download_duration") > timedelta(0)
assert not hasattr(records[0], "payload_upload_count")

# WFT 2: activity result is small — no external storage
assert re.match(
r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(",
records[1].getMessage(),
)
assert _TMPRL1104_DURATION_MESSAGE.match(records[1].getMessage())
assert not hasattr(records[1], "payload_download_count")
assert not hasattr(records[1], "payload_upload_count")

Expand Down Expand Up @@ -768,18 +766,12 @@ async def test_tmprl1104_with_extstore_upload(env: WorkflowEnvironment) -> None:
assert len(records) == 2

# WFT 1: small input — no external storage
assert re.match(
r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(",
records[0].getMessage(),
)
assert _TMPRL1104_DURATION_MESSAGE.match(records[0].getMessage())
assert not hasattr(records[0], "payload_download_count")
assert not hasattr(records[0], "payload_upload_count")

# WFT 2: workflow returns large result → uploaded
assert re.match(
r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(",
records[1].getMessage(),
)
assert _TMPRL1104_DURATION_MESSAGE.match(records[1].getMessage())
assert not hasattr(records[1], "payload_download_count")
assert getattr(records[1], "payload_upload_count") == 1
assert getattr(records[1], "payload_upload_size") == expected_output_size
Expand Down Expand Up @@ -830,20 +822,14 @@ async def test_tmprl1104_with_extstore_download_and_upload(
assert len(records) == 2

# WFT 1: retrieves externalized workflow input
assert re.match(
r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(",
records[0].getMessage(),
)
assert _TMPRL1104_DURATION_MESSAGE.match(records[0].getMessage())
assert getattr(records[0], "payload_download_count") == 1
assert getattr(records[0], "payload_download_size") == expected_input_size
assert getattr(records[0], "payload_download_duration") > timedelta(0)
assert not hasattr(records[0], "payload_upload_count")

# WFT 2: uploads externalized workflow result
assert re.match(
r"\[TMPRL1104\] [^:]+:\d+:\d+ Workflow task duration information \(",
records[1].getMessage(),
)
assert _TMPRL1104_DURATION_MESSAGE.match(records[1].getMessage())
assert not hasattr(records[1], "payload_download_count")
assert getattr(records[1], "payload_upload_count") == 1
assert getattr(records[1], "payload_upload_size") == expected_output_size
Expand Down
Loading