Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use util.makeWritableStream #276

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ module.exports.toArray = toArray;
function makeWritableStream(dup, options, onComplete) {
onComplete = onComplete || noop;

options = options || {};
options.metadata = options.metadata || {};
onComplete = onComplete || noop;

var boundary = uuid.v4();

var defaults = {
Expand Down Expand Up @@ -276,7 +280,7 @@ function makeWritableStream(dup, options, onComplete) {
return;
}

var streamType = options.streamContentType || 'application/octet-stream';
var streamType = options.metadata.contentType || 'application/octet-stream';

var stream = options.connection.requester(req);
stream.callback = noop;
Expand Down Expand Up @@ -314,7 +318,7 @@ function makeWritableStream(dup, options, onComplete) {
// processing incoming data.
dup.setWritable(stream);

// Keep part of the stream open to keep Request from closing the conneciton.
// Keep part of the stream open to keep Request from closing the connection.
// Reference: http://goo.gl/zZVSif.
dup.pipe(stream);
});
Expand Down
107 changes: 22 additions & 85 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@

var crypto = require('crypto');
var duplexify = require('duplexify');
var extend = require('extend');
var uuid = require('node-uuid');
var streamEvents = require('stream-events');

/**
* @type module:common/util
Expand Down Expand Up @@ -259,25 +258,29 @@ File.prototype.createReadStream = function() {
*/
File.prototype.createWriteStream = function(metadata) {
var that = this;
var dup = duplexify();
this.getWritableStream_(metadata, function(err, writable) {
if (err) {
dup.emit('error', err);
return;
}
writable.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
if (err) {
dup.emit('error', err);
return;
}
that.metadata = data;
dup.emit('complete', data);
});
var dup = streamEvents(duplexify());

dup.once('writing', function() {
util.makeWritableStream(dup, {
connection: that.bucket.connection_,
metadata: metadata,
request: {
qs: {
name: that.name,
},
uri: util.format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: that.bucket.name
})
}
}, function(data) {
that.metadata = data;

dup.emit('complete', data);
dup.end();
});
dup.setWritable(writable);
dup.pipe(writable);
});

return dup;
};

Expand Down Expand Up @@ -411,70 +414,4 @@ File.prototype.setMetadata = function(metadata, callback) {
}.bind(this));
};

/*! Developer Documentation
*
* Private Methods
*
* These methods deal with creating and maintaining the lifecycle of a stream.
* All File objects are Duplex streams, which will allow a reader to pipe data
* to the remote endpoint. Likewise, you can pipe data from a remote endpoint to
* a writer.
*
* Duplexify is used to allow us to asynchronously set the readable and writable
* portions of this stream. We can't accept data for buffering until we have
* made an authorized connection. Once we have such a connection, we call
* `setReadable` and/or `setWritable` on the File instance (which is also a
* Duplexify instance), which then opens the pipe for more data to come in or go
* out.
*/

/**
* Get a remote stream to begin piping a readable stream to.
*
* @private
*/
File.prototype.getWritableStream_ = function(metadata, callback) {
if (!callback) {
callback = metadata;
metadata = {};
}
var that = this;
var boundary = uuid.v4();
metadata = extend({ contentType: 'text/plain' }, metadata);
this.bucket.connection_.createAuthorizedReq({
method: 'POST',
uri: util.format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: that.bucket.name
}),
qs: {
name: this.name,
uploadType: 'multipart'
},
headers: {
'Content-Type': 'multipart/related; boundary="' + boundary + '"'
}
}, function(err, req) {
if (err) {
callback(err);
return;
}
var remoteStream = that.bucket.connection_.requester(req);
remoteStream.callback = util.noop;
remoteStream.write('--' + boundary + '\n');
remoteStream.write('Content-Type: application/json\n\n');
remoteStream.write(JSON.stringify(metadata));
remoteStream.write('\n\n');
remoteStream.write('--' + boundary + '\n');
remoteStream.write('Content-Type: ' + metadata.contentType + '\n\n');
var oldEndFn = remoteStream.end;
remoteStream.end = function(data, encoding, callback) {
data = (data || '') + '\n--' + boundary + '--\n';
remoteStream.write(data, encoding, callback);
oldEndFn.apply(this);
};
callback(null, remoteStream);
});
};

module.exports = File;
2 changes: 1 addition & 1 deletion test/bigquery/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function FakeFile(a, b) {
}

var makeWritableStream_Override;
var fakeUtil = extend(util, {
var fakeUtil = extend({}, util, {
makeWritableStream: function() {
var args = [].slice.call(arguments);
(makeWritableStream_Override || util.makeWritableStream).apply(null, args);
Expand Down
Loading