From 0f9988a5752a74e6c4640937abc49db84418435d Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Thu, 1 Aug 2024 16:59:42 -0400 Subject: [PATCH 1/2] fix: reduce duplicate code in streaming retries and add a test --- gax/src/streamingCalls/streaming.ts | 26 +++++------ gax/test/test-application/src/index.ts | 64 ++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 14 deletions(-) diff --git a/gax/src/streamingCalls/streaming.ts b/gax/src/streamingCalls/streaming.ts index c74facaa3..583425fc9 100644 --- a/gax/src/streamingCalls/streaming.ts +++ b/gax/src/streamingCalls/streaming.ts @@ -119,6 +119,15 @@ export class StreamProxy extends duplexify implements GRPCCallResult { this.gaxServerStreamingRetries = gaxServerStreamingRetries; } + private shouldRetryRequest(error: Error, retry: RetryOptions): boolean { + const e = GoogleError.parseGRPCStatusDetails(error); + let shouldRetry = this.defaultShouldRetry(e!, retry); + if (retry.shouldRetryFn) { + shouldRetry = retry.shouldRetryFn(e!); + } + return shouldRetry; + } + cancel() { if (this.stream) { this.stream.cancel(); @@ -228,13 +237,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { } this.retries!++; - const e = GoogleError.parseGRPCStatusDetails(error); - let shouldRetry = this.defaultShouldRetry(e!, retry); - if (retry.shouldRetryFn) { - shouldRetry = retry.shouldRetryFn(e!); - } - - if (shouldRetry) { + if (this.shouldRetryRequest(error, retry)) { const toSleep = Math.random() * delay; setTimeout(() => { now = new Date(); @@ -246,6 +249,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { timeout = Math.min(timeoutCal, rpcTimeout, newDeadline); }, toSleep); } else { + const e = GoogleError.parseGRPCStatusDetails(error); e.note = 'Exception occurred in retry method that was ' + 'not classified as transient'; @@ -377,13 +381,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { const timeout = retry.backoffSettings.totalTimeoutMillis; const maxRetries = retry.backoffSettings.maxRetries!; if ((maxRetries && maxRetries > 0) || (timeout && timeout > 0)) { - const e = GoogleError.parseGRPCStatusDetails(error); - let shouldRetry = this.defaultShouldRetry(e!, retry); - if (retry.shouldRetryFn) { - shouldRetry = retry.shouldRetryFn(e!); - } - - if (shouldRetry) { + if (this.shouldRetryRequest(error, retry)) { if (maxRetries && timeout!) { const newError = new GoogleError( 'Cannot set both totalTimeoutMillis and maxRetries ' + diff --git a/gax/test/test-application/src/index.ts b/gax/test/test-application/src/index.ts index c83a733f3..d7ce01a4c 100644 --- a/gax/test/test-application/src/index.ts +++ b/gax/test/test-application/src/index.ts @@ -151,6 +151,10 @@ async function testShowcase() { ); await testErrorMaxRetries0(grpcSequenceClientWithServerStreamingRetries); + await testServerStreamingRetriesImmediatelywithRetryOptions( + grpcSequenceClientWithServerStreamingRetries + ); + // ensure legacy tests pass with streaming retries client await testEcho(grpcClientWithServerStreamingRetries); await testEchoError(grpcClientWithServerStreamingRetries); @@ -1219,6 +1223,66 @@ async function testErrorMaxRetries0(client: SequenceServiceClient) { }); }); } +// a streaming call that retries two times and finishes successfully +async function testServerStreamingRetriesImmediatelywithRetryOptions( + client: SequenceServiceClient +) { + const finalData: string[] = []; + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + + const retryOptions = new RetryOptions([14, 4], backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], + [0.1, 0.1, 0.1], + [0, 2, 11], + 'This is testing the brand new and shiny StreamingSequence server 3' + ); + + const response = await client.createStreamingSequence(request); + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', error => { + reject(error); + }); + attemptStream.on('end', () => { + attemptStream.end(); + + resolve(); + }); + }).then(() => { + assert.equal( + finalData.join(' '), + 'This is This is testing the brand new and shiny StreamingSequence server 3' + ); + }); +} async function main() { const showcaseServer = new ShowcaseServer(); From 6d1841f1eb35651d7798aa89e1f3168b2bb50c30 Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Fri, 2 Aug 2024 13:06:12 -0400 Subject: [PATCH 2/2] add comments --- gax/test/test-application/src/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gax/test/test-application/src/index.ts b/gax/test/test-application/src/index.ts index d7ce01a4c..f4500170f 100644 --- a/gax/test/test-application/src/index.ts +++ b/gax/test/test-application/src/index.ts @@ -1238,6 +1238,7 @@ async function testServerStreamingRetriesImmediatelywithRetryOptions( 10000 ); + // allow the two codes we are going to send as errors const retryOptions = new RetryOptions([14, 4], backoffSettings); const settings = { @@ -1246,6 +1247,7 @@ async function testServerStreamingRetriesImmediatelywithRetryOptions( client.initialize(); + // errors immediately, then again after sending "This is" const request = createStreamingSequenceRequestFactory( [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], [0.1, 0.1, 0.1],