Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle retry info so client respect the delay server sets #2026

Merged
merged 14 commits into from
Dec 19, 2023
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
mutianf committed Dec 18, 2023
commit 5f6d5833f2a9e875148d9d635616f5e6fb153d93
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -763,7 +763,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new RetryInfoRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));
RetryingExecutorWithContext<Void> retryingExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -236,7 +235,8 @@ private void handleAttemptError(Throwable rpcError) {
FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);

if (ApiResultRetryAlgorithm.extractRetryDelay(failedMutation.getError()) == null
if (!com.google.cloud.bigtable.gaxx.retrying.ApiException.isRetryable2(
failedMutation.getError())
&& !failedMutation.getError().isRetryable()) {
permanentFailures.add(failedMutation);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;

// TODO: move this to gax later
@InternalApi
public class ApiException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ApiExceptions. Also please add a private ctor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the pattern is that the instance class is singular and the helper utility with static methods is plural.
In this case you are adding latter. Eventually when you upstream, you will make it an instance method and it will be in ApiException. But in the transitional state I think plural makes sense


// TODO: this should replace the existing ApiException#isRetryable() method,
// but that cant be done in bigtable, so this lives here for now.
public static boolean isRetryable2(Throwable e) {
if (RetryInfoRetryAlgorithm.extractRetryDelay(e) != null) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,17 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;

/**
* For internal use, public for technical reasons. This retry algorithm checks the metadata of an
* exception for additional error details. If the metadata has a RetryInfo field, use the retry
* delay to set the wait time between attempts.
*/
/** For internal use, public for technical reasons. */
@InternalApi
public class ApiResultRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<ResponseT> {

private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
Duration retryDelay = extractRetryDelay(prevThrowable);
if (retryDelay != null) {
return prevSettings
.toBuilder()
.setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.build();
}
return null;
}

/** Returns true if previousThrowable is an {@link ApiException} that is retryable. */
@Override
public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) {
return (extractRetryDelay(previousThrowable) != null)
|| (previousThrowable instanceof ApiException
&& ((ApiException) previousThrowable).isRetryable());
return (previousThrowable instanceof ApiException)
&& ((ApiException) previousThrowable).isRetryable();
}

/**
Expand All @@ -72,30 +43,11 @@ public boolean shouldRetry(
if (context.getRetryableCodes() != null) {
// Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
// of codes that should be retried.
return extractRetryDelay(previousThrowable) != null
|| ((previousThrowable instanceof ApiException)
&& context
.getRetryableCodes()
.contains(((ApiException) previousThrowable).getStatusCode().getCode()));
return (previousThrowable instanceof ApiException)
&& context
.getRetryableCodes()
.contains(((ApiException) previousThrowable).getStatusCode().getCode());
}
return shouldRetry(previousThrowable, previousResponse);
}

public static Duration extractRetryDelay(Throwable throwable) {
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
return null;
}
RetryInfo retryInfo = trailers.get(KEY_RETRY_INFO);
if (retryInfo == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
return null;
}
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(

RetryAlgorithm<ResponseT> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<>(),
new RetryInfoRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
ScheduledRetryingExecutor<ResponseT> executor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
Expand All @@ -64,7 +64,7 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new RetryInfoRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;

// TODO move this algorithm to gax
/**
* This retry algorithm checks the metadata of an exception for additional error details. If the
* metadata has a RetryInfo field, use the retry delay to set the wait time between attempts.
*/
@InternalApi
public class RetryInfoRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<ResponseT> {

@VisibleForTesting
public static final Metadata.Key<RetryInfo> RETRY_INFO_KEY =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
Duration retryDelay = extractRetryDelay(prevThrowable);
if (retryDelay != null) {
return prevSettings
.toBuilder()
.setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.build();
}
return null;
}

/** Returns true if previousThrowable is an {@link ApiException} that is retryable. */
@Override
public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) {
if (extractRetryDelay(previousThrowable) != null) {
// First check if server wants us to retry
return true;
}
// Server didn't have retry information, use the local status code config.
return (previousThrowable instanceof ApiException
&& ((ApiException) previousThrowable).isRetryable());
}

/**
* If {@link RetryingContext#getRetryableCodes()} is not null: Returns true if the status code of
* previousThrowable is in the list of retryable code of the {@link RetryingContext}.
*
* <p>Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}.
*/
@Override
public boolean shouldRetry(
RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
if (extractRetryDelay(previousThrowable) != null) {
// First check if server wants us to retry
return true;
}
if (context.getRetryableCodes() != null) {
// Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
// of codes that should be retried.
return ((previousThrowable instanceof ApiException)
&& context
.getRetryableCodes()
.contains(((ApiException) previousThrowable).getStatusCode().getCode()));
}
return shouldRetry(previousThrowable, previousResponse);
}

static Duration extractRetryDelay(Throwable throwable) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
return null;
}
RetryInfo retryInfo = trailers.get(RETRY_INFO_KEY);
if (retryInfo == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
return null;
}
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm.RETRY_INFO_KEY;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.core.NoCredentialsProvider;
Expand Down Expand Up @@ -54,7 +55,6 @@
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcServerRule;
import java.io.IOException;
Expand All @@ -71,9 +71,6 @@ public class RetryInfoTest {

@Rule public GrpcServerRule serverRule = new GrpcServerRule();

private static final Metadata.Key<RetryInfo> RETRY_INFO_KEY =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

private FakeBigtableService service;
private BigtableDataClient client;

Expand Down