Skip to content

Commit

Permalink
adds grpc retry decorator (#272)
Browse files Browse the repository at this point in the history
* adds grpc retry decorator

adds logging

* better var names :)

ensures function type before binding

wrong error code

* move comment to the right place
  • Loading branch information
hlminh2000 authored Jul 16, 2019
1 parent fd1cce0 commit ef61041
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 16 deletions.
41 changes: 30 additions & 11 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"grpc": "^1.20.3",
"lodash": "^4.17.14",
"node-fetch": "^2.3.0",
"retry": "^0.12.0",
"url-join": "^4.0.0"
},
"devDependencies": {
Expand Down
5 changes: 4 additions & 1 deletion server/services/ego/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import grpc from 'grpc';
import * as loader from '@grpc/proto-loader';
import { EGO_ROOT_GRPC } from '../../config';
import { withRetries } from '../../utils/grpcUtils';

const PROTO_PATH = __dirname + '/Ego.proto';
const packageDefinition = loader.loadSync(PROTO_PATH, {
Expand All @@ -17,7 +18,9 @@ const packageDefinition = loader.loadSync(PROTO_PATH, {

const proto = grpc.loadPackageDefinition(packageDefinition).bio.overture.ego.grpc;

const userService = new proto.UserService(EGO_ROOT_GRPC, grpc.credentials.createInsecure());
const userService = withRetries(
new proto.UserService(EGO_ROOT_GRPC, grpc.credentials.createInsecure()),
);

const getUser = async (id, jwt = null) => {
return await new Promise((resolve, reject) => {
Expand Down
8 changes: 4 additions & 4 deletions server/services/programService/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import * as loader from '@grpc/proto-loader';
import protoPath from '@icgc-argo/program-service-proto';

import { PROGRAM_SERVICE_ROOT } from '../../config';
import { getAuthMeta, wrapValue } from '../../utils/grpcUtils';
import { getAuthMeta, wrapValue, withRetries } from '../../utils/grpcUtils';
import retry from 'retry';

const packageDefinition = loader.loadSync(protoPath, {
keepCase: true,
Expand All @@ -20,9 +21,8 @@ const packageDefinition = loader.loadSync(protoPath, {

const proto = grpc.loadPackageDefinition(packageDefinition).program_service;

const programService = new proto.ProgramService(
PROGRAM_SERVICE_ROOT,
grpc.credentials.createInsecure(),
const programService = withRetries(
new proto.ProgramService(PROGRAM_SERVICE_ROOT, grpc.credentials.createInsecure()),
);

const defaultPromiseCallback = (resolve, reject) => (err, response) =>
Expand Down
59 changes: 59 additions & 0 deletions server/utils/grpcUtils.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import grpc from 'grpc';
import retry from 'retry';

/* When building gRPC requests we frequently need to provide a value as:
* { value: "asdf" }
Expand All @@ -16,3 +17,61 @@ export const getAuthMeta = jwt => {

return meta;
};

export const getGrpcMethodsNames = grpcService =>
Object.getOwnPropertyNames(grpcService.__proto__).filter(
name => !(name.search(/^\$/) > -1 || name === 'constructor'),
);

export const withRetries = (
grpcClient,
retryConfig = {
retries: 5,
factor: 3,
minTimeout: 1 * 1000,
maxTimeout: 60 * 1000,
randomize: true,
},
errorCodes = [],
) => {
const STREAM_REMOVED_ERROR = 2;
const methodNames = getGrpcMethodsNames(grpcClient).reduce(
//converts to a hasmap for run-time performance
(acc, methodName) => ({
...acc,
[methodName]: methodName,
}),
{},
);
const methodWithRetry = (methodName, originalMethod) => (payload, metadata, cb) => {
const operation = retry.operation(retryConfig);
operation.attempt(currentAttempt => {
originalMethod(payload, metadata, (err, response) => {
if (operation.retry(err) && [...errorCodes, STREAM_REMOVED_ERROR].includes(err.code)) {
console.warn(
`grpc method ${methodName} failed with errorCode ${
err.code
}, retrying after ${currentAttempt} attempt(s)`,
);
return;
}
cb(err, response);
});
});
};
return new Proxy(grpcClient, {
get: (client, methodName) => {
const originalValue = client[methodName];
if (typeof originalValue === 'function') {
const originalMethod = originalValue.bind(client);
if (methodNames[methodName]) {
return methodWithRetry(methodName, originalMethod);
} else {
return originalMethod;
}
} else {
return originalValue;
}
},
});
};

0 comments on commit ef61041

Please sign in to comment.