Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle retry info so client respect the delay server sets #2026

Merged
merged 14 commits into from
Dec 19, 2023
Next Next commit
feat: add a flag to add / remove routing cookie from callable chain
  • Loading branch information
mutianf committed Dec 16, 2023
commit 11c5a2435bda554cf73389ee55a080a65d4d9922
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ public static EnhancedBigtableStubSettings finalizeSettings(
// workaround JWT audience issues
patchCredentials(builder);

// patch cookies interceptor
InstantiatingGrpcChannelProvider.Builder transportProvider = null;
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
transportProvider =
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder();
InstantiatingGrpcChannelProvider.Builder transportProvider =
builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
}

Expand Down Expand Up @@ -371,9 +373,12 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
ServerStreamingCallable<Query, RowT> withCookie = new CookiesServerStreamingCallable<>(traced);
ServerStreamingCallable<Query, RowT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -411,7 +416,10 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

UnaryCallable<Query, RowT> withCookie = new CookiesUnaryCallable<>(traced);
UnaryCallable<Query, RowT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -654,7 +662,10 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

UnaryCallable<BulkMutation, Void> withCookie = new CookiesUnaryCallable<>(traced);
UnaryCallable<BulkMutation, Void> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -938,8 +949,10 @@ public Map<String, String> extract(
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

ServerStreamingCallable<String, ByteStringRange> withCookie =
new CookiesServerStreamingCallable<>(traced);
ServerStreamingCallable<String, ByteStringRange> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -1021,8 +1034,10 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie =
new CookiesServerStreamingCallable<>(traced);
ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand All @@ -1037,9 +1052,12 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
UnaryCallable<RequestT, ResponseT> withCookie = new CookiesUnaryCallable<>(traced);
UnaryCallable<RequestT, ResponseT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private final Map<String, String> jwtAudienceMapping;
private final boolean enableRoutingCookie;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -252,6 +253,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRoutingCookie = builder.enableRoutingCookie;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -313,6 +315,14 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Gets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -595,6 +605,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private Map<String, String> jwtAudienceMapping;
private boolean enableRoutingCookie;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand Down Expand Up @@ -627,6 +638,7 @@ private Builder() {
primedTableIds = ImmutableList.of();
jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING;
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();
Expand Down Expand Up @@ -745,6 +757,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRoutingCookie = settings.enableRoutingCookie;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -893,6 +906,23 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Sets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
public Builder setEnableRoutingCookie(boolean enableRoutingCookie) {
this.enableRoutingCookie = enableRoutingCookie;
return this;
}

/**
* Gets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -1019,6 +1049,7 @@ public String toString() {
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -83,6 +84,7 @@ public class CookiesHolderTest {

private Server server;
private final FakeService fakeService = new FakeService();
private BigtableDataSettings.Builder settings;
private BigtableDataClient client;
private final List<Metadata> serverMetadata = new ArrayList<>();

Expand Down Expand Up @@ -138,6 +140,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
.build())
.setRetryableCodes(StatusCode.Code.UNAVAILABLE);

this.settings = settings;

client = BigtableDataClient.create(settings.build());
}

Expand Down Expand Up @@ -379,7 +383,7 @@ public void sendHeaders(Metadata headers) {
}

@Test
public void testAllMethodsAreCalled() throws InterruptedException {
public void testAllMethodsAreCalled() {
// This test ensures that all methods respect the retry cookie except for the ones that are
// explicitly added to the methods list. It requires that any newly method is exercised in this
// test. This is enforced by introspecting grpc method descriptors.
Expand Down Expand Up @@ -422,6 +426,53 @@ public void testAllMethodsAreCalled() throws InterruptedException {
assertThat(methods).containsExactlyElementsIn(expected);
}

@Test
public void testDisableRoutingCookie() throws IOException {
// This test disables routing cookie in the client settings and ensures that none of the routing
// cookie
// is added.
settings.stubSettings().setEnableRoutingCookie(false);
try (BigtableDataClient client = BigtableDataClient.create(settings.build())) {
client.readRows(Query.create("fake-table")).iterator().hasNext();
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v"));
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("key").setCell("cf", "q", "v")));
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.sampleRowKeys("fake-table");
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.checkAndMutateRow(
ConditionalRowMutation.create("fake-table", "key")
.then(Mutation.create().setCell("cf", "q", "v")));
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.readModifyWriteRow(
ReadModifyWriteRow.create("fake-table", "key").append("cf", "q", "v"));
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.generateInitialChangeStreamPartitions("fake-table").iterator().hasNext();
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.readChangeStream(ReadChangeStreamQuery.create("fake-table")).iterator().hasNext();
assertThat(fakeService.count.get()).isEqualTo(2);

assertThat(methods).isEmpty();
}
}

static class FakeService extends BigtableGrpc.BigtableImplBase {

private boolean returnCookie = true;
Expand Down
Loading
Loading