Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.junit.Test;

public class GrpcMessageTooLargeTest {
private static final String QUERY_ERROR_MESSAGE =
"Failed to send query response: RESOURCE_EXHAUSTED: grpc: received message larger than max";
// This string is kept intentionally short to match multiple possible too-large error messages
private static final String TOO_BIG_ERR_MESSAGE = "larger than max";
private static final String VERY_LARGE_DATA;

static {
Expand Down Expand Up @@ -120,7 +120,7 @@ public void queryResultTooLarge() {
assertNotNull(e.getCause());
// The exception will not contain the original failure object, so instead of type check we're
// checking the message to ensure the correct error is being sent.
assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE));
assertTrue(e.getCause().getMessage().contains(TOO_BIG_ERR_MESSAGE));
}

@Test
Expand All @@ -132,7 +132,7 @@ public void queryErrorTooLarge() {
WorkflowQueryException e = assertThrows(WorkflowQueryException.class, workflow::query);

assertNotNull(e.getCause());
assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE));
assertTrue(e.getCause().getMessage().contains(TOO_BIG_ERR_MESSAGE));
}

private static <T> T createWorkflowStub(Class<T> clazz, SDKTestWorkflowRule workflowRule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ private Channel applyHeadStandardInterceptors(Channel channel) {

return ClientInterceptors.intercept(
channel,
new GrpcCompressionInterceptor(options.getGrpcCompression()),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I would say GrpcCompressionInterceptor should only be added to the chain if compression is enabled, but it's not a big deal one way or the other.

MetadataUtils.newAttachHeadersInterceptor(headers),
new SystemInfoInterceptor(serverCapabilitiesFuture));
}
Expand Down Expand Up @@ -206,6 +207,8 @@ private ManagedChannel prepareChannel() {
builder.useTransportSecurity();
}

builder.decompressorRegistry(options.getGrpcCompression().getDecompressorRegistry());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compression, decompression and advertising should be 3 independent settings. There are valid scenarios to support decompression without enabling compression, or to support decompression without advertising it (for compatibility with servers that ignore Message-Accept-Encoding). Disabling compression should not impact the others.

Decompressor registry APIs are experimental in gRPC right now. I think we should not call them at all, and not expose the settings to control it. I could be convinced to allow disabling advertising as a separate boolean option marked experimental. But I don't think we should allow disabling decompression entirely until the relevant upstream APIs are stabilized.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK fair point about the API being experiemental. My preference then would be for the no-compression option to simply not set the compressor and otherwise not change anything, rather than having more knobs


// Disable built-in idleTimer until https://github.com/grpc/grpc-java/issues/8714 is resolved.
// jsdk force-idles channels often anyway, so this is not needed until we stop doing
// force-idling as a part of
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.temporal.serviceclient;

import io.grpc.Codec;
import io.grpc.DecompressorRegistry;
import javax.annotation.Nullable;

/** Selects transport-level gRPC compression for service calls. */
public enum GrpcCompression {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an enum locks us out of supporting custom compressors if gRPC ever decides to stabilize it. Instead, make it a regular class with private constructor and a single static member GZIP (no compression should be represented by null). This is functionally equivalent to current design, and we still don't expose custom compressors at this point, but it will make future changes easier.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we're ever going to need to support a custom compressor. After all, the server has to support it. Maybe we'll add support for zstd or snappy or something, but they can easily be added as variants to this enum (with options).

/** Do not compress requests or advertise support for compressed responses. */
NONE(null, DecompressorRegistry.emptyInstance().with(Codec.Identity.NONE, false)),

/** Gzip-compress outbound requests and accept gzip-compressed responses. */
GZIP("gzip", DecompressorRegistry.getDefaultInstance());

private final @Nullable String compressorName;
private final DecompressorRegistry decompressorRegistry;

GrpcCompression(@Nullable String compressorName, DecompressorRegistry decompressorRegistry) {
this.compressorName = compressorName;
this.decompressorRegistry = decompressorRegistry;
}

@Nullable
String getCompressorName() {
return compressorName;
}

DecompressorRegistry getDecompressorRegistry() {
return decompressorRegistry;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.temporal.serviceclient;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;

final class GrpcCompressionInterceptor implements ClientInterceptor {
private final GrpcCompression compression;

GrpcCompressionInterceptor(GrpcCompression compression) {
this.compression = compression;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return next.newCall(method, callOptions.withCompression(compression.getCompressorName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public class ServiceStubsOptions {

protected final Scope metricsScope;

/** Transport-level gRPC compression. */
protected final GrpcCompression grpcCompression;

ServiceStubsOptions(ServiceStubsOptions that) {
this.channel = that.channel;
this.target = that.target;
Expand All @@ -135,6 +138,7 @@ public class ServiceStubsOptions {
this.grpcMetadataProviders = that.grpcMetadataProviders;
this.grpcClientInterceptors = that.grpcClientInterceptors;
this.metricsScope = that.metricsScope;
this.grpcCompression = that.grpcCompression;
}

ServiceStubsOptions(
Expand All @@ -157,7 +161,8 @@ public class ServiceStubsOptions {
Metadata headers,
Collection<GrpcMetadataProvider> grpcMetadataProviders,
Collection<ClientInterceptor> grpcClientInterceptors,
Scope metricsScope) {
Scope metricsScope,
GrpcCompression grpcCompression) {
this.channel = channel;
this.target = target;
this.channelInitializer = channelInitializer;
Expand All @@ -178,6 +183,7 @@ public class ServiceStubsOptions {
this.grpcMetadataProviders = grpcMetadataProviders;
this.grpcClientInterceptors = grpcClientInterceptors;
this.metricsScope = metricsScope;
this.grpcCompression = grpcCompression;
}

/**
Expand Down Expand Up @@ -342,6 +348,15 @@ public Scope getMetricsScope() {
return metricsScope;
}

/**
* @return transport-level gRPC compression used for requests and response negotiation.
* @see Builder#setGrpcCompression(GrpcCompression)
*/
@Nonnull
public GrpcCompression getGrpcCompression() {
return grpcCompression;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -366,7 +381,8 @@ public boolean equals(Object o) {
&& Objects.equals(headers, that.headers)
&& Objects.equals(grpcMetadataProviders, that.grpcMetadataProviders)
&& Objects.equals(grpcClientInterceptors, that.grpcClientInterceptors)
&& Objects.equals(metricsScope, that.metricsScope);
&& Objects.equals(metricsScope, that.metricsScope)
&& grpcCompression == that.grpcCompression;
}

@Override
Expand All @@ -391,7 +407,8 @@ public int hashCode() {
headers,
grpcMetadataProviders,
grpcClientInterceptors,
metricsScope);
metricsScope,
grpcCompression);
}

@Override
Expand Down Expand Up @@ -436,6 +453,8 @@ public String toString() {
+ grpcClientInterceptors
+ ", metricsScope="
+ metricsScope
+ ", grpcCompression="
+ grpcCompression
+ '}';
}

Expand All @@ -460,6 +479,7 @@ public static class Builder<T extends Builder<T>> {
private Collection<ClientInterceptor> grpcClientInterceptors;
private Scope metricsScope;
private boolean apiKeyProvided;
private GrpcCompression grpcCompression = GrpcCompression.GZIP;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have one release cycle where compression is available but not enabled by default.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't done that with the other SDKs. This change is I think quite safe, and if we have it off by default we basically won't get any usage. Turning it off I think is sufficient for anyone who happens to have some kind of broken proxy setup that messes things up.


protected Builder() {}

Expand Down Expand Up @@ -491,6 +511,7 @@ protected Builder(ServiceStubsOptions options) {
? new ArrayList<>(options.grpcClientInterceptors)
: null;
this.metricsScope = options.metricsScope;
this.grpcCompression = options.grpcCompression;
}

/**
Expand Down Expand Up @@ -720,6 +741,22 @@ public T setMetricsScope(Scope metricsScope) {
return self();
}

/**
* Sets transport-level gRPC compression. Defaults to {@link GrpcCompression#GZIP}. Set to
* {@link GrpcCompression#NONE} to opt out.
*
* <p>For SDK-created channels, this controls both request compression and the {@code
* grpc-accept-encoding} response negotiation header. For user-supplied channels, the SDK still
* controls request compression, but response decompression negotiation is configured by the
* supplied channel.
*
* @return {@code this}
*/
public T setGrpcCompression(GrpcCompression grpcCompression) {
this.grpcCompression = Objects.requireNonNull(grpcCompression);
return self();
}

/**
* Set the time to wait between service responses on each health check.
*
Expand Down Expand Up @@ -853,7 +890,8 @@ public ServiceStubsOptions build() {
this.headers,
this.grpcMetadataProviders,
this.grpcClientInterceptors,
this.metricsScope);
this.metricsScope,
this.grpcCompression);
}

public ServiceStubsOptions validateAndBuildWithDefaults() {
Expand Down Expand Up @@ -916,7 +954,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() {
headers,
grpcMetadataProviders,
grpcClientInterceptors,
metricsScope);
metricsScope,
this.grpcCompression);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.temporal.serviceclient;

import static org.junit.Assert.*;

import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.temporal.api.workflowservice.v1.GetSystemInfoRequest;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Rule;
import org.junit.Test;

public class GrpcCompressionTest {
private static final Metadata.Key<String> GRPC_ENCODING =
Metadata.Key.of("grpc-encoding", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> GRPC_ACCEPT_ENCODING =
Metadata.Key.of("grpc-accept-encoding", Metadata.ASCII_STRING_MARSHALLER);

@Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();

@Test
public void gzipCompressionSendsAndAcceptsGzip() throws Exception {
Metadata headers = callGetSystemInfo(GrpcCompression.GZIP);

assertEquals("gzip", headers.get(GRPC_ENCODING));
assertTrue(headers.get(GRPC_ACCEPT_ENCODING).contains("gzip"));
}

@Test
public void noneCompressionDoesNotSendOrAcceptGzip() throws Exception {
Metadata headers = callGetSystemInfo(GrpcCompression.NONE);

assertNull(headers.get(GRPC_ENCODING));
assertNull(headers.get(GRPC_ACCEPT_ENCODING));
}

private Metadata callGetSystemInfo(GrpcCompression compression) throws Exception {
AtomicReference<Metadata> capturedHeaders = new AtomicReference<>();
ServerInterceptor captureHeadersInterceptor =
new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
capturedHeaders.set(headers);
return next.startCall(call, headers);
}
};
Server server =
grpcCleanupRule.register(
NettyServerBuilder.forPort(0)
.addService(
ServerInterceptors.intercept(
new TestWorkflowService(), captureHeadersInterceptor))
.build()
.start());

WorkflowServiceStubs serviceStubs =
WorkflowServiceStubs.newServiceStubs(
WorkflowServiceStubsOptions.newBuilder()
.setTarget("127.0.0.1:" + server.getPort())
.setEnableHttps(false)
.setGrpcCompression(compression)
.build());
try {
serviceStubs.blockingStub().getSystemInfo(GetSystemInfoRequest.getDefaultInstance());
} finally {
serviceStubs.shutdownNow();
}

assertNotNull(capturedHeaders.get());
return capturedHeaders.get();
}

private static final class TestWorkflowService
extends WorkflowServiceGrpc.WorkflowServiceImplBase {
@Override
public void getSystemInfo(
GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
responseObserver.onNext(GetSystemInfoResponse.getDefaultInstance());
responseObserver.onCompleted();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,30 @@ public void testSpringBootStyleAutoTLSWithApiKey() {
"TLS should be disabled when no API key and no explicit TLS setting",
options3.getEnableHttps());
}

@Test
public void testGrpcCompressionDefaultsToGzip() {
ServiceStubsOptions options =
WorkflowServiceStubsOptions.newBuilder()
.setTarget("localhost:7233")
.validateAndBuildWithDefaults();

assertEquals(GrpcCompression.GZIP, options.getGrpcCompression());
}

@Test
public void testGrpcCompressionNonePassesThroughBuilderCopy() {
ServiceStubsOptions options =
WorkflowServiceStubsOptions.newBuilder()
.setTarget("localhost:7233")
.setGrpcCompression(GrpcCompression.NONE)
.validateAndBuildWithDefaults();

assertEquals(GrpcCompression.NONE, options.getGrpcCompression());

ServiceStubsOptions copied =
WorkflowServiceStubsOptions.newBuilder(options).validateAndBuildWithDefaults();

assertEquals(GrpcCompression.NONE, copied.getGrpcCompression());
}
}
Loading