Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use PingAndWarm request for channel priming #1179

Merged
merged 7 commits into from
Jul 19, 2022
Next Next commit
feat: use PingAndWarm request for channel priming
  • Loading branch information
mutianf committed Jul 19, 2022
commit 16d97c15435abced083d4e436205f8ba2489748a
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,11 @@ public boolean isRefreshingChannel() {
/**
* Gets the table ids that will be used to send warmup requests when {@link
* #isRefreshingChannel()} is enabled.
*
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}
Expand Down Expand Up @@ -377,13 +380,10 @@ public boolean isRefreshingChannel() {
}

/**
* Configure the tables that can be used to prime a channel during a refresh.
*
* <p>These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a
* channel is refreshed, it will send a request to each table to warm up the serverside caches
* before admitting the new channel into the channel pool.
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public Builder setPrimingTableIds(String... tableIds) {
stubSettings.setPrimedTableIds(tableIds);
return this;
Expand All @@ -392,8 +392,11 @@ public Builder setPrimingTableIds(String... tableIds) {
/**
* Gets the table ids that will be used to send warmup requests when {@link
* #setRefreshingChannel(boolean)} is enabled.
*
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,20 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.auth.Credentials;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.RowSet;
import com.google.bigtable.v2.TableName;
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

Expand All @@ -58,14 +47,9 @@ class BigtableChannelPrimer implements ChannelPrimer {
private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30);

private final EnhancedBigtableStubSettings settingsTemplate;
private final List<String> tableIds;

static BigtableChannelPrimer create(
Credentials credentials,
String projectId,
String instanceId,
String appProfileId,
List<String> tableIds) {
Credentials credentials, String projectId, String instanceId, String appProfileId) {
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(projectId)
Expand All @@ -75,28 +59,12 @@ static BigtableChannelPrimer create(
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());

// Disable retries for priming request
builder
.readRowSettings()
.setRetrySettings(
builder
.readRowSettings()
.getRetrySettings()
.toBuilder()
.setMaxAttempts(1)
.setJittered(false)
.setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
.build());
return new BigtableChannelPrimer(builder.build(), tableIds);
return new BigtableChannelPrimer(builder.build());
}

private BigtableChannelPrimer(
EnhancedBigtableStubSettings settingsTemplate, List<String> tableIds) {
private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) {
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
this.settingsTemplate = settingsTemplate;
this.tableIds = ImmutableList.copyOf(tableIds);
}

@Override
Expand All @@ -110,25 +78,7 @@ public void primeChannel(ManagedChannel managedChannel) {
}

private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
if (tableIds.isEmpty()) {
waitForChannelReady(managedChannel);
} else {
sendPrimeRequests(managedChannel);
}
}

private void waitForChannelReady(ManagedChannel managedChannel) {
for (int i = 0; i < 30; i++) {
ConnectivityState connectivityState = managedChannel.getState(true);
if (connectivityState == ConnectivityState.READY) {
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
sendPrimeRequests(managedChannel);
}

private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
Expand All @@ -141,41 +91,21 @@ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException
.build();

try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
Map<String, ApiFuture<?>> primeFutures = new HashMap<>();

// Prime all of the table ids in parallel
for (String tableId : tableIds) {
ApiFuture<Row> f =
stub.createReadRowsRawCallable(new DefaultRowAdapter())
.first()
.futureCall(
ReadRowsRequest.newBuilder()
.setTableName(
TableName.format(
primingSettings.getProjectId(),
primingSettings.getInstanceId(),
tableId))
.setAppProfileId(primingSettings.getAppProfileId())
.setRows(RowSet.newBuilder().addRowKeys(PRIMING_ROW_KEY).build())
.setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build())
.setRowsLimit(1)
.build());
PingAndWarmRequest request =
PingAndWarmRequest.newBuilder()
.setName(
InstanceName.format(
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
primingSettings.getProjectId(), primingSettings.getInstanceId()))
.setAppProfileId(primingSettings.getAppProfileId())
.build();

primeFutures.put(tableId, f);
}

// Wait for all of the prime requests to complete.
for (Map.Entry<String, ApiFuture<?>> entry : primeFutures.entrySet()) {
try {
entry.getValue().get();
} catch (Throwable e) {
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(
String.format(
"Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage()));
try {
stub.pingAndWarmCallable().call(request);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
} catch (Throwable e) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(String.format("Failed to prime channel: %s", e));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -104,6 +106,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -141,6 +144,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
Expand Down Expand Up @@ -181,8 +185,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId(),
settings.getPrimedTableIds()))
settings.getAppProfileId()))
.build());
}

Expand Down Expand Up @@ -284,6 +287,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext
bulkMutateRowsCallable = createBulkMutateRowsCallable();
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
pingAndWarmCallable = createPingAndWarmCallable();
}

// <editor-fold desc="Callable creators">
Expand Down Expand Up @@ -810,6 +814,23 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<PingAndWarmRequest, PingAndWarmResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
.setParamsExtractor(
new RequestParamsExtractor<PingAndWarmRequest>() {
@Override
public Map<String, String> extract(PingAndWarmRequest request) {
return ImmutableMap.of("app_profile_id", request.getAppProfileId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure to embed the resource name as well. In this case it will be "name", request.getName()

}
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}
// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand Down Expand Up @@ -854,6 +875,10 @@ public UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable(
public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return readModifyWriteRowCallable;
}

UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
return pingAndWarmCallable;
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,13 @@ public boolean isRefreshingChannel() {
return isRefreshingChannel;
}

/** Gets the tables that will be primed during a channel refresh. */
@BetaApi("Channel priming is not currently stable and might change in the future")
/**
* Gets the tables that will be primed during a channel refresh.
*
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@Deprecated
public List<String> getPrimedTableIds() {
return primedTableIds;
}
Expand Down Expand Up @@ -831,8 +836,7 @@ public EnhancedBigtableStubSettings build() {
// Use shared credentials
this.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
channelProviderBuilder.setChannelPrimer(
BigtableChannelPrimer.create(
credentials, projectId, instanceId, appProfileId, primedTableIds));
BigtableChannelPrimer.create(credentials, projectId, instanceId, appProfileId));
this.setTransportChannelProvider(channelProviderBuilder.build());
}
return new EnhancedBigtableStubSettings(this);
Expand Down
Loading