Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle retry info so client respect the delay server sets #2026

Merged
merged 14 commits into from
Dec 19, 2023
Prev Previous commit
Next Next commit
refactor
  • Loading branch information
mutianf committed Dec 18, 2023
commit cd0996d5a8c74e7a85ec635e835988f58cc8fc7b
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.api.gax.tracing.SpanName;
Expand Down Expand Up @@ -373,14 +374,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

ServerStreamingCallable<Query, RowT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -416,12 +410,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

UnaryCallable<Query, RowT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -493,7 +482,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
withRetries(retrying1, innerSettings);

return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}
Expand Down Expand Up @@ -576,7 +565,7 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
withRetries(withBigtableTracer, settings.sampleRowKeysSettings());

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -615,7 +604,7 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.mutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand All @@ -639,19 +628,25 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();

UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;

if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}

UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
baseCallable,
withCookie,
bulkMutationFlowController,
bulkMutationDynamicFlowControlStats,
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);

SpanName spanName = getSpanName("MutateRows");

Expand All @@ -662,12 +657,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

UnaryCallable<BulkMutation, Void> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -821,7 +811,7 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.checkAndMutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -862,8 +852,7 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.readModifyWriteRowSettings());

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -943,18 +932,13 @@ public Map<String, String> extract(
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, ByteStringRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
withRetries(withBigtableTracer, innerSettings);

SpanName span = getSpanName("GenerateInitialChangeStreamPartitions");
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

ServerStreamingCallable<String, ByteStringRange> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -1023,7 +1007,7 @@ public Map<String, String> extract(
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<ReadChangeStreamRequest, ChangeStreamRecordT> readChangeStreamCallable =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
withRetries(withBigtableTracer, innerSettings);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT>
readChangeStreamUserCallable =
Expand Down Expand Up @@ -1052,14 +1036,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

UnaryCallable<RequestT, ResponseT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
Expand All @@ -1080,6 +1057,27 @@ public Map<String, String> extract(PingAndWarmRequest request) {
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, unaryCallSettings, clientContext);
if (settings.getEnableRoutingCookie()) {
return new CookiesUnaryCallable<>(retrying);
}
return retrying;
}

private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withRetries(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {
ServerStreamingCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
return retrying;
}
// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,43 @@ public void testSampleRowKeys() {
serverMetadata.clear();
}

@Test
public void testReadChangeStream() {
client.readChangeStream(ReadChangeStreamQuery.create("table")).iterator().hasNext();

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata).hasSize(fakeService.count.get());

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.containsAtLeast(
ROUTING_COOKIE_1.name(), "readChangeStream", ROUTING_COOKIE_2.name(), testCookie);
assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name());

serverMetadata.clear();
}

@Test
public void testGenerateInitialChangeStreamParition() {
client.generateInitialChangeStreamPartitions("table").iterator().hasNext();

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata).hasSize(fakeService.count.get());

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.containsAtLeast(
ROUTING_COOKIE_1.name(),
"generateInitialChangeStreamPartitions",
ROUTING_COOKIE_2.name(),
testCookie);
assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name());

serverMetadata.clear();
}

@Test
public void testNoCookieSucceedReadRows() {
fakeService.returnCookie = false;
Expand Down Expand Up @@ -326,6 +363,42 @@ public void testNoCookieSucceedSampleRowKeys() {
serverMetadata.clear();
}

@Test
public void testNoCookieSucceedReadChangeStream() {
fakeService.returnCookie = false;

client.readChangeStream(ReadChangeStreamQuery.create("table")).iterator().hasNext();

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata).hasSize(fakeService.count.get());

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name());

serverMetadata.clear();

serverMetadata.clear();
}

@Test
public void testNoCookieSucceedGenerateInitialChangeStreamParition() {
fakeService.returnCookie = false;

client.generateInitialChangeStreamPartitions("table").iterator().hasNext();

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata).hasSize(fakeService.count.get());

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name());

serverMetadata.clear();
}

@Test
public void testCookiesInHeaders() throws Exception {
// Send 2 cookies in the headers, with routingCookieKey and ROUTING_COOKIE_2. ROUTING_COOKIE_2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception {
FeatureFlags featureFlags = FeatureFlags.parseFrom(decodedFlags);
assertThat(featureFlags.getMutateRowsRateLimit()).isFalse();
assertThat(featureFlags.getMutateRowsRateLimit2()).isFalse();
stub.close();
}

@Test
Expand Down
Loading