diff --git a/lib/storage/index.js b/lib/storage/index.js index f36c5cd2a42..1b52b899a21 100644 --- a/lib/storage/index.js +++ b/lib/storage/index.js @@ -20,8 +20,7 @@ 'use strict'; -var events = require('events'); -var fs = require('fs'); +var duplexify = require('duplexify'); var nodeutil = require('util'); var stream = require('stream'); var uuid = require('node-uuid'); @@ -72,85 +71,6 @@ BufferStream.prototype._read = function() { this.push(null); }; -/** - * A readable stream that streams the contents of a file. - * - * @private - * - * @constructor - * @mixes {stream#Readable} - * - * @param {module:storage~Bucket} bucket - Bucket the source file belongs to. - * @param {string} name - Name of the file to read from. - * - * @example - * ```js - * var myBucket = new Bucket({ - * bucketName: 'my-bucket' - * }); - * var readStream = new ReadStream(myBucket, 'file/to/fetch.pdf'); - * ``` - */ -function ReadStream(bucket, name) { - events.EventEmitter.call(this); - - this.bucket = bucket; - this.name = name; - this.remoteStream = null; - - this.open(); -} - -nodeutil.inherits(ReadStream, events.EventEmitter); - -/** - * Open a connection to retrieve a file. - */ -ReadStream.prototype.open = function() { - var that = this; - this.bucket.stat(this.name, function(err, metadata) { - if (err) { - that.emit('error', err); - return; - } - that.bucket.conn.createAuthorizedReq( - { uri: metadata.mediaLink }, function(err, req) { - if (err) { - that.emit('error', err); - return; - } - that.remoteStream = that.bucket.conn.requester(req); - that.remoteStream.on('complete', that.emit.bind(that, 'complete')); - that.emit('readable'); - }); - }); -}; - -/** - * Pipe the output to the destination stream with the provided options. - * - * @param {stream} dest - Destination stream to write to. - * @param {object} opts - Piping options. - * @return {stream} - */ -ReadStream.prototype.pipe = function(dest, opts) { - var that = this; - if (!that.remoteStream) { - return that.once('readable', function() { - that.pipe(dest, opts); - }); - } - // Register an on-data listener instead of piping, so we can avoid writing if - // the request ends up with a non-200 response. - that.remoteStream.on('data', function(data) { - if (!that.errored) { - that.emit('data', data); - dest.write(data); - } - }); - return dest; -}; - /** * Google Cloud Storage allows you to store data on Google infrastructure. See * the guide on {@link https://developers.google.com/storage} to create a @@ -312,8 +232,8 @@ Bucket.prototype.remove = function(name, callback) { /** * Create a readable stream to read contents of the provided remote file. It - * can be piped to a write stream, or listened to for 'data' and `complete` - * events to read a file's contents. + * can be piped to a write stream, or listened to for 'data' events to read a + * file's contents. * * @param {string} name - Name of the remote file. * @return {ReadStream} @@ -321,78 +241,108 @@ Bucket.prototype.remove = function(name, callback) { * @example * ```js * // Create a readable stream and write the file contents to "/path/to/file" - * bucket.createReadStream('filename') - * .pipe(fs.createWriteStream('/path/to/file')); + * var fs = require('fs'); + * + * bucket.createReadStream('remote-file-name') + * .pipe(fs.createWriteStream('local-file-path')) + * .on('error', function(err) {}); * ``` */ Bucket.prototype.createReadStream = function(name) { - return new ReadStream(this, name); + var that = this; + var dup = duplexify(); + this.stat(name, function(err, metadata) { + if (err) { + dup.emit('error', err); + return; + } + that.conn.createAuthorizedReq( + { uri: metadata.mediaLink }, function(err, req) { + if (err) { + dup.emit('error', err); + return; + } + dup.setReadable(that.conn.requester(req)); + }); + }); + return dup; }; /** - * Write the provided data to the destination with optional metadata. + * Create a Duplex to handle the upload of a file. * - * @param {string} name - Name of the remote file. - * @param {object} options - Configuration object. - * @param {String|Buffer|ReadableStream=} options.data - Data. - * @param {string=} options.filename - Path of the source file. - * @param {object=} options.metadata - Optional metadata. - * @param {function} callback - The callback function. + * @param {string} name - Name of the remote file to create. + * @param {object=} metadata - Optional metadata. + * @return {stream} * * @example * ```js - * // Upload file.pdf - * bucket.write('filename', { - * filename: '/path/to/file.pdf', - * metadata: { - * // optional metadata - * } - * }, function(err) {}); + * // Read from a local file and pipe to your bucket. + * var fs = require('fs'); * - * // Upload a readable stream - * bucket.write('filename', { - * data: fs.createReadStream('/path/to/file.pdf') - * }, function(err) {}); + * fs.createReadStream('local-file-path') + * .pipe(bucket.createWriteStream('remote-file-name')) + * .on('error', function(err) {}) + * .on('complete', function(fileObject) {}); + * ``` + */ +Bucket.prototype.createWriteStream = function(name, metadata) { + var dup = duplexify(); + this.getWritableStream_(name, (metadata || {}), function(err, writable) { + if (err) { + dup.emit('error', err); + return; + } + writable.on('complete', function(res) { + util.handleResp(null, res, res.body, function(err, data) { + if (err) { + dup.emit('error', err); + return; + } + dup.emit('complete', data); + }); + }); + dup.setWritable(writable); + dup.pipe(writable); + }); + return dup; +}; + +/** + * Write the provided data to the destination with optional metadata. + * + * @param {string} name - Name of the remote file toc reate. + * @param {object|string|buffer} options - Configuration object or data. + * @param {object=} options.metadata - Optional metadata. + * @param {function=} callback - The callback function. * - * // Upload "Hello World" as file contents. `data` can be any string or buffer + * @example + * ```js + * // Upload "Hello World" as file contents. `data` can be any string or buffer. * bucket.write('filename', { * data: 'Hello World' * }, function(err) {}); + * + * // A shorthand for the above. + * bucket.write('filename', 'Hello World', function(err) {}); * ``` */ Bucket.prototype.write = function(name, options, callback) { callback = callback || util.noop; + var data = typeof options === 'object' ? options.data : options; var metadata = options.metadata || {}; - var readStream = options.data; - var isStringOrBuffer = - typeof readStream === 'string' || readStream instanceof Buffer; - if (options.filename) { - readStream = fs.createReadStream(options.filename); - } else if (readStream && isStringOrBuffer) { - readStream = new BufferStream(readStream); - } - - if (!readStream) { + if (typeof data === 'undefined') { // metadata only write this.makeReq('PATCH', 'o/' + name, null, metadata, callback); return; } - this.getRemoteStream_(name, metadata, function(err, remoteStream) { - if (err) { - callback(err); - return; - } - // TODO(jbd): High potential of multiple callback invokes. - readStream.pipe(remoteStream) - .on('error', callback); - remoteStream + if (typeof data === 'string' || data instanceof Buffer) { + new BufferStream(data).pipe(this.createWriteStream(name, metadata)) .on('error', callback) - .on('complete', function(resp) { - util.handleResp(null, resp, resp.body, callback); - }); - }); + .on('complete', callback.bind(null, null)); + } }; /** @@ -402,7 +352,7 @@ Bucket.prototype.write = function(name, options, callback) { * @param {object} metadata - File descriptive metadata. * @param {function} callback - The callback function. */ -Bucket.prototype.getRemoteStream_ = function(name, metadata, callback) { +Bucket.prototype.getWritableStream_ = function(name, metadata, callback) { var boundary = uuid.v4(); var that = this; metadata.contentType = metadata.contentType || 'text/plain'; @@ -434,8 +384,7 @@ Bucket.prototype.getRemoteStream_ = function(name, metadata, callback) { remoteStream.write('Content-Type: ' + metadata.contentType + '\n\n'); var oldEndFn = remoteStream.end; remoteStream.end = function(data, encoding, callback) { - data = data || ''; - data += '\n--' + boundary + '--\n'; + data = (data || '') + '\n--' + boundary + '--\n'; remoteStream.write(data, encoding, callback); oldEndFn.apply(this); }; diff --git a/package.json b/package.json index e0ae8ef57c1..e5e7099a03e 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "dependencies": { "async": "^0.9.0", "bytebuffer": "^3.2.0", + "duplexify": "^3.1.2", "gapitoken": "^0.1.3", "node-uuid": "^1.4.1", "protobufjs": "^3.4.0", @@ -51,7 +52,7 @@ "scripts": { "lint": "jshint lib/ regression/ test/", "test": "mocha --recursive --reporter spec", - "regression-test": "mocha regression/ --reporter spec --timeout 10000", + "regression-test": "mocha regression/ --reporter spec --timeout 15000", "cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 10000 test/* regression/*" }, "license": "Apache 2" diff --git a/regression/data/five-mb-file.zip b/regression/data/five-mb-file.zip new file mode 100644 index 00000000000..38da09a4f79 Binary files /dev/null and b/regression/data/five-mb-file.zip differ diff --git a/regression/storage.js b/regression/storage.js index b877624b362..a5d9871aa13 100644 --- a/regression/storage.js +++ b/regression/storage.js @@ -29,85 +29,93 @@ var gcloud = require('../lib'); var bucket = new gcloud.storage.Bucket(env); -var pathToLogoFile = 'regression/data/CloudPlatform_128px_Retina.png'; -var logoFileMd5Hash; - -describe('storage', function() { - describe('write, read and remove files', function() { - before(function(done) { - var md5sum = crypto.createHash('md5'); - var s = fs.ReadStream(pathToLogoFile); - s.on('data', function(d) { +var files = { + logo: { + path: 'regression/data/CloudPlatform_128px_Retina.png' + }, + big: { + path: 'regression/data/five-mb-file.zip' + } +}; + +function setHash(obj, file, done) { + var md5sum = crypto.createHash('md5'); + fs.createReadStream(obj[file].path) + .on('data', function(d) { md5sum.update(d); - }); - s.on('error', done); - s.on('end', function() { - logoFileMd5Hash = md5sum.digest('base64'); + }) + .on('end', function() { + obj[file].hash = md5sum.digest('base64'); done(); }); - }); +} - it('should write/remove from file', function(done) { - var fileName = 'CloudLogo'; - var fileConfig = { filename: pathToLogoFile }; - bucket.write(fileName, fileConfig, function(err, fileObject) { - assert.ifError(err); - assert.equal(fileObject.md5Hash, logoFileMd5Hash); - bucket.remove(fileName, done); +describe('storage', function() { + describe('write, read, and remove files', function() { + before(function(done) { + var doneCalled = 0; + Object.keys(files).forEach(function(file) { + setHash(files, file, function() { + if (++doneCalled === 2) { + done(); + } + }); }); }); - it('should write/remove from stream', function(done) { - var fileName = 'CloudLogo'; - var fileConfig = { data: fs.createReadStream(pathToLogoFile) }; - bucket.write(fileName, fileConfig, function(err, fileObject) { - assert.ifError(err); - assert.equal(fileObject.md5Hash, logoFileMd5Hash); - bucket.remove(fileName, function(err) { - assert.ifError(err); - done(); - }); + describe('stream write', function() { + it('should stream write, then remove large file (5mb)', function(done) { + var fileName = 'LargeFile'; + + fs.createReadStream(files.big.path) + .pipe(bucket.createWriteStream(fileName)) + .on('error', done) + .on('complete', function(fileObject) { + assert.equal(fileObject.md5Hash, files.big.hash); + bucket.remove(fileName, done); + }); }); - }); - it('should write/read/remove from a buffer', function(done) { - var fileName = 'MyBuffer'; - var fileContent = 'Hello World'; - tmp.setGracefulCleanup(); - tmp.file(function _tempFileCreated(err, path) { - assert.ifError(err); - bucket.write( - fileName, { data: fileContent }, function(err, fileObject) { - assert.ifError(err); - assert(fileObject); - bucket.createReadStream(fileName) - .pipe(fs.createWriteStream(path)) - .on('error', done) - .on('complete', function() { + it('should write and read metadata', function(done) { + var fileName = 'CloudLogo'; + var myMetadata = { contentType: 'image/png' }; + + fs.createReadStream(files.logo.path) + .pipe(bucket.createWriteStream(fileName, myMetadata)) + .on('error', done) + .on('complete', function() { + bucket.stat(fileName, function(err, metadata) { + assert.ifError(err); + assert.equal(metadata.contentType, myMetadata.contentType); bucket.remove(fileName, function(err) { assert.ifError(err); - fs.readFile(path, function(err, data) { - assert.equal(data, fileContent); - done(); - }); + done(); }); }); - }); + }); }); - }); - it('should write and read metadata', function(done) { - var fileName = 'CloudLogo'; - var myMetadata = { contentType: 'image/png' }; - var fileConfig = { filename: pathToLogoFile, metadata: myMetadata }; - bucket.write(fileName, fileConfig, function(err) { - assert.ifError(err); - bucket.stat(fileName, function(err, metadata) { + it('should write/read/remove from a buffer', function(done) { + var fileName = 'MyBuffer'; + var fileContent = 'Hello World'; + tmp.setGracefulCleanup(); + tmp.file(function _tempFileCreated(err, path) { assert.ifError(err); - assert.equal(metadata.contentType, myMetadata.contentType); - bucket.remove(fileName, function(err) { + bucket.write(fileName, fileContent, function(err, fileObject) { assert.ifError(err); - done(); + assert(fileObject); + bucket.createReadStream(fileName) + .pipe(fs.createWriteStream(path)) + .on('error', done) + .on('finish', function() { + bucket.remove(fileName, function(err) { + assert.ifError(err); + fs.readFile(path, function(err, data) { + assert.equal(data, fileContent); + done(); + }); + }); + }); }); }); }); @@ -115,41 +123,39 @@ describe('storage', function() { it('should copy an existing file', function(done) { var fileName = 'CloudLogo'; - var fileConfig = { filename: pathToLogoFile }; var copyName = 'CloudLogoCopy'; - bucket.write(fileName, fileConfig, function(err) { - assert.ifError(err); - bucket.copy(fileName, { name: copyName }, function() { - assert.ifError(err); - async.parallel([ - function(callback) { - bucket.remove(fileName, callback); - }, - function(callback) { - bucket.remove(copyName, callback); - }, - ], done); - }); - }); + fs.createReadStream(files.logo.path) + .pipe(bucket.createWriteStream(fileName)) + .on('error', done) + .on('complete', function() { + bucket.copy(fileName, { name: copyName }, function(err) { + assert.ifError(err); + async.parallel([ + bucket.remove.bind(bucket, fileName), + bucket.remove.bind(bucket, copyName) + ], done); + }); + }); }); }); describe('list files', function() { var filenames = ['CloudLogo1', 'CloudLogo2', 'CloudLogo3']; - var fileConfig = { filename: pathToLogoFile }; before(function(done) { - bucket.write(filenames[0], fileConfig, function(err) { - assert.ifError(err); - bucket.copy(filenames[0], { name: filenames[1] }, function() { - assert.ifError(err); - bucket.copy(filenames[0], { name: filenames[2] }, function() { - assert.ifError(err); - done(); + fs.createReadStream(files.logo.path) + .pipe(bucket.createWriteStream(filenames[0])) + .on('error', done) + .on('complete', function() { + bucket.copy(filenames[0], { name: filenames[1] }, function(err) { + assert.ifError(err); + bucket.copy(filenames[0], { name: filenames[2] }, function(err) { + assert.ifError(err); + done(); + }); + }); }); - }); - }); }); it('should list files', function(done) {