Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,21 @@
},
"path": "./src/logger_example/logger_example.py"
},
{
"name": "Replay Logging",
"description": "Demonstrating replay-aware logger de-duplication across a wait/replay boundary",
"handler": "replay_logging.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"loggingConfig": {
"ApplicationLogLevel": "INFO",
"LogFormat": "JSON"
},
"path": "./src/logger_example/replay_logging.py"
},
{
"name": "Steps with Retry",
"description": "Multiple steps with retry logic in a polling pattern",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Example demonstrating replay-aware logging across a wait boundary."""

from typing import Any

from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.context import (
DurableContext,
StepContext,
durable_step,
durable_with_child_context,
)
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_step
def prepare(step_context: StepContext, item: str) -> str:
"""A step that runs before the wait.

Its log is emitted on the first invocation. On replay this step is not
re-executed (it returns its checkpointed result), so this log does not
repeat.
"""
step_context.logger.info("Preparing item", extra={"item": item})
return f"prepared:{item}"


@durable_step
def finalize(step_context: StepContext, prepared: str) -> str:
"""A step that runs after the wait (new work on the replay invocation)."""
step_context.logger.info("Finalizing item", extra={"prepared": prepared})
return f"done:{prepared}"


@durable_with_child_context
def audit(child_ctx: DurableContext, prepared: str) -> str:
"""Child context with its own logger and its own replay status."""
child_ctx.logger.info(
"Auditing in child context (before child wait)",
extra={"prepared": prepared, "child_is_replaying": child_ctx.is_replaying()},
)

# The child's own replay boundary.
child_ctx.wait(duration=Duration.from_seconds(5), name="audit_cooldown")

# After the child's wait: emitted as new work on the child's replay.
child_ctx.logger.info(
"Resumed in child context (after child wait)",
extra={"child_is_replaying": child_ctx.is_replaying()},
)

return child_ctx.step(lambda _: f"audited:{prepared}", name="record_audit")


@durable_execution
def handler(event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating replay-aware logging across a wait."""
item: str = event.get("item", "widget") if isinstance(event, dict) else "widget"

# --- Before the wait ---
# On the replay invocation these lines are de-duplicated by the replay-aware
# logger because the context is still replaying when it reaches them.
context.logger.info(
"Workflow started (before wait)",
extra={"item": item, "is_replaying": context.is_replaying()},
)

prepared: str = context.step(prepare(item), name="prepare")

context.logger.info(
"Prepared, about to wait",
extra={"prepared": prepared, "is_replaying": context.is_replaying()},
)

# --- The replay boundary ---
# The wait suspends the execution. When it resumes, the handler replays from
# the top; everything above is de-duplicated, and everything below is new.
context.wait(duration=Duration.from_seconds(5), name="cooldown")

# --- After the wait ---
# These logs are emitted on the replay invocation because the context has
# crossed its replay boundary and is no longer replaying.
context.logger.info(
"Resumed after wait",
extra={"is_replaying": context.is_replaying()},
)

audited: str = context.run_in_child_context(audit(prepared), name="audit")

result: str = context.step(finalize(audited), name="finalize")

context.logger.info("Workflow completed", extra={"result": result})

return {"result": result, "item": item}
18 changes: 18 additions & 0 deletions packages/aws-durable-execution-sdk-python-examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,24 @@
}
}
},
"ReplayLogging": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "replay_logging.handler",
"Description": "Demonstrating replay-aware logger de-duplication across a wait/replay boundary",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
}
}
},
"StepsWithRetry": {
"Type": "AWS::Serverless::Function",
"Properties": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Tests for the replay_logging example.

These tests do not assert on emitted log lines (the replay-aware
de-duplication is best observed in CloudWatch after deploying). They verify the
workflow runs end-to-end across the wait/replay boundary and produces the
expected operations and result.
"""

import pytest

from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from src.logger_example import replay_logging
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=replay_logging.handler,
lambda_function_name="Replay Logging",
)
def test_replay_logging(durable_runner):
"""Test the replay-aware logging example runs across the wait boundary."""
with durable_runner:
result = durable_runner.run(input={"item": "widget"}, timeout=30)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == {
"result": "done:audited:prepared:widget",
"item": "widget",
}

# Two wait operations force suspend/replay cycles: one in the parent context
# and one inside the child (audit) context. This exercises per-context replay
# status in different contexts.
wait_ops = [
op for op in result.operations if op.operation_type == OperationType.WAIT
]
assert len(wait_ops) >= 1

# Steps before (prepare) and after (finalize) the wait both ran. The child
# context's record_audit step is nested inside the CONTEXT operation.
step_ops = [
op for op in result.operations if op.operation_type == OperationType.STEP
]
assert len(step_ops) >= 2

# The audit child context produces a CONTEXT operation.
context_ops = [
op for op in result.operations if op.operation_type.value == "CONTEXT"
]
assert len(context_ops) >= 1
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,6 @@ def run_in_child_handler() -> ResultType:
is_virtual=is_virtual,
),
)
child_context.state.track_replay(operation_id=operation_id)
return result

def replay(self, execution_state: ExecutionState, executor_context: DurableContext):
Expand Down
Loading