Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce duplicate code in streaming retries and add a test #1636

Merged
merged 2 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions gax/src/streamingCalls/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
this.gaxServerStreamingRetries = gaxServerStreamingRetries;
}

private shouldRetryRequest(error: Error, retry: RetryOptions): boolean {
const e = GoogleError.parseGRPCStatusDetails(error);
let shouldRetry = this.defaultShouldRetry(e!, retry);
if (retry.shouldRetryFn) {
shouldRetry = retry.shouldRetryFn(e!);
}
return shouldRetry;
}

cancel() {
if (this.stream) {
this.stream.cancel();
Expand Down Expand Up @@ -228,13 +237,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
}

this.retries!++;
const e = GoogleError.parseGRPCStatusDetails(error);
let shouldRetry = this.defaultShouldRetry(e!, retry);
if (retry.shouldRetryFn) {
shouldRetry = retry.shouldRetryFn(e!);
}

if (shouldRetry) {
if (this.shouldRetryRequest(error, retry)) {
const toSleep = Math.random() * delay;
setTimeout(() => {
now = new Date();
Expand All @@ -246,6 +249,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
timeout = Math.min(timeoutCal, rpcTimeout, newDeadline);
}, toSleep);
} else {
const e = GoogleError.parseGRPCStatusDetails(error);
e.note =
'Exception occurred in retry method that was ' +
'not classified as transient';
Expand Down Expand Up @@ -377,13 +381,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
const timeout = retry.backoffSettings.totalTimeoutMillis;
const maxRetries = retry.backoffSettings.maxRetries!;
if ((maxRetries && maxRetries > 0) || (timeout && timeout > 0)) {
const e = GoogleError.parseGRPCStatusDetails(error);
let shouldRetry = this.defaultShouldRetry(e!, retry);
if (retry.shouldRetryFn) {
shouldRetry = retry.shouldRetryFn(e!);
}

if (shouldRetry) {
if (this.shouldRetryRequest(error, retry)) {
if (maxRetries && timeout!) {
const newError = new GoogleError(
'Cannot set both totalTimeoutMillis and maxRetries ' +
Expand Down
66 changes: 66 additions & 0 deletions gax/test/test-application/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ async function testShowcase() {
);

await testErrorMaxRetries0(grpcSequenceClientWithServerStreamingRetries);
await testServerStreamingRetriesImmediatelywithRetryOptions(
grpcSequenceClientWithServerStreamingRetries
);

// ensure legacy tests pass with streaming retries client
await testEcho(grpcClientWithServerStreamingRetries);
await testEchoError(grpcClientWithServerStreamingRetries);
Expand Down Expand Up @@ -1219,6 +1223,68 @@ async function testErrorMaxRetries0(client: SequenceServiceClient) {
});
});
}
// a streaming call that retries two times and finishes successfully
async function testServerStreamingRetriesImmediatelywithRetryOptions(
client: SequenceServiceClient
) {
const finalData: string[] = [];
const backoffSettings = createBackoffSettings(
100,
1.2,
1000,
null,
1.5,
3000,
10000
);

// allow the two codes we are going to send as errors
const retryOptions = new RetryOptions([14, 4], backoffSettings);

const settings = {
retry: retryOptions,
};

client.initialize();

// errors immediately, then again after sending "This is"
const request = createStreamingSequenceRequestFactory(
[Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK],
[0.1, 0.1, 0.1],
[0, 2, 11],
'This is testing the brand new and shiny StreamingSequence server 3'
);

const response = await client.createStreamingSequence(request);
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest();
attemptRequest.name = sequence.name!;

const attemptStream = client.attemptStreamingSequence(
attemptRequest,
settings
);
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();

resolve();
});
}).then(() => {
assert.equal(
finalData.join(' '),
'This is This is testing the brand new and shiny StreamingSequence server 3'
);
});
}

async function main() {
const showcaseServer = new ShowcaseServer();
Expand Down
Loading