Skip to content

Commit

Permalink
Merge pull request #35 from rakyll/eventemitter-subs
Browse files Browse the repository at this point in the history
WIP: EventEmitter Subscription
  • Loading branch information
Burcu Dogan committed Jul 28, 2014
2 parents 61db807 + 2461ac4 commit 58d06e7
Show file tree
Hide file tree
Showing 6 changed files with 468 additions and 49 deletions.
190 changes: 154 additions & 36 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/

var events = require('events'),
nodeutil = require('util');

var conn = require('../common/connection.js'),
util = require('../common/util.js');

Expand All @@ -31,15 +34,18 @@ var SCOPES = [
'https://www.googleapis.com/auth/cloud-platform'
];

// TODO(jbd): Emit message and error events if in polled-mode.
// sub.on('meessage', console.log)
// sub.on('error')

function Subscription(conn, name) {
var that = this;
this.conn = conn;
this.name = name;

this.autoAck = false;
this.pullIntervalInMs = 10;
this.closed = false;
}

nodeutil.inherits(Subscription, events.EventEmitter);

/**
* Acknowledges the backend that message is retrieved.
* @param {Array<String>} ids A list of message IDs.
Expand All @@ -56,44 +62,104 @@ Subscription.prototype.ack = function(ids, callback) {

/**
* Pulls from the subscribed topic.
* @param {Boolean} opts.returnImmediately If set, the system will respond immediately.
* Otherwise, wait until new messages are
* available. Returns if timeout is reached.
* @param {Boolean} opts.autoAck Automatically acknowledges the
* message once it's pulled.
* @param {Function} callback Callback function.
* @param {Boolean} opts.returnImmediately If set, the system will respond immediately.
* Otherwise, wait until new messages are
* available. Returns if timeout is reached.
* @param {Function} callback Callback.
*/
Subscription.prototype.pull = function(opts, callback) {
// TODO(jbd): Make opts optional.
var that = this;
var autoAck = !!opts.autoAck;
// TODO(jbd): Should not be racing with other pull.
// TOOD(jbd): Make opts optional.
var body = {
subscription: this.name,
returnImmediately: !!opts.returnImmediately
};
this.conn.makeReq('POST', 'subscriptions/pull', null, body, function(err, message) {
if (err) { return callback(err); }
if (!autoAck) {
return callback(null, message);
// TODO(jbd): Fix API to return a list of messages.
if (err) {
callback(err);
return;
}
if (!that.autoAck) {
that.emitMessage_(message);
callback();
return;
}
that.ack(message.ackId, function(err) {
if (err) { return callback(err); }
callback(null, message);
if (err) {
callback(err);
return;
}
that.emitMessage_(message);
callback();
});
});
};

/**
* Unsubscribes the current subscription. Pull requests from the current
* Polls the backend for new messages.
*/
Subscription.prototype.startPulling_ = function() {
var that = this;
var pullFn = function() {
if (that.closed) {
return;
}
that.pull({ returnImmediately: false }, function(err) {
// TODO(jbd): Fix API to return a more explicit error code or message.
if (err && err.message.indexOf('has no more messages') < 0) {
that.emitError_(err);
}
setTimeout(function() {
pullFn();
}, that.pullIntervalInMs);
});
};
pullFn();
}

/**
* Deletes the current subscription. Pull requests from the current
* subscription will be errored once unsubscription is done.
* @param {Function} opt_callback Optional callback.
*/
Subscription.prototype.unsubscribe = function(opt_callback) {
Subscription.prototype.del = function(opt_callback) {
var that = this;
cb = opt_callback || util.noop;
var path = util.format('subscriptions/{fullName}', {
fullName: this.name
});
this.conn.makeReq('DELETE', path, null, true, cb);
this.conn.makeReq('DELETE', path, null, true, function(err) {
if (err) return cb(err);
that.closed = true;
cb(err);
});
};

/**
* Closes the subscription.
*/
Subscription.prototype.close = function() {
this.closed = true;
};

/**
* Emits a 'message' event with the provided message.
*/
Subscription.prototype.emitMessage_ = function(msg) {
if (msg.pubsubEvent && msg.pubsubEvent.message) {
var data = msg.pubsubEvent.message.data;
msg.pubsubEvent.message.data = new Buffer(data, 'base64').toString('utf-8');
}
this.emit('message', msg);
};

/**
* Emits an error with the provided error.
*/
Subscription.prototype.emitError_ = function(err) {
this.emit('error', err);
};

/**
Expand All @@ -112,7 +178,12 @@ function Topic(conn, name) {
* @param {Function} opt_callback Optional callback.
*/
Topic.prototype.publish = function(data, opt_callback) {
this.publishMessage({ topic: this.name, data: data }, opt_callback);
this.publishMessage({
topic: this.name,
message: {
data: new Buffer(data).toString('base64')
}
}, opt_callback);
};

/**
Expand All @@ -132,7 +203,7 @@ Topic.prototype.publishMessage = function(message, opt_callback) {
*/
Topic.prototype.del = function(opt_callback) {
var path = 'topics/' + this.name;
this.conn.makeReq('DELETE', path, null, true, cb);
this.conn.makeReq('DELETE', path, null, true, opt_callback);
};

/**
Expand Down Expand Up @@ -192,25 +263,62 @@ Connection.prototype.listSubscriptions = function(query, callback) {
};

/**
* Subscribe with the provided options.
* @param {string} opts.name Name of the subscription.
* @param {string} opts.topicName Name of the topic to subscribe.
* @param {Function} opt_callback Optional callback.
* Gets a subscription.
* @param {string} name Name of the subscription.
* @param {Function} callback Callback.
*/
Connection.prototype.subscribe = function(opts, opt_callback) {
Connection.prototype.getSubscription = function(name, callback) {
var that = this;
var cb = opt_callback || util.noop;
var body = {
topic:'/topics/' + this.id + '/' + opts.topicName,
var fullName = '/subscriptions/' + this.id + '/' + name;
this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) {
if (err) {
callback(err);
return;
}
callback(null, new Subscription(that, fullName));
});
};

Connection.prototype.createSubscription = function(opts, callback) {
var that = this;
var subscription = {
topic:'/topics/' + this.id + '/' + opts.topic,
name: '/subscriptions/' + this.id + '/' + opts.name,
ackDeadlineSeconds: opts.ackDeadlineSeconds
};
this.makeReq('POST', 'subscriptions', null, body, function(err, item) {
// TODO(jbd): maybe init a subscription instance if http 200 or 409.
cb(err, new Subscription(that, body.name));
this.makeReq('POST', 'subscriptions', null, subscription, function(err) {
if (err) {
callback(err);
return;
}
callback(null, new Subscription(that, subscription.name));
});
};

/**
* Subscribe with the provided options.
* @param {string} name Name of the subscription.
* @param {Boolean} opts.autoAck Automatically acknowledges the
* message once it's pulled.
* @return {Subscription}
*/
Connection.prototype.subscribe = function(name, opts) {
opts = opts || {};

var fullName = '/subscriptions/' + this.id + '/' + name;
var sub = new Subscription(this, fullName);
sub.autoAck = !!opts.autoAck;
this.getSubscription(name, function(err) {
if (err) {
sub.emitError_(err);
return;
}
sub.emit('ready');
sub.startPulling_();
});
return sub;
};

/**
* Lists topics.
* @param {string} query.pageToken Page token.
Expand Down Expand Up @@ -306,16 +414,26 @@ Connection.prototype.makeReq = function(method, path, q, body, callback) {
}
this.conn.req(reqOpts, function(err, res, body) {
if (body && body.error) {
return callback(body.error);
callback(new util.ApiError(body.error)); return;
}
if (res && (res.statusCode < 200 || res.statusCode > 299)) {
return callback(new Error('error during request, statusCode: ' + res.statusCode));
callback(new Error('error during request, statusCode: ' + res.statusCode)); return;
}
callback(err, body);
callback(null, body);
});
};

/**
* Exports Connection.
*/
module.exports.Connection = Connection;

/**
* Exports Topic.
*/
module.exports.Topic = Topic;

/**
* Exports Subscription.
*/
module.exports.Subscription = Subscription;
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
},
"scripts": {
"test": "node_modules/mocha/bin/_mocha --reporter spec",
"regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 4000 regression/*",
"cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha"
"regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 10000 regression/*",
"cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha -- --timeout 10000 test/* regression/*"
},
"license": "Apache 2"
}
15 changes: 4 additions & 11 deletions regression/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,10 @@
* limitations under the License.
*/

if (!process.env.GCLOUD_TESTS_PROJECT_ID &&
!process.env.GCLOUD_TESTS_SERVICE_ACCOUNT &&
!process.env.GCLOUD_TESTS_PEM_KEY) {
var error = ['To run the regression tests, you need to set the value of some environment variables.',
'Please check the README for instructions.'
].join('\n');
throw error;
}
var projectId = process.env.GCLOUD_TESTS_PROJECT_ID,
email = process.env.GCLOUD_TESTS_SERVICE_ACCOUNT,
pemFilePath = process.env.GCLOUD_TESTS_PEM_KEY;
var env = require('./env.js'),
projectId = env.projectId,
email = env.serviceAccount,
pemFilePath = env.pemKey;

var assert = require('assert'),
datastore = require('../lib/datastore'),
Expand Down
30 changes: 30 additions & 0 deletions regression/env.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2014 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

if (!process.env.GCLOUD_TESTS_PROJECT_ID &&
!process.env.GCLOUD_TESTS_SERVICE_ACCOUNT &&
!process.env.GCLOUD_TESTS_PEM_KEY) {
var error = ['To run the regression tests, you need to set the value of some environment variables.',
'Please check the README for instructions.'
].join('\n');
throw error;
}

module.exports = {
projectId: process.env.GCLOUD_TESTS_PROJECT_ID,
serviceAccount: process.env.GCLOUD_TESTS_SERVICE_ACCOUNT,
pemKey: process.env.GCLOUD_TESTS_PEM_KEY
};
Loading

0 comments on commit 58d06e7

Please sign in to comment.