Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't emit error event during stream handoff #1592

Merged
merged 3 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions gax/src/streamingRetryRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ export function streamingRetryRequest(opts: streamingRetryRequestOptions) {
// No more attempts need to be made, just continue on.
retryStream.emit('response', response);
delayStream.pipe(retryStream);
requestStream.on('error', (err: GoogleError) => {
retryStream.destroy(err);
requestStream.on('error', () => {
// retryStream must be destroyed here for the stream handoff part of retries to function properly
// but the error event should not be passed - if it emits as part of .destroy()
// it will bubble up early to the caller
retryStream.destroy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future idea, this will be a great place to add some tracing whenever we add that.

});
}
}
116 changes: 47 additions & 69 deletions gax/test/test-application/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ async function testWait(client: EchoClient) {
assert.deepStrictEqual(response.content, request.success.content);
}

// a successful streaming call that has retry options passed but does not retry
async function testServerStreamingRetryOptions(client: SequenceServiceClient) {
const finalData: string[] = [];
const backoffSettings = createBackoffSettings(
Expand Down Expand Up @@ -495,7 +496,7 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) {
);

const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -509,8 +510,9 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) {
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
//Do Nothing
attemptStream.on('error', (error: GoogleError) => {
// should not reach this
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -524,6 +526,7 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) {
});
}

// a streaming call that retries two times and finishes successfully
async function testServerStreamingRetrieswithRetryOptions(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -554,7 +557,7 @@ async function testServerStreamingRetrieswithRetryOptions(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -568,8 +571,8 @@ async function testServerStreamingRetrieswithRetryOptions(
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
//Do Nothing
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -583,6 +586,7 @@ async function testServerStreamingRetrieswithRetryOptions(
});
}

// a streaming call that retries twice using shouldRetryFn and finally succeeds
async function testServerStreamingRetriesWithShouldRetryFn(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -617,7 +621,7 @@ async function testServerStreamingRetriesWithShouldRetryFn(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];
const attemptRequest =
new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest();
Expand All @@ -630,8 +634,9 @@ async function testServerStreamingRetriesWithShouldRetryFn(
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
// Do nothing
attemptStream.on('error', error => {
// should not reach this
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -645,6 +650,7 @@ async function testServerStreamingRetriesWithShouldRetryFn(
});
}

// streaming call that retries twice using RetryRequestOptions instead of RetryOptions
async function testServerStreamingRetrieswithRetryRequestOptions(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -676,7 +682,7 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -690,8 +696,8 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
// Do Nothing
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -704,6 +710,8 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
);
});
}

// streaming call that retries twice with RetryRequestOpsions and resumes from where it left off
async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -748,7 +756,7 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate
'This is testing the brand new and shiny StreamingSequence server 3'
);
const response = await client.createStreamingSequence(request);
await new Promise<void>(resolve => {
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -762,8 +770,8 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', () => {
// do nothing
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
Expand All @@ -777,6 +785,7 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate
});
}

// retries twice but fails with an error not from the streaming sequence
async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResumptionStrategy(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -816,9 +825,8 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum
[1, 2, 11],
'This is testing the brand new and shiny StreamingSequence server 3'
);
const allowedCodes = [4, 14];
const response = await client.createStreamingSequence(request);
await new Promise<void>((_, reject) => {
await new Promise<void>(resolve => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -831,21 +839,14 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum
);

attemptStream.on('error', (e: GoogleError) => {
if (!allowedCodes.includes(e.code!)) {
reject(e);
}
assert.strictEqual(e.code, 3);
assert.match(e.note!, /not classified as transient/);
resolve();
});
}).then(
() => {
assert(false);
},
(err: GoogleError) => {
assert.strictEqual(err.code, 3);
assert.match(err.note!, /not classified as transient/);
}
);
});
}

// fails on the first error in the sequence
async function testServerStreamingThrowsClassifiedTransientErrorNote(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -875,7 +876,7 @@ async function testServerStreamingThrowsClassifiedTransientErrorNote(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>((_, reject) => {
await new Promise<void>(resolve => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -887,21 +888,14 @@ async function testServerStreamingThrowsClassifiedTransientErrorNote(
settings
);
attemptStream.on('error', (e: GoogleError) => {
if (!allowedCodes.includes(e.code!)) {
reject(e);
}
assert.strictEqual(e.code, 14);
assert.match(e.note!, /not classified as transient/);
resolve();
});
}).then(
() => {
assert(false);
},
(err: GoogleError) => {
assert.strictEqual(err.code, 14);
assert.match(err.note!, /not classified as transient/);
}
);
});
}

// retries once and fails on the second error in the sequence
async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote(
client: SequenceServiceClient
) {
Expand Down Expand Up @@ -931,7 +925,7 @@ async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>((_, reject) => {
await new Promise<void>(resolve => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -943,19 +937,11 @@ async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote(
settings
);
attemptStream.on('error', (e: GoogleError) => {
if (!allowedCodes.includes(e.code!)) {
reject(e);
}
assert.strictEqual(e.code, 4);
assert.match(e.note!, /not classified as transient/);
resolve();
});
}).then(
() => {
assert(false);
},
(err: GoogleError) => {
assert.strictEqual(err.code, 4);
assert.match(err.note!, /not classified as transient/);
}
);
});
}

async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries(
Expand Down Expand Up @@ -988,7 +974,7 @@ async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries(
);

const response = await client.createStreamingSequence(request);
await new Promise<void>((_, reject) => {
await new Promise<void>(resolve => {
const sequence = response[0];

const attemptRequest =
Expand All @@ -1000,22 +986,14 @@ async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries(
settings
);
attemptStream.on('error', (e: GoogleError) => {
if (!allowedCodes.includes(e.code!)) {
reject(e);
}
});
}).then(
() => {
assert(false);
},
(err: GoogleError) => {
assert.strictEqual(err.code, 3);
assert.strictEqual(e.code, 3);
assert.match(
err.message,
e.message,
/Cannot set both totalTimeoutMillis and maxRetries/
);
}
);
resolve();
});
});
}

async function main() {
Expand Down
Loading