Skip to content

Commit

Permalink
feat: track the latency a request is queued on the grpc channel (#1604)
Browse files Browse the repository at this point in the history
For all operations, it tracks the latency of the request getting queued on a grpc channel. For batch operations, it's the aggregated value of grpc channel queued latency and batcher flow control latencies.
  • Loading branch information
mutianf committed Apr 19, 2023
1 parent 9ca7b08 commit bf3e7dd
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 17 deletions.
6 changes: 6 additions & 0 deletions google-cloud-bigtable-stats/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@
<className>com/google/cloud/bigtable/stats/StatsRecorderWrapper</className>
<method>void record(java.lang.String, java.lang.String, java.lang.String, java.lang.String)</method>
</difference>
<!-- Internal API is updated -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigtable/stats/StatsRecorderWrapper</className>
<method>void putBatchRequestThrottled(long)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public void putGfeMissingHeaders(long connectivityErrors) {
attemptMeasureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
}

public void putBatchRequestThrottled(long throttledTimeMs) {
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
public void putClientBlockingLatencies(long clientBlockingLatency) {
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, clientBlockingLatency);
}

private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testStreamingOperation() throws InterruptedException {
recorderWrapper.putGfeLatencies(serverLatency);
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);
recorderWrapper.putClientBlockingLatencies(throttlingLatency);

recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER);
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testUnaryOperations() throws InterruptedException {
recorderWrapper.putGfeLatencies(serverLatency);
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);
recorderWrapper.putClientBlockingLatencies(throttlingLatency);

recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerBatchedUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
Expand Down Expand Up @@ -509,7 +510,7 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withBigtableTracer =
new BigtableTracerUnaryCallable<>(tracedBatcher);
new BigtableTracerBatchedUnaryCallable<>(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span);
Expand Down Expand Up @@ -641,7 +642,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withBigtableTracer =
new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable);
new BigtableTracerBatchedUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.common.base.Stopwatch;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import java.util.concurrent.TimeUnit;

/**
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
* tracing and Bigtable tracing. Its primary purpose is to measure the transition time between
* asking gRPC to start an RPC and gRPC actually serializing that RPC.
*/
class BigtableGrpcStreamTracer extends ClientStreamTracer {

private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final BigtableTracer tracer;

public BigtableGrpcStreamTracer(BigtableTracer tracer) {
this.tracer = tracer;
}

@Override
public void streamCreated(Attributes transportAttrs, Metadata headers) {
stopwatch.start();
}

@Override
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

static class Factory extends ClientStreamTracer.Factory {

private final BigtableTracer tracer;

Factory(BigtableTracer tracer) {
this.tracer = tracer;
}

@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
return new BigtableGrpcStreamTracer(tracer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,8 @@ public void batchRequestThrottled(long throttledTimeMs) {
public void setLocations(String zone, String cluster) {
// noop
}

public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nonnull;

/**
* This callable will do everything described in {@link BigtableTracerUnaryCallable} except that it
* won't inject a {@link BigtableGrpcStreamTracer}. For batching calls, we only want to calculate
* the total time client is blocked because of flow control.
*/
@InternalApi
public class BigtableTracerBatchedUnaryCallable<RequestT, ResponseT>
extends BigtableTracerUnaryCallable<RequestT, ResponseT> {

private UnaryCallable<RequestT, ResponseT> innerCallable;

public BigtableTracerBatchedUnaryCallable(
@Nonnull UnaryCallable<RequestT, ResponseT> innerCallable) {
super(innerCallable);
this.innerCallable = innerCallable;
}

@Override
public ApiFuture futureCall(RequestT request, ApiCallContext context) {
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
BigtableTracerUnaryCallback<ResponseT> callback =
new BigtableTracerUnaryCallback<ResponseT>(
(BigtableTracer) context.getTracer(), responseMetadata);
ApiFuture<ResponseT> future =
innerCallable.futureCall(request, responseMetadata.addHandlers(context));
ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* <li>-Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream.
* <li>-This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an
* RPC spent in a grpc channel queue.
* <li>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
Expand All @@ -60,7 +62,11 @@ public void call(
BigtableTracerResponseObserver<ResponseT> innerObserver =
new BigtableTracerResponseObserver<>(
responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context));
innerCallable.call(
request,
innerObserver,
Util.injectBigtableStreamTracer(
context, responseMetadata, (BigtableTracer) context.getTracer()));
} else {
innerCallable.call(request, responseObserver, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
* the gfe_header_missing_counter in this case.
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* <li>-This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an
* RPC spent in a grpc channel queue.
* <li>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
Expand All @@ -49,14 +51,18 @@ public BigtableTracerUnaryCallable(@Nonnull UnaryCallable<RequestT, ResponseT> i
}

@Override
public ApiFuture futureCall(RequestT request, ApiCallContext context) {
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
// tracer should always be an instance of BigtableTracer
if (context.getTracer() instanceof BigtableTracer) {
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
final ApiCallContext contextWithResponseMetadata = responseMetadata.addHandlers(context);
BigtableTracerUnaryCallback callback =
new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata);
ApiFuture<ResponseT> future = innerCallable.futureCall(request, contextWithResponseMetadata);
BigtableTracerUnaryCallback<ResponseT> callback =
new BigtableTracerUnaryCallback<ResponseT>(
(BigtableTracer) context.getTracer(), responseMetadata);
ApiFuture<ResponseT> future =
innerCallable.futureCall(
request,
Util.injectBigtableStreamTracer(
context, responseMetadata, (BigtableTracer) context.getTracer()));
ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class BuiltinMetricsTracer extends BigtableTracer {
private String zone = "global";
private String cluster = "unspecified";

private AtomicLong totalClientBlockingTime = new AtomicLong(0);

@VisibleForTesting
BuiltinMetricsTracer(
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
Expand Down Expand Up @@ -219,7 +221,12 @@ public void setLocations(String zone, String cluster) {

@Override
public void batchRequestThrottled(long throttledTimeMs) {
recorder.putBatchRequestThrottled(throttledTimeMs);
totalClientBlockingTime.addAndGet(throttledTimeMs);
}

@Override
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
totalClientBlockingTime.addAndGet(queuedTimeMs);
}

@Override
Expand Down Expand Up @@ -266,6 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}
}

recorder.putClientBlockingLatencies(totalClientBlockingTime.get());

// Patch the status until it's fixed in gax. When an attempt failed,
// it'll throw a ServerStreamingAttemptException. Unwrap the exception
// so it could get processed by extractStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,11 @@ public void afterResponse(long applicationLatency) {
tracer.afterResponse(applicationLatency);
}
}

@Override
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.grpcChannelQueuedLatencies(queuedTimeMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
Expand All @@ -32,6 +33,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
Expand Down Expand Up @@ -197,4 +199,23 @@ static void recordMetricsFromMetadata(
// Record gfe metrics
tracer.recordGfeMetadata(latency, throwable);
}

/**
* This method bridges gRPC stream tracing to bigtable tracing by adding a {@link
* io.grpc.ClientStreamTracer} to the callContext.
*/
static GrpcCallContext injectBigtableStreamTracer(
ApiCallContext context, GrpcResponseMetadata responseMetadata, BigtableTracer tracer) {
if (context instanceof GrpcCallContext) {
GrpcCallContext callContext = (GrpcCallContext) context;
CallOptions callOptions = callContext.getCallOptions();
return responseMetadata.addHandlers(
callContext.withCallOptions(
callOptions.withStreamTracerFactory(new BigtableGrpcStreamTracer.Factory(tracer))));
} else {
// context should always be an instance of GrpcCallContext. If not throw an exception
// so we can see what class context is.
throw new RuntimeException("Unexpected context class: " + context.getClass().getName());
}
}
}
Loading

0 comments on commit bf3e7dd

Please sign in to comment.