Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions contrib/temporal-payload-storage-s3driver-awssdkv2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
description = '''Temporal Java SDK External Storage S3 Driver - AWS SDK v2 Client'''

ext {
awsSdkVersion = '2.31.0'
}

dependencies {
api project(':temporal-payload-storage-s3driver')
api platform("software.amazon.awssdk:bom:$awsSdkVersion")
api "software.amazon.awssdk:s3"

// For the @Experimental annotation only.
compileOnly project(':temporal-sdk')

testImplementation project(':temporal-payload-storage-s3driver')
testImplementation "junit:junit:${junitVersion}"
testImplementation "org.mockito:mockito-core:${mockitoVersion}"
testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.temporal.payload.storage.s3driver.awssdkv2;

import io.temporal.common.Experimental;
import io.temporal.payload.storage.s3driver.S3StorageDriverClient;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nonnull;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;

/**
* {@link S3StorageDriverClient} backed by the AWS SDK for Java v2 {@link S3AsyncClient}. The
* wrapped client must be configured with credentials and a region by the caller.
*/
@Experimental
public final class S3AsyncClientAdapter implements S3StorageDriverClient {
private final S3AsyncClient client;

public S3AsyncClientAdapter(@Nonnull S3AsyncClient client) {
this.client = Objects.requireNonNull(client, "client");
}

@Nonnull
@Override
public CompletableFuture<Void> putObject(
@Nonnull String bucket, @Nonnull String key, @Nonnull byte[] data) {
CompletableFuture<PutObjectResponse> request =
client.putObject(
PutObjectRequest.builder().bucket(bucket).key(key).build(),
AsyncRequestBody.fromBytesUnsafe(data)); // avoids a defensive copy
return abortRequestOnCancel(request, request.thenApply(response -> (Void) null));
}

@Nonnull
@Override
public CompletableFuture<Boolean> objectExists(@Nonnull String bucket, @Nonnull String key) {
CompletableFuture<HeadObjectResponse> request =
client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build());
return abortRequestOnCancel(
request,
request.handle(
(response, ex) -> {
if (ex == null) {
return true;
}
Throwable cause =
(ex instanceof CompletionException && ex.getCause() != null) ? ex.getCause() : ex;
if (cause instanceof NoSuchKeyException) {
return false;
}
if (cause instanceof S3Exception && ((S3Exception) cause).statusCode() == 404) {
return false;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
}));
}

@Nonnull
@Override
public CompletableFuture<byte[]> getObject(@Nonnull String bucket, @Nonnull String key) {
CompletableFuture<ResponseBytes<GetObjectResponse>> request =
client.getObject(
GetObjectRequest.builder().bucket(bucket).key(key).build(),
AsyncResponseTransformer.toBytes());
return abortRequestOnCancel(request, request.thenApply(ResponseBytes::asByteArrayUnsafe));
}

/**
* Returns {@code result}, wired so that cancelling it cancels the underlying {@code request}. The
* AWS SDK aborts an async request when the future it returns is cancelled. Cancellation does not
* otherwise propagate across the {@code thenApply}/{@code handle} boundary.
*/
private static <T> CompletableFuture<T> abortRequestOnCancel(
CompletableFuture<?> request, CompletableFuture<T> result) {
result.whenComplete(
(value, ex) -> {
if (result.isCancelled()) {
request.cancel(true);
}
});
return result;
}

@Nonnull
@Override
public Map<String, String> describe() {
Region region = client.serviceClientConfiguration().region();
if (region == null) {
return Collections.emptyMap();
}
return Collections.singletonMap("client_region", region.id());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.temporal.payload.storage.s3driver.awssdkv2;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.concurrent.CompletableFuture;
import org.junit.Test;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

public class S3AsyncClientAdapterTest {

/**
* Cancelling the future the adapter returns must abort the underlying AWS request. The adapter
* wraps the AWS future with {@code thenApply}, which does not propagate cancellation upstream, so
* this verifies the explicit forwarding does its job.
*/
@Test
public void cancellingReturnedFutureAbortsTheUnderlyingRequest() {
S3AsyncClient s3 = mock(S3AsyncClient.class);
CompletableFuture<PutObjectResponse> awsRequest = new CompletableFuture<>();
when(s3.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)))
.thenReturn(awsRequest);

CompletableFuture<Void> result =
new S3AsyncClientAdapter(s3).putObject("bucket", "key", new byte[] {1, 2, 3});

assertFalse(awsRequest.isCancelled());
result.cancel(true);
assertTrue(
"cancelling the adapter's future should abort the AWS request", awsRequest.isCancelled());
}
}
13 changes: 13 additions & 0 deletions contrib/temporal-payload-storage-s3driver/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
description = '''Temporal Java SDK External Storage S3 Driver'''

dependencies {
// No cloud SDK dependency: the driver works against any S3StorageDriverClient implementation.
// The AWS SDK v2 client lives in the separate temporal-payload-storage-s3driver-awssdkv2 module.
compileOnly project(':temporal-serviceclient')
compileOnly project(':temporal-sdk')

testImplementation project(':temporal-serviceclient')
testImplementation project(':temporal-sdk')
testImplementation "junit:junit:${junitVersion}"
testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.temporal.payload.storage.s3driver;

import io.temporal.api.common.v1.Payload;
import io.temporal.common.Experimental;
import io.temporal.payload.storage.StorageDriverStoreContext;
import javax.annotation.Nonnull;

/**
* Resolves the target S3 bucket for a payload. Use {@link
* S3StorageDriver.Builder#setBucket(String)} for a fixed bucket, or supply a resolver via {@link
* S3StorageDriver.Builder#setBucketResolver(BucketResolver)} to choose a bucket per payload.
*/
@Experimental
@FunctionalInterface
public interface BucketResolver {
@Nonnull
String resolveBucket(@Nonnull StorageDriverStoreContext context, @Nonnull Payload payload);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.temporal.payload.storage.s3driver;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

final class CompletableFutures {
private CompletableFutures() {}

/**
* Completes with the results in input order once every future succeeds. Fails fast with the first
* failure's cause as soon as any future fails, without waiting for the rest.
*/
static <T> CompletableFuture<List<T>> allAsList(List<CompletableFuture<T>> futures) {
CompletableFuture<List<T>> result = new CompletableFuture<>();
if (futures.isEmpty()) {
result.complete(new ArrayList<>());
return result;
}
AtomicInteger remaining = new AtomicInteger(futures.size());
for (CompletableFuture<T> future : futures) {
future.whenComplete(
(value, ex) -> {
if (ex != null) {
result.completeExceptionally(unwrap(ex));
} else if (remaining.decrementAndGet() == 0) {
List<T> results = new ArrayList<>(futures.size());
for (CompletableFuture<T> completed : futures) {
results.add(completed.join());
}
result.complete(results);
}
});
}
result.whenComplete(
(value, ex) -> {
if (ex != null) {
for (CompletableFuture<T> future : futures) {
future.cancel(true);
}
}
});
return result;
}

static Throwable unwrap(Throwable t) {
while ((t instanceof CompletionException || t instanceof ExecutionException)
&& t.getCause() != null) {
t = t.getCause();
}
return t;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.temporal.payload.storage.s3driver;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

final class PayloadHasher {
private static final char[] HEX = "0123456789abcdef".toCharArray();

private PayloadHasher() {}

/** Returns the lower-case SHA-256 hex digest of {@code data}. */
static String sha256Hex(byte[] data) {
byte[] digest;
try {
// If we ever move to Java 17+ we can use HexFormat.of().formatHex() instead.
digest = MessageDigest.getInstance("SHA-256").digest(data);
} catch (NoSuchAlgorithmException e) {
throw new AssertionError("SHA-256 MessageDigest cannot be found", e);
}
StringBuilder sb = new StringBuilder(digest.length * 2);
for (byte b : digest) {
sb.append(HEX[(b >> 4) & 0xF]).append(HEX[b & 0xF]);
}
return sb.toString();
}
}
Loading
Loading