Skip to content

Commit

Permalink
Merge pull request #746 from stephenplusplus/spp--storage-readstream-…
Browse files Browse the repository at this point in the history
…response-handling

storage: handle errors in read stream
  • Loading branch information
callmehiphop committed Jul 27, 2015
2 parents 572c9d5 + 19d7b87 commit 7efd2a8
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 46 deletions.
15 changes: 10 additions & 5 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,14 @@ function ApiError(errorBody) {
Error.captureStackTrace(this, arguments.callee);
this.errors = errorBody.errors;
this.code = errorBody.code;
this.message = errorBody.message;
this.message = errorBody.message || 'Error during request.';
this.response = errorBody.response;
}

nodeutil.inherits(ApiError, Error);

util.ApiError = ApiError;

/**
* Uniformly process an API response.
*
Expand Down Expand Up @@ -211,10 +213,10 @@ function parseApiResp(err, resp, body) {

if (resp.statusCode < 200 || resp.statusCode > 299) {
// Unknown error. Format according to ApiError standard.
parsedResp.err = new ApiError({
parsedResp.err = new util.ApiError({
errors: [],
code: resp.statusCode,
message: 'Error during request.',
message: resp.statusMessage,
response: resp
});
}
Expand All @@ -227,7 +229,7 @@ function parseApiResp(err, resp, body) {

if (parsedResp.body && parsedResp.body.error) {
// Error from JSON API.
parsedResp.err = new ApiError(parsedResp.body.error);
parsedResp.err = new util.ApiError(parsedResp.body.error);
}

return parsedResp;
Expand Down Expand Up @@ -716,9 +718,12 @@ function makeRequest(reqOpts, config, callback) {
};

if (config.stream) {
var requestStream = retryRequest(reqOpts, options);
config.stream.abort = requestStream.abort;

// `streamForward` is used to re-emit the events the request stream receives
// on to the stream the user is holding (config.stream).
streamForward(retryRequest(reqOpts, options)).pipe(config.stream);
streamForward(requestStream).pipe(config.stream);
} else {
retryRequest(reqOpts, options, function(err, response, body) {
util.handleResp(err, response, body, callback);
Expand Down
81 changes: 44 additions & 37 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -538,51 +538,58 @@ File.prototype.createReadStream = function(options) {
.on('response', throughStream.emit.bind(throughStream, 'response'))

.on('complete', function(res) {
if (rangeRequest) {
// Range requests can't receive data integrity checks.
endThroughStream(null, res);
return;
}
util.handleResp(null, res, null, function(err) {
if (err) {
endThroughStream(err);
return;
}

var failed = false;
var crcFail = true;
var md5Fail = true;
if (rangeRequest) {
// Range requests can't receive data integrity checks.
endThroughStream(null, res);
return;
}

var hashes = {};
res.headers['x-goog-hash'].split(',').forEach(function(hash) {
var hashType = hash.split('=')[0].trim();
hashes[hashType] = hash.substr(hash.indexOf('=') + 1);
});
var failed = false;
var crcFail = true;
var md5Fail = true;

var remoteMd5 = hashes.md5;
var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4);
var hashes = {};
res.headers['x-goog-hash'].split(',').forEach(function(hash) {
var hashType = hash.split('=')[0].trim();
hashes[hashType] = hash.substr(hash.indexOf('=') + 1);
});

if (crc32c) {
crcFail = localCrcHash !== remoteCrc;
failed = crcFail;
}
var remoteMd5 = hashes.md5;
var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4);

if (md5) {
md5Fail = localMd5Hash !== remoteMd5;
failed = md5Fail;
}
if (crc32c) {
crcFail = localCrcHash !== remoteCrc;
failed = crcFail;
}

if (validation === 'all') {
failed = remoteMd5 ? md5Fail : crcFail;
}
if (md5) {
md5Fail = localMd5Hash !== remoteMd5;
failed = md5Fail;
}

if (failed) {
var mismatchError = new Error([
'The downloaded data did not match the data from the server.',
'To be sure the content is the same, you should download the',
'file again.'
].join(' '));
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';
if (validation === 'all') {
failed = remoteMd5 ? md5Fail : crcFail;
}

endThroughStream(mismatchError, res);
} else {
endThroughStream(null, res);
}
if (failed) {
var mismatchError = new Error([
'The downloaded data did not match the data from the server.',
'To be sure the content is the same, you should download the',
'file again.'
].join(' '));
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';

endThroughStream(mismatchError, res);
} else {
endThroughStream(null, res);
}
});
})

.pipe(throughStream)
Expand Down
9 changes: 9 additions & 0 deletions system-test/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,15 @@ describe('storage', function() {
});
});

it('should handle non-network errors', function(done) {
var file = bucket.file('hi.jpg');
file.download(function(err) {
assert.strictEqual(err.code, 404);
assert.strictEqual(err.message, 'Not Found');
done();
});
});

it('should upload a gzipped file and download it', function(done) {
var options = {
metadata: {
Expand Down
60 changes: 56 additions & 4 deletions test/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,36 @@ describe('common/util', function() {
});
});

describe('ApiError', function() {
it('should build correct ApiError', function() {
var error = {
errors: [ new Error(), new Error() ],
code: 100,
message: 'Uh oh',
response: { a: 'b', c: 'd' }
};

var apiError = new util.ApiError(error);

assert.strictEqual(apiError.errors, error.errors);
assert.strictEqual(apiError.code, error.code);
assert.strictEqual(apiError.message, error.message);
assert.strictEqual(apiError.response, error.response);
});

it('should build ApiError with default status message', function() {
var error = {
errors: [],
code: 100,
response: { a: 'b', c: 'd' }
};

var apiError = new util.ApiError(error);

assert.strictEqual(apiError.message, 'Error during request.');
});
});

describe('extendGlobalConfig', function() {
it('should favor `keyFilename` when `credentials` is global', function() {
var globalConfig = { credentials: {} };
Expand Down Expand Up @@ -208,11 +238,20 @@ describe('common/util', function() {
});

describe('parseApiResp', function() {
it('should return err code if there are not other errors', function() {
var parsedApiResp = util.parseApiResp(null, { statusCode: 400 });
describe('non-200s response status', function() {
it('should build ApiError with status and message', function(done) {
var error = { statusCode: 400, statusMessage: 'Not Good' };

utilOverrides.ApiError = function(error_) {
assert.strictEqual(error_.code, error.statusCode);
assert.strictEqual(error_.message, error.statusMessage);
assert.strictEqual(error_.response, error);

assert.strictEqual(parsedApiResp.err.code, 400);
assert.strictEqual(parsedApiResp.err.message, 'Error during request.');
done();
};

util.parseApiResp(null, error);
});
});

it('should detect body errors', function() {
Expand Down Expand Up @@ -1004,6 +1043,19 @@ describe('common/util', function() {
util.makeRequest(reqOpts, {});
});

it('should expose the abort method from retryRequest', function(done) {
var userStream = new stream.Stream();

retryRequestOverride = function() {
var requestStream = new stream.Stream();
requestStream.abort = done;
return requestStream;
};

util.makeRequest(reqOpts, { stream: userStream });
userStream.abort();
});

it('should allow turning off retries to retryRequest', function(done) {
retryRequestOverride = testNoRetryRequestConfig(done);
util.makeRequest(reqOpts, noRetryRequestConfig);
Expand Down
21 changes: 21 additions & 0 deletions test/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,27 @@ describe('File', function() {
});
});

it('should let util.handleResp handle the response', function(done) {
var response = { a: 'b', c: 'd' };

handleRespOverride = function(err, response_, body) {
assert.strictEqual(err, null);
assert.strictEqual(response_, response);
assert.strictEqual(body, null);
done();
};

file.bucket.storage.makeAuthorizedRequest_ = function() {
var stream = through();
setImmediate(function() {
stream.emit('complete', response);
});
return stream;
};

file.createReadStream();
});

describe('errors', function() {
var ERROR = new Error('Error.');

Expand Down

0 comments on commit 7efd2a8

Please sign in to comment.