-
Notifications
You must be signed in to change notification settings - Fork 217
Add GZIP compression defaulting to on #2911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -161,6 +161,7 @@ private Channel applyHeadStandardInterceptors(Channel channel) { | |
|
|
||
| return ClientInterceptors.intercept( | ||
| channel, | ||
| new GrpcCompressionInterceptor(options.getGrpcCompression()), | ||
| MetadataUtils.newAttachHeadersInterceptor(headers), | ||
| new SystemInfoInterceptor(serverCapabilitiesFuture)); | ||
| } | ||
|
|
@@ -206,6 +207,8 @@ private ManagedChannel prepareChannel() { | |
| builder.useTransportSecurity(); | ||
| } | ||
|
|
||
| builder.decompressorRegistry(options.getGrpcCompression().getDecompressorRegistry()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Compression, decompression and advertising should be 3 independent settings. There are valid scenarios to support decompression without enabling compression, or to support decompression without advertising it (for compatibility with servers that ignore Message-Accept-Encoding). Disabling compression should not impact the others. Decompressor registry APIs are experimental in gRPC right now. I think we should not call them at all, and not expose the settings to control it. I could be convinced to allow disabling advertising as a separate boolean option marked experimental. But I don't think we should allow disabling decompression entirely until the relevant upstream APIs are stabilized.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, OK fair point about the API being experiemental. My preference then would be for the no-compression option to simply not set the compressor and otherwise not change anything, rather than having more knobs |
||
|
|
||
| // Disable built-in idleTimer until https://github.com/grpc/grpc-java/issues/8714 is resolved. | ||
| // jsdk force-idles channels often anyway, so this is not needed until we stop doing | ||
| // force-idling as a part of | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| package io.temporal.serviceclient; | ||
|
|
||
| import io.grpc.Codec; | ||
| import io.grpc.DecompressorRegistry; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Selects transport-level gRPC compression for service calls. */ | ||
| public enum GrpcCompression { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using an enum locks us out of supporting custom compressors if gRPC ever decides to stabilize it. Instead, make it a regular class with private constructor and a single static member
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we're ever going to need to support a custom compressor. After all, the server has to support it. Maybe we'll add support for zstd or snappy or something, but they can easily be added as variants to this enum (with options). |
||
| /** Do not compress requests or advertise support for compressed responses. */ | ||
| NONE(null, DecompressorRegistry.emptyInstance().with(Codec.Identity.NONE, false)), | ||
|
|
||
| /** Gzip-compress outbound requests and accept gzip-compressed responses. */ | ||
| GZIP("gzip", DecompressorRegistry.getDefaultInstance()); | ||
|
|
||
| private final @Nullable String compressorName; | ||
| private final DecompressorRegistry decompressorRegistry; | ||
|
|
||
| GrpcCompression(@Nullable String compressorName, DecompressorRegistry decompressorRegistry) { | ||
| this.compressorName = compressorName; | ||
| this.decompressorRegistry = decompressorRegistry; | ||
| } | ||
|
|
||
| @Nullable | ||
| String getCompressorName() { | ||
| return compressorName; | ||
| } | ||
|
|
||
| DecompressorRegistry getDecompressorRegistry() { | ||
| return decompressorRegistry; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| package io.temporal.serviceclient; | ||
|
|
||
| import io.grpc.CallOptions; | ||
| import io.grpc.Channel; | ||
| import io.grpc.ClientCall; | ||
| import io.grpc.ClientInterceptor; | ||
| import io.grpc.MethodDescriptor; | ||
|
|
||
| final class GrpcCompressionInterceptor implements ClientInterceptor { | ||
| private final GrpcCompression compression; | ||
|
|
||
| GrpcCompressionInterceptor(GrpcCompression compression) { | ||
| this.compression = compression; | ||
| } | ||
|
|
||
| @Override | ||
| public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( | ||
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { | ||
| return next.newCall(method, callOptions.withCompression(compression.getCompressorName())); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,6 +114,9 @@ public class ServiceStubsOptions { | |
|
|
||
| protected final Scope metricsScope; | ||
|
|
||
| /** Transport-level gRPC compression. */ | ||
| protected final GrpcCompression grpcCompression; | ||
|
|
||
| ServiceStubsOptions(ServiceStubsOptions that) { | ||
| this.channel = that.channel; | ||
| this.target = that.target; | ||
|
|
@@ -135,6 +138,7 @@ public class ServiceStubsOptions { | |
| this.grpcMetadataProviders = that.grpcMetadataProviders; | ||
| this.grpcClientInterceptors = that.grpcClientInterceptors; | ||
| this.metricsScope = that.metricsScope; | ||
| this.grpcCompression = that.grpcCompression; | ||
| } | ||
|
|
||
| ServiceStubsOptions( | ||
|
|
@@ -157,7 +161,8 @@ public class ServiceStubsOptions { | |
| Metadata headers, | ||
| Collection<GrpcMetadataProvider> grpcMetadataProviders, | ||
| Collection<ClientInterceptor> grpcClientInterceptors, | ||
| Scope metricsScope) { | ||
| Scope metricsScope, | ||
| GrpcCompression grpcCompression) { | ||
| this.channel = channel; | ||
| this.target = target; | ||
| this.channelInitializer = channelInitializer; | ||
|
|
@@ -178,6 +183,7 @@ public class ServiceStubsOptions { | |
| this.grpcMetadataProviders = grpcMetadataProviders; | ||
| this.grpcClientInterceptors = grpcClientInterceptors; | ||
| this.metricsScope = metricsScope; | ||
| this.grpcCompression = grpcCompression; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -342,6 +348,15 @@ public Scope getMetricsScope() { | |
| return metricsScope; | ||
| } | ||
|
|
||
| /** | ||
| * @return transport-level gRPC compression used for requests and response negotiation. | ||
| * @see Builder#setGrpcCompression(GrpcCompression) | ||
| */ | ||
| @Nonnull | ||
| public GrpcCompression getGrpcCompression() { | ||
| return grpcCompression; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) return true; | ||
|
|
@@ -366,7 +381,8 @@ public boolean equals(Object o) { | |
| && Objects.equals(headers, that.headers) | ||
| && Objects.equals(grpcMetadataProviders, that.grpcMetadataProviders) | ||
| && Objects.equals(grpcClientInterceptors, that.grpcClientInterceptors) | ||
| && Objects.equals(metricsScope, that.metricsScope); | ||
| && Objects.equals(metricsScope, that.metricsScope) | ||
| && grpcCompression == that.grpcCompression; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -391,7 +407,8 @@ public int hashCode() { | |
| headers, | ||
| grpcMetadataProviders, | ||
| grpcClientInterceptors, | ||
| metricsScope); | ||
| metricsScope, | ||
| grpcCompression); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -436,6 +453,8 @@ public String toString() { | |
| + grpcClientInterceptors | ||
| + ", metricsScope=" | ||
| + metricsScope | ||
| + ", grpcCompression=" | ||
| + grpcCompression | ||
| + '}'; | ||
| } | ||
|
|
||
|
|
@@ -460,6 +479,7 @@ public static class Builder<T extends Builder<T>> { | |
| private Collection<ClientInterceptor> grpcClientInterceptors; | ||
| private Scope metricsScope; | ||
| private boolean apiKeyProvided; | ||
| private GrpcCompression grpcCompression = GrpcCompression.GZIP; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have one release cycle where compression is available but not enabled by default.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haven't done that with the other SDKs. This change is I think quite safe, and if we have it off by default we basically won't get any usage. Turning it off I think is sufficient for anyone who happens to have some kind of broken proxy setup that messes things up. |
||
|
|
||
| protected Builder() {} | ||
|
|
||
|
|
@@ -491,6 +511,7 @@ protected Builder(ServiceStubsOptions options) { | |
| ? new ArrayList<>(options.grpcClientInterceptors) | ||
| : null; | ||
| this.metricsScope = options.metricsScope; | ||
| this.grpcCompression = options.grpcCompression; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -720,6 +741,22 @@ public T setMetricsScope(Scope metricsScope) { | |
| return self(); | ||
| } | ||
|
|
||
| /** | ||
| * Sets transport-level gRPC compression. Defaults to {@link GrpcCompression#GZIP}. Set to | ||
| * {@link GrpcCompression#NONE} to opt out. | ||
| * | ||
| * <p>For SDK-created channels, this controls both request compression and the {@code | ||
| * grpc-accept-encoding} response negotiation header. For user-supplied channels, the SDK still | ||
| * controls request compression, but response decompression negotiation is configured by the | ||
| * supplied channel. | ||
| * | ||
| * @return {@code this} | ||
| */ | ||
| public T setGrpcCompression(GrpcCompression grpcCompression) { | ||
| this.grpcCompression = Objects.requireNonNull(grpcCompression); | ||
| return self(); | ||
| } | ||
|
|
||
| /** | ||
| * Set the time to wait between service responses on each health check. | ||
| * | ||
|
|
@@ -853,7 +890,8 @@ public ServiceStubsOptions build() { | |
| this.headers, | ||
| this.grpcMetadataProviders, | ||
| this.grpcClientInterceptors, | ||
| this.metricsScope); | ||
| this.metricsScope, | ||
| this.grpcCompression); | ||
| } | ||
|
|
||
| public ServiceStubsOptions validateAndBuildWithDefaults() { | ||
|
|
@@ -916,7 +954,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() { | |
| headers, | ||
| grpcMetadataProviders, | ||
| grpcClientInterceptors, | ||
| metricsScope); | ||
| metricsScope, | ||
| this.grpcCompression); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| package io.temporal.serviceclient; | ||
|
|
||
| import static org.junit.Assert.*; | ||
|
|
||
| import io.grpc.Metadata; | ||
| import io.grpc.Server; | ||
| import io.grpc.ServerCall; | ||
| import io.grpc.ServerCallHandler; | ||
| import io.grpc.ServerInterceptor; | ||
| import io.grpc.ServerInterceptors; | ||
| import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; | ||
| import io.grpc.stub.StreamObserver; | ||
| import io.grpc.testing.GrpcCleanupRule; | ||
| import io.temporal.api.workflowservice.v1.GetSystemInfoRequest; | ||
| import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; | ||
| import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import org.junit.Rule; | ||
| import org.junit.Test; | ||
|
|
||
| public class GrpcCompressionTest { | ||
| private static final Metadata.Key<String> GRPC_ENCODING = | ||
| Metadata.Key.of("grpc-encoding", Metadata.ASCII_STRING_MARSHALLER); | ||
| private static final Metadata.Key<String> GRPC_ACCEPT_ENCODING = | ||
| Metadata.Key.of("grpc-accept-encoding", Metadata.ASCII_STRING_MARSHALLER); | ||
|
|
||
| @Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); | ||
|
|
||
| @Test | ||
| public void gzipCompressionSendsAndAcceptsGzip() throws Exception { | ||
| Metadata headers = callGetSystemInfo(GrpcCompression.GZIP); | ||
|
|
||
| assertEquals("gzip", headers.get(GRPC_ENCODING)); | ||
| assertTrue(headers.get(GRPC_ACCEPT_ENCODING).contains("gzip")); | ||
| } | ||
|
|
||
| @Test | ||
| public void noneCompressionDoesNotSendOrAcceptGzip() throws Exception { | ||
| Metadata headers = callGetSystemInfo(GrpcCompression.NONE); | ||
|
|
||
| assertNull(headers.get(GRPC_ENCODING)); | ||
| assertNull(headers.get(GRPC_ACCEPT_ENCODING)); | ||
| } | ||
|
|
||
| private Metadata callGetSystemInfo(GrpcCompression compression) throws Exception { | ||
| AtomicReference<Metadata> capturedHeaders = new AtomicReference<>(); | ||
| ServerInterceptor captureHeadersInterceptor = | ||
| new ServerInterceptor() { | ||
| @Override | ||
| public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( | ||
| ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { | ||
| capturedHeaders.set(headers); | ||
| return next.startCall(call, headers); | ||
| } | ||
| }; | ||
| Server server = | ||
| grpcCleanupRule.register( | ||
| NettyServerBuilder.forPort(0) | ||
| .addService( | ||
| ServerInterceptors.intercept( | ||
| new TestWorkflowService(), captureHeadersInterceptor)) | ||
| .build() | ||
| .start()); | ||
|
|
||
| WorkflowServiceStubs serviceStubs = | ||
| WorkflowServiceStubs.newServiceStubs( | ||
| WorkflowServiceStubsOptions.newBuilder() | ||
| .setTarget("127.0.0.1:" + server.getPort()) | ||
| .setEnableHttps(false) | ||
| .setGrpcCompression(compression) | ||
| .build()); | ||
| try { | ||
| serviceStubs.blockingStub().getSystemInfo(GetSystemInfoRequest.getDefaultInstance()); | ||
| } finally { | ||
| serviceStubs.shutdownNow(); | ||
| } | ||
|
|
||
| assertNotNull(capturedHeaders.get()); | ||
| return capturedHeaders.get(); | ||
| } | ||
|
|
||
| private static final class TestWorkflowService | ||
| extends WorkflowServiceGrpc.WorkflowServiceImplBase { | ||
| @Override | ||
| public void getSystemInfo( | ||
| GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) { | ||
| responseObserver.onNext(GetSystemInfoResponse.getDefaultInstance()); | ||
| responseObserver.onCompleted(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I would say
GrpcCompressionInterceptorshould only be added to the chain if compression is enabled, but it's not a big deal one way or the other.