Skip to content

Commit

Permalink
feat: use proto3 JSON serializer for REGAPIC workflows (#1074)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-fenster authored Aug 5, 2021
1 parent 6616d3b commit 6ef89f1
Show file tree
Hide file tree
Showing 14 changed files with 615 additions and 425 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"is-stream-ended": "^0.1.4",
"node-fetch": "^2.6.1",
"object-hash": "^2.1.1",
"proto3-json-serializer": "^0.1.0",
"protobufjs": "6.11.2",
"retry-request": "^4.0.0"
},
Expand Down
326 changes: 68 additions & 258 deletions src/fallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,9 @@
* limitations under the License.
*/

/* global window */
/* global AbortController */

import {
hasWindowFetch,
hasTextEncoder,
hasTextDecoder,
isNodeJS,
hasAbortController,
} from './featureDetection';

if (!hasTextEncoder() || !hasTextDecoder()) {
if (isNodeJS()) {
// Node.js 10 does not have global TextDecoder
// TODO(@alexander-fenster): remove this logic after Node.js 10 is EOL.
// eslint-disable-next-line @typescript-eslint/no-var-requires
const util = require('util');
Object.assign(global, {
TextDecoder: util.TextDecoder,
TextEncoder: util.TextEncoder,
});
} else {
require('fast-text-encoding');
}
}

import * as protobuf from 'protobufjs';
import * as gax from './gax';
import nodeFetch from 'node-fetch';
import {Response as NodeFetchResponse} from 'node-fetch';
import * as routingHeader from './routingHeader';
import {AbortController as NodeAbortController} from 'abort-controller';
import {Status} from './status';
import {OutgoingHttpHeaders} from 'http';
import {
Expand All @@ -63,8 +34,12 @@ import {GrpcClientOptions, ClientStubOptions} from './grpc';
import {GaxCall, GRPCCall} from './apitypes';
import {Descriptor} from './descriptor';
import {createApiCall as _createApiCall} from './createApiCall';
import {GoogleErrorDecoder, FallbackServiceError} from './googleError';
import {transcode} from './transcoding';
import {FallbackServiceError} from './googleError';
import * as fallbackProto from './fallbackProto';
import * as fallbackRest from './fallbackRest';
import {isNodeJS} from './featureDetection';
import {generateServiceStub} from './fallbackServiceStub';

export {FallbackServiceError};
export {PathTemplate} from './pathTemplate';
export {routingHeader};
Expand All @@ -80,24 +55,30 @@ export {

export {StreamType} from './streamingCalls/streaming';

interface NodeFetchType {
(url: RequestInfo, init?: RequestInit): Promise<Response>;
}
export const defaultToObjectOptions = {
keepCase: false,
longs: String,
enums: String,
defaults: true,
oneofs: true,
};

const CLIENT_VERSION_HEADER = 'x-goog-api-client';

interface FallbackServiceStub {
[method: string]: Function;
export interface ServiceMethods {
[name: string]: protobuf.Method;
}

export type AuthClient =
| OAuth2Client
| Compute
| JWT
| UserRefreshClient
| BaseExternalAccountClient;

export class GrpcClient {
auth?: OAuth2Client | GoogleAuth;
authClient?:
| OAuth2Client
| Compute
| JWT
| UserRefreshClient
| BaseExternalAccountClient;
authClient?: AuthClient;
fallback: boolean | 'rest' | 'proto';
grpcVersion: string;
private static protoCache = new Map<string, protobuf.Root>();
Expand Down Expand Up @@ -163,7 +144,7 @@ export class GrpcClient {
return root;
}

private static getServiceMethods(service: protobuf.Service) {
private static getServiceMethods(service: protobuf.Service): ServiceMethods {
const methods: {[name: string]: protobuf.Method} = {};
for (const [methodName, methodObject] of Object.entries(service.methods)) {
const methodNameLowerCamelCase =
Expand Down Expand Up @@ -276,23 +257,6 @@ export class GrpcClient {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
customServicePath?: boolean
) {
// an RPC function to be passed to protobufjs RPC API
function serviceClientImpl(
method:
| protobuf.Method
| protobuf.rpc.ServiceMethod<
protobuf.Message<{}>,
protobuf.Message<{}>
>,
requestData: Uint8Array,
callback: protobuf.RPCImplCallback
) {
return [requestData, callback];
}

// decoder for google.rpc.Status messages
const statusDecoder = new GoogleErrorDecoder();

if (!this.authClient) {
if (this.auth && 'getClient' in this.auth) {
this.authClient = await this.auth.getClient();
Expand All @@ -303,210 +267,56 @@ export class GrpcClient {
if (!this.authClient) {
throw new Error('No authentication was provided');
}
const authHeader = await this.authClient.getRequestHeaders();
const serviceStub = service.create(
serviceClientImpl,
false,
false
) as unknown as FallbackServiceStub;

service.resolveAll();
const methods = GrpcClient.getServiceMethods(service);

// grpcCompatibleServiceStub methods accept four parameters:
// request, options, metadata, and callback - similar to
// the stub returned by grpc.ts
const grpcCompatibleServiceStub = service.create(
serviceClientImpl,
false,
false
) as unknown as FallbackServiceStub;
for (const [methodName, methodObject] of Object.entries(methods)) {
grpcCompatibleServiceStub[methodName] = (
req: {},
options: {[name: string]: string},
metadata: {},
callback: Function
) => {
const [requestData, serviceCallback] = serviceStub[methodName].apply(
serviceStub,
[
methodObject.resolvedRequestType!.fromObject(req),
(err: Error | null, response: protobuf.Message<{}>) => {
if (!err) {
// converts a protobuf message instance to a plain JavaScript object
// with enum and long conversion options specified
const responseObject =
methodObject.resolvedResponseType!.toObject(response, {
enums: String,
longs: String,
});
callback(null, responseObject);
} else {
callback(err);
}
},
]
);

const cancelController = hasAbortController()
? new AbortController()
: new NodeAbortController();
const cancelSignal = cancelController.signal;
let cancelRequested = false;
const protocol = opts.protocol || 'https';

const headers = Object.assign({}, authHeader);
for (const key of Object.keys(options)) {
headers[key] = options[key][0];
}

const grpcFallbackProtocol = opts.protocol || 'https';
let servicePath = opts.servicePath;
if (
!servicePath &&
service.options &&
service.options['(google.api.default_host)']
) {
servicePath = service.options['(google.api.default_host)'];
}
if (!servicePath) {
serviceCallback(new Error('Service path is undefined'));
return;
}

let servicePort;
const match = servicePath!.match(/^(.*):(\d+)$/);
if (match) {
servicePath = match[1];
servicePort = match[2];
}
if (opts.port) {
servicePort = opts.port;
} else if (!servicePort) {
servicePort = 443;
}

const protoNamespaces: string[] = [];
let currNamespace = methodObject.parent!;
while (currNamespace.name !== '') {
protoNamespaces.unshift(currNamespace.name);
currNamespace = currNamespace.parent!;
}
const protoServiceName = protoNamespaces.join('.');
const rpcName = methodObject.name;

let url: string;
let data: string;
let httpMethod: string;
let servicePath = opts.servicePath;
if (
!servicePath &&
service.options &&
service.options['(google.api.default_host)']
) {
servicePath = service.options['(google.api.default_host)'];
}
if (!servicePath) {
throw new Error(
`Cannot determine service API path for service ${service.name}.`
);
}

// TODO(@alexander-fenster): refactor this into separate function that prepares
// request object for `fetch`.
if (this.fallback === 'rest') {
// REGAPIC: JSON over HTTP/1 with gRPC trancoding
headers['Content-Type'] = 'application/json';
const decodedRequest =
methodObject.resolvedRequestType!.decode(requestData);
const requestJSON = methodObject.resolvedRequestType!.toObject(
// TODO: use toJSON instead of toObject
decodedRequest,
{
enums: String,
longs: String,
}
);
const transcoded = transcode(
requestJSON,
methodObject.parsedOptions,
methodObject.resolvedRequestType!.fields
);
if (!transcoded) {
throw new Error(
`Cannot build HTTP request for ${JSON.stringify(
requestJSON
)}, method: ${methodObject.name}`
);
}
httpMethod = transcoded.httpMethod;
data = JSON.stringify(transcoded.data);
url = `${grpcFallbackProtocol}://${servicePath}:${servicePort}/${transcoded.url.replace(
/^\//,
''
)}?${transcoded.queryString}`;
} else {
// gRPC-fallback: proto over HTTP/1
headers['Content-Type'] = 'application/x-protobuf';
httpMethod = 'post';
data = requestData;
url = `${grpcFallbackProtocol}://${servicePath}:${servicePort}/$rpc/${protoServiceName}/${rpcName}`;
}
let servicePort;
const match = servicePath!.match(/^(.*):(\d+)$/);
if (match) {
servicePath = match[1];
servicePort = parseInt(match[2]);
}
if (opts.port) {
servicePort = opts.port;
} else if (!servicePort) {
servicePort = 443;
}

const fetch = hasWindowFetch()
? window.fetch
: (nodeFetch as unknown as NodeFetchType);
const fetchRequest = {
headers,
body: data as string | undefined,
method: httpMethod,
signal: cancelSignal,
};
if (
httpMethod === 'get' ||
httpMethod === 'delete' ||
httpMethod === 'head'
) {
delete fetchRequest['body'];
}
fetch(url, fetchRequest)
.then((response: Response | NodeFetchResponse) => {
return Promise.all([
Promise.resolve(response.ok),
response.arrayBuffer(),
]);
})
.then(([ok, buffer]: [boolean, Buffer | ArrayBuffer]) => {
// TODO(@alexander-fenster): response processing to be moved
// to a separate function.
if (this.fallback === 'rest') {
// REGAPIC: JSON over HTTP/1
// eslint-disable-next-line node/no-unsupported-features/node-builtins
const decodedString = new TextDecoder().decode(buffer);
const response = JSON.parse(decodedString);
if (!ok) {
const error = Object.assign(
new Error(response['error']['message']),
response.error
);
throw error;
}
const message =
methodObject.resolvedResponseType!.fromObject(response);
const encoded = methodObject
.resolvedResponseType!.encode(message)
.finish();
serviceCallback(null, encoded);
} else {
// gRPC-fallback: proto over HTTP/1
if (!ok) {
const error = statusDecoder.decodeErrorFromBuffer(buffer);
throw error;
}
serviceCallback(null, new Uint8Array(buffer));
}
})
.catch((err: Error) => {
if (!cancelRequested || err.name !== 'AbortError') {
serviceCallback(err);
}
});
const encoder =
this.fallback === 'rest'
? fallbackRest.encodeRequest
: fallbackProto.encodeRequest;
const decoder =
this.fallback === 'rest'
? fallbackRest.decodeResponse
: fallbackProto.decodeResponse;
const serviceStub = generateServiceStub(
methods,
protocol,
servicePath,
servicePort,
this.authClient,
encoder,
decoder
);

return {
cancel: () => {
cancelRequested = true;
cancelController.abort();
},
};
};
}
return grpcCompatibleServiceStub;
return serviceStub;
}
}

Expand Down
Loading

0 comments on commit 6ef89f1

Please sign in to comment.