Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use streams. #118

Merged
merged 1 commit into from
Aug 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 80 additions & 131 deletions lib/storage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

'use strict';

var events = require('events');
var fs = require('fs');
var duplexify = require('duplexify');
var nodeutil = require('util');
var stream = require('stream');
var uuid = require('node-uuid');
Expand Down Expand Up @@ -72,85 +71,6 @@ BufferStream.prototype._read = function() {
this.push(null);
};

/**
* A readable stream that streams the contents of a file.
*
* @private
*
* @constructor
* @mixes {stream#Readable}
*
* @param {module:storage~Bucket} bucket - Bucket the source file belongs to.
* @param {string} name - Name of the file to read from.
*
* @example
* ```js
* var myBucket = new Bucket({
* bucketName: 'my-bucket'
* });
* var readStream = new ReadStream(myBucket, 'file/to/fetch.pdf');
* ```
*/
function ReadStream(bucket, name) {
events.EventEmitter.call(this);

this.bucket = bucket;
this.name = name;
this.remoteStream = null;

this.open();
}

nodeutil.inherits(ReadStream, events.EventEmitter);

/**
* Open a connection to retrieve a file.
*/
ReadStream.prototype.open = function() {
var that = this;
this.bucket.stat(this.name, function(err, metadata) {
if (err) {
that.emit('error', err);
return;
}
that.bucket.conn.createAuthorizedReq(
{ uri: metadata.mediaLink }, function(err, req) {
if (err) {
that.emit('error', err);
return;
}
that.remoteStream = that.bucket.conn.requester(req);
that.remoteStream.on('complete', that.emit.bind(that, 'complete'));
that.emit('readable');
});
});
};

/**
* Pipe the output to the destination stream with the provided options.
*
* @param {stream} dest - Destination stream to write to.
* @param {object} opts - Piping options.
* @return {stream}
*/
ReadStream.prototype.pipe = function(dest, opts) {
var that = this;
if (!that.remoteStream) {
return that.once('readable', function() {
that.pipe(dest, opts);
});
}
// Register an on-data listener instead of piping, so we can avoid writing if
// the request ends up with a non-200 response.
that.remoteStream.on('data', function(data) {
if (!that.errored) {
that.emit('data', data);
dest.write(data);
}
});
return dest;
};

/**
* Google Cloud Storage allows you to store data on Google infrastructure. See
* the guide on {@link https://developers.google.com/storage} to create a
Expand Down Expand Up @@ -312,87 +232,117 @@ Bucket.prototype.remove = function(name, callback) {

/**
* Create a readable stream to read contents of the provided remote file. It
* can be piped to a write stream, or listened to for 'data' and `complete`
* events to read a file's contents.
* can be piped to a write stream, or listened to for 'data' events to read a
* file's contents.
*
* @param {string} name - Name of the remote file.
* @return {ReadStream}
*
* @example
* ```js
* // Create a readable stream and write the file contents to "/path/to/file"
* bucket.createReadStream('filename')
* .pipe(fs.createWriteStream('/path/to/file'));
* var fs = require('fs');
*
* bucket.createReadStream('remote-file-name')
* .pipe(fs.createWriteStream('local-file-path'))
* .on('error', function(err) {});
* ```
*/
Bucket.prototype.createReadStream = function(name) {
return new ReadStream(this, name);
var that = this;
var dup = duplexify();
this.stat(name, function(err, metadata) {
if (err) {
dup.emit('error', err);
return;
}
that.conn.createAuthorizedReq(
{ uri: metadata.mediaLink }, function(err, req) {
if (err) {
dup.emit('error', err);
return;
}
dup.setReadable(that.conn.requester(req));
});
});
return dup;
};

/**
* Write the provided data to the destination with optional metadata.
* Create a Duplex to handle the upload of a file.
*
* @param {string} name - Name of the remote file.
* @param {object} options - Configuration object.
* @param {String|Buffer|ReadableStream=} options.data - Data.
* @param {string=} options.filename - Path of the source file.
* @param {object=} options.metadata - Optional metadata.
* @param {function} callback - The callback function.
* @param {string} name - Name of the remote file to create.
* @param {object=} metadata - Optional metadata.
* @return {stream}
*
* @example
* ```js
* // Upload file.pdf
* bucket.write('filename', {
* filename: '/path/to/file.pdf',
* metadata: {
* // optional metadata
* }
* }, function(err) {});
* // Read from a local file and pipe to your bucket.
* var fs = require('fs');
*
* // Upload a readable stream
* bucket.write('filename', {
* data: fs.createReadStream('/path/to/file.pdf')
* }, function(err) {});
* fs.createReadStream('local-file-path')
* .pipe(bucket.createWriteStream('remote-file-name'))
* .on('error', function(err) {})
* .on('complete', function(fileObject) {});
* ```
*/
Bucket.prototype.createWriteStream = function(name, metadata) {
var dup = duplexify();
this.getWritableStream_(name, (metadata || {}), function(err, writable) {
if (err) {
dup.emit('error', err);
return;
}
writable.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
if (err) {
dup.emit('error', err);
return;
}
dup.emit('complete', data);
});
});
dup.setWritable(writable);
dup.pipe(writable);
});
return dup;
};

/**
* Write the provided data to the destination with optional metadata.
*
* @param {string} name - Name of the remote file toc reate.
* @param {object|string|buffer} options - Configuration object or data.
* @param {object=} options.metadata - Optional metadata.
* @param {function=} callback - The callback function.
*
* // Upload "Hello World" as file contents. `data` can be any string or buffer
* @example
* ```js
* // Upload "Hello World" as file contents. `data` can be any string or buffer.
* bucket.write('filename', {
* data: 'Hello World'
* }, function(err) {});
*
* // A shorthand for the above.
* bucket.write('filename', 'Hello World', function(err) {});
* ```
*/
Bucket.prototype.write = function(name, options, callback) {
callback = callback || util.noop;
var data = typeof options === 'object' ? options.data : options;
var metadata = options.metadata || {};
var readStream = options.data;
var isStringOrBuffer =
typeof readStream === 'string' || readStream instanceof Buffer;

if (options.filename) {
readStream = fs.createReadStream(options.filename);
} else if (readStream && isStringOrBuffer) {
readStream = new BufferStream(readStream);
}

if (!readStream) {
if (typeof data === 'undefined') {
// metadata only write
this.makeReq('PATCH', 'o/' + name, null, metadata, callback);
return;
}

this.getRemoteStream_(name, metadata, function(err, remoteStream) {
if (err) {
callback(err);
return;
}
// TODO(jbd): High potential of multiple callback invokes.
readStream.pipe(remoteStream)
.on('error', callback);
remoteStream
if (typeof data === 'string' || data instanceof Buffer) {
new BufferStream(data).pipe(this.createWriteStream(name, metadata))
.on('error', callback)
.on('complete', function(resp) {
util.handleResp(null, resp, resp.body, callback);
});
});
.on('complete', callback.bind(null, null));

This comment was marked as spam.

This comment was marked as spam.

}
};

/**
Expand All @@ -402,7 +352,7 @@ Bucket.prototype.write = function(name, options, callback) {
* @param {object} metadata - File descriptive metadata.
* @param {function} callback - The callback function.
*/
Bucket.prototype.getRemoteStream_ = function(name, metadata, callback) {
Bucket.prototype.getWritableStream_ = function(name, metadata, callback) {
var boundary = uuid.v4();
var that = this;
metadata.contentType = metadata.contentType || 'text/plain';
Expand Down Expand Up @@ -434,8 +384,7 @@ Bucket.prototype.getRemoteStream_ = function(name, metadata, callback) {
remoteStream.write('Content-Type: ' + metadata.contentType + '\n\n');
var oldEndFn = remoteStream.end;
remoteStream.end = function(data, encoding, callback) {
data = data || '';
data += '\n--' + boundary + '--\n';
data = (data || '') + '\n--' + boundary + '--\n';
remoteStream.write(data, encoding, callback);
oldEndFn.apply(this);
};
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"dependencies": {
"async": "^0.9.0",
"bytebuffer": "^3.2.0",
"duplexify": "^3.1.2",
"gapitoken": "^0.1.3",
"node-uuid": "^1.4.1",
"protobufjs": "^3.4.0",
Expand All @@ -51,7 +52,7 @@
"scripts": {
"lint": "jshint lib/ regression/ test/",
"test": "mocha --recursive --reporter spec",
"regression-test": "mocha regression/ --reporter spec --timeout 10000",
"regression-test": "mocha regression/ --reporter spec --timeout 15000",
"cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 10000 test/* regression/*"
},
"license": "Apache 2"
Expand Down
Binary file added regression/data/five-mb-file.zip
Binary file not shown.
Loading