Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle retry info so client respect the delay server sets #2026

Merged
merged 14 commits into from
Dec 19, 2023
Prev Previous commit
Next Next commit
add a test for initial metadata
  • Loading branch information
mutianf committed Dec 18, 2023
commit 2b700b01645a0e8e5fd3024d5e65136b877b9695
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ Metadata injectCookiesInRequestHeaders(Metadata headers) {
* COOKIE_KEY_PREFIX to cookies. Values in trailers will override the value set in initial
* metadata for the same keys.
*/
void extractCookiesFromMetadata(@Nullable Metadata trailers) {
if (trailers == null) {
void extractCookiesFromMetadata(@Nullable Metadata metadata) {
if (metadata == null) {
return;
}
for (String key : trailers.keys()) {
for (String key : metadata.keys()) {
if (key.startsWith(COOKIE_KEY_PREFIX)) {
Metadata.Key<String> metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
String value = trailers.get(metadataKey);
String value = metadata.get(metadataKey);
cookies.put(metadataKey, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,12 +1018,7 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,14 @@ public class CookiesHolderTest {
Metadata.Key.of("x-goog-cbt-cookie-routing", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> ROUTING_COOKIE_2 =
Metadata.Key.of("x-goog-cbt-cookie-random", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> ROUTING_COOKIE_HEADER =
Metadata.Key.of("x-goog-cbt-cookie-header", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> BAD_KEY =
Metadata.Key.of("x-goog-cbt-not-cookie", Metadata.ASCII_STRING_MARSHALLER);

private static final String testHeaderCookie = "header-cookie";
private static final String testCookie = "test-routing-cookie";
private static final String routingCookie1Header = "should-be-overridden";

private Server server;
private final FakeService fakeService = new FakeService();
Expand All @@ -103,7 +108,16 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
if (metadata.containsKey(ROUTING_COOKIE_1)) {
methods.add(serverCall.getMethodDescriptor().getBareMethodName());
}
return serverCallHandler.startCall(serverCall, metadata);
return serverCallHandler.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(ROUTING_COOKIE_HEADER, testHeaderCookie);
responseHeaders.put(ROUTING_COOKIE_1, routingCookie1Header);
super.sendHeaders(responseHeaders);
}
},
metadata);
}
};

Expand Down Expand Up @@ -157,6 +171,7 @@ public void tearDown() throws Exception {

@Test
public void testReadRows() {
// Only server stream calls can return an initial metadata before the first response
client.readRows(Query.create("fake-table")).iterator().hasNext();

assertThat(fakeService.count.get()).isGreaterThan(1);
Expand All @@ -165,7 +180,13 @@ public void testReadRows() {
Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.containsAtLeast(ROUTING_COOKIE_1.name(), "readRows", ROUTING_COOKIE_2.name(), testCookie);
.containsAtLeast(
ROUTING_COOKIE_1.name(),
"readRows",
ROUTING_COOKIE_2.name(),
testCookie,
ROUTING_COOKIE_HEADER.name(),
testHeaderCookie);
assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name());

serverMetadata.clear();
Expand All @@ -181,7 +202,13 @@ public void testReadRow() {
Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.containsAtLeast(ROUTING_COOKIE_1.name(), "readRows", ROUTING_COOKIE_2.name(), testCookie);
.containsAtLeast(
ROUTING_COOKIE_1.name(),
"readRows",
ROUTING_COOKIE_2.name(),
testCookie,
ROUTING_COOKIE_HEADER.name(),
testHeaderCookie);
assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name());

serverMetadata.clear();
Expand Down Expand Up @@ -249,6 +276,9 @@ public void testReadChangeStream() {

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

// TODO: read change stream should have ROUTING_COOKIE_HEADER but the sequence is different from
// ReadRows.
// Debug the test failure.
assertThat(lastMetadata)
.containsAtLeast(
ROUTING_COOKIE_1.name(), "readChangeStream", ROUTING_COOKIE_2.name(), testCookie);
Expand All @@ -266,6 +296,9 @@ public void testGenerateInitialChangeStreamParition() {

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

// TODO: read change stream should have ROUTING_COOKIE_HEADER but the sequence is different from
// ReadRows.
// Debug the test failure.
assertThat(lastMetadata)
.containsAtLeast(
ROUTING_COOKIE_1.name(),
Expand All @@ -288,7 +321,9 @@ public void testNoCookieSucceedReadRows() {

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name());
assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name());
// Should contain initial metadata
assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header);

serverMetadata.clear();
}
Expand All @@ -304,8 +339,8 @@ public void testNoCookieSucceedReadRow() {

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name());
assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name(), BAD_KEY.name());
assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header);

serverMetadata.clear();
}
Expand Down Expand Up @@ -374,8 +409,11 @@ public void testNoCookieSucceedReadChangeStream() {

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name());
assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name(), BAD_KEY.name());
// TODO: read change stream should have ROUTING_COOKIE_HEADER but the sequence is different from
// ReadRows.
// Debug the test failure.
// assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header);

serverMetadata.clear();

Expand All @@ -393,8 +431,11 @@ public void testNoCookieSucceedGenerateInitialChangeStreamParition() {

Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1);

assertThat(lastMetadata)
.doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name());
assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_2.name(), BAD_KEY.name());
// TODO: read change stream should have ROUTING_COOKIE_HEADER but the sequence is different from
// ReadRows.
// Debug the test failure.
// assertThat(lastMetadata).containsAtLeast(ROUTING_COOKIE_1.name(), routingCookie1Header);

serverMetadata.clear();
}
Expand Down
Loading