Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: backport retry idle timeout to 2.9.x and rare race condition fix #4234

Merged
merged 3 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.common.annotations.VisibleForTesting;

/**
* Define {@link org.apache.hadoop.conf.Configuration} names for setting {@link
Expand Down Expand Up @@ -324,4 +325,8 @@ public class BigtableOptionsFactory {
*/
public static final String BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL =
"google.bigtable.enable.bulk.mutation.flow.control";

/** Override idle timeout, for testing only. */
@VisibleForTesting
public static final String BIGTABLE_TEST_IDLE_TIMEOUT_MS = "google.bigtable.idle.timeout.ms";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_SERVICE_ACCOUNT_JSON_KEYFILE_LOCATION_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_SERVICE_ACCOUNT_JSON_VALUE_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_SERVICE_ACCOUNT_P12_KEYFILE_LOCATION_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_TEST_IDLE_TIMEOUT_MS;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_USE_BATCH;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_USE_CACHED_DATA_CHANNEL_POOL;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_USE_PLAINTEXT_NEGOTIATION;
Expand Down Expand Up @@ -718,6 +719,11 @@ private void configureReadRowsSettings(
.retrySettings()
.setTotalTimeout(operationTimeouts.getOperationTimeout().get());
}

String idleTimeout = configuration.get(BIGTABLE_TEST_IDLE_TIMEOUT_MS);
if (idleTimeout != null) {
readRowsSettings.setIdleTimeout(Duration.ofMillis(Long.parseLong(idleTimeout)));
}
}

private void configureRetryableCallSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions opt
.withAppProfileId(options.getBigtableAppProfileId())
.withConfiguration(
BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, "SequenceFileExportJob")
.withConfiguration(
CloudBigtableIO.Reader.RETRY_IDLE_TIMEOUT,
String.valueOf(options.getRetryIdleTimeout()))
.withScan(
new ScanValueProvider(
options.getBigtableStartRow(),
options.getBigtableStopRow(),
options.getBigtableMaxVersions(),
options.getBigtableFilter()));

return configBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ public interface ExportOptions extends GcpOptions {

@SuppressWarnings("unused")
void setWait(boolean wait);

@Description("Get if idle timeout is retried.")
@Default.Boolean(true)
boolean getRetryIdleTimeout();

@SuppressWarnings("unused")
void setRetryIdleTimeout(boolean retryIdleTimeout);
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.bigtable.repackaged.com.google.api.core.InternalApi;
import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
import com.google.bigtable.repackaged.com.google.api.gax.rpc.WatchdogTimeoutException;
import com.google.bigtable.repackaged.com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting;
Expand All @@ -32,11 +33,13 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -647,6 +650,8 @@ Object readResolve() {
/** Reads rows for a specific {@link Table}, usually filtered by a {@link Scan}. */
@VisibleForTesting
static class Reader extends BoundedReader<Result> {
static final String RETRY_IDLE_TIMEOUT = "google.cloud.bigtable.retry.idle.timeout";

private static final Logger READER_LOG = LoggerFactory.getLogger(Reader.class);

private CloudBigtableIO.AbstractSource source;
Expand All @@ -657,6 +662,9 @@ static class Reader extends BoundedReader<Result> {
protected long workStart;
private final AtomicLong rowsRead = new AtomicLong();
private final ByteKeyRangeTracker rangeTracker;
private transient Result lastScannedRow;

private final AtomicInteger attempt = new AtomicInteger(3);

@VisibleForTesting
Reader(CloudBigtableIO.AbstractSource source) {
Expand Down Expand Up @@ -690,7 +698,37 @@ void initializeScanner() throws IOException {
/** Calls {@link ResultScanner#next()}. */
@Override
public boolean advance() throws IOException {
try {
boolean hasMore = tryAdvance();
// reset attempt after a success read
attempt.set(3);
return hasMore;
} catch (Throwable e) {
// if retry idle timeout is disabled, throw the exception
if (!source.getConfiguration().toHBaseConfig().getBoolean(RETRY_IDLE_TIMEOUT, true)) {
throw e;
}
// Exception is not idle timeout, throw it
Throwable exception = findCause(e, WatchdogTimeoutException.class);
if (exception == null) {
throw e;
}
if (exception.getMessage() == null || !exception.getMessage().contains("idle")) {
throw e;
}
// Run out ot retry attempt, throw the exception
if (attempt.decrementAndGet() <= 0) {
throw e;
}
READER_LOG.warn("got idle timeout exception, will try to reset the scanner and retry", e);
resetScanner();
return tryAdvance();
}
}

private boolean tryAdvance() throws IOException {
Result row = scanner.next();
lastScannedRow = row;
if (row != null && rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(row.getRow()))) {
current = row;
rowsRead.addAndGet(1l);
Expand All @@ -702,6 +740,40 @@ public boolean advance() throws IOException {
}
}

private void resetScanner() throws IOException {
CloudBigtableScanConfiguration scanConfiguration = source.getConfiguration();
Scan scan;
if (lastScannedRow != null) {
byte[] rowKey = lastScannedRow.getRow();
// ScanConfiguration always gets start key and end key from the RowRange, and it expects
// start key to be inclusive and end key to be exclusive.
byte[] newStartKey = Arrays.copyOf(rowKey, rowKey.length + 1);
scan =
scanConfiguration
.toBuilder()
.withKeys(newStartKey, scanConfiguration.getStopRow())
.build()
.getScanValueProvider()
.get();
} else {
scan = scanConfiguration.getScanValueProvider().get();
READER_LOG.info("last scanned row key is null, haven't read any row yet");
}

scanner =
connection
.getTable(TableName.valueOf(source.getConfiguration().getTableId()))
.getScanner(scan);
}

static Throwable findCause(Throwable e, Class<? extends Throwable> cls) {
Throwable throwable = e;
while (throwable != null && !cls.isAssignableFrom(e.getClass())) {
throwable = throwable.getCause();
}
return throwable;
}

@Override
public final Double getFractionConsumed() {
if (rangeTracker.isDone()) {
Expand Down
Loading