-
Notifications
You must be signed in to change notification settings - Fork 567
[fs] Support AWS S3 credentials provider mode #3540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,9 @@ | |
| import org.apache.fluss.fs.token.CredentialsJsonSerde; | ||
| import org.apache.fluss.fs.token.ObtainedSecurityToken; | ||
|
|
||
| import com.amazonaws.auth.AWSCredentials; | ||
| import com.amazonaws.auth.AWSCredentialsProvider; | ||
| import com.amazonaws.auth.AWSSessionCredentials; | ||
| import com.amazonaws.auth.AWSStaticCredentialsProvider; | ||
| import com.amazonaws.auth.BasicAWSCredentials; | ||
| import com.amazonaws.client.builder.AwsClientBuilder; | ||
|
|
@@ -29,12 +32,16 @@ | |
| import com.amazonaws.services.securitytoken.model.AssumeRoleResult; | ||
| import com.amazonaws.services.securitytoken.model.Credentials; | ||
| import com.amazonaws.services.securitytoken.model.GetSessionTokenResult; | ||
| import org.apache.commons.lang3.StringUtils; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. commons-lang3 is only transitive here, shall we use org.apache.commons.lang3.StringUtils? |
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; | ||
| import org.apache.hadoop.fs.s3a.S3AUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Arrays; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
@@ -49,6 +56,9 @@ public class S3DelegationTokenProvider { | |
|
|
||
| private static final String ACCESS_KEY_ID = "fs.s3a.access.key"; | ||
| private static final String ACCESS_KEY_SECRET = "fs.s3a.secret.key"; | ||
| private static final String AWS_CREDENTIALS_PROVIDER = "fs.s3a.aws.credentials.provider"; | ||
| public static final String CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED = | ||
| "fluss.fs.s3.aws.credentials.provider.explicitly.configured"; | ||
|
|
||
| private static final String REGION_KEY = "fs.s3a.region"; | ||
| private static final String ENDPOINT_KEY = "fs.s3a.endpoint"; | ||
|
|
@@ -63,21 +73,31 @@ public class S3DelegationTokenProvider { | |
| @Nullable private final String secretKey; | ||
| @Nullable private final String roleArn; | ||
| @Nullable private final String stsEndpoint; | ||
| @Nullable private final AWSCredentialProviderList credentialProviderList; | ||
| private final Map<String, String> additionInfos; | ||
|
|
||
| public S3DelegationTokenProvider(String scheme, Configuration conf) { | ||
| public S3DelegationTokenProvider(String scheme, Configuration conf) throws IOException { | ||
| this.scheme = scheme; | ||
| this.region = conf.get(REGION_KEY); | ||
| checkArgument(region != null, "Region is not set."); | ||
| this.accessKey = conf.get(ACCESS_KEY_ID); | ||
| this.secretKey = conf.get(ACCESS_KEY_SECRET); | ||
| this.roleArn = conf.get(ROLE_ARN_KEY); | ||
| this.stsEndpoint = conf.get(STS_ENDPOINT_KEY); | ||
| boolean hasCredentialProvider = | ||
| conf.getBoolean(CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, false) | ||
| && StringUtils.isNotBlank(conf.getTrimmed(AWS_CREDENTIALS_PROVIDER)); | ||
|
|
||
| checkArgument( | ||
| (accessKey == null) == (secretKey == null), | ||
| "S3 access key and secret key must both be set or both be unset."); | ||
| if (accessKey == null) { | ||
| if (hasCredentialProvider && roleArn != null) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're adding all these checks in this PR - this one fails fast, but the session/empty-cred cases only fail on the first token request. Could we check those at construction too, so they're consistent? Not blocking though |
||
| throw new IllegalArgumentException( | ||
| "AssumeRole and a custom AWS credentials provider cannot be configured together."); | ||
| } | ||
| this.credentialProviderList = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The description says configuring DynamicTemporaryAWSCredentialsProvider for server mode is "rejected", but there's no guard, so it just gets instantiated here and throws NoAwsCredentialsException at the first token request. Can you clarify? |
||
| hasCredentialProvider ? S3AUtils.createAWSCredentialProviderSet(null, conf) : null; | ||
| if (accessKey == null && credentialProviderList == null) { | ||
| checkArgument( | ||
| roleArn != null, | ||
| "Role ARN must be set when static credentials are not provided."); | ||
|
|
@@ -106,8 +126,10 @@ public ObtainedSecurityToken obtainSecurityToken() { | |
| credentials = result.getCredentials(); | ||
| } else { | ||
| LOG.info( | ||
| "Obtaining session credentials via GetSessionToken with access key: {}", | ||
| S3TokenLogUtils.maskAccessKey(accessKey)); | ||
| "Obtaining session credentials via GetSessionToken{}.", | ||
| credentialProviderList != null | ||
| ? " with configured AWS credentials provider" | ||
| : " with access key: " + S3TokenLogUtils.maskAccessKey(accessKey)); | ||
| GetSessionTokenResult result = stsClient.getSessionToken(); | ||
| credentials = result.getCredentials(); | ||
| } | ||
|
|
@@ -131,10 +153,9 @@ private AWSSecurityTokenService buildStsClient() { | |
| AWSSecurityTokenServiceClientBuilder builder = | ||
| AWSSecurityTokenServiceClientBuilder.standard(); | ||
|
|
||
| if (accessKey != null && secretKey != null) { | ||
| builder.withCredentials( | ||
| new AWSStaticCredentialsProvider( | ||
| new BasicAWSCredentials(accessKey, secretKey))); | ||
| AWSCredentialsProvider stsCredentialsProvider = createStsCredentialsProvider(); | ||
| if (stsCredentialsProvider != null) { | ||
| builder.withCredentials(stsCredentialsProvider); | ||
| } | ||
|
|
||
| if (stsEndpoint != null) { | ||
|
|
@@ -147,6 +168,33 @@ private AWSSecurityTokenService buildStsClient() { | |
| return builder.build(); | ||
| } | ||
|
|
||
| @Nullable | ||
| AWSCredentialsProvider createStsCredentialsProvider() { | ||
| if (credentialProviderList != null) { | ||
| AWSCredentials credentials = credentialProviderList.getCredentials(); | ||
| checkArgument( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is effectively the "long-term creds only" gate, and it's the thing people will trip on instance profiles/IRSA roles all return session creds and land here at token time (lazily). Given s3.md already has an IRSA section, can we add a short note there for this new mode: long-term creds only, not compatible with AssumeRole? Otherwise it's a bit hidden for operational usage |
||
| !(credentials instanceof AWSSessionCredentials), | ||
| "Session credentials from the configured AWS credentials provider are not supported " | ||
| + "for Fluss S3 client-token generation."); | ||
| checkArgument( | ||
| credentials.getAWSAccessKeyId() != null | ||
| && credentials.getAWSSecretKey() != null, | ||
| "The configured AWS credentials provider must return an access key and secret key."); | ||
| LOG.info( | ||
| "Using configured AWS credentials provider for STS GetSessionToken with access key: {}", | ||
| S3TokenLogUtils.maskAccessKey(credentials.getAWSAccessKeyId())); | ||
| return new AWSStaticCredentialsProvider( | ||
| new BasicAWSCredentials( | ||
| credentials.getAWSAccessKeyId(), credentials.getAWSSecretKey())); | ||
| } | ||
|
|
||
| if (accessKey != null && secretKey != null) { | ||
| return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
|
|
||
| private byte[] toJson(Credentials credentials) { | ||
| org.apache.fluss.fs.token.Credentials flussCredentials = | ||
| new org.apache.fluss.fs.token.Credentials( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto