diff --git a/lib/storage/file.js b/lib/storage/file.js
index 78004c31916..18695cbb7ef 100644
--- a/lib/storage/file.js
+++ b/lib/storage/file.js
@@ -211,6 +211,19 @@ File.prototype.copy = function(destination, callback) {
* piped to a writable stream or listened to for 'data' events to read a file's
* contents.
*
+ * In the unlikely event there is a mismatch between what you downloaded and the
+ * version in your Bucket, your error handler will receive an error with code
+ * "CONTENT_DOWNLOAD_MISMATCH". If you receive this error, the best recourse is
+ * to try downloading the file again.
+ *
+ * @param {object=} options - Configuration object.
+ * @param {string|boolean} options.validation - Possible values: `"md5"`,
+ * `"crc32c"`, or `false`. By default, data integrity is validated with an
+ * MD5 checksum for maximum reliability, falling back to CRC32c when an MD5
+ * hash wasn't returned from the API. CRC32c will provide better performance
+ * with less reliability. You may also choose to skip validation completely,
+ * however this is **not recommended**.
+ *
* @example
* //-
* //
Downloading a File
@@ -226,35 +239,133 @@ File.prototype.copy = function(destination, callback) {
* .pipe(fs.createWriteStream('/Users/stephen/Photos/image.png'))
* .on('error', function(err) {});
*/
-File.prototype.createReadStream = function() {
- var storage = this.bucket.storage;
- var dup = duplexify();
- function createAuthorizedReq(uri) {
- var reqOpts = { uri: uri };
- storage.makeAuthorizedRequest_(reqOpts, {
- onAuthorized: function(err, authorizedReqOpts) {
- if (err) {
- dup.emit('error', err);
- dup.end();
- return;
- }
- dup.setReadable(request(authorizedReqOpts));
- }
- });
+File.prototype.createReadStream = function(options) {
+ options = options || {};
+
+ var that = this;
+ var throughStream = through();
+
+ var validations = ['crc32c', 'md5'];
+ var validation;
+
+ if (util.is(options.validation, 'string')) {
+ options.validation = options.validation.toLowerCase();
+
+ if (validations.indexOf(options.validation) > -1) {
+ validation = options.validation;
+ } else {
+ validation = 'all';
+ }
+ }
+
+ if (util.is(options.validation, 'undefined')) {
+ validation = 'all';
}
+
+ var crc32c = validation === 'crc32c' || validation === 'all';
+ var md5 = validation === 'md5' || validation === 'all';
+
if (this.metadata.mediaLink) {
createAuthorizedReq(this.metadata.mediaLink);
} else {
this.getMetadata(function(err, metadata) {
if (err) {
- dup.emit('error', err);
- dup.end();
+ throughStream.emit('error', err);
+ throughStream.end();
return;
}
+
createAuthorizedReq(metadata.mediaLink);
});
}
- return dup;
+
+ return throughStream;
+
+ // Authenticate the request, then pipe the remote API request to the stream
+ // returned to the user.
+ function createAuthorizedReq(uri) {
+ var reqOpts = {
+ uri: uri
+ };
+
+ that.bucket.storage.makeAuthorizedRequest_(reqOpts, {
+ onAuthorized: function(err, authorizedReqOpts) {
+ if (err) {
+ throughStream.emit('error', err);
+ throughStream.end();
+ return;
+ }
+
+ // For data integrity, hash the contents of the stream as we receive it
+ // from the server.
+ var localCrc32cHash;
+ var localMd5Hash = crypto.createHash('md5');
+
+ request(authorizedReqOpts)
+ .on('error', function(err) {
+ throughStream.emit('error', err);
+ throughStream.end();
+ })
+
+ .on('data', function(chunk) {
+ if (crc32c) {
+ localCrc32cHash = crc.calculate(chunk, localCrc32cHash);
+ }
+
+ if (md5) {
+ localMd5Hash.update(chunk);
+ }
+ })
+
+ .on('complete', function(res) {
+ var failed = false;
+ var crcFail = true;
+ var md5Fail = true;
+
+ var hashes = {};
+ res.headers['x-goog-hash'].split(',').forEach(function(hash) {
+ var hashType = hash.split('=')[0];
+ hashes[hashType] = hash.substr(hash.indexOf('=') + 1);
+ });
+
+ var remoteMd5 = hashes.md5;
+ var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4);
+
+ if (crc32c) {
+ crcFail =
+ new Buffer([localCrc32cHash]).toString('base64') !== remoteCrc;
+ failed = crcFail;
+ }
+
+ if (md5) {
+ md5Fail = localMd5Hash.digest('base64') !== remoteMd5;
+ failed = md5Fail;
+ }
+
+ if (validation === 'all') {
+ failed = remoteMd5 ? md5Fail : crcFail;
+ }
+
+ if (failed) {
+ var error = new Error([
+ 'The downloaded data did not match the data from the server.',
+ 'To be sure the content is the same, you should download the',
+ 'file again.'
+ ].join(' '));
+ error.code = 'CONTENT_DOWNLOAD_MISMATCH';
+
+ throughStream.emit('error', error);
+ } else {
+ throughStream.emit('complete');
+ }
+
+ throughStream.end();
+ })
+
+ .pipe(throughStream);
+ }
+ });
+ }
};
/**
@@ -688,7 +799,7 @@ File.prototype.startResumableUpload_ = function(stream, metadata) {
method: 'PUT',
uri: resumableUri
}, {
- onAuthorized: function (err, reqOpts) {
+ onAuthorized: function(err, reqOpts) {
if (err) {
handleError(err);
return;
diff --git a/regression/storage.js b/regression/storage.js
index c132ba5c3e1..43be0e8626c 100644
--- a/regression/storage.js
+++ b/regression/storage.js
@@ -322,12 +322,17 @@ describe('storage', function() {
writeStream.on('error', done);
writeStream.on('complete', function() {
+ var data = new Buffer('');
+
file.createReadStream()
+ .on('error', done)
.on('data', function(chunk) {
- assert.equal(String(chunk), contents);
+ data = Buffer.concat([data, chunk]);
})
- .on('error', done)
- .on('end', done);
+ .on('complete', function() {
+ assert.equal(data.toString(), contents);
+ done();
+ });
});
});
diff --git a/test/storage/file.js b/test/storage/file.js
index e81c34d7d08..f6711964b7e 100644
--- a/test/storage/file.js
+++ b/test/storage/file.js
@@ -263,13 +263,15 @@ describe('File', function() {
});
it('should create an authorized request', function(done) {
- request_Override = function(opts) {
+ file.bucket.storage.makeAuthorizedRequest_ = function(opts) {
assert.equal(opts.uri, metadata.mediaLink);
done();
};
+
file.getMetadata = function(callback) {
callback(null, metadata);
};
+
file.createReadStream();
});
@@ -292,33 +294,129 @@ describe('File', function() {
it('should get readable stream from request', function(done) {
var fakeRequest = { a: 'b', c: 'd' };
- file.getMetadata = function(callback) {
- callback(null, metadata);
- };
+
+ // Faking a stream implementation so we can simulate an actual Request
+ // request. The only thing we want to know is if the data passed to
+ // request was correct.
request_Override = function(req) {
+ if (!(this instanceof request_Override)) {
+ return new request_Override(req);
+ }
+
+ stream.Readable.call(this);
+ this._read = util.noop;
+
assert.deepEqual(req, fakeRequest);
done();
};
- file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) {
- (callback.onAuthorized || callback)(null, fakeRequest);
- };
- file.createReadStream();
- });
+ nodeutil.inherits(request_Override, stream.Readable);
- it('should set readable stream', function() {
- var dup = duplexify();
file.getMetadata = function(callback) {
callback(null, metadata);
};
- request_Override = function() {
- return dup;
- };
+
file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) {
- (callback.onAuthorized || callback)();
+ (callback.onAuthorized || callback)(null, fakeRequest);
};
+
file.createReadStream();
- assert.deepEqual(readableStream, dup);
- readableStream = null;
+ });
+
+ describe('validation', function() {
+ var data = 'test';
+
+ var crc32cBase64 = new Buffer([crc.calculate(data)]).toString('base64');
+
+ var md5HashBase64 = crypto.createHash('md5');
+ md5HashBase64.update(data);
+ md5HashBase64 = md5HashBase64.digest('base64');
+
+ var fakeResponse = {
+ crc32c: {
+ headers: { 'x-goog-hash': 'crc32c=####' + crc32cBase64 }
+ },
+ md5: {
+ headers: { 'x-goog-hash': 'md5=' + md5HashBase64 }
+ }
+ };
+
+ function getFakeRequest(data, fakeResponse) {
+ function FakeRequest(req) {
+ if (!(this instanceof FakeRequest)) {
+ return new FakeRequest(req);
+ }
+
+ var that = this;
+
+ stream.Readable.call(this);
+ this._read = function() {
+ this.push(data);
+ this.push(null);
+ };
+
+ setImmediate(function() {
+ that.emit('complete', fakeResponse);
+ });
+ }
+ nodeutil.inherits(FakeRequest, stream.Readable);
+ return FakeRequest;
+ }
+
+ beforeEach(function() {
+ file.metadata.mediaLink = 'http://uri';
+
+ file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) {
+ (callback.onAuthorized || callback)(null, {});
+ };
+ });
+
+ it('should validate with crc32c', function(done) {
+ request_Override = getFakeRequest(data, fakeResponse.crc32c);
+
+ file.createReadStream({ validation: 'crc32c' })
+ .on('error', done)
+ .on('complete', done);
+ });
+
+ it('should emit an error if crc32c validation fails', function(done) {
+ request_Override = getFakeRequest('bad-data', fakeResponse.crc32c);
+
+ file.createReadStream({ validation: 'crc32c' })
+ .on('error', function(err) {
+ assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH');
+ done();
+ });
+ });
+
+ it('should validate with md5', function(done) {
+ request_Override = getFakeRequest(data, fakeResponse.md5);
+
+ file.createReadStream({ validation: 'md5' })
+ .on('error', done)
+ .on('complete', done);
+ });
+
+ it('should emit an error if md5 validation fails', function(done) {
+ request_Override = getFakeRequest('bad-data', fakeResponse.crc32c);
+
+ file.createReadStream({ validation: 'md5' })
+ .on('error', function(err) {
+ assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH');
+ done();
+ });
+ });
+
+ it('should default to md5 validation', function(done) {
+ request_Override = getFakeRequest(data, {
+ headers: { 'x-goog-hash': 'md5=fakefakefake' }
+ });
+
+ file.createReadStream()
+ .on('error', function(err) {
+ assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH');
+ done();
+ });
+ });
});
});