diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 227bbdb7d7b..bac855e79dc 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -14,6 +14,9 @@ * limitations under the License. */ +var events = require('events'), + nodeutil = require('util'); + var conn = require('../common/connection.js'), util = require('../common/util.js'); @@ -31,15 +34,18 @@ var SCOPES = [ 'https://www.googleapis.com/auth/cloud-platform' ]; -// TODO(jbd): Emit message and error events if in polled-mode. -// sub.on('meessage', console.log) -// sub.on('error') - function Subscription(conn, name) { + var that = this; this.conn = conn; this.name = name; + + this.autoAck = false; + this.pullIntervalInMs = 10; + this.closed = false; } +nodeutil.inherits(Subscription, events.EventEmitter); + /** * Acknowledges the backend that message is retrieved. * @param {Array} ids A list of message IDs. @@ -56,44 +62,104 @@ Subscription.prototype.ack = function(ids, callback) { /** * Pulls from the subscribed topic. - * @param {Boolean} opts.returnImmediately If set, the system will respond immediately. - * Otherwise, wait until new messages are - * available. Returns if timeout is reached. - * @param {Boolean} opts.autoAck Automatically acknowledges the - * message once it's pulled. - * @param {Function} callback Callback function. + * @param {Boolean} opts.returnImmediately If set, the system will respond immediately. + * Otherwise, wait until new messages are + * available. Returns if timeout is reached. + * @param {Function} callback Callback. */ Subscription.prototype.pull = function(opts, callback) { - // TODO(jbd): Make opts optional. var that = this; - var autoAck = !!opts.autoAck; + // TODO(jbd): Should not be racing with other pull. + // TOOD(jbd): Make opts optional. var body = { subscription: this.name, returnImmediately: !!opts.returnImmediately }; this.conn.makeReq('POST', 'subscriptions/pull', null, body, function(err, message) { - if (err) { return callback(err); } - if (!autoAck) { - return callback(null, message); + // TODO(jbd): Fix API to return a list of messages. + if (err) { + callback(err); + return; + } + if (!that.autoAck) { + that.emitMessage_(message); + callback(); + return; } that.ack(message.ackId, function(err) { - if (err) { return callback(err); } - callback(null, message); + if (err) { + callback(err); + return; + } + that.emitMessage_(message); + callback(); }); }); }; /** - * Unsubscribes the current subscription. Pull requests from the current + * Polls the backend for new messages. + */ +Subscription.prototype.startPulling_ = function() { + var that = this; + var pullFn = function() { + if (that.closed) { + return; + } + that.pull({ returnImmediately: false }, function(err) { + // TODO(jbd): Fix API to return a more explicit error code or message. + if (err && err.message.indexOf('has no more messages') < 0) { + that.emitError_(err); + } + setTimeout(function() { + pullFn(); + }, that.pullIntervalInMs); + }); + }; + pullFn(); +} + +/** + * Deletes the current subscription. Pull requests from the current * subscription will be errored once unsubscription is done. * @param {Function} opt_callback Optional callback. */ -Subscription.prototype.unsubscribe = function(opt_callback) { +Subscription.prototype.del = function(opt_callback) { + var that = this; cb = opt_callback || util.noop; var path = util.format('subscriptions/{fullName}', { fullName: this.name }); - this.conn.makeReq('DELETE', path, null, true, cb); + this.conn.makeReq('DELETE', path, null, true, function(err) { + if (err) return cb(err); + that.closed = true; + cb(err); + }); +}; + +/** + * Closes the subscription. + */ +Subscription.prototype.close = function() { + this.closed = true; +}; + +/** + * Emits a 'message' event with the provided message. + */ +Subscription.prototype.emitMessage_ = function(msg) { + if (msg.pubsubEvent && msg.pubsubEvent.message) { + var data = msg.pubsubEvent.message.data; + msg.pubsubEvent.message.data = new Buffer(data, 'base64').toString('utf-8'); + } + this.emit('message', msg); +}; + +/** + * Emits an error with the provided error. + */ +Subscription.prototype.emitError_ = function(err) { + this.emit('error', err); }; /** @@ -112,7 +178,12 @@ function Topic(conn, name) { * @param {Function} opt_callback Optional callback. */ Topic.prototype.publish = function(data, opt_callback) { - this.publishMessage({ topic: this.name, data: data }, opt_callback); + this.publishMessage({ + topic: this.name, + message: { + data: new Buffer(data).toString('base64') + } + }, opt_callback); }; /** @@ -132,7 +203,7 @@ Topic.prototype.publishMessage = function(message, opt_callback) { */ Topic.prototype.del = function(opt_callback) { var path = 'topics/' + this.name; - this.conn.makeReq('DELETE', path, null, true, cb); + this.conn.makeReq('DELETE', path, null, true, opt_callback); }; /** @@ -192,25 +263,62 @@ Connection.prototype.listSubscriptions = function(query, callback) { }; /** - * Subscribe with the provided options. - * @param {string} opts.name Name of the subscription. - * @param {string} opts.topicName Name of the topic to subscribe. - * @param {Function} opt_callback Optional callback. + * Gets a subscription. + * @param {string} name Name of the subscription. + * @param {Function} callback Callback. */ -Connection.prototype.subscribe = function(opts, opt_callback) { +Connection.prototype.getSubscription = function(name, callback) { var that = this; - var cb = opt_callback || util.noop; - var body = { - topic:'/topics/' + this.id + '/' + opts.topicName, + var fullName = '/subscriptions/' + this.id + '/' + name; + this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) { + if (err) { + callback(err); + return; + } + callback(null, new Subscription(that, fullName)); + }); +}; + +Connection.prototype.createSubscription = function(opts, callback) { + var that = this; + var subscription = { + topic:'/topics/' + this.id + '/' + opts.topic, name: '/subscriptions/' + this.id + '/' + opts.name, ackDeadlineSeconds: opts.ackDeadlineSeconds }; - this.makeReq('POST', 'subscriptions', null, body, function(err, item) { - // TODO(jbd): maybe init a subscription instance if http 200 or 409. - cb(err, new Subscription(that, body.name)); + this.makeReq('POST', 'subscriptions', null, subscription, function(err) { + if (err) { + callback(err); + return; + } + callback(null, new Subscription(that, subscription.name)); }); }; +/** + * Subscribe with the provided options. + * @param {string} name Name of the subscription. + * @param {Boolean} opts.autoAck Automatically acknowledges the + * message once it's pulled. + * @return {Subscription} + */ +Connection.prototype.subscribe = function(name, opts) { + opts = opts || {}; + + var fullName = '/subscriptions/' + this.id + '/' + name; + var sub = new Subscription(this, fullName); + sub.autoAck = !!opts.autoAck; + this.getSubscription(name, function(err) { + if (err) { + sub.emitError_(err); + return; + } + sub.emit('ready'); + sub.startPulling_(); + }); + return sub; +}; + /** * Lists topics. * @param {string} query.pageToken Page token. @@ -306,12 +414,12 @@ Connection.prototype.makeReq = function(method, path, q, body, callback) { } this.conn.req(reqOpts, function(err, res, body) { if (body && body.error) { - return callback(body.error); + callback(new util.ApiError(body.error)); return; } if (res && (res.statusCode < 200 || res.statusCode > 299)) { - return callback(new Error('error during request, statusCode: ' + res.statusCode)); + callback(new Error('error during request, statusCode: ' + res.statusCode)); return; } - callback(err, body); + callback(null, body); }); }; @@ -319,3 +427,13 @@ Connection.prototype.makeReq = function(method, path, q, body, callback) { * Exports Connection. */ module.exports.Connection = Connection; + +/** + * Exports Topic. + */ +module.exports.Topic = Topic; + +/** + * Exports Subscription. + */ +module.exports.Subscription = Subscription; diff --git a/package.json b/package.json index ddb8061dbef..275da23533e 100644 --- a/package.json +++ b/package.json @@ -37,8 +37,8 @@ }, "scripts": { "test": "node_modules/mocha/bin/_mocha --reporter spec", - "regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 4000 regression/*", - "cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha" + "regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 10000 regression/*", + "cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha -- --timeout 10000 test/* regression/*" }, "license": "Apache 2" } diff --git a/regression/datastore.js b/regression/datastore.js index 71add1494be..2ab73c9fdc0 100644 --- a/regression/datastore.js +++ b/regression/datastore.js @@ -14,17 +14,10 @@ * limitations under the License. */ -if (!process.env.GCLOUD_TESTS_PROJECT_ID && - !process.env.GCLOUD_TESTS_SERVICE_ACCOUNT && - !process.env.GCLOUD_TESTS_PEM_KEY) { - var error = ['To run the regression tests, you need to set the value of some environment variables.', - 'Please check the README for instructions.' - ].join('\n'); - throw error; -} -var projectId = process.env.GCLOUD_TESTS_PROJECT_ID, - email = process.env.GCLOUD_TESTS_SERVICE_ACCOUNT, - pemFilePath = process.env.GCLOUD_TESTS_PEM_KEY; +var env = require('./env.js'), + projectId = env.projectId, + email = env.serviceAccount, + pemFilePath = env.pemKey; var assert = require('assert'), datastore = require('../lib/datastore'), diff --git a/regression/env.js b/regression/env.js new file mode 100644 index 00000000000..c15ec8c3835 --- /dev/null +++ b/regression/env.js @@ -0,0 +1,30 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +if (!process.env.GCLOUD_TESTS_PROJECT_ID && + !process.env.GCLOUD_TESTS_SERVICE_ACCOUNT && + !process.env.GCLOUD_TESTS_PEM_KEY) { + var error = ['To run the regression tests, you need to set the value of some environment variables.', + 'Please check the README for instructions.' + ].join('\n'); + throw error; +} + +module.exports = { + projectId: process.env.GCLOUD_TESTS_PROJECT_ID, + serviceAccount: process.env.GCLOUD_TESTS_SERVICE_ACCOUNT, + pemKey: process.env.GCLOUD_TESTS_PEM_KEY +}; diff --git a/regression/pubsub.js b/regression/pubsub.js new file mode 100644 index 00000000000..cf0b4056ae1 --- /dev/null +++ b/regression/pubsub.js @@ -0,0 +1,181 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var assert = require('assert'), + async = require('async'); + +var env = require('./env.js'), + gcloud = require('../lib'); + +var topicNames = ['topic1', 'topic2', 'topic3']; +var subscriptions = [{ + name: 'sub1', + ackDeadlineSeconds: 30 +}, { + name: 'sub2', + ackDeadlineSeconds: 60 +}]; + +var conn = new gcloud.pubsub.Connection({ + projectId: env.projectId, + email: env.serviceAccount, + pemFilePath: env.pemKey, +}); + +before(function(done) { + // TODO: Handle pagination. + var createFn = function(name, callback) { + conn.createTopic(name, callback); + }; + conn.listTopics(function(err, topics) { + if (err) { return done(err); } + var fns = topics.map(function(t) { + return function(cb) { + t.del(cb); + }; + }); + async.parallel(fns, function(err) { + if (err) { return done(err); } + async.map(topicNames, createFn, done); + }); + }) +}); + +describe('Topic', function() { + + it('should be listed', function(done) { + conn.listTopics(function(err, topics) { + assert(topics.length, 3); + done(err); + }); + }); + + it('should return a nextQuery if there are more results', function(done) { + conn.listTopics({ maxResults: 2 }, function(err, topics, next) { + assert(topics.length, 2); + assert(next.maxResults, 2); + assert(!!next.pageToken, true); + done(err); + }); + }) + + it('should be created', function(done) { + conn.createTopic('topic-new', done); + }); + + it('should be gettable', function(done) { + conn.getTopic('topic1', done); + }); + + it('should publish a message', function(done) { + conn.getTopic('topic1', function(err, topic) { + topic.publish('message from me', done); + }); + }); + + it('should be deleted', function(done) { + conn.getTopic('topic3', function(err, topic) { + topic.del(done); + }); + }); + +}); + +describe('Subscription', function() { + + before(function(done) { + var createFn = function(item, callback) { + conn.createSubscription({ + name: item.name, + topic: 'topic1', + ackDeadlineSeconds: item.ackDeadlineSeconds + }, callback); + }; + conn.listSubscriptions(function(err, subs) { + if (err) { + done(err); return; + } + var fns = subs.map(function(sub) { + return function(cb) { + sub.del(cb); + }; + }); + async.series(fns, function(err) { + if (err) { + done(err); return; + } + async.map(subscriptions, createFn, done); + }); + }) + }); + + it('should be listed', function(done) { + conn.listSubscriptions(function(err, subs) { + assert.strictEqual(subs.length, 2); + done(err); + }); + }); + + it('should be gettable', function(done) { + conn.getSubscription('sub1', function(err, sub) { + if (err) { + done(err); return; + } + assert.strictEqual(sub.name, '/subscriptions/' + env.projectId + '/sub1'); + done(); + }); + }); + + it('should error while getting a non-existent subscription', function(done){ + conn.getSubscription('sub-nothing-is-here', function(err, sub) { + assert.strictEqual(err.code, 404); + done(); + }); + }); + + it('should be created', function(done) { + conn.createSubscription({ + topic: 'topic1', + name: 'new-sub' + }, done); + }); + + it('should be able to pull and ack', function(done) { + conn.getTopic('topic1', function(err, topic) { + if (err) { + done(err); return; + } + topic.publish('hello', function(err) { + if(err) done(err); return; + }); + }); + conn.getSubscription('sub1', function(err, sub) { + if (err) { + done(err); return; + } + sub.on('message', function(msg) { + sub.ack(msg.ackId, done); + }); + sub.pull({}, function(err) { + if (err) { + done(err); + return; + } + }); + }); + }); + +}); diff --git a/test/pubsub.js b/test/pubsub.js new file mode 100644 index 00000000000..e397582eb9e --- /dev/null +++ b/test/pubsub.js @@ -0,0 +1,97 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var assert = require('assert'), + pubsub = require('../lib/pubsub'); + +describe('Subscription', function() { + + it('should ack messages if autoAck is set', function(done) { + var sub = new pubsub.Subscription({}, 'sub1'); + sub.autoAck = true; + sub.conn.makeReq = function(method, path, qs, body, callback) { + if (path == 'subscriptions/pull') { + callback(null, { ackId: 'ackd-id' }); + return; + } + if (path === 'subscriptions/acknowledge') { + done(); + } + }; + sub.pull({}, function() {}); + }); + + it('should be closed', function(done) { + var sub = new pubsub.Subscription({}, 'sub1'); + sub.close(); + assert.strictEqual(sub.closed, true); + done(); + }); + + it('should pull messages', function(done) { + var conn = new pubsub.Connection({ + projectId: 'test-project' + }); + conn.makeReq = function(method, path, qs, body, callback) { + switch (path) { + case 'subscriptions//subscriptions/test-project/sub1': + callback(null, {}); + return; + case 'subscriptions/pull': + callback(null, { ackId: 123 }); + return; + } + }; + var sub = conn.subscribe('sub1', { autoAck: false }); + var doneCalled = false; + sub.on('message', function() { + if (!doneCalled) { + done(); + } + doneCalled = true; + }); + }); + + it('should pull and ack messages', function(done) { + var conn = new pubsub.Connection({ + projectId: 'test-project' + }); + conn.makeReq = function(method, path, qs, body, callback) { + switch (path) { + case 'subscriptions//subscriptions/test-project/sub1': + callback(null, {}); + return; + case 'subscriptions/pull': + setTimeout(function() { + callback(null, { ackId: 123 }); + }, 500); + return; + case 'subscriptions/acknowledge': + callback(null, true); + return; + } + }; + var sub = conn.subscribe('sub1', { autoAck: true }); + var doneCalled = false; + sub.on('message', function() { + if (!doneCalled) { + done(); + } + doneCalled = true; + }); + }); + +}); \ No newline at end of file