From e0dbd9c8417e54660ce5ea4c8ecac7cd98345bd8 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Thu, 24 Jul 2014 16:27:15 -0700 Subject: [PATCH 01/14] pubsub: Poll message and emit events. --- lib/pubsub/index.js | 118 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 93 insertions(+), 25 deletions(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 227bbdb7d7b..411612d7d9b 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -14,6 +14,9 @@ * limitations under the License. */ +var events = require('events'), + nodeutil = require('util'); + var conn = require('../common/connection.js'), util = require('../common/util.js'); @@ -31,15 +34,18 @@ var SCOPES = [ 'https://www.googleapis.com/auth/cloud-platform' ]; -// TODO(jbd): Emit message and error events if in polled-mode. -// sub.on('meessage', console.log) -// sub.on('error') - function Subscription(conn, name) { + var that = this; this.conn = conn; this.name = name; + + this.autoAck = false; + this.pullIntervalInMs = 10; + this.closed = false; } +nodeutil.inherits(Subscription, events.EventEmitter); + /** * Acknowledges the backend that message is retrieved. * @param {Array} ids A list of message IDs. @@ -56,44 +62,92 @@ Subscription.prototype.ack = function(ids, callback) { /** * Pulls from the subscribed topic. - * @param {Boolean} opts.returnImmediately If set, the system will respond immediately. - * Otherwise, wait until new messages are - * available. Returns if timeout is reached. - * @param {Boolean} opts.autoAck Automatically acknowledges the - * message once it's pulled. - * @param {Function} callback Callback function. + * @param {Boolean} opts.returnImmediately If set, the system will respond immediately. + * Otherwise, wait until new messages are + * available. Returns if timeout is reached. + * @param {Function} callback Callback. */ Subscription.prototype.pull = function(opts, callback) { - // TODO(jbd): Make opts optional. var that = this; - var autoAck = !!opts.autoAck; + // TODO(jbd): Should not be racing with other pull. + // TOOD(jbd): Make opts optional. var body = { subscription: this.name, returnImmediately: !!opts.returnImmediately }; this.conn.makeReq('POST', 'subscriptions/pull', null, body, function(err, message) { - if (err) { return callback(err); } - if (!autoAck) { - return callback(null, message); + if (err) { + callback(err); + return; + } + if (!that.autoAck) { + that.emitMessage_(message); + callback(); + return; } that.ack(message.ackId, function(err) { - if (err) { return callback(err); } - callback(null, message); + if (err) { + callback(err); + return; + } + that.emitMessage_(message); + callback(); }); }); }; +/** + * Polls the backend for new messages. + */ +Subscription.prototype.startPulling_ = function() { + var that = this; + var pullFn = function() { + if (that.closed) { + return; + } + that.pull({ returnImmediately: false }, function(err) { + // TODO(jbd): Fix API to return a more explicit error code or message. + if (err && err.message.indexOf('has no more messages') < 0) { + that.emitError_(err); + } + setTimeout(function() { + pullFn(); + }, that.pullIntervalInMs); + }); + }; + pullFn(); +} + /** * Unsubscribes the current subscription. Pull requests from the current * subscription will be errored once unsubscription is done. * @param {Function} opt_callback Optional callback. */ Subscription.prototype.unsubscribe = function(opt_callback) { + var that = this; cb = opt_callback || util.noop; var path = util.format('subscriptions/{fullName}', { fullName: this.name }); - this.conn.makeReq('DELETE', path, null, true, cb); + this.conn.makeReq('DELETE', path, null, true, function(err) { + if (err) return cb(err); + that.closed = true; + cb(err); + }); +}; + +/** + * Emits a 'message' event with the provided message. + */ +Subscription.prototype.emitMessage_ = function(msg) { + this.emit('message', msg); +}; + +/** + * Emits an error with the provided error. + */ +Subscription.prototype.emitError_ = function(err) { + this.emit('error', err); }; /** @@ -112,7 +166,12 @@ function Topic(conn, name) { * @param {Function} opt_callback Optional callback. */ Topic.prototype.publish = function(data, opt_callback) { - this.publishMessage({ topic: this.name, data: data }, opt_callback); + this.publishMessage({ + topic: this.name, + message: { + data: data + } + }, opt_callback); }; /** @@ -195,20 +254,29 @@ Connection.prototype.listSubscriptions = function(query, callback) { * Subscribe with the provided options. * @param {string} opts.name Name of the subscription. * @param {string} opts.topicName Name of the topic to subscribe. - * @param {Function} opt_callback Optional callback. + * @param {Boolean} opts.autoAck Automatically acknowledges the + * message once it's pulled. + * @return {Subscription} */ -Connection.prototype.subscribe = function(opts, opt_callback) { +Connection.prototype.subscribe = function(opts) { var that = this; - var cb = opt_callback || util.noop; - var body = { + var subscription = { topic:'/topics/' + this.id + '/' + opts.topicName, name: '/subscriptions/' + this.id + '/' + opts.name, ackDeadlineSeconds: opts.ackDeadlineSeconds }; - this.makeReq('POST', 'subscriptions', null, body, function(err, item) { + var sub = new Subscription(that, subscription.name); + sub.autoAck = !!opts.autoAck; + this.makeReq('POST', 'subscriptions', null, subscription, function(err, item) { // TODO(jbd): maybe init a subscription instance if http 200 or 409. - cb(err, new Subscription(that, body.name)); + if (err && err.code != 409) { + sub.emitError_(err); + return; + } + sub.emit('ready'); + sub.startPulling_(); }); + return sub; }; /** From 673e438123a34cafc2e5a763741fb63d73b6c7c5 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Thu, 24 Jul 2014 16:29:09 -0700 Subject: [PATCH 02/14] Add TODO. --- lib/pubsub/index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 411612d7d9b..e21c5db2d69 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -76,6 +76,7 @@ Subscription.prototype.pull = function(opts, callback) { returnImmediately: !!opts.returnImmediately }; this.conn.makeReq('POST', 'subscriptions/pull', null, body, function(err, message) { + // TODO(jbd): Fix API to return a list of messages. if (err) { callback(err); return; From 03c84f0fca0254e798939dd2cb5309abfa00fe3e Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Thu, 24 Jul 2014 16:50:10 -0700 Subject: [PATCH 03/14] Adding TODO about the immutability of the subscriptions. --- lib/pubsub/index.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index e21c5db2d69..99162cacec5 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -270,6 +270,8 @@ Connection.prototype.subscribe = function(opts) { sub.autoAck = !!opts.autoAck; this.makeReq('POST', 'subscriptions', null, subscription, function(err, item) { // TODO(jbd): maybe init a subscription instance if http 200 or 409. + // TODO(jbd): subscriptions are not mutable, return error and provide + // a way to create and get a subscription. (or upsert a subscription). if (err && err.code != 409) { sub.emitError_(err); return; From 41fde886cace38cc46cadb374253d1c28cd2490e Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Thu, 24 Jul 2014 16:53:02 -0700 Subject: [PATCH 04/14] Convert message data to base64 and decode on pull. --- lib/pubsub/index.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 99162cacec5..f79b110c2e2 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -141,6 +141,10 @@ Subscription.prototype.unsubscribe = function(opt_callback) { * Emits a 'message' event with the provided message. */ Subscription.prototype.emitMessage_ = function(msg) { + if (msg.pubsubEvent && msg.pubsubEvent.message) { + var data = msg.pubsubEvent.message.data; + msg.pubsubEvent.message.data = new Buffer(data, 'base64').toString('utf-8'); + } this.emit('message', msg); }; @@ -170,7 +174,7 @@ Topic.prototype.publish = function(data, opt_callback) { this.publishMessage({ topic: this.name, message: { - data: data + data: new Buffer(data).toString('base64') } }, opt_callback); }; From 95802bff9278e88cc681958f6001895351c87d9b Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Fri, 25 Jul 2014 15:47:55 -0700 Subject: [PATCH 05/14] pubsub: Subscriptions are designed to be persistent. Allow to close without deletion. --- lib/pubsub/index.js | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index f79b110c2e2..d19cc178d88 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -120,11 +120,11 @@ Subscription.prototype.startPulling_ = function() { } /** - * Unsubscribes the current subscription. Pull requests from the current + * Deletes the current subscription. Pull requests from the current * subscription will be errored once unsubscription is done. * @param {Function} opt_callback Optional callback. */ -Subscription.prototype.unsubscribe = function(opt_callback) { +Subscription.prototype.del = function(opt_callback) { var that = this; cb = opt_callback || util.noop; var path = util.format('subscriptions/{fullName}', { @@ -137,6 +137,13 @@ Subscription.prototype.unsubscribe = function(opt_callback) { }); }; +/** + * Closes the subscription. + */ +Subscription.prototype.close = function() { + this.closed = true; +}; + /** * Emits a 'message' event with the provided message. */ From 9db344ae39b0f6e78107d74b2146e7f5f691ac44 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Fri, 25 Jul 2014 15:48:33 -0700 Subject: [PATCH 06/14] pubsub: Subscribe should not involved in persistent subscription creation. --- lib/pubsub/index.js | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index d19cc178d88..594d5d0a15d 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -262,28 +262,22 @@ Connection.prototype.listSubscriptions = function(query, callback) { }); }; +// TODO(jbd): Create subscription + /** * Subscribe with the provided options. - * @param {string} opts.name Name of the subscription. - * @param {string} opts.topicName Name of the topic to subscribe. + * @param {string} name Name of the subscription. * @param {Boolean} opts.autoAck Automatically acknowledges the * message once it's pulled. * @return {Subscription} */ -Connection.prototype.subscribe = function(opts) { +Connection.prototype.subscribe = function(name, opts) { var that = this; - var subscription = { - topic:'/topics/' + this.id + '/' + opts.topicName, - name: '/subscriptions/' + this.id + '/' + opts.name, - ackDeadlineSeconds: opts.ackDeadlineSeconds - }; - var sub = new Subscription(that, subscription.name); + var fullName = '/subscriptions/' + this.id + '/' + name; + var sub = new Subscription(that, fullName); sub.autoAck = !!opts.autoAck; - this.makeReq('POST', 'subscriptions', null, subscription, function(err, item) { - // TODO(jbd): maybe init a subscription instance if http 200 or 409. - // TODO(jbd): subscriptions are not mutable, return error and provide - // a way to create and get a subscription. (or upsert a subscription). - if (err && err.code != 409) { + this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) { + if (err) { sub.emitError_(err); return; } From 7ee2d876b68369783cc6b9e5e27c24f9b961e139 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Fri, 25 Jul 2014 16:18:44 -0700 Subject: [PATCH 07/14] Allow options to be optional. --- lib/pubsub/index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 594d5d0a15d..0708ab33f77 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -273,6 +273,7 @@ Connection.prototype.listSubscriptions = function(query, callback) { */ Connection.prototype.subscribe = function(name, opts) { var that = this; + opts = opts || {}; var fullName = '/subscriptions/' + this.id + '/' + name; var sub = new Subscription(that, fullName); sub.autoAck = !!opts.autoAck; From aab8eb94d797603c2538843e2f7b0ad6cfb6e91c Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Fri, 25 Jul 2014 16:22:02 -0700 Subject: [PATCH 08/14] pubsub: Allow to get a subscription without listening. --- lib/pubsub/index.js | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 0708ab33f77..6861e7196b0 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -264,6 +264,23 @@ Connection.prototype.listSubscriptions = function(query, callback) { // TODO(jbd): Create subscription +/** + * Gets a subscription. + * @param {string} name Name of the subscription. + * @param {Function} callback Callback. + */ +Connection.prototype.getSubscription = function(name, callback) { + var that = this; + var fullName = '/subscriptions/' + this.id + '/' + name; + this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) { + if (err) { + callback(err); + return; + } + callback(null, new Subscription(that, fullName)); + }); +}; + /** * Subscribe with the provided options. * @param {string} name Name of the subscription. @@ -272,12 +289,12 @@ Connection.prototype.listSubscriptions = function(query, callback) { * @return {Subscription} */ Connection.prototype.subscribe = function(name, opts) { - var that = this; opts = opts || {}; + var fullName = '/subscriptions/' + this.id + '/' + name; - var sub = new Subscription(that, fullName); + var sub = new Subscription(this, fullName); sub.autoAck = !!opts.autoAck; - this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) { + this.getSubscription(name, function(err) { if (err) { sub.emitError_(err); return; From 50328b100cf42548d2905dc42cacf316e1f6b445 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Sat, 26 Jul 2014 16:44:25 -0700 Subject: [PATCH 09/14] Include regression tests to coverage. --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index ddb8061dbef..537ff186f46 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,7 @@ "scripts": { "test": "node_modules/mocha/bin/_mocha --reporter spec", "regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 4000 regression/*", - "cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha" + "cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha -- --timeout 4000 test/* regression/*" }, "license": "Apache 2" } From cc3c0c1d6ced39616480b9cd84212da85c214d41 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Sat, 26 Jul 2014 16:45:06 -0700 Subject: [PATCH 10/14] pubsub: Adding subscription creation. --- lib/pubsub/index.js | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 6861e7196b0..8d6e6ae3ab4 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -262,8 +262,6 @@ Connection.prototype.listSubscriptions = function(query, callback) { }); }; -// TODO(jbd): Create subscription - /** * Gets a subscription. * @param {string} name Name of the subscription. @@ -281,6 +279,22 @@ Connection.prototype.getSubscription = function(name, callback) { }); }; +Connection.prototype.createSubscription = function(opts, callback) { + var that = this; + var subscription = { + topic:'/topics/' + this.id + '/' + opts.topic, + name: '/subscriptions/' + this.id + '/' + opts.name, + ackDeadlineSeconds: opts.ackDeadlineSeconds + }; + this.makeReq('POST', 'subscriptions', null, subscription, function(err) { + if (err) { + callback(err); + return; + } + callback(null, new Subscription(that, subscription.name)); + }); +}; + /** * Subscribe with the provided options. * @param {string} name Name of the subscription. From f00eaada16d4c551d2b3245735d8b64dfd19d9e2 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Sat, 26 Jul 2014 17:29:28 -0700 Subject: [PATCH 11/14] pubsub: Fixing undefined callback. --- lib/pubsub/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 8d6e6ae3ab4..a31c1b2de2f 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -203,7 +203,7 @@ Topic.prototype.publishMessage = function(message, opt_callback) { */ Topic.prototype.del = function(opt_callback) { var path = 'topics/' + this.name; - this.conn.makeReq('DELETE', path, null, true, cb); + this.conn.makeReq('DELETE', path, null, true, opt_callback); }; /** From f054959611741944e294f3dbcc7dd749891cdf73 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Sat, 26 Jul 2014 17:39:39 -0700 Subject: [PATCH 12/14] pubsub: Add regression tests. --- package.json | 4 +- regression/datastore.js | 15 ++---- regression/env.js | 30 ++++++++++++ regression/pubsub.js | 101 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 13 deletions(-) create mode 100644 regression/env.js create mode 100644 regression/pubsub.js diff --git a/package.json b/package.json index 537ff186f46..275da23533e 100644 --- a/package.json +++ b/package.json @@ -37,8 +37,8 @@ }, "scripts": { "test": "node_modules/mocha/bin/_mocha --reporter spec", - "regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 4000 regression/*", - "cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha -- --timeout 4000 test/* regression/*" + "regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 10000 regression/*", + "cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha -- --timeout 10000 test/* regression/*" }, "license": "Apache 2" } diff --git a/regression/datastore.js b/regression/datastore.js index 71add1494be..2ab73c9fdc0 100644 --- a/regression/datastore.js +++ b/regression/datastore.js @@ -14,17 +14,10 @@ * limitations under the License. */ -if (!process.env.GCLOUD_TESTS_PROJECT_ID && - !process.env.GCLOUD_TESTS_SERVICE_ACCOUNT && - !process.env.GCLOUD_TESTS_PEM_KEY) { - var error = ['To run the regression tests, you need to set the value of some environment variables.', - 'Please check the README for instructions.' - ].join('\n'); - throw error; -} -var projectId = process.env.GCLOUD_TESTS_PROJECT_ID, - email = process.env.GCLOUD_TESTS_SERVICE_ACCOUNT, - pemFilePath = process.env.GCLOUD_TESTS_PEM_KEY; +var env = require('./env.js'), + projectId = env.projectId, + email = env.serviceAccount, + pemFilePath = env.pemKey; var assert = require('assert'), datastore = require('../lib/datastore'), diff --git a/regression/env.js b/regression/env.js new file mode 100644 index 00000000000..c15ec8c3835 --- /dev/null +++ b/regression/env.js @@ -0,0 +1,30 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +if (!process.env.GCLOUD_TESTS_PROJECT_ID && + !process.env.GCLOUD_TESTS_SERVICE_ACCOUNT && + !process.env.GCLOUD_TESTS_PEM_KEY) { + var error = ['To run the regression tests, you need to set the value of some environment variables.', + 'Please check the README for instructions.' + ].join('\n'); + throw error; +} + +module.exports = { + projectId: process.env.GCLOUD_TESTS_PROJECT_ID, + serviceAccount: process.env.GCLOUD_TESTS_SERVICE_ACCOUNT, + pemKey: process.env.GCLOUD_TESTS_PEM_KEY +}; diff --git a/regression/pubsub.js b/regression/pubsub.js new file mode 100644 index 00000000000..3126042aca8 --- /dev/null +++ b/regression/pubsub.js @@ -0,0 +1,101 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var assert = require('assert'), + async = require('async'); + +var env = require('./env.js'), + gcloud = require('../lib'); + +var topicNames = ['topic1', 'topic2', 'topic3']; +var subscriptions = []; + +var conn = new gcloud.pubsub.Connection({ + projectId: env.projectId, + email: env.serviceAccount, + pemFilePath: env.pemKey, +}); + + +describe('Topic', function() { + + before(function(done) { + // TODO: Handle pagination. + var createFn = function(name, callback) { + conn.createTopic(name, callback); + }; + conn.listTopics(function(err, topics) { + if (err) { done(err); } + var fns = []; + for (var i=0; i Date: Sun, 27 Jul 2014 16:02:41 -0700 Subject: [PATCH 13/14] pubsub: Adding regression tests for subscriptions --- lib/pubsub/index.js | 6 +-- regression/pubsub.js | 108 +++++++++++++++++++++++++++++++------------ 2 files changed, 81 insertions(+), 33 deletions(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index a31c1b2de2f..b1a7ee755ac 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -414,12 +414,12 @@ Connection.prototype.makeReq = function(method, path, q, body, callback) { } this.conn.req(reqOpts, function(err, res, body) { if (body && body.error) { - return callback(body.error); + callback(new util.ApiError(body.error)); return; } if (res && (res.statusCode < 200 || res.statusCode > 299)) { - return callback(new Error('error during request, statusCode: ' + res.statusCode)); + callback(new Error('error during request, statusCode: ' + res.statusCode)); return; } - callback(err, body); + callback(null, body); }); }; diff --git a/regression/pubsub.js b/regression/pubsub.js index 3126042aca8..6b09bef1a45 100644 --- a/regression/pubsub.js +++ b/regression/pubsub.js @@ -21,7 +21,13 @@ var env = require('./env.js'), gcloud = require('../lib'); var topicNames = ['topic1', 'topic2', 'topic3']; -var subscriptions = []; +var subscriptions = [{ + name: 'sub1', + ackDeadlineSeconds: 30 +}, { + name: 'sub2', + ackDeadlineSeconds: 60 +}]; var conn = new gcloud.pubsub.Connection({ projectId: env.projectId, @@ -29,31 +35,27 @@ var conn = new gcloud.pubsub.Connection({ pemFilePath: env.pemKey, }); +before(function(done) { + // TODO: Handle pagination. + var createFn = function(name, callback) { + conn.createTopic(name, callback); + }; + conn.listTopics(function(err, topics) { + if (err) { return done(err); } + var fns = topics.map(function(t) { + return function(cb) { + t.del(cb); + }; + }); + async.parallel(fns, function(err) { + if (err) { return done(err); } + async.map(topicNames, createFn, done); + }); + }) +}); describe('Topic', function() { - before(function(done) { - // TODO: Handle pagination. - var createFn = function(name, callback) { - conn.createTopic(name, callback); - }; - conn.listTopics(function(err, topics) { - if (err) { done(err); } - var fns = []; - for (var i=0; i Date: Mon, 28 Jul 2014 12:56:57 -0700 Subject: [PATCH 14/14] pubsub: Improving test coverage. --- lib/pubsub/index.js | 10 +++++ regression/pubsub.js | 40 ++++++++++++++++-- test/pubsub.js | 97 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 4 deletions(-) create mode 100644 test/pubsub.js diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index b1a7ee755ac..bac855e79dc 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -427,3 +427,13 @@ Connection.prototype.makeReq = function(method, path, q, body, callback) { * Exports Connection. */ module.exports.Connection = Connection; + +/** + * Exports Topic. + */ +module.exports.Topic = Topic; + +/** + * Exports Subscription. + */ +module.exports.Subscription = Subscription; diff --git a/regression/pubsub.js b/regression/pubsub.js index 6b09bef1a45..cf0b4056ae1 100644 --- a/regression/pubsub.js +++ b/regression/pubsub.js @@ -57,13 +57,21 @@ before(function(done) { describe('Topic', function() { it('should be listed', function(done) { - // TODO(jbd): Add pagination. conn.listTopics(function(err, topics) { assert(topics.length, 3); done(err); }); }); + it('should return a nextQuery if there are more results', function(done) { + conn.listTopics({ maxResults: 2 }, function(err, topics, next) { + assert(topics.length, 2); + assert(next.maxResults, 2); + assert(!!next.pageToken, true); + done(err); + }); + }) + it('should be created', function(done) { conn.createTopic('topic-new', done); }); @@ -121,8 +129,7 @@ describe('Subscription', function() { }); }); - // TODO(jbd): Add assertions. - it('should get a subscription', function(done) { + it('should be gettable', function(done) { conn.getSubscription('sub1', function(err, sub) { if (err) { done(err); return; @@ -139,11 +146,36 @@ describe('Subscription', function() { }); }); - it('should create a subscription', function(done) { + it('should be created', function(done) { conn.createSubscription({ topic: 'topic1', name: 'new-sub' }, done); }); + it('should be able to pull and ack', function(done) { + conn.getTopic('topic1', function(err, topic) { + if (err) { + done(err); return; + } + topic.publish('hello', function(err) { + if(err) done(err); return; + }); + }); + conn.getSubscription('sub1', function(err, sub) { + if (err) { + done(err); return; + } + sub.on('message', function(msg) { + sub.ack(msg.ackId, done); + }); + sub.pull({}, function(err) { + if (err) { + done(err); + return; + } + }); + }); + }); + }); diff --git a/test/pubsub.js b/test/pubsub.js new file mode 100644 index 00000000000..e397582eb9e --- /dev/null +++ b/test/pubsub.js @@ -0,0 +1,97 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var assert = require('assert'), + pubsub = require('../lib/pubsub'); + +describe('Subscription', function() { + + it('should ack messages if autoAck is set', function(done) { + var sub = new pubsub.Subscription({}, 'sub1'); + sub.autoAck = true; + sub.conn.makeReq = function(method, path, qs, body, callback) { + if (path == 'subscriptions/pull') { + callback(null, { ackId: 'ackd-id' }); + return; + } + if (path === 'subscriptions/acknowledge') { + done(); + } + }; + sub.pull({}, function() {}); + }); + + it('should be closed', function(done) { + var sub = new pubsub.Subscription({}, 'sub1'); + sub.close(); + assert.strictEqual(sub.closed, true); + done(); + }); + + it('should pull messages', function(done) { + var conn = new pubsub.Connection({ + projectId: 'test-project' + }); + conn.makeReq = function(method, path, qs, body, callback) { + switch (path) { + case 'subscriptions//subscriptions/test-project/sub1': + callback(null, {}); + return; + case 'subscriptions/pull': + callback(null, { ackId: 123 }); + return; + } + }; + var sub = conn.subscribe('sub1', { autoAck: false }); + var doneCalled = false; + sub.on('message', function() { + if (!doneCalled) { + done(); + } + doneCalled = true; + }); + }); + + it('should pull and ack messages', function(done) { + var conn = new pubsub.Connection({ + projectId: 'test-project' + }); + conn.makeReq = function(method, path, qs, body, callback) { + switch (path) { + case 'subscriptions//subscriptions/test-project/sub1': + callback(null, {}); + return; + case 'subscriptions/pull': + setTimeout(function() { + callback(null, { ackId: 123 }); + }, 500); + return; + case 'subscriptions/acknowledge': + callback(null, true); + return; + } + }; + var sub = conn.subscribe('sub1', { autoAck: true }); + var doneCalled = false; + sub.on('message', function() { + if (!doneCalled) { + done(); + } + doneCalled = true; + }); + }); + +}); \ No newline at end of file