Skip to content

Commit

Permalink
fix: retry rst_stream (#3002)
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jun 1, 2021
1 parent e24e09e commit ace17b7
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ protected void onError(Status status, Metadata trailers) {
String channelId = ChannelPool.extractIdentifier(trailers);
// Non retry scenario
if (!retryOptions.enableRetries()
|| !retryOptions.isRetryable(code)
|| !isStatusRetryable(status)
// Unauthenticated is special because the request never made it to
// to the server, so all requests are retryable
|| !(isRequestRetryable() || code == Code.UNAUTHENTICATED || code == Code.UNAVAILABLE)) {
Expand Down Expand Up @@ -259,6 +259,10 @@ public void run() {
};
}

protected boolean isStatusRetryable(Status status) {
return retryOptions.isRetryable(status.getCode());
}

protected boolean isRequestRetryable() {
return rpc.isRetryable(getRetryRequest());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.opencensus.trace.AttributeValue;
Expand Down Expand Up @@ -243,6 +244,24 @@ protected boolean isRequestRetryable() {
return true;
}

/** Read rows requests are retryable if the status is a rst stream error. */
@Override
protected boolean isStatusRetryable(Status status) {
return retryOptions.isRetryable(status.getCode()) || isRstStream(status);
}

private boolean isRstStream(Status status) {
if (status.getCode() == Code.INTERNAL) {
String description = status.getDescription();
if (description != null) {
return description.contains("Received Rst stream")
|| description.contains("RST_STREAM closed stream")
|| description.contains("Received RST_STREAM");
}
}
return false;
}

/** {@inheritDoc} */
@Override
protected boolean onOK(Metadata trailers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,26 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
Assert.assertTrue(underTest.getRowMerger().isComplete());
}

@Test
public void testRetryRstStream() throws Exception {
RetryingReadRowsOperation underTest = createOperation();
start(underTest);

ByteString key1 = ByteString.copyFrom("SomeKey1", "UTF-8");
ByteString key2 = ByteString.copyFrom("SomeKey2", "UTF-8");
underTest.onMessage(buildResponse(key1));
underTest.onClose(
Status.INTERNAL.withDescription("HTTP/2 error code: INTERNAL_ERROR\nReceived Rst stream"),
null);
Assert.assertFalse(underTest.getRowMerger().isComplete());
underTest.onMessage(buildResponse(key2));
verify(mockFlatRowObserver, times(2)).onNext(any(FlatRow.class));
checkRetryRequest(underTest, key2, 8);
verify(mockClientCall, times(4)).request(eq(1));

finishOK(underTest, 1);
}

protected void performTimeout(RetryingReadRowsOperation underTest) {
underTest.onClose(
Status.CANCELLED.withCause(
Expand Down

0 comments on commit ace17b7

Please sign in to comment.