Add extstore s3 driver#2907
Conversation
| - Missing values (including a missing `run-id`) are encoded as `null`. | ||
| - `hex-digest` is lower-case SHA-256 hex (64 characters). | ||
|
|
||
| ### Percent-encoding rules |
There was a problem hiding this comment.
These are encoding rules that all temporal s3 drivers should conform to. If we have a better place to put information like this I will move it, otherwise I'll put something similar in the READMEs of the other SDKs.
|
|
||
| /** Computes payload hashes shared by external storage drivers. */ | ||
| @Experimental | ||
| public final class PayloadHasher { |
There was a problem hiding this comment.
Any driver we add that creates content-addressable keys will need to hash the content somehow. I opted to put this in a shared place. Happy to rename or move if needed.
There was a problem hiding this comment.
I'd move this to the s3 package for now. As it is, it's publicly available to anyone using the SDK, and I think this should be an implementation detail.
|
The actual changes here are only a few hundred lines. The vast majority of the diff is tests, comments, and java ceremony. |
|
|
||
| /** Interface for S3 {@link S3StorageDriver} operations: upload, existence check, and download. */ | ||
| @Experimental | ||
| public interface S3Client { |
| @@ -0,0 +1,18 @@ | |||
| package io.temporal.payload.storage.s3; | |||
There was a problem hiding this comment.
package io.temporal.payload.storage.s3driver to avoid collisions with the AWS S3 SDK package naming?
| * be configured with credentials and a region by the caller. | ||
| */ | ||
| @Experimental | ||
| public final class S3AsyncClientAdapter implements S3Client { |
There was a problem hiding this comment.
This makes the io.temporal.payload.storage.s3 package have a hard dependency on the AWS S3 SDK libraries, which is different than how we did it for Go and Python, which use a separate module for the client implementation.
There was a problem hiding this comment.
I'll split out to a temporal.payload.storage.s3driver.awssdkv2
|
|
||
| /** Computes payload hashes shared by external storage drivers. */ | ||
| @Experimental | ||
| public final class PayloadHasher { |
There was a problem hiding this comment.
I'd move this to the s3 package for now. As it is, it's publicly available to anyone using the SDK, and I think this should be an implementation detail.
| */ | ||
| final class S3StorageKey { | ||
| private static final String KEY_VERSION = "v0"; | ||
| private static final String PATH_SEGMENT_UNRESERVED = "-_.~$&+:=@"; |
There was a problem hiding this comment.
Did you validate that these are acceptable characters in S3 object keys?
There was a problem hiding this comment.
Good callout. I followed Go's implementation for this but I did some reading and interestingly both Go and Python allow at least one character the S3 docs say to avoid (~) https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
I'm going to update the Java SDK to diverge from both the existing implementations, update the key encoding "spec" in the README, and then patch the other two SDKs to follow AWS's recommendations. That seems like the correct thing to do here.
| * larger payload fails the {@code store} call. | ||
| */ | ||
| public Builder setMaxPayloadSize(int maxPayloadSize) { | ||
| this.maxPayloadSize = maxPayloadSize; |
There was a problem hiding this comment.
range check here instead of in builder? not sure what's idiomatic in Java
There was a problem hiding this comment.
I'm not either. I looked it up and it seems like putting it in here is more idiomatic. Updating.
| * setting both before {@link #build()} is an error. | ||
| */ | ||
| public Builder setBucket(@Nonnull String bucket) { | ||
| this.staticBucket = Objects.requireNonNull(bucket, "bucket"); |
There was a problem hiding this comment.
Can you remove the staticBucket field and provide an annonymous implementation for the bucket resolver? Eliminates the field and the XOR check.
…d-storage-s3driver and split out awsskdv2 dep into new package. Address some minor PR comments.
| return withFailureContext(uploadRequest, "upload failed " + location); | ||
| }) | ||
| .thenApply(ignored -> claimFor(bucket, key, hexDigest)); | ||
| cancelRequestWhenCancelled(claimFuture, inFlightRequest); |
There was a problem hiding this comment.
What I had here previously did not properly cancel the upstream client request. Now cancellation should propagate correctly:
client.objectExists(bucket, key);creates a future that eagerly executes.- that is wrapped in a future that provides failure context via
withFailureContext thenComposechains theclient.putObjectrequestcancelRequestWhenCancelled(claimFuture, inFlightRequest);listens for a cancel onclaimFutureand then reaches upstream and cancels the current inFlightRequest.
If there is a more elegant way to do this in Java please let me know. An alternative solution is returning an opaque cancellation token/context but now you've got two things to cancel and that might be confusing.
What changed?
contrib/temporal-payload-storage-s3/src/main/java/io/temporal/payload/storage/s3/PayloadHasherfor drivers to consistently hash payload content.Why?
Checklist