Skip to content

Commit

Permalink
core: add retry logic to read streams
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jul 9, 2015
1 parent 7f7c2b1 commit bad080e
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 396 deletions.
168 changes: 104 additions & 64 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ var request = require('request').defaults({
maxSockets: Infinity
}
});
var retryRequest = require('retry-request');
var streamForward = require('stream-forward');
var through = require('through2');
var uuid = require('node-uuid');

/** @const {object} gcloud-node's package.json file. */
Expand Down Expand Up @@ -178,34 +181,68 @@ nodeutil.inherits(ApiError, Error);
*/
function handleResp(err, resp, body, callback) {
callback = callback || noop;

if (err) {
callback(err);
return;
}
if (typeof body === 'string') {
try {
body = JSON.parse(body);
} catch(err) {}
}
if (body && body.error) {
// Error from JSON api.
callback(new ApiError(body.error));

var parsedApiResponse = util.parseApiResp(resp, body);

if (parsedApiResponse.err) {
callback(parsedApiResponse.err);
return;
}
if (resp && (resp.statusCode < 200 || resp.statusCode > 299)) {

callback(null, parsedApiResponse.body, parsedApiResponse.resp);
}

util.handleResp = handleResp;

/**
* From an HTTP response, generate an error if one occurred.
*
* @param {*} resp - Response value.
* @param {*=} body - Body value.
* @return {object} parsedResponse - The parsed response.
* @param {?error} parsedResponse.err - An error detected.
* @param {object} parsedResponse.resp - The original response object.
* @param {*} parsedREsponse.body - The original body value provided will try to
* be JSON.parse'd. If it's successful, the parsed value will be returned
* here, otherwise the original value.
*/
function parseApiResp(resp, body) {
var parsedResponse = {
err: null,
resp: resp,
body: body
};

if (resp.statusCode < 200 || resp.statusCode > 299) {
// Unknown error. Format according to ApiError standard.
callback(new ApiError({
parsedResponse.err = new ApiError({
errors: [],
code: resp.statusCode,
message: body || 'Error during request.',
message: 'Error during request.',
response: resp
}));
return;
});
}

if (util.is(body, 'string')) {
try {
parsedResponse.body = JSON.parse(body);
} catch(err) {}
}
callback(null, body, resp);

if (parsedResponse.body && parsedResponse.body.error) {
// Error from JSON API.
parsedResponse.err = new ApiError(parsedResponse.body.error);
}

return parsedResponse;
}

util.handleResp = handleResp;
util.parseApiResp = parseApiResp;

/**
* Get the type of a value.
Expand Down Expand Up @@ -418,20 +455,6 @@ function makeWritableStream(dup, options, onComplete) {

util.makeWritableStream = makeWritableStream;

/**
* Returns an exponential distributed time to wait given the number of retries
* that have been previously been attempted on the request.
*
* @param {number} retryNumber - The number of retries previously attempted.
* @return {number} An exponentially distributed time to wait E.g. for use with
* exponential backoff.
*/
function getNextRetryWait(retryNumber) {
return (Math.pow(2, retryNumber) * 1000) + Math.floor(Math.random() * 1000);
}

util.getNextRetryWait = getNextRetryWait;

/**
* Returns true if the API request should be retried, given the error that was
* given the first time the request was attempted. This is used for rate limit
Expand Down Expand Up @@ -600,30 +623,47 @@ function makeAuthorizedRequestFactory(config) {
* request options.
*/
function makeAuthorizedRequest(reqOpts, callback) {
if (config.customEndpoint) {
// Using a custom API override. Do not use `google-auth-library` for
// authentication. (ex: connecting to a local Datastore server)
if (callback.onAuthorized) {
callback.onAuthorized(null, reqOpts);
} else {
util.makeRequest(reqOpts, config, callback);
}
var streamMode = !callback;
var stream;
var reqConfig = extend({}, config);

return;
callback = callback || util.noop;

if (streamMode) {
stream = through();
reqConfig.stream = stream;
}

util.authorizeRequest(config, reqOpts, function(err, authorizedReqOpts) {
function onAuthorized(err, authorizedReqOpts) {
if (err) {
(callback.onAuthorized || callback)(err);
if (streamMode) {
stream.emit('error', err);
stream.end();
} else {
(callback.onAuthorized || callback)(err);
}

return;
}

if (callback.onAuthorized) {
callback.onAuthorized(null, authorizedReqOpts);
} else {
util.makeRequest(authorizedReqOpts, config, callback);
util.makeRequest(authorizedReqOpts, reqConfig, callback);
}
});
}

if (reqConfig.customEndpoint) {
// Using a custom API override. Do not use `google-auth-library` for
// authentication. (ex: connecting to a local Datastore server)
onAuthorized(null, reqOpts);
} else {
util.authorizeRequest(reqConfig, reqOpts, onAuthorized);
}

if (streamMode) {
return stream;
}
}

makeAuthorizedRequest.getCredentials = function(callback) {
Expand Down Expand Up @@ -653,8 +693,8 @@ function makeAuthorizedRequestFactory(config) {
util.makeAuthorizedRequestFactory = makeAuthorizedRequestFactory;

/**
* Make a request through the `request` module with built-in error handling and
* exponential back off.
* Make a request through the `retryRequest` module with built-in error handling
* and exponential back off.
*
* @param {object} reqOpts - Request options in the format `request` expects.
* @param {object=} config - Configuration object.
Expand All @@ -674,33 +714,33 @@ function makeRequest(reqOpts, config, callback) {

config = config || {};

var MAX_RETRIES = config.maxRetries || 3;
var autoRetry = config.autoRetry !== false ? true : false;
var attemptedRetries = 0;
var options = {
request: request,

retries: config.autoRetry !== false ? config.maxRetries || 3 : 0,

shouldRetryFn: function(resp) {
var err = util.parseApiResp(resp).err;
return err && util.shouldRetryRequest(err);
}
};

reqOpts.headers = reqOpts.headers || {};
reqOpts.headers['User-Agent'] = USER_AGENT;

function shouldRetry(err) {
return autoRetry &&
MAX_RETRIES > attemptedRetries &&
util.shouldRetryRequest(err);
}
if (config.stream) {
var requestStream = retryRequest(reqOpts, options);

function makeRateLimitedRequest() {
request(reqOpts, function(err, resp, body) {
util.handleResp(err, resp, body, function(err, body, resp) {
if (shouldRetry(err)) {
var delay = util.getNextRetryWait(attemptedRetries++);
setTimeout(makeRateLimitedRequest, delay);
} else {
callback(err || null, body, resp);
}
});
// `streamForward` is used to re-emit the events the request stream receives
// on to the stream the user is holding.
var streamEventsToForward = ['error', 'response', 'end', 'complete'];

streamForward(requestStream, streamEventsToForward).pipe(config.stream);
} else {
retryRequest(reqOpts, options, function(err, response, body) {
util.handleResp(err, response, body, callback);
});
}

makeRateLimitedRequest();
}

util.makeRequest = makeRequest;
Loading

0 comments on commit bad080e

Please sign in to comment.