Skip to content

Commit

Permalink
fix: revert gRPC stream change in #1226 (#1257)
Browse files Browse the repository at this point in the history
  • Loading branch information
summer-ji-eng committed May 9, 2022
1 parent a50542e commit ed0844e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
9 changes: 7 additions & 2 deletions src/streamingCalls/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
retryRequestOptions: RetryRequestOptions = {}
) {
if (this.type === StreamType.SERVER_STREAMING) {
const stream = apiCall(argument, this._callback) as CancellableStream;
this.stream = stream;
if (this.rest) {
const stream = apiCall(argument, this._callback) as CancellableStream;
this.stream = stream;
this.setReadable(stream);
} else {
const retryStream = retryRequest(null, {
Expand All @@ -187,6 +187,11 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
}
return;
}
const stream = apiCall(
argument,
this._callback
) as CancellableStream;
this.stream = stream;
this.forwardEvents(stream);
return stream;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import * as gax from 'google-gax';
import {Callback, CallOptions, ClientOptions, Descriptors, GaxCall, LROperation, PaginationCallback,} from 'google-gax';
// @ts-ignore
import {RequestType} from 'google-gax/build/src/apitypes';
import {Transform} from 'stream';
import {PassThrough, Transform} from 'stream';

import * as protos from '../../protos/protos';

Expand All @@ -36,7 +36,7 @@ import jsonProtos = require('../../protos/protos.json');
*/
import * as gapicConfig from './echo_client_config.json';
//@ts-ignore
import {operationsProtos} from 'google-gax';
import {operationsProtos, GoogleError} from 'google-gax';
const version = require('../../package.json').version;

/**
Expand Down Expand Up @@ -290,6 +290,16 @@ export class EchoClient {
const callPromise = this.echoStub.then(
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
setImmediate(() => {
stream.emit(
'error',
new GoogleError('The client has already been closed.')
);
});
return stream;
}
return Promise.reject('The client has already been closed.');
}
const func = stub[methodName];
Expand Down
26 changes: 26 additions & 0 deletions test/fixtures/google-gax-packaging-test-app/test/gapic-v1beta1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,32 @@ describe('v1beta1.EchoClient', () => {
.calledWith(request, expectedOptions)
);
});

it('invokes expand with closed client', async () => {
const client = new echoModule.v1beta1.EchoClient({
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
client.initialize();
const request = generateSampleMessage(
new protos.google.showcase.v1beta1.ExpandRequest()
);
const expectedError = new Error('The client has already been closed.');
client.close();
const stream = client.expand(request);
const promise = new Promise((resolve, reject) => {
stream.on(
'data',
(response: protos.google.showcase.v1beta1.EchoResponse) => {
resolve(response);
}
);
stream.on('error', (err: Error) => {
reject(err);
});
});
await assert.rejects(promise, expectedError);
});
});

describe('chat', () => {
Expand Down

0 comments on commit ed0844e

Please sign in to comment.