Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions lambda-durable-ddb-streams-python-sam/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Amazon DynamoDB Streams to AWS Lambda Durable Function

This pattern implements a change data capture (CDC) pipeline using DynamoDB Streams and Lambda durable functions. When items are written to the source table, the stream triggers a multi-step processing pipeline with automatic checkpointing at each step.

Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/lambda-durable-ddb-streams-python-sam](https://serverlessland.com/patterns/lambda-durable-ddb-streams-python-sam)

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed
* [Python 3.13](https://www.python.org/downloads/) installed and available in your PATH

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
1. Change directory to the pattern directory:
```
cd lambda-durable-ddb-streams-python-sam
```
1. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yaml file:
```
sam build
sam deploy --guided
```
1. During the prompts:
* Enter a stack name
* Enter the desired AWS Region
* Allow SAM CLI to create IAM roles with the required permissions.

Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults.

1. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing.

## How it works

This pattern creates:

1. **Durable Lambda Function (Stream Processor)**: A Python 3.13 Lambda function that processes DynamoDB stream events through three checkpointed steps: validate, enrich, and notify.

2. **Source Table (DynamoDB with Streams)**: The table where items are written. DynamoDB Streams captures INSERT, MODIFY, and REMOVE events with NEW_AND_OLD_IMAGES.

3. **Processed Table (DynamoDB)**: Stores enriched records with computed metadata (event type, timestamps, flags).

4. **Notifications Table (DynamoDB)**: Audit trail of notification records for each processed event.

### Durable Execution Flow

1. An item is written to the source table, generating a stream event.
2. The durable function receives a batch of stream records.
3. For each record:
- **Step 1 (Validate)**: Checks record integrity and required fields. REMOVE events are validated but skip further processing.
- **Step 2 (Enrich)**: Adds computed fields (category, timestamps, name length, description flag) and writes to the processed table.
- **Step 3 (Notify)**: Creates an audit notification record in the notifications table.
4. Each step is checkpointed. If the function replays, completed steps return cached results without re-executing.
5. Uses `ReportBatchItemFailures` so only failed records are retried — successfully processed records are not re-sent.

**Important**: This pattern uses `AutoPublishAlias: live` because DynamoDB Streams event source mappings with durable functions require a qualified ARN (published version or alias). The `ExecutionTimeout` is set to 900 seconds (maximum allowed for event source mappings).

## Testing

1. Write an item to the source table (INSERT event):
```bash
aws dynamodb put-item \
--table-name lambda-durable-ddb-streams-source \
--item '{
"pk": {"S": "PRODUCT-001"},
"name": {"S": "Wireless Headphones"},
"category": {"S": "Electronics"},
"description": {"S": "Premium noise-cancelling headphones"}
}'
```

2. After 10-30 seconds, verify the processed record:
```bash
aws dynamodb scan --table-name lambda-durable-ddb-streams-processed
```

Expected: enriched record with `event_type: INSERT`, `processed_at`, `name_length`, `has_description: true`.

3. Verify the notification:
```bash
aws dynamodb scan --table-name lambda-durable-ddb-streams-notifications
```

Expected: notification record with `event_type`, `message`, `created_at`.

4. Test MODIFY event:
```bash
aws dynamodb put-item \
--table-name lambda-durable-ddb-streams-source \
--item '{
"pk": {"S": "PRODUCT-001"},
"name": {"S": "Wireless Headphones Pro"},
"category": {"S": "Electronics"}
}'
```

## Cleanup

1. Delete the stack:
```bash
sam delete
```

----
Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
69 changes: 69 additions & 0 deletions lambda-durable-ddb-streams-python-sam/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"title": "Amazon DynamoDB Streams to AWS Lambda Durable Function",
"description": "Change data capture pipeline using DynamoDB Streams and Lambda durable functions with checkpointed validate, enrich, and notify steps.",
"language": "Python",
"level": "300",
"framework": "AWS SAM",
"introBox": {
"headline": "How it works",
"text": [
"This pattern processes DynamoDB stream events through a multi-step pipeline using Lambda durable functions.",
"When items are written to the source DynamoDB table, streams trigger a durable function that validates, enriches, and sends notifications for each record.",
"Each step is checkpointed. If the function is interrupted during processing, it resumes from the last checkpoint without re-executing completed steps.",
"Uses ReportBatchItemFailures for partial batch error handling so successfully processed records are not retried.",
"The function is deployed with AutoPublishAlias to meet the qualified ARN requirement for event source mappings with durable functions."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-ddb-streams-python-sam",
"templateURL": "serverless-patterns/lambda-durable-ddb-streams-python-sam",
"projectFolder": "lambda-durable-ddb-streams-python-sam",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "AWS Lambda durable functions",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html"
},
{
"text": "Using Lambda with DynamoDB Streams",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html"
},
{
"text": "Durable Execution SDK for Python",
"link": "https://github.com/aws/aws-durable-execution-sdk-python"
},
{
"text": "Reporting batch item failures for DynamoDB Streams",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting"
}
]
},
"deploy": {
"text": [
"sam build",
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Ajaya Shrestha",
"image": "",
"bio": "Cloud Support Engineer at AWS specializing in Lambda and serverless architectures.",
"linkedin": ""
}
]
}
172 changes: 172 additions & 0 deletions lambda-durable-ddb-streams-python-sam/src/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""
DynamoDB Streams → Lambda Durable Function.

Processes stream records through a multi-step pipeline:
1. Validate — check record integrity
2. Enrich — add computed fields and metadata
3. Notify — write an audit notification record

Each step is checkpointed. If the function is interrupted during processing,
it resumes from the last checkpoint without re-executing completed steps.

Uses ReportBatchItemFailures for partial batch error handling.
"""

import os
import uuid
import logging
from datetime import datetime, timezone
from decimal import Decimal

import boto3
from aws_durable_execution_sdk_python.context import DurableContext, StepContext, durable_step
from aws_durable_execution_sdk_python.execution import durable_execution

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource("dynamodb")
processed_table = dynamodb.Table(os.environ["PROCESSED_TABLE"])
notifications_table = dynamodb.Table(os.environ["NOTIFICATIONS_TABLE"])


@durable_step
def validate_record(step_context: StepContext, record: dict) -> dict:
"""Step 1: Validate the stream record has required fields."""
step_context.logger.info("Validating record: %s", record.get("pk"))

event_name = record.get("event_name")
new_image = record.get("new_image", {})

if event_name == "REMOVE":
# Deletions are valid but we skip enrichment
return {"valid": True, "skip_processing": True, "event_name": event_name}

# Validate required fields for INSERT/MODIFY
required_fields = ["pk", "name"]
missing = [f for f in required_fields if f not in new_image]

if missing:
raise UnrecoverableInvocationError(
f"Record missing required fields: {missing}"
)

return {
"valid": True,
"skip_processing": False,
"event_name": event_name,
"data": new_image,
}


@durable_step
def enrich_record(step_context: StepContext, validation: dict) -> dict:
"""Step 2: Enrich the record with computed fields."""
step_context.logger.info("Enriching record: %s", validation["data"].get("pk"))

data = validation["data"]
now = datetime.now(timezone.utc).isoformat()

# Add enrichment fields
enriched = {
"pk": data["pk"],
"name": data["name"],
"category": data.get("category", "UNCATEGORIZED"),
"event_type": validation["event_name"],
"processed_at": now,
"name_length": len(data.get("name", "")),
"has_description": "description" in data,
}

# Write enriched record to processed table
processed_table.put_item(Item=enriched)

return enriched


@durable_step
def send_notification(step_context: StepContext, enriched: dict) -> dict:
"""Step 3: Create an audit notification for the processed record."""
step_context.logger.info("Sending notification for: %s", enriched["pk"])

notification_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()

notification = {
"notification_id": notification_id,
"record_pk": enriched["pk"],
"event_type": enriched["event_type"],
"message": f"Record '{enriched['name']}' was processed (event: {enriched['event_type']})",
"created_at": now,
}

notifications_table.put_item(Item=notification)

return {"notification_id": notification_id, "status": "SENT"}


@durable_execution
def lambda_handler(event, context: DurableContext) -> dict:
"""
Process DynamoDB stream records through a checkpointed pipeline.
Returns batch item failures for records that could not be processed.
"""
batch_item_failures = []

for record in event.get("Records", []):
event_id = record.get("eventID", "unknown")

try:
# Extract stream record data
stream_record = {
"pk": record.get("dynamodb", {}).get("Keys", {}).get("pk", {}).get("S", ""),
"event_name": record.get("eventName", ""),
"new_image": _deserialize_image(
record.get("dynamodb", {}).get("NewImage", {})
),
"old_image": _deserialize_image(
record.get("dynamodb", {}).get("OldImage", {})
),
}

# Step 1: Validate
validation = context.step(validate_record(stream_record))

if validation.get("skip_processing"):
context.logger.info("Skipping REMOVE event for %s", stream_record["pk"])
continue

# Step 2: Enrich
enriched = context.step(enrich_record(validation))

# Step 3: Notify
notification = context.step(send_notification(enriched))

context.logger.info(
"Processed %s → notification %s",
stream_record["pk"],
notification["notification_id"],
)

except Exception as err:
context.logger.error("Failed to process record %s: %s", event_id, str(err))
batch_item_failures.append({"itemIdentifier": event_id})

return {"batchItemFailures": batch_item_failures}


def _deserialize_image(image: dict) -> dict:
"""Simple DynamoDB image deserializer for common types."""
result = {}
for key, value in image.items():
if "S" in value:
result[key] = value["S"]
elif "N" in value:
result[key] = value["N"]
elif "BOOL" in value:
result[key] = value["BOOL"]
elif "NULL" in value:
result[key] = None
else:
result[key] = str(value)
return result
2 changes: 2 additions & 0 deletions lambda-durable-ddb-streams-python-sam/src/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
aws-durable-execution-sdk-python>=1.0.0
boto3>=1.34.0
Loading