diff --git a/api/src/main/java/io/grpc/ChannelConfigurator.java b/api/src/main/java/io/grpc/ChannelConfigurator.java
new file mode 100644
index 00000000000..35f521ac0d9
--- /dev/null
+++ b/api/src/main/java/io/grpc/ChannelConfigurator.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2026 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc;
+
+
+
+/**
+ * A configurator for child channels created by gRPC's internal infrastructure.
+ *
+ *
This interface allows users to inject configuration (such as credentials, interceptors,
+ * or flow control settings) into channels created automatically by gRPC for control plane
+ * operations. Common use cases include:
+ *
+ * xDS control plane connections
+ * Load Balancing helper channels (OOB channels)
+ *
+ *
+ * Usage Example:
+ *
{@code
+ * // 1. Define the configurator
+ * ChannelConfigurator configurator = builder -> {
+ * builder.maxInboundMessageSize(4 * 1024 * 1024);
+ * };
+ *
+ * // 2. Apply to parent channel - automatically used for ALL child channels
+ * ManagedChannel channel = ManagedChannelBuilder
+ * .forTarget("xds:///my-service")
+ * .childChannelConfigurator(configurator)
+ * .build();
+ * }
+ *
+ * Implementations must be thread-safe as the configure methods may be invoked concurrently
+ * by multiple internal components.
+ *
+ * @since 1.83.0
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/12574")
+@FunctionalInterface
+public interface ChannelConfigurator {
+
+ /**
+ * Configures a builder for a new child channel.
+ *
+ *
This method is invoked synchronously during the creation of the child channel,
+ * before {@link ManagedChannelBuilder#build()} is called.
+ *
+ *
Note: Implementations must only apply configurations to the
+ * provided builder and must NOT call {@code builder.build()} themselves.
+ *
+ *
Note: The provided {@code builder} is generic ({@code ?}). Implementations
+ * should use universal configuration methods (like {@code intercept()}, {@code userAgent()})
+ * on the builder rather than casting it to specific implementation types.
+ *
+ * @param builder the mutable channel builder for the new child channel
+ */
+ void configureChannelBuilder(ManagedChannelBuilder> builder);
+}
diff --git a/api/src/main/java/io/grpc/ForwardingChannelBuilder.java b/api/src/main/java/io/grpc/ForwardingChannelBuilder.java
index 1202582421a..d340ff8ef88 100644
--- a/api/src/main/java/io/grpc/ForwardingChannelBuilder.java
+++ b/api/src/main/java/io/grpc/ForwardingChannelBuilder.java
@@ -242,6 +242,13 @@ public T disableServiceConfigLookUp() {
return thisT();
}
+
+ @Override
+ public T childChannelConfigurator(ChannelConfigurator channelConfigurator) {
+ delegate().childChannelConfigurator(channelConfigurator);
+ return thisT();
+ }
+
/**
* Returns the correctly typed version of the builder.
*/
diff --git a/api/src/main/java/io/grpc/ForwardingChannelBuilder2.java b/api/src/main/java/io/grpc/ForwardingChannelBuilder2.java
index 78fe730d91a..4e67748da51 100644
--- a/api/src/main/java/io/grpc/ForwardingChannelBuilder2.java
+++ b/api/src/main/java/io/grpc/ForwardingChannelBuilder2.java
@@ -269,6 +269,13 @@ public T setNameResolverArg(NameResolver.Args.Key key, X value) {
return thisT();
}
+
+ @Override
+ public T childChannelConfigurator(ChannelConfigurator channelConfigurator) {
+ delegate().childChannelConfigurator(channelConfigurator);
+ return thisT();
+ }
+
/**
* Returns the {@link ManagedChannel} built by the delegate by default. Overriding method can
* return different value.
diff --git a/api/src/main/java/io/grpc/ManagedChannelBuilder.java b/api/src/main/java/io/grpc/ManagedChannelBuilder.java
index 3f370ab3003..943c8ba4dd7 100644
--- a/api/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/api/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -661,6 +661,23 @@ public T setNameResolverArg(NameResolver.Args.Key key, X value) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Sets a configurator that will be applied to all internal child channels created by this
+ * channel.
+ *
+ * This allows injecting universal configuration (like interceptors)
+ * into auxiliary channels created by gRPC infrastructure, such as xDS control plane connections.
+ *
+ * @param channelConfigurator the configurator to apply.
+ * @return this
+ * @since 1.83.0
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/12574")
+ public T childChannelConfigurator(ChannelConfigurator channelConfigurator) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
/**
* Builds a channel using the given parameters.
*
diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java
index 80bc338d86b..f7a2d627404 100644
--- a/api/src/main/java/io/grpc/NameResolver.java
+++ b/api/src/main/java/io/grpc/NameResolver.java
@@ -358,6 +358,7 @@ public static final class Args {
private final MetricRecorder metricRecorder;
@Nullable private final NameResolverRegistry nameResolverRegistry;
@Nullable private final IdentityHashMap, Object> customArgs;
+ private final ChannelConfigurator channelConfigurator;
private Args(Builder builder) {
this.defaultPort = checkNotNull(builder.defaultPort, "defaultPort not set");
@@ -373,6 +374,7 @@ private Args(Builder builder) {
: new MetricRecorder() {};
this.nameResolverRegistry = builder.nameResolverRegistry;
this.customArgs = cloneCustomArgs(builder.customArgs);
+ this.channelConfigurator = builder.channelConfigurator;
}
/**
@@ -471,6 +473,16 @@ public ChannelLogger getChannelLogger() {
return channelLogger;
}
+ /**
+ * Returns the configurator for child channels.
+ *
+ * @since 1.83.0
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/12574")
+ public ChannelConfigurator getChildChannelConfigurator() {
+ return channelConfigurator;
+ }
+
/**
* Returns the Executor on which this resolver should execute long-running or I/O bound work.
* Null if no Executor was set.
@@ -549,6 +561,7 @@ public Builder toBuilder() {
builder.setOverrideAuthority(overrideAuthority);
builder.setMetricRecorder(metricRecorder);
builder.setNameResolverRegistry(nameResolverRegistry);
+ builder.setChildChannelConfigurator(channelConfigurator);
builder.customArgs = cloneCustomArgs(customArgs);
return builder;
}
@@ -579,6 +592,7 @@ public static final class Builder {
private MetricRecorder metricRecorder;
private NameResolverRegistry nameResolverRegistry;
private IdentityHashMap, Object> customArgs;
+ private ChannelConfigurator channelConfigurator = builder -> { };
Builder() {
}
@@ -694,6 +708,17 @@ public Builder setNameResolverRegistry(NameResolverRegistry registry) {
return this;
}
+ /**
+ * See {@link Args#getChildChannelConfigurator()}. This is an optional field.
+ *
+ * @since 1.83.0
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/12574")
+ public Builder setChildChannelConfigurator(ChannelConfigurator channelConfigurator) {
+ this.channelConfigurator = checkNotNull(channelConfigurator, "channelConfigurator");
+ return this;
+ }
+
/**
* Builds an {@link Args}.
*
diff --git a/api/src/test/java/io/grpc/NameResolverTest.java b/api/src/test/java/io/grpc/NameResolverTest.java
index 82abe5c7505..e3c05274ce4 100644
--- a/api/src/test/java/io/grpc/NameResolverTest.java
+++ b/api/src/test/java/io/grpc/NameResolverTest.java
@@ -22,6 +22,7 @@
import static org.mockito.Mockito.verify;
import com.google.common.base.Objects;
+import io.grpc.ChannelConfigurator;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NameResolver.Listener2;
import io.grpc.NameResolver.ResolutionResult;
@@ -72,6 +73,8 @@ public class NameResolverTest {
private final int customArgValue = 42;
@Mock NameResolver.Listener mockListener;
+ private final ChannelConfigurator channelConfigurator = builder -> { };
+
@Test
public void args() {
NameResolver.Args args = createArgs();
@@ -84,6 +87,7 @@ public void args() {
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);
+ assertThat(args.getChildChannelConfigurator()).isSameInstanceAs(channelConfigurator);
assertThat(args.getArg(FOO_ARG_KEY)).isEqualTo(customArgValue);
assertThat(args.getArg(BAR_ARG_KEY)).isNull();
@@ -97,6 +101,7 @@ public void args() {
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args2.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);
+ assertThat(args2.getChildChannelConfigurator()).isSameInstanceAs(channelConfigurator);
assertThat(args.getArg(FOO_ARG_KEY)).isEqualTo(customArgValue);
assertThat(args.getArg(BAR_ARG_KEY)).isNull();
@@ -115,10 +120,47 @@ private NameResolver.Args createArgs() {
.setOffloadExecutor(executor)
.setOverrideAuthority(overrideAuthority)
.setMetricRecorder(metricRecorder)
+ .setChildChannelConfigurator(channelConfigurator)
.setArg(FOO_ARG_KEY, customArgValue)
.build();
}
+ @Test
+ public void args_childChannelConfigurator() {
+ final ManagedChannelBuilder>[] capturedBuilder = new ManagedChannelBuilder>[1];
+ ChannelConfigurator channelConfigurator = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ capturedBuilder[0] = builder;
+ }
+ };
+
+ SynchronizationContext realSyncContext = new SynchronizationContext(
+ new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ throw new AssertionError(e);
+ }
+ });
+
+ NameResolver.Args args = NameResolver.Args.newBuilder()
+ .setDefaultPort(8080)
+ .setProxyDetector(mock(ProxyDetector.class))
+ .setSynchronizationContext(realSyncContext)
+ .setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
+ .setChannelLogger(mock(ChannelLogger.class))
+ .setChildChannelConfigurator(channelConfigurator)
+ .build();
+
+ ChannelConfigurator configurator = args.getChildChannelConfigurator();
+ assertThat(configurator).isSameInstanceAs(channelConfigurator);
+
+ // Validate configurator accepts builders
+ ManagedChannelBuilder> mockBuilder = mock(ManagedChannelBuilder.class);
+ configurator.configureChannelBuilder(mockBuilder);
+ assertThat(capturedBuilder[0]).isSameInstanceAs(mockBuilder);
+ }
+
@Test
@SuppressWarnings("deprecation")
public void startOnOldListener_wrapperListener2UsedToStart() {
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index e423220e3ad..71afcb7e0c1 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -37,6 +37,7 @@
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
+import io.grpc.ChannelConfigurator;
import io.grpc.ChannelCredentials;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
@@ -155,6 +156,14 @@ public Result selectConfig(PickSubchannelArgs args) {
private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
new LoadBalancer.PickDetailsConsumer() {};
+ /**
+ * Retrieves the user-provided configuration function for internal child channels.
+ *
+ * This is intended for use by gRPC internal components
+ * that are responsible for creating auxiliary {@code ManagedChannel} instances.
+ */
+ private final ChannelConfigurator channelConfigurator;
+
private final InternalLogId logId;
private final String target;
@Nullable
@@ -545,6 +554,8 @@ ClientStream newSubstream(
Supplier stopwatchSupplier,
List interceptors,
final TimeProvider timeProvider) {
+ this.channelConfigurator = checkNotNull(builder.channelConfigurator,
+ "channelConfigurator");
this.target = checkNotNull(builder.target, "target");
this.logId = InternalLogId.allocate("Channel", target);
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
@@ -589,7 +600,8 @@ ClientStream newSubstream(
.setOffloadExecutor(this.offloadExecutorHolder)
.setOverrideAuthority(this.authorityOverride)
.setMetricRecorder(this.metricRecorder)
- .setNameResolverRegistry(builder.nameResolverRegistry);
+ .setNameResolverRegistry(builder.nameResolverRegistry)
+ .setChildChannelConfigurator(this.channelConfigurator);
builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
this.nameResolverArgs = nameResolverArgsBuilder.build();
this.nameResolver = getNameResolver(
@@ -1486,6 +1498,10 @@ protected ManagedChannelBuilder> delegate() {
ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
+ // Note that we follow the global configurator pattern and try to fuse the configurations as
+ // soon as the builder gets created
+ channelConfigurator.configureChannelBuilder(builder);
+
return builder
// TODO(zdapeng): executors should not outlive the parent channel.
.executor(executor)
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
index 128c929ec0e..f0ec3c2ec09 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
@@ -29,6 +29,7 @@
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
+import io.grpc.ChannelConfigurator;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
@@ -149,6 +150,8 @@ public static ManagedChannelBuilder> forTarget(String target) {
}
+ ChannelConfigurator channelConfigurator = builder -> { };
+
ObjectPool extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
ObjectPool extends Executor> offloadExecutorPool = DEFAULT_EXECUTOR_POOL;
@@ -717,6 +720,14 @@ protected ManagedChannelImplBuilder addMetricSink(MetricSink metricSink) {
return this;
}
+ @Override
+ public ManagedChannelImplBuilder childChannelConfigurator(
+ ChannelConfigurator channelConfigurator) {
+ this.channelConfigurator = checkNotNull(channelConfigurator,
+ "childChannelConfigurator");
+ return this;
+ }
+
@Override
public ManagedChannel build() {
ClientTransportFactory clientTransportFactory =
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplBuilderTest.java
index b0939239477..b475e4c2cac 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplBuilderTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplBuilderTest.java
@@ -34,6 +34,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.CallOptions;
import io.grpc.Channel;
+import io.grpc.ChannelConfigurator;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
@@ -42,6 +43,7 @@
import io.grpc.InternalConfigurator;
import io.grpc.InternalConfiguratorRegistry;
import io.grpc.InternalFeatureFlags;
+import io.grpc.InternalManagedChannelBuilder;
import io.grpc.InternalManagedChannelBuilder.InternalInterceptorFactory;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -49,7 +51,9 @@
import io.grpc.MetricSink;
import io.grpc.NameResolver;
import io.grpc.NameResolverRegistry;
+import io.grpc.NoopMetricSink;
import io.grpc.StaticTestingClassLoader;
+import io.grpc.Uri;
import io.grpc.internal.ManagedChannelImplBuilder.ChannelBuilderDefaultPortProvider;
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
@@ -780,6 +784,113 @@ public void setNameResolverExtArgs() {
assertThat(builder.nameResolverCustomArgs.get(testKey)).isEqualTo(42);
}
+ @Test
+ public void childChannelConfigurator_setsField() {
+ ChannelConfigurator configurator = builder -> { };
+ assertSame(builder, builder.childChannelConfigurator(configurator));
+ assertSame(configurator, builder.channelConfigurator);
+ }
+
+ @Test
+ public void childChannelConfigurator_propagatesMetricsAndInterceptors_xdsTarget() {
+ // Setup Mocks
+ when(mockClientTransportFactory.getScheduledExecutorService())
+ .thenReturn(clock.getScheduledExecutorService());
+ when(mockClientTransportFactoryBuilder.buildClientTransportFactory())
+ .thenReturn(mockClientTransportFactory);
+ when(mockClientTransportFactory.getSupportedSocketAddressTypes())
+ .thenReturn(Collections.singleton(InetSocketAddress.class));
+
+ MetricSink mockMetricSink = new NoopMetricSink();
+ ClientInterceptor mockInterceptor = new ClientInterceptor() {
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor method, CallOptions callOptions, Channel next) {
+ return next.newCall(method, callOptions);
+ }
+ };
+
+ // Define the Configurator
+ ChannelConfigurator configurator = builder -> {
+ InternalManagedChannelBuilder.addMetricSink(builder, mockMetricSink);
+
+ InternalManagedChannelBuilder.interceptWithTarget(builder, target -> mockInterceptor);
+ };
+
+ // Use NameResolver.Factory to capture Args
+ final NameResolver.Args[] capturedArgs = new NameResolver.Args[1];
+ final boolean[] newNameResolverCalled = new boolean[1];
+
+ NameResolver realNameResolver = new NameResolver() {
+ @Override
+ public String getServiceAuthority() {
+ return "foo.authority";
+ }
+
+ @Override
+ public void start(Listener2 listener) {}
+
+ @Override
+ public void shutdown() {}
+ };
+
+ NameResolver.Factory realNameResolverFactory = new NameResolver.Factory() {
+ @Override
+ public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
+ newNameResolverCalled[0] = true;
+ capturedArgs[0] = args;
+ return realNameResolver;
+ }
+
+ @Override
+ public NameResolver newNameResolver(Uri targetUri, NameResolver.Args args) {
+ newNameResolverCalled[0] = true;
+ capturedArgs[0] = args;
+ return realNameResolver;
+ }
+
+ @Override
+ public String getDefaultScheme() {
+ return "xds";
+ }
+ };
+
+ // Use the configurator and the custom factory
+ NameResolverRegistry registry = new NameResolverRegistry();
+ registry.register(new NameResolverFactoryToProviderFacade(realNameResolverFactory));
+
+ ManagedChannelBuilder> parentBuilder = new ManagedChannelImplBuilder(
+ "xds:///my-service-target",
+ mockClientTransportFactoryBuilder,
+ new FixedPortProvider(DUMMY_PORT))
+ .childChannelConfigurator(configurator)
+ .nameResolverRegistry(registry);
+
+ ManagedChannel channel = parentBuilder.build();
+ grpcCleanupRule.register(channel);
+
+ // Verify that newNameResolver was called
+ assertThat(newNameResolverCalled[0]).isTrue();
+
+ // Extract the childChannelConfigurator from Args
+ NameResolver.Args args = capturedArgs[0];
+ ChannelConfigurator channelConfiguratorInArgs = args.getChildChannelConfigurator();
+ assertNotNull("Child channel configurator should be present in NameResolver.Args",
+ channelConfiguratorInArgs);
+
+ // Verify the configurator is the one we passed
+ assertThat(channelConfiguratorInArgs).isSameInstanceAs(configurator);
+
+ // Verify the configurator logically applies (by running it on a real builder)
+ ManagedChannelImplBuilder childBuilder = new ManagedChannelImplBuilder(
+ "xds:///child-service-target",
+ mockClientTransportFactoryBuilder,
+ new FixedPortProvider(DUMMY_PORT));
+
+ configurator.configureChannelBuilder(childBuilder);
+ assertThat(childBuilder.metricSinks).contains(mockMetricSink);
+ }
+
@Test
public void metricSinks() {
MetricSink mocksink = mock(MetricSink.class);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index ae224af27e1..4c57a2ded77 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -68,6 +68,7 @@
import io.grpc.CallCredentials.RequestInfo;
import io.grpc.CallOptions;
import io.grpc.Channel;
+import io.grpc.ChannelConfigurator;
import io.grpc.ChannelCredentials;
import io.grpc.ChannelLogger;
import io.grpc.ClientCall;
@@ -496,6 +497,40 @@ public void immediateDeadlineExceeded() {
assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
}
+ @Test
+ public void childChannelConfigurator_passedToNameResolverArgs() {
+ ChannelConfigurator configurator = builder -> { };
+ channelBuilder.childChannelConfigurator(configurator);
+ AtomicReference actualArgs = new AtomicReference<>();
+ channelBuilder.nameResolverRegistry.register(new NameResolverProvider() {
+ @Override
+ public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
+ actualArgs.set(args);
+ NameResolver resolver = mock(NameResolver.class);
+ when(resolver.getServiceAuthority()).thenReturn("test.example.com");
+ return resolver;
+ }
+
+ @Override
+ public String getDefaultScheme() {
+ return expectedUri.getScheme();
+ }
+
+ @Override
+ protected boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ protected int priority() {
+ return 10;
+ }
+ });
+ createChannel();
+ assertNotNull(actualArgs.get());
+ assertSame(configurator, actualArgs.get().getChildChannelConfigurator());
+ }
+
@Test
public void startCallBeforeNameResolution() throws Exception {
FakeNameResolverFactory nameResolverFactory =
diff --git a/googleapis/src/main/java/io/grpc/googleapis/GoogleCloudToProdNameResolver.java b/googleapis/src/main/java/io/grpc/googleapis/GoogleCloudToProdNameResolver.java
index 10ba586ab47..c45beeb3116 100644
--- a/googleapis/src/main/java/io/grpc/googleapis/GoogleCloudToProdNameResolver.java
+++ b/googleapis/src/main/java/io/grpc/googleapis/GoogleCloudToProdNameResolver.java
@@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import com.google.errorprone.annotations.concurrent.GuardedBy;
+import io.grpc.ChannelConfigurator;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver;
import io.grpc.NameResolverRegistry;
@@ -108,6 +109,7 @@ private static synchronized BootstrapInfo getBootstrapInfo(boolean isForcedXds)
private final Resource executorResource;
private final String target;
private final MetricRecorder metricRecorder;
+ private final ChannelConfigurator channelConfigurator;
private final NameResolver delegate;
private final boolean usingExecutorResource;
private final boolean forceXds;
@@ -160,6 +162,7 @@ private static synchronized BootstrapInfo getBootstrapInfo(boolean isForcedXds)
}
target = targetUri.toString();
metricRecorder = args.getMetricRecorder();
+ channelConfigurator = args.getChildChannelConfigurator();
delegate = checkNotNull(nameResolverFactory, "nameResolverFactory").newNameResolver(
targetUri, args);
executor = args.getOffloadExecutor();
@@ -211,6 +214,7 @@ private static synchronized BootstrapInfo getBootstrapInfo(boolean isForcedXds)
targetUri = modifiedTargetBuilder.build();
target = targetUri.toString();
metricRecorder = args.getMetricRecorder();
+ channelConfigurator = args.getChildChannelConfigurator();
delegate =
checkNotNull(nameResolverFactory, "nameResolverFactory").newNameResolver(targetUri, args);
executor = args.getOffloadExecutor();
@@ -278,7 +282,7 @@ public void run() {
public void run() {
if (!shutdown && finalBootstrapInfo != null) {
xdsClientPool = InternalSharedXdsClientPoolProvider.getOrCreate(
- target, finalBootstrapInfo, metricRecorder, null);
+ target, finalBootstrapInfo, metricRecorder, null, channelConfigurator);
xdsClient = xdsClientPool.getObject();
delegate.start(listener);
succeeded = true;
diff --git a/googleapis/src/test/java/io/grpc/googleapis/GoogleCloudToProdNameResolverTest.java b/googleapis/src/test/java/io/grpc/googleapis/GoogleCloudToProdNameResolverTest.java
index bbd3ba3ef05..1d4c7a64ce0 100644
--- a/googleapis/src/test/java/io/grpc/googleapis/GoogleCloudToProdNameResolverTest.java
+++ b/googleapis/src/test/java/io/grpc/googleapis/GoogleCloudToProdNameResolverTest.java
@@ -22,6 +22,7 @@
import static org.mockito.Mockito.when;
import com.google.common.collect.Iterables;
+import io.grpc.ChannelConfigurator;
import io.grpc.ChannelLogger;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver;
@@ -102,6 +103,7 @@ public void close(Executor instance) {}
private final Map delegatedResolver = new HashMap<>();
private final Map delegatedUri = new HashMap<>();
private final Map delegatedRfcUri = new HashMap<>();
+ private final Map delegatedArgs = new HashMap<>();
@Mock
private NameResolver.Listener2 mockListener;
@@ -236,6 +238,22 @@ public void notOnGcpButForceXds_KeyValueTrue_DelegateToXds() {
}
+ @Test
+ public void childChannelConfigurator_passedToDelegatedResolver() {
+ GoogleCloudToProdNameResolver.isOnGcp = false;
+ ChannelConfigurator configurator = builder -> { };
+ Args customArgs = args.toBuilder().setChildChannelConfigurator(configurator).build();
+ resolver = enableRfc3986UrisParam
+ ? new GoogleCloudToProdNameResolver(
+ Uri.create(TARGET_URI), customArgs, fakeExecutorResource, nsRegistry.asFactory())
+ : new GoogleCloudToProdNameResolver(
+ URI.create(TARGET_URI), customArgs, fakeExecutorResource, nsRegistry.asFactory());
+ resolver.start(mockListener);
+ assertThat(delegatedArgs.keySet()).containsExactly("dns");
+ assertThat(delegatedArgs.get("dns").getChildChannelConfigurator())
+ .isSameInstanceAs(configurator);
+ }
+
@Test
public void notOnGcpButForceXds_WithMultipleParams_DelegateToXds() {
GoogleCloudToProdNameResolver.isOnGcp = false;
@@ -338,6 +356,7 @@ private FakeNsProvider(String scheme) {
public NameResolver newNameResolver(URI targetUri, Args args) {
if (scheme.equals(targetUri.getScheme())) {
delegatedUri.put(scheme, targetUri);
+ delegatedArgs.put(scheme, args);
NameResolver resolver = mock(NameResolver.class);
delegatedResolver.put(scheme, resolver);
return resolver;
@@ -349,6 +368,7 @@ public NameResolver newNameResolver(URI targetUri, Args args) {
public NameResolver newNameResolver(Uri targetUri, Args args) {
if (scheme.equals(targetUri.getScheme())) {
delegatedRfcUri.put(scheme, targetUri);
+ delegatedArgs.put(scheme, args);
NameResolver resolver = mock(NameResolver.class);
delegatedResolver.put(scheme, resolver);
return resolver;
diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java
index f0bd6f93098..77eadf9ebbb 100644
--- a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java
+++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java
@@ -25,7 +25,9 @@
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingChannelBuilder2;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
@@ -168,6 +170,36 @@ public void disableAllMetrics() {
assertThat(module.getEnableMetrics()).isEmpty();
}
- // TODO(dnvindhya): Add tests for configurator
+ @Test
+ public void configureChannelBuilder_registersMetricSink() {
+ GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder().build();
+ TestChannelBuilder testBuilder = new TestChannelBuilder();
+ grpcOpenTelemetry.configureChannelBuilder(testBuilder);
+ assertThat(testBuilder.metricSink).isSameInstanceAs(grpcOpenTelemetry.getSink());
+ assertThat(testBuilder.interceptorFactory).isNotNull();
+ }
+ private static class TestChannelBuilder extends ForwardingChannelBuilder2 {
+ Object interceptorFactory;
+ MetricSink metricSink;
+
+ @Override
+ protected ManagedChannelBuilder> delegate() {
+ return null;
+ }
+
+ @Override
+ protected TestChannelBuilder interceptWithTarget(InterceptorFactory factory) {
+ this.interceptorFactory = factory;
+ return this;
+ }
+
+ @Override
+ public TestChannelBuilder addMetricSink(MetricSink metricSink) {
+ this.metricSink = metricSink;
+ return this;
+ }
+ }
+
+ // TODO(dnvindhya): Add tests for configurator
}
diff --git a/xds/build.gradle b/xds/build.gradle
index 8036f8691ec..c6325f7fc2d 100644
--- a/xds/build.gradle
+++ b/xds/build.gradle
@@ -66,7 +66,10 @@ dependencies {
testImplementation project(':grpc-api')
testImplementation project(':grpc-rls')
+ testImplementation project(':grpc-opentelemetry')
testImplementation project(':grpc-inprocess')
+ testImplementation libraries.opentelemetry.api
+ testImplementation libraries.opentelemetry.sdk.testing
testImplementation libraries.cel.compiler
testImplementation testFixtures(project(':grpc-core')),
testFixtures(project(':grpc-api')),
diff --git a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java
index 5100537aea2..3c146794537 100644
--- a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java
+++ b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java
@@ -21,11 +21,13 @@
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
+import io.grpc.ChannelConfigurator;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@@ -52,6 +54,8 @@
final class GrpcXdsTransportFactory implements XdsTransportFactory {
private final CallCredentials callCredentials;
+ private final ChannelConfigurator channelConfigurator;
+
// The map of xDS server info to its corresponding gRPC xDS transport.
// This enables reusing and sharing the same underlying gRPC channel.
//
@@ -61,8 +65,10 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory {
private static final Map xdsServerInfoToTransportMap =
new ConcurrentHashMap<>();
- GrpcXdsTransportFactory(CallCredentials callCredentials) {
+ GrpcXdsTransportFactory(CallCredentials callCredentials,
+ ChannelConfigurator channelConfigurator) {
this.callCredentials = callCredentials;
+ this.channelConfigurator = channelConfigurator;
}
@Override
@@ -71,7 +77,7 @@ public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
serverInfo,
(info, transport) -> {
if (transport == null) {
- transport = new GrpcXdsTransport(serverInfo, callCredentials);
+ transport = new GrpcXdsTransport(serverInfo, callCredentials, channelConfigurator);
}
++transport.refCount;
return transport;
@@ -93,7 +99,7 @@ static class GrpcXdsTransport implements XdsTransport {
private int refCount = 0;
public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) {
- this(serverInfo, null);
+ this(serverInfo, null, null);
}
@VisibleForTesting
@@ -102,11 +108,20 @@ public GrpcXdsTransport(ManagedChannel channel) {
}
public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials callCredentials) {
+ this(serverInfo, callCredentials, null);
+ }
+
+ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo,
+ CallCredentials callCredentials,
+ ChannelConfigurator channelConfigurator) {
String target = serverInfo.target();
ChannelCredentials channelCredentials = (ChannelCredentials) serverInfo.implSpecificConfig();
- this.channel = Grpc.newChannelBuilder(target, channelCredentials)
- .keepAliveTime(5, TimeUnit.MINUTES)
- .build();
+ ManagedChannelBuilder> channelBuilder = Grpc.newChannelBuilder(target, channelCredentials)
+ .keepAliveTime(5, TimeUnit.MINUTES);
+ if (channelConfigurator != null) {
+ channelConfigurator.configureChannelBuilder(channelBuilder);
+ }
+ this.channel = channelBuilder.build();
this.callCredentials = callCredentials;
this.serverInfo = serverInfo;
}
@@ -170,6 +185,7 @@ public XdsStreamingCall(
.setType(MethodDescriptor.MethodType.BIDI_STREAMING)
.setRequestMarshaller(reqMarshaller)
.setResponseMarshaller(respMarshaller)
+ .setSampledToLocalTracing(true)
.build(),
CallOptions.DEFAULT.withCallCredentials(
callCredentials)); // TODO(zivy): support waitForReady
diff --git a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java
index cc5ff128274..472b7fd7fa9 100644
--- a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java
+++ b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java
@@ -17,6 +17,7 @@
package io.grpc.xds;
import io.grpc.CallCredentials;
+import io.grpc.ChannelConfigurator;
import io.grpc.Internal;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool;
@@ -85,8 +86,15 @@ public static ObjectPool getOrCreate(
public static XdsClientResult getOrCreate(
String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder,
CallCredentials transportCallCredentials) {
+ return getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials, null);
+ }
+
+ public static XdsClientResult getOrCreate(
+ String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder,
+ CallCredentials transportCallCredentials, ChannelConfigurator channelConfigurator) {
return new XdsClientResult(SharedXdsClientPoolProvider.getDefaultProvider()
- .getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials));
+ .getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials,
+ channelConfigurator));
}
/**
diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
index 45c379244af..24500d6a7f6 100644
--- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
+++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.CallCredentials;
+import io.grpc.ChannelConfigurator;
import io.grpc.MetricRecorder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
@@ -57,6 +58,10 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
@Nullable
private final Bootstrapper bootstrapper;
private final Object lock = new Object();
+ /*
+ The first one wins.
+ Anything with the same target string uses the client created for the first one.
+ */
private final Map> targetToXdsClientMap = new ConcurrentHashMap<>();
SharedXdsClientPoolProvider() {
@@ -88,20 +93,28 @@ public ObjectPool getOrCreate(
} else {
bootstrapInfo = GrpcBootstrapperImpl.defaultBootstrap();
}
- return getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials);
+ return getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials, null);
}
@Override
public ObjectPool getOrCreate(
String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder) {
- return getOrCreate(target, bootstrapInfo, metricRecorder, null);
+ return getOrCreate(target, bootstrapInfo, metricRecorder, null, null);
+ }
+
+ @Override
+ public ObjectPool getOrCreate(
+ String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder,
+ ChannelConfigurator channelConfigurator) {
+ return getOrCreate(target, bootstrapInfo, metricRecorder, null, channelConfigurator);
}
public ObjectPool getOrCreate(
String target,
BootstrapInfo bootstrapInfo,
MetricRecorder metricRecorder,
- CallCredentials transportCallCredentials) {
+ CallCredentials transportCallCredentials,
+ ChannelConfigurator channelConfigurator) {
ObjectPool ref = targetToXdsClientMap.get(target);
if (ref == null) {
synchronized (lock) {
@@ -109,7 +122,8 @@ public ObjectPool getOrCreate(
if (ref == null) {
ref =
new RefCountedXdsClientObjectPool(
- bootstrapInfo, target, metricRecorder, transportCallCredentials);
+ bootstrapInfo, target, metricRecorder, transportCallCredentials,
+ channelConfigurator);
targetToXdsClientMap.put(target, ref);
}
}
@@ -134,6 +148,7 @@ class RefCountedXdsClientObjectPool implements ObjectPool {
private final String target; // The target associated with the xDS client.
private final MetricRecorder metricRecorder;
private final CallCredentials transportCallCredentials;
+ private final ChannelConfigurator channelConfigurator;
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
@@ -147,7 +162,7 @@ class RefCountedXdsClientObjectPool implements ObjectPool {
@VisibleForTesting
RefCountedXdsClientObjectPool(
BootstrapInfo bootstrapInfo, String target, MetricRecorder metricRecorder) {
- this(bootstrapInfo, target, metricRecorder, null);
+ this(bootstrapInfo, target, metricRecorder, null, null);
}
@VisibleForTesting
@@ -155,11 +170,13 @@ class RefCountedXdsClientObjectPool implements ObjectPool {
BootstrapInfo bootstrapInfo,
String target,
MetricRecorder metricRecorder,
- CallCredentials transportCallCredentials) {
+ CallCredentials transportCallCredentials,
+ ChannelConfigurator channelConfigurator) {
this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo");
this.target = target;
this.metricRecorder = checkNotNull(metricRecorder, "metricRecorder");
this.transportCallCredentials = transportCallCredentials;
+ this.channelConfigurator = channelConfigurator;
}
@Override
@@ -172,7 +189,7 @@ public XdsClient getObject() {
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
metricReporter = new XdsClientMetricReporterImpl(metricRecorder, target);
GrpcXdsTransportFactory xdsTransportFactory =
- new GrpcXdsTransportFactory(transportCallCredentials);
+ new GrpcXdsTransportFactory(transportCallCredentials, channelConfigurator);
xdsClient =
new XdsClientImpl(
xdsTransportFactory,
diff --git a/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java b/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java
index 6df8d566a7a..cdd198474bb 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java
@@ -16,6 +16,7 @@
package io.grpc.xds;
+import io.grpc.ChannelConfigurator;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
@@ -30,5 +31,9 @@ interface XdsClientPoolFactory {
ObjectPool getOrCreate(
String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder);
+ ObjectPool getOrCreate(
+ String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder,
+ ChannelConfigurator channelConfigurator);
+
List getTargets();
}
diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
index 69b0b824433..f4accf3869d 100644
--- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
+++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
@@ -31,6 +31,7 @@
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
+import io.grpc.ChannelConfigurator;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
@@ -186,7 +187,8 @@ final class XdsNameResolver extends NameResolver {
} else {
checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.xdsClientPool = new BootstrappingXdsClientPool(
- xdsClientPoolFactory, target, bootstrapOverride, metricRecorder);
+ xdsClientPoolFactory, target, bootstrapOverride, metricRecorder,
+ nameResolverArgs.getChildChannelConfigurator());
}
this.random = checkNotNull(random, "random");
this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
@@ -1060,16 +1062,19 @@ private static final class BootstrappingXdsClientPool implements XdsClientPool {
private final @Nullable Map bootstrapOverride;
private final MetricRecorder metricRecorder;
private ObjectPool xdsClientPool;
+ private final ChannelConfigurator channelConfigurator;
BootstrappingXdsClientPool(
XdsClientPoolFactory xdsClientPoolFactory,
String target,
@Nullable Map bootstrapOverride,
- MetricRecorder metricRecorder) {
+ MetricRecorder metricRecorder,
+ ChannelConfigurator channelConfigurator) {
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.target = checkNotNull(target, "target");
this.bootstrapOverride = bootstrapOverride;
- this.metricRecorder = checkNotNull(metricRecorder, "metricRecorder");
+ this.metricRecorder = metricRecorder;
+ this.channelConfigurator = checkNotNull(channelConfigurator, "channelConfigurator");
}
@Override
@@ -1082,7 +1087,8 @@ public XdsClient getObject() throws XdsInitializationException {
bootstrapInfo = new GrpcBootstrapperImpl().bootstrap(bootstrapOverride);
}
this.xdsClientPool =
- xdsClientPoolFactory.getOrCreate(target, bootstrapInfo, metricRecorder);
+ xdsClientPoolFactory.getOrCreate(
+ target, bootstrapInfo, metricRecorder, channelConfigurator);
}
return xdsClientPool.getObject();
}
diff --git a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java
index 4a4fb71aa84..1c0eb3cd024 100644
--- a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java
+++ b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java
@@ -25,6 +25,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.DoNotCall;
import io.grpc.Attributes;
+import io.grpc.ChannelConfigurator;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerBuilder;
import io.grpc.Internal;
@@ -58,6 +59,8 @@ public final class XdsServerBuilder extends ForwardingServerBuilder bootstrapOverride;
private long drainGraceTime = 10;
private TimeUnit drainGraceTimeUnit = TimeUnit.MINUTES;
+ private ChannelConfigurator channelConfigurator = builder -> { };
+
private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) {
this.delegate = nettyDelegate;
@@ -100,6 +103,20 @@ public XdsServerBuilder drainGraceTime(long drainGraceTime, TimeUnit drainGraceT
return this;
}
+ /**
+ * Sets the configurator that will be stored in the server built by this builder.
+ *
+ * This configurator will subsequently be used to configure any child channels
+ * created by that server.
+ *
+ * @param channelConfigurator the configurator to store in the channel.
+ * @return this
+ */
+ public XdsServerBuilder childChannelConfigurator(ChannelConfigurator channelConfigurator) {
+ this.channelConfigurator = checkNotNull(channelConfigurator, "channelConfigurator");
+ return this;
+ }
+
@DoNotCall("Unsupported. Use forPort(int, ServerCredentials) instead")
public static ServerBuilder> forPort(int port) {
throw new UnsupportedOperationException(
@@ -128,7 +145,8 @@ public Server build() {
}
InternalNettyServerBuilder.eagAttributes(delegate, builder.build());
return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener,
- filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry);
+ filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry,
+ this.channelConfigurator);
}
@VisibleForTesting
diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
index 5529f96c7a2..daf226236f5 100644
--- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
+++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
@@ -29,6 +29,7 @@
import com.google.common.util.concurrent.SettableFuture;
import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol;
import io.grpc.Attributes;
+import io.grpc.ChannelConfigurator;
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@@ -128,6 +129,8 @@ public void uncaughtException(Thread t, Throwable e) {
// NamedFilterConfig.filterStateKey -> filter_instance.
private final HashMap activeFiltersDefaultChain = new HashMap<>();
+ private final ChannelConfigurator channelConfigurator;
+
XdsServerWrapper(
String listenerAddress,
ServerBuilder> delegateBuilder,
@@ -135,7 +138,8 @@ public void uncaughtException(Thread t, Throwable e) {
FilterChainSelectorManager filterChainSelectorManager,
XdsClientPoolFactory xdsClientPoolFactory,
@Nullable Map bootstrapOverride,
- FilterRegistry filterRegistry) {
+ FilterRegistry filterRegistry,
+ ChannelConfigurator channelConfigurator) {
this(
listenerAddress,
delegateBuilder,
@@ -144,10 +148,30 @@ public void uncaughtException(Thread t, Throwable e) {
xdsClientPoolFactory,
bootstrapOverride,
filterRegistry,
- SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
+ SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE),
+ channelConfigurator);
sharedTimeService = true;
}
+ XdsServerWrapper(
+ String listenerAddress,
+ ServerBuilder> delegateBuilder,
+ XdsServingStatusListener listener,
+ FilterChainSelectorManager filterChainSelectorManager,
+ XdsClientPoolFactory xdsClientPoolFactory,
+ @Nullable Map bootstrapOverride,
+ FilterRegistry filterRegistry) {
+ this(
+ listenerAddress,
+ delegateBuilder,
+ listener,
+ filterChainSelectorManager,
+ xdsClientPoolFactory,
+ bootstrapOverride,
+ filterRegistry,
+ builder -> { });
+ }
+
@VisibleForTesting
XdsServerWrapper(
String listenerAddress,
@@ -158,6 +182,29 @@ public void uncaughtException(Thread t, Throwable e) {
@Nullable Map bootstrapOverride,
FilterRegistry filterRegistry,
ScheduledExecutorService timeService) {
+ this(
+ listenerAddress,
+ delegateBuilder,
+ listener,
+ filterChainSelectorManager,
+ xdsClientPoolFactory,
+ bootstrapOverride,
+ filterRegistry,
+ timeService,
+ builder -> { });
+ }
+
+ @VisibleForTesting
+ XdsServerWrapper(
+ String listenerAddress,
+ ServerBuilder> delegateBuilder,
+ XdsServingStatusListener listener,
+ FilterChainSelectorManager filterChainSelectorManager,
+ XdsClientPoolFactory xdsClientPoolFactory,
+ @Nullable Map bootstrapOverride,
+ FilterRegistry filterRegistry,
+ ScheduledExecutorService timeService,
+ ChannelConfigurator channelConfigurator) {
this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
@@ -169,6 +216,7 @@ public void uncaughtException(Thread t, Throwable e) {
this.timeService = checkNotNull(timeService, "timeService");
this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
this.delegate = delegateBuilder.build();
+ this.channelConfigurator = checkNotNull(channelConfigurator, "channelConfigurator");
}
@Override
@@ -202,7 +250,8 @@ private void internalStart() {
bootstrapInfo = new GrpcBootstrapperImpl().bootstrap(bootstrapOverride);
}
xdsClientPool = xdsClientPoolFactory.getOrCreate(
- "#server", bootstrapInfo, new MetricRecorder() {});
+ "#server", bootstrapInfo, new MetricRecorder() {},
+ channelConfigurator);
} catch (Exception e) {
StatusException statusException = Status.UNAVAILABLE.withDescription(
"Failed to initialize xDS").withCause(e).asException();
diff --git a/xds/src/main/java/io/grpc/xds/client/XdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/client/XdsTransportFactory.java
index ec700bd6dc9..f8e7c07507f 100644
--- a/xds/src/main/java/io/grpc/xds/client/XdsTransportFactory.java
+++ b/xds/src/main/java/io/grpc/xds/client/XdsTransportFactory.java
@@ -31,6 +31,12 @@ public interface XdsTransportFactory {
* Represents transport for xDS communication (e.g., a gRPC channel).
*/
interface XdsTransport {
+ /**
+ * Creates a bidirectional streaming call.
+ *
+ * @param fullMethodName should be a constant/literal string so that metric recorders
+ * like OpenTelemetry can record the exact method name.
+ */
StreamingCall createStreamingCall(
String fullMethodName, MethodDescriptor.Marshaller reqMarshaller,
MethodDescriptor.Marshaller respMarshaller);
diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java
index e8bd7461736..98853804421 100644
--- a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java
+++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java
@@ -37,6 +37,7 @@
import io.envoyproxy.envoy.service.status.v3.ClientStatusRequest;
import io.envoyproxy.envoy.service.status.v3.ClientStatusResponse;
import io.envoyproxy.envoy.type.matcher.v3.NodeMatcher;
+import io.grpc.ChannelConfigurator;
import io.grpc.Deadline;
import io.grpc.InsecureChannelCredentials;
import io.grpc.MetricRecorder;
@@ -517,5 +518,12 @@ public ObjectPool getOrCreate(
String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder) {
throw new UnsupportedOperationException("Should not be called");
}
+
+ @Override
+ public ObjectPool getOrCreate(
+ String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder,
+ ChannelConfigurator channelConfigurator) {
+ throw new UnsupportedOperationException("Should not be called");
+ }
}
}
diff --git a/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java
index a273c6f3ebf..d6361915940 100644
--- a/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java
+++ b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java
@@ -47,22 +47,34 @@
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
import io.grpc.CallOptions;
import io.grpc.Channel;
+import io.grpc.ChannelConfigurator;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.FlagResetRule;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
+import io.grpc.Grpc;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.InsecureServerCredentials;
import io.grpc.InternalFeatureFlags;
+import io.grpc.InternalManagedChannelBuilder;
import io.grpc.LoadBalancerRegistry;
+import io.grpc.LongCounterMetricInstrument;
import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
+import io.grpc.NoopMetricSink;
+import io.grpc.Server;
import io.grpc.testing.protobuf.SimpleRequest;
import io.grpc.testing.protobuf.SimpleResponse;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -360,4 +372,77 @@ public void pingPong_logicalDns_authorityOverride() {
System.clearProperty("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
}
}
+
+ @Test
+ public void childChannelConfigurator_passesMetricSinkToChannel_E2E() throws Exception {
+ CountingMetricSink sink = new CountingMetricSink();
+ ChannelConfigurator configurator = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ InternalManagedChannelBuilder.addMetricSink(builder, sink);
+ }
+ };
+
+ ManagedChannel channel = Grpc.newChannelBuilder("test-xds:///test-server",
+ InsecureChannelCredentials.create())
+ .childChannelConfigurator(configurator)
+ .build();
+
+ try {
+ SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
+ channel);
+ blockingStub.unaryRpc(SimpleRequest.getDefaultInstance());
+
+ // The xDS client inside the channel configurator will have created an ADS stream.
+ // The metric sink should have received attempt or connection metrics.
+ sink.awaitCall();
+ } finally {
+ channel.shutdownNow();
+ }
+ }
+
+ @Test
+ public void childChannelConfigurator_passesMetricSinkToServer_E2E() throws Exception {
+ CountingMetricSink sink = new CountingMetricSink();
+ ChannelConfigurator configurator = builder -> {
+ // Child channels (xDS client connections) created by this server get the sink.
+ InternalManagedChannelBuilder.addMetricSink(builder, sink);
+ };
+
+ // We start an XdsServer manually.
+ // XdsServer needs RDS, LDS, etc. from control plane.
+ XdsServerBuilder serverBuilder = XdsServerBuilder.forPort(
+ 0, InsecureServerCredentials.create())
+ .addService(new SimpleServiceGrpc.SimpleServiceImplBase() {})
+ .overrideBootstrapForTest(controlPlane.defaultBootstrapOverride())
+ .childChannelConfigurator(configurator);
+
+ Server childServer = serverBuilder.build().start();
+
+ try {
+ // The server xDS client will connect to control plane to get LDS.
+ sink.awaitCall();
+ } finally {
+ childServer.shutdownNow();
+ }
+ }
+
+ private static final class CountingMetricSink extends NoopMetricSink {
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void addLongCounter(
+ LongCounterMetricInstrument metricInstrument,
+ long value,
+ List requiredLabelValues,
+ List optionalLabelValues) {
+ latch.countDown();
+ }
+
+ public void awaitCall() throws InterruptedException {
+ if (!latch.await(5, TimeUnit.SECONDS)) {
+ throw new AssertionError("Timed out waiting for metric sink call");
+ }
+ }
+ }
}
diff --git a/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsOtelIntegrationTest.java b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsOtelIntegrationTest.java
new file mode 100644
index 00000000000..3f079881a45
--- /dev/null
+++ b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsOtelIntegrationTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2026 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import io.grpc.ChannelConfigurator;
+import io.grpc.FlagResetRule;
+import io.grpc.Grpc;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.InternalFeatureFlags;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.opentelemetry.GrpcOpenTelemetry;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.testing.protobuf.SimpleRequest;
+import io.grpc.testing.protobuf.SimpleServiceGrpc;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * xDS + OpenTelemetry E2E integration test using a fake control plane.
+ * This class is skipped from Bazel builds because Bazel doesn't compile the
+ * grpc-opentelemetry module.
+ */
+@RunWith(Parameterized.class)
+public class FakeControlPlaneXdsOtelIntegrationTest {
+
+ @Rule(order = 0)
+ public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
+ @Rule(order = 1)
+ public final ControlPlaneRule controlPlane = new ControlPlaneRule();
+ @Rule(order = 2)
+ public final DataPlaneRule dataPlane = new DataPlaneRule(controlPlane);
+ @Rule(order = 3)
+ public final FlagResetRule flagResetRule = new FlagResetRule();
+
+ @Parameters(name = "enableRfc3986UrisParam={0}")
+ public static Iterable data() {
+ return Arrays.asList(new Object[][] {{true}, {false}});
+ }
+
+ @Parameter public boolean enableRfc3986UrisParam;
+
+ @Test
+ public void testInMemoryMetricReader() {
+ InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+ SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+ .registerMetricReader(metricReader)
+ .build();
+ io.opentelemetry.api.metrics.LongCounter counter = meterProvider
+ .meterBuilder("test-scope")
+ .build()
+ .counterBuilder("test-counter")
+ .build();
+ counter.add(10);
+ assertThat(metricReader.collectAllMetrics()).isNotEmpty();
+ }
+
+ @Before
+ public void setupRfc3986UrisFeatureFlag() throws Exception {
+ flagResetRule.setFlagForTest(
+ InternalFeatureFlags::setRfc3986UrisEnabled, enableRfc3986UrisParam);
+ }
+
+ @Test
+ public void childChannelConfigurator_passesOtelSdkToChannel_E2E() throws Exception {
+ InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+ SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+ .registerMetricReader(metricReader)
+ .build();
+ OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
+ .setMeterProvider(meterProvider)
+ .build();
+ GrpcOpenTelemetry grpcOtel = GrpcOpenTelemetry.newBuilder()
+ .sdk(openTelemetry)
+ .build();
+
+ ChannelConfigurator configurator = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ grpcOtel.configureChannelBuilder(builder);
+ }
+ };
+
+ ManagedChannel channel = grpcCleanupRule.register(
+ Grpc.newChannelBuilder("test-xds:///test-server",
+ InsecureChannelCredentials.create())
+ .childChannelConfigurator(configurator)
+ .build());
+
+ SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub =
+ SimpleServiceGrpc.newBlockingStub(channel);
+ blockingStub.unaryRpc(SimpleRequest.getDefaultInstance());
+
+ // Shut down the channel to complete the RPC cycle.
+ channel.shutdownNow();
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+
+ // Verify that OpenTelemetry metrics specifically from the xDS Control Plane ADS stream
+ // successfully propagated, with the exact stream method name preserved.
+ boolean foundTargetMethod = false;
+ for (MetricData metric : metricReader.collectAllMetrics()) {
+ String name = metric.getName();
+ if ("grpc.client.attempt.started".equals(name) || "grpc.client.call.duration".equals(name)) {
+ if (metric.toString().contains("envoy.service.discovery.v3.AggregatedDiscoveryService")) {
+ foundTargetMethod = true;
+ break;
+ }
+ }
+ }
+ assertThat(foundTargetMethod).isTrue();
+ }
+}
diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
index 4918c2af7a4..60bb9ab8da2 100644
--- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
+++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
@@ -5113,7 +5113,7 @@ public void serverFailureMetricReport_forRetryAndBackoff() {
private XdsClientImpl createXdsClient(String serverUri) {
BootstrapInfo bootstrapInfo = buildBootStrap(serverUri);
return new XdsClientImpl(
- new GrpcXdsTransportFactory(null),
+ new GrpcXdsTransportFactory(null, null),
bootstrapInfo,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java
index 9c606a962f6..48595504d55 100644
--- a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java
+++ b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java
@@ -17,20 +17,30 @@
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import com.google.common.util.concurrent.SettableFuture;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.BindableService;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ChannelConfigurator;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
+import io.grpc.NoopClientCall;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.client.Bootstrapper;
import io.grpc.xds.client.XdsTransportFactory;
import java.util.concurrent.BlockingQueue;
@@ -96,7 +106,7 @@ public void onCompleted() {
@Test
public void callApis() throws Exception {
XdsTransportFactory.XdsTransport xdsTransport =
- new GrpcXdsTransportFactory(null)
+ new GrpcXdsTransportFactory(null, null)
.create(
Bootstrapper.ServerInfo.create(
"localhost:" + server.getPort(), InsecureChannelCredentials.create()));
@@ -127,7 +137,7 @@ public void refCountedXdsTransport_sameXdsServerAddress_returnsExistingTransport
Bootstrapper.ServerInfo xdsServerInfo =
Bootstrapper.ServerInfo.create(
"localhost:" + server.getPort(), InsecureChannelCredentials.create());
- GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null);
+ GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null, null);
// Calling create() for the first time creates a new GrpcXdsTransport instance.
// The ref count was previously 0 and now is 1.
XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo);
@@ -159,7 +169,7 @@ public void refCountedXdsTransport_differentXdsServerAddress_returnsDifferentTra
Bootstrapper.ServerInfo xdsServerInfo2 =
Bootstrapper.ServerInfo.create(
"localhost:" + server2.getPort(), InsecureChannelCredentials.create());
- GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null);
+ GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null, null);
// Calling create() to the first xDS server creates a new GrpcXdsTransport instance.
// The ref count was previously 0 and now is 1.
XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo1);
@@ -196,5 +206,139 @@ public void onStatusReceived(Status status) {
endFuture.set(status);
}
}
+
+ @Test
+ public void verifyConfigApplied_interceptor() {
+ final boolean[] interceptorCalled = new boolean[1];
+ final ClientInterceptor interceptor = new ClientInterceptor() {
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor method,
+ CallOptions callOptions,
+ Channel next) {
+ interceptorCalled[0] = true;
+ return new NoopClientCall<>();
+ }
+ };
+
+ // Create Configurer that adds the interceptor
+ ChannelConfigurator configurer = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ builder.intercept(interceptor);
+ }
+ };
+
+ // Create Factory
+ GrpcXdsTransportFactory factory = new GrpcXdsTransportFactory(
+ null,
+ configurer);
+
+ // Create Transport
+ XdsTransportFactory.XdsTransport transport = factory.create(
+ Bootstrapper.ServerInfo.create("localhost:8080", InsecureChannelCredentials.create()));
+
+ // Create a Call to trigger interceptors
+ MethodDescriptor method = MethodDescriptor.newBuilder()
+ .setType(MethodDescriptor.MethodType.UNARY)
+ .setFullMethodName("service/method")
+ .setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
+ .setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
+ .build();
+
+ transport.createStreamingCall(method.getFullMethodName(), method.getRequestMarshaller(),
+ method.getResponseMarshaller());
+
+ // Verify interceptor was invoked
+ assertThat(interceptorCalled[0]).isTrue();
+
+ transport.shutdown();
+ }
+
+ @Test
+ public void useChannelConfigurator() {
+ final boolean[] called = new boolean[1];
+ ChannelConfigurator configurer = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ called[0] = true;
+ }
+ };
+
+ // Create Factory
+ GrpcXdsTransportFactory factory = new GrpcXdsTransportFactory(
+ null, // CallCredentials
+ configurer);
+
+ // Create Transport (triggers channel creation)
+ XdsTransportFactory.XdsTransport transport = factory.create(
+ Bootstrapper.ServerInfo.create("localhost:8080", InsecureChannelCredentials.create()));
+
+ // Verify Configurer was accessed and applied
+ assertThat(called[0]).isTrue();
+
+ transport.shutdown();
+ }
+
+ @Test
+ public void useChannelConfigurator_throwsException_propagates() {
+ final RuntimeException testException = new RuntimeException("test exception");
+ ChannelConfigurator configurer = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ throw testException;
+ }
+ };
+
+ GrpcXdsTransportFactory factory = new GrpcXdsTransportFactory(null, configurer);
+
+ try {
+ factory.create(
+ Bootstrapper.ServerInfo.create("localhost:8080", InsecureChannelCredentials.create()));
+ org.junit.Assert.fail("Expected RuntimeException");
+ } catch (RuntimeException e) {
+ assertThat(e).isSameInstanceAs(testException);
+ }
+ }
+
+ @Test
+ public void verifyConfigApplied_maxInboundMessageSize() {
+ // Create a mock Builder
+ ManagedChannelBuilder> mockBuilder = mock(ManagedChannelBuilder.class);
+
+ // Create Configurer that modifies message size
+ ChannelConfigurator configurer = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ builder.maxInboundMessageSize(1024);
+ }
+ };
+
+ // Apply configurer to builder
+ configurer.configureChannelBuilder(mockBuilder);
+
+ // Verify builder was modified
+ verify(mockBuilder).maxInboundMessageSize(1024);
+ }
+
+ @Test
+ public void verifyConfigApplied_interceptors() {
+ ClientInterceptor interceptor1 = mock(ClientInterceptor.class);
+ ClientInterceptor interceptor2 = mock(ClientInterceptor.class);
+
+ ChannelConfigurator configurer = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ builder.intercept(interceptor1);
+ builder.intercept(interceptor2);
+ }
+ };
+
+ ManagedChannelBuilder> mockBuilder = mock(ManagedChannelBuilder.class);
+ configurer.configureChannelBuilder(mockBuilder);
+
+ verify(mockBuilder).intercept(interceptor1);
+ verify(mockBuilder).intercept(interceptor2);
+ }
}
diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
index 80eb5cc1f47..4d1be47ef19 100644
--- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
+++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
@@ -185,7 +185,7 @@ public void cancelled(Context context) {
lrsClient =
new LoadReportClient(
loadStatsManager,
- new GrpcXdsTransportFactory(null).createForTest(channel),
+ new GrpcXdsTransportFactory(null, null).createForTest(channel),
NODE,
syncContext,
fakeClock.getScheduledExecutorService(),
diff --git a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java
index 29b149f166f..15887ff3d26 100644
--- a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java
+++ b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java
@@ -28,9 +28,12 @@
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallCredentials;
+import io.grpc.ChannelConfigurator;
+import io.grpc.ClientInterceptor;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.Server;
@@ -207,7 +210,8 @@ public void xdsClient_usesCallCredentials() throws Exception {
// Create xDS client that uses the CallCredentials on the transport
ObjectPool xdsClientPool =
- provider.getOrCreate("target", bootstrapInfo, metricRecorder, sampleCreds);
+ provider.getOrCreate("target", bootstrapInfo, metricRecorder, sampleCreds,
+ null);
XdsClient xdsClient = xdsClientPool.getObject();
xdsClient.watchXdsResource(
XdsListenerResource.getInstance(), "someLDSresource", ldsResourceWatcher);
@@ -220,4 +224,65 @@ public void xdsClient_usesCallCredentials() throws Exception {
xdsClientPool.returnObject(xdsClient);
xdsServer.shutdownNow();
}
+
+ @Test
+ public void xdsClient_usesChannelConfigurator() throws Exception {
+ // Set up fake xDS server
+ XdsTestControlPlaneService fakeXdsService = new XdsTestControlPlaneService();
+ CallCredsServerInterceptor callInterceptor = new CallCredsServerInterceptor();
+ Server xdsServer =
+ Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create())
+ .addService(fakeXdsService)
+ .intercept(callInterceptor)
+ .build()
+ .start();
+ String xdsServerUri = "localhost:" + xdsServer.getPort();
+
+ // Set up bootstrap & xDS client pool provider
+ ServerInfo server = ServerInfo.create(xdsServerUri, InsecureChannelCredentials.create());
+ BootstrapInfo bootstrapInfo =
+ BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build();
+ SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider();
+
+ // Create a client interceptor that actually just injects a test token
+ ClientInterceptor testInterceptor = new ClientInterceptor() {
+ @Override
+ public io.grpc.ClientCall interceptCall(
+ io.grpc.MethodDescriptor method,
+ io.grpc.CallOptions callOptions,
+ io.grpc.Channel next) {
+ return new io.grpc.ForwardingClientCall.SimpleForwardingClientCall(
+ next.newCall(method, callOptions)) {
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ headers.put(AUTHORIZATION_METADATA_KEY, "Bearer test-configurator-token");
+ super.start(responseListener, headers);
+ }
+ };
+ }
+ };
+
+ ChannelConfigurator configurator = new ChannelConfigurator() {
+ @Override
+ public void configureChannelBuilder(ManagedChannelBuilder> builder) {
+ builder.intercept(testInterceptor);
+ }
+ };
+
+ // Create xDS client that uses the ChannelConfigurator on the transport
+ ObjectPool xdsClientPool =
+ provider.getOrCreate("target", bootstrapInfo, metricRecorder, null, configurator);
+ XdsClient xdsClient = xdsClientPool.getObject();
+ xdsClient.watchXdsResource(
+ XdsListenerResource.getInstance(), "someLDSresource", ldsResourceWatcher);
+
+ // Wait for xDS server to get the request and verify that it received the token from
+ // configurator
+ assertThat(callInterceptor.getTokenWithTimeout(5, TimeUnit.SECONDS))
+ .isEqualTo("Bearer test-configurator-token");
+
+ // Clean up
+ xdsClientPool.returnObject(xdsClient);
+ xdsServer.shutdownNow();
+ }
}
diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java
index 27ee8d22825..4d5e7d09ad4 100644
--- a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java
@@ -484,7 +484,7 @@ public void fallbackFromBadUrlToGoodOne() {
XdsClientImpl client =
CommonBootstrapperTestUtils.createXdsClient(
Arrays.asList(garbageUri, validUri),
- new GrpcXdsTransportFactory(null),
+ new GrpcXdsTransportFactory(null, null),
fakeClock,
new ExponentialBackoffPolicy.Provider(),
MessagePrinter.INSTANCE,
@@ -509,7 +509,7 @@ public void testGoodUrlFollowedByBadUrl() {
XdsClientImpl client =
CommonBootstrapperTestUtils.createXdsClient(
Arrays.asList(validUri, garbageUri),
- new GrpcXdsTransportFactory(null),
+ new GrpcXdsTransportFactory(null, null),
fakeClock,
new ExponentialBackoffPolicy.Provider(),
MessagePrinter.INSTANCE,
@@ -536,7 +536,7 @@ public void testTwoBadUrl() {
XdsClientImpl client =
CommonBootstrapperTestUtils.createXdsClient(
Arrays.asList(garbageUri1, garbageUri2),
- new GrpcXdsTransportFactory(null),
+ new GrpcXdsTransportFactory(null, null),
fakeClock,
new ExponentialBackoffPolicy.Provider(),
MessagePrinter.INSTANCE,
diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
index df3a0af5111..83a8ddfd7c8 100644
--- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
@@ -47,6 +47,7 @@
import com.google.re2j.Pattern;
import io.grpc.CallOptions;
import io.grpc.Channel;
+import io.grpc.ChannelConfigurator;
import io.grpc.ChannelLogger;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
@@ -67,6 +68,7 @@
import io.grpc.NameResolver.ServiceConfigParser;
import io.grpc.NoopClientCall;
import io.grpc.NoopClientCall.NoopClientCallListener;
+import io.grpc.ProxyDetector;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusOr;
@@ -2495,6 +2497,7 @@ private PickSubchannelArgs newPickSubchannelArgs(
private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory {
Set targets = new HashSet<>();
XdsClient xdsClient = new FakeXdsClient();
+ ChannelConfigurator savedChannelConfigurator;
@Override
@Nullable
@@ -2519,6 +2522,25 @@ public XdsClient returnObject(Object object) {
};
}
+ @Override
+ public ObjectPool getOrCreate(
+ String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder,
+ ChannelConfigurator channelConfigurator) {
+ targets.add(target);
+ this.savedChannelConfigurator = channelConfigurator;
+ return new ObjectPool() {
+ @Override
+ public XdsClient getObject() {
+ return xdsClient;
+ }
+
+ @Override
+ public XdsClient returnObject(Object object) {
+ return null;
+ }
+ };
+ }
+
@Override
public List getTargets() {
if (targets.isEmpty()) {
@@ -2957,4 +2979,41 @@ void deliverErrorStatus() {
listener.onClose(Status.UNAVAILABLE, new Metadata());
}
}
+
+ @Test
+ public void start_passesChannelConfiguratorToClientPoolFactory() {
+ ChannelConfigurator channelConfigurator = builder -> { };
+
+ // Build NameResolver.Args containing the channel configurator
+ NameResolver.Args args = NameResolver.Args.newBuilder()
+ .setDefaultPort(8080)
+ .setProxyDetector(mock(ProxyDetector.class))
+ .setSynchronizationContext(syncContext)
+ .setServiceConfigParser(serviceConfigParser)
+ .setChannelLogger(mock(ChannelLogger.class))
+ .setChildChannelConfigurator(channelConfigurator)
+ .build();
+
+ XdsNameResolver resolver = new XdsNameResolver(
+ targetUri,
+ null, // targetAuthority (nullable)
+ AUTHORITY, // name
+ null, // overrideAuthority (nullable)
+ serviceConfigParser,
+ syncContext,
+ scheduler,
+ xdsClientPoolFactory,
+ mockRandom,
+ FilterRegistry.getDefaultRegistry(),
+ rawBootstrap,
+ metricRecorder,
+ args);
+
+ // Start the resolver
+ resolver.start(mockListener);
+
+ assertThat(xdsClientPoolFactory.savedChannelConfigurator).isSameInstanceAs(channelConfigurator);
+
+ resolver.shutdown();
+ }
}
diff --git a/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java b/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java
index ac990226259..503f1b670c0 100644
--- a/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java
@@ -20,6 +20,7 @@
import static io.grpc.xds.XdsServerTestHelper.buildTestListener;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -30,15 +31,18 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.BindableService;
+import io.grpc.ChannelConfigurator;
import io.grpc.InsecureServerCredentials;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusOr;
+import io.grpc.internal.ObjectPool;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsServerTestHelper.FakeXdsClient;
import io.grpc.xds.XdsServerTestHelper.FakeXdsClientPoolFactory;
+import io.grpc.xds.client.XdsClient;
import io.grpc.xds.internal.security.CommonTlsContextTestsUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -321,8 +325,39 @@ public void testOverrideBootstrap() throws Exception {
buildBuilder(null);
builder.overrideBootstrapForTest(b);
xdsServer = cleanupRule.register((XdsServerWrapper) builder.build());
- Future unused = startServerAsync();
+ Future> unused = startServerAsync();
assertThat(xdsClientPoolFactory.savedBootstrapInfo.node().getId())
.isEqualTo(XdsServerTestHelper.BOOTSTRAP_INFO.node().getId());
}
+
+ @Test
+ public void start_passesChannelConfiguratorToClientPoolFactory() throws Exception {
+ ChannelConfigurator configurer = builder -> { };
+ XdsClientPoolFactory mockPoolFactory = mock(XdsClientPoolFactory.class);
+ @SuppressWarnings("unchecked")
+ ObjectPool mockPool = mock(ObjectPool.class);
+ when(mockPool.getObject()).thenReturn(xdsClient);
+ when(mockPoolFactory.getOrCreate(any(), any(), any(), any())).thenReturn(mockPool);
+
+ buildBuilder(null);
+ builder.childChannelConfigurator(configurer);
+ builder.xdsClientPoolFactory(mockPoolFactory);
+ xdsServer = cleanupRule.register((XdsServerWrapper) builder.build());
+
+ Future> unused = startServerAsync();
+
+ verify(mockPoolFactory).getOrCreate(
+ any(), any(), any(), eq(configurer));
+ }
+
+ @Test
+ public void childChannelConfigurator_nullThrows() throws IOException {
+ buildBuilder(null);
+ try {
+ builder.childChannelConfigurator(null);
+ fail("exception expected");
+ } catch (NullPointerException expected) {
+ assertThat(expected).hasMessageThat().contains("channelConfigurator");
+ }
+ }
}
diff --git a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java
index 386793299d8..aa546a564f9 100644
--- a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java
+++ b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol;
+import io.grpc.ChannelConfigurator;
import io.grpc.InsecureChannelCredentials;
import io.grpc.MetricRecorder;
import io.grpc.Status;
@@ -182,6 +183,13 @@ public XdsClient returnObject(Object object) {
};
}
+ @Override
+ public ObjectPool getOrCreate(
+ String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder,
+ ChannelConfigurator channelConfigurator) {
+ return getOrCreate(target, bootstrapInfo, metricRecorder);
+ }
+
@Override
public List getTargets() {
return Collections.singletonList("fake-target");
diff --git a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java
index 99e3911307a..63dfcbdc703 100644
--- a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java
@@ -39,6 +39,7 @@
import com.google.common.util.concurrent.SettableFuture;
import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol;
import io.grpc.Attributes;
+import io.grpc.ChannelConfigurator;
import io.grpc.InsecureChannelCredentials;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@@ -52,6 +53,7 @@
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
+import io.grpc.internal.ObjectPool;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.EnvoyServerProtoData.CidrRange;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
@@ -2030,4 +2032,31 @@ private static MethodDescriptor createMethod(String path) {
static EnvoyServerProtoData.DownstreamTlsContext createTls() {
return CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1");
}
+
+ @Test
+ public void childChannelConfigurator_passedToXdsClientPool() {
+ ChannelConfigurator configurator = builder -> { };
+ XdsClientPoolFactory mockPoolFactory = mock(XdsClientPoolFactory.class);
+ @SuppressWarnings("unchecked")
+ ObjectPool mockPool = mock(ObjectPool.class);
+ when(mockPool.getObject()).thenReturn(xdsClient);
+ when(mockPoolFactory.getOrCreate(any(), any(), any(), any())).thenReturn(mockPool);
+
+ XdsServerWrapper serverWrapper = new XdsServerWrapper(
+ "0.0.0.0:1", mockBuilder, listener, selectorManager, mockPoolFactory,
+ XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry,
+ executor.getScheduledExecutorService(), configurator);
+
+ Executors.newSingleThreadExecutor().execute(() -> {
+ try {
+ serverWrapper.start();
+ } catch (IOException ex) {
+ // ignore
+ }
+ });
+
+ verify(mockPoolFactory, timeout(5000)).getOrCreate(
+ any(), any(), any(), eq(configurator));
+ serverWrapper.shutdownNow();
+ }
}