Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Reset the retry counter to 0 when receiving data #1604

Merged
merged 22 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion gax/src/streamingCalls/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
*/
streamHandoffHelper(stream: CancellableStream, retry: RetryOptions): void {
let enteredError = false;
const eventsToForward = ['metadata', 'response', 'status', 'data'];
const eventsToForward = ['metadata', 'response', 'status'];

eventsToForward.forEach(event => {
stream.on(event, this.emit.bind(this, event));
Expand All @@ -282,6 +282,11 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
this.streamHandoffErrorHandler(stream, retry, error);
});

stream.on('data', (data: ResponseType) => {
this.retries = 0;
this.emit.bind(this, 'data')(data);
});

stream.on('end', () => {
if (!enteredError) {
enteredError = true;
Expand Down
79 changes: 79 additions & 0 deletions gax/test/test-application/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ async function testShowcase() {
const restClient = new EchoClient(restClientOpts);
const restClientCompat = new EchoClient(restClientOptsCompat);

await testResetRetriesToZero(grpcSequenceClientWithServerStreamingRetries);

// assuming gRPC server is started locally
await testEcho(grpcClient);
await testEchoError(grpcClient);
Expand Down Expand Up @@ -714,6 +716,83 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
});
}

// When the stream recieves data then the retry count should be set to 0
async function testResetRetriesToZero(client: SequenceServiceClient) {
const finalData: string[] = [];
const shouldRetryFn = (error: GoogleError) => {
return [4, 5, 6, 7].includes(error!.code!);
};
const backoffSettings = createBackoffSettings(
10000,
2.5,
1000,
null,
1.5,
3000,
null
);
// intentionally set maxRetries to a value less than
// the number of errors in the sequence
backoffSettings.maxRetries = 2;
danieljbruce marked this conversation as resolved.
Show resolved Hide resolved
const getResumptionRequestFn = (request: RequestType) => {
return request;
};

const retryOptions = new RetryOptions(
[],
backoffSettings,
shouldRetryFn,
getResumptionRequestFn
);

const settings = {
retry: retryOptions,
};

client.initialize();

const request = createStreamingSequenceRequestFactory(
[
Status.DEADLINE_EXCEEDED,
Status.NOT_FOUND,
Status.ALREADY_EXISTS,
Status.PERMISSION_DENIED,
Status.OK,
],
[0.1, 0.1, 0.1, 0.1, 0.1],
[1, 2, 3, 4, 5],
'This is testing the brand new and shiny StreamingSequence server 3'
leahecole marked this conversation as resolved.
Show resolved Hide resolved
);
const response = await client.createStreamingSequence(request);
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest();
attemptRequest.name = sequence.name!;

const attemptStream = client.attemptStreamingSequence(
attemptRequest,
settings
);
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
resolve();
});
}).then(() => {
assert.deepStrictEqual(
finalData.join(' '),
'This This is This is testing This is testing the This is testing the brand'
);
});
}

// When maxRetries are set to 2 then on the third error from the server gax
// should throw an error that says the retry count has been exceeded.
async function testShouldFailOnThirdError(client: SequenceServiceClient) {
Expand Down
1 change: 0 additions & 1 deletion gax/test/unit/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,6 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
const s = new PassThrough({
objectMode: true,
});
s.push('hello');
setImmediate(() => {
s.emit('metadata');
});
Expand Down
Loading