From a7492c6824bf25cdfd74e6a1496abc51fca60f6c Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Fri, 21 Apr 2017 14:23:39 -0400 Subject: [PATCH] pubsub: borrow request config from GAX (#2242) * pubsub: borrow request config from GAX [ci skip] * tests * rm .only --- packages/pubsub/src/index.js | 37 ++++++++- packages/pubsub/src/subscription.js | 14 ++-- packages/pubsub/src/topic.js | 11 ++- packages/pubsub/test/index.js | 114 ++++++++++++++++++++++++++- packages/pubsub/test/subscription.js | 84 ++++++++++---------- packages/pubsub/test/topic.js | 35 ++++---- 6 files changed, 216 insertions(+), 79 deletions(-) diff --git a/packages/pubsub/src/index.js b/packages/pubsub/src/index.js index f226d325106..73184308460 100644 --- a/packages/pubsub/src/index.js +++ b/packages/pubsub/src/index.js @@ -45,6 +45,16 @@ var Subscription = require('./subscription.js'); */ var Topic = require('./topic.js'); +/** + * @type {object} - GAX's default configuration. + */ +var GAX_CONFIG = { + Publisher: require('./v1/publisher_client_config.json'). + interfaces['google.pubsub.v1.Publisher'], + Subscriber: require('./v1/subscriber_client_config.json'). + interfaces['google.pubsub.v1.Subscriber'] +}; + /** * [Cloud Pub/Sub](https://developers.google.com/pubsub/overview) is a * reliable, many-to-many, asynchronous messaging service from Cloud @@ -751,6 +761,24 @@ PubSub.prototype.topic = function(name) { return new Topic(this, name); }; +/** + * Intercept the call to {module:common/grpc-service#request}, making sure the + * correct timeouts are set. + * + * @private + */ +PubSub.prototype.request = function(protoOpts) { + var method = protoOpts.method; + var camelCaseMethod = method[0].toUpperCase() + method.substr(1); + var config = GAX_CONFIG[protoOpts.service].methods[camelCaseMethod]; + + if (is.undefined(arguments[0].timeout)) { + arguments[0].timeout = config.timeout_millis; + } + + commonGrpc.Service.prototype.request.apply(this, arguments); +}; + /** * Determine the appropriate endpoint to use for API requests, first trying the * local Pub/Sub emulator environment variable (PUBSUB_EMULATOR_HOST), otherwise @@ -775,7 +803,7 @@ PubSub.prototype.determineBaseUrl_ = function() { /*! Developer Documentation * - * These methods can be auto-paginated. + * These methods can be agto-paginated. */ common.paginator.extend(PubSub, [ 'getSnapshots', @@ -789,7 +817,12 @@ common.paginator.extend(PubSub, [ * that a callback is omitted. */ common.util.promisifyAll(PubSub, { - exclude: ['snapshot', 'subscription', 'topic'] + exclude: [ + 'request', + 'snapshot', + 'subscription', + 'topic' + ] }); PubSub.Subscription = Subscription; diff --git a/packages/pubsub/src/subscription.js b/packages/pubsub/src/subscription.js index 69b564b726a..6610e7ef814 100644 --- a/packages/pubsub/src/subscription.js +++ b/packages/pubsub/src/subscription.js @@ -465,7 +465,7 @@ Subscription.prototype.ack = function(ackIds, options, callback) { ackIds: ackIds }; - this.request(protoOpts, reqOpts, function(err, resp) { + this.parent.request(protoOpts, reqOpts, function(err, resp) { if (!err) { ackIds.forEach(function(ackId) { delete self.inProgressAckIds[ackId]; @@ -523,7 +523,7 @@ Subscription.prototype.createSnapshot = function(name, callback) { subscription: this.name }; - this.request(protoOpts, reqOpts, function(err, resp) { + this.parent.request(protoOpts, reqOpts, function(err, resp) { if (err) { callback(err, null, resp); return; @@ -601,7 +601,7 @@ Subscription.prototype.delete = function(callback) { subscription: this.name }; - this.request(protoOpts, reqOpts, function(err, resp) { + this.parent.request(protoOpts, reqOpts, function(err, resp) { if (err) { callback(err, resp); return; @@ -696,9 +696,11 @@ Subscription.prototype.pull = function(options, callback) { maxMessages: options.maxResults }; - this.activeRequest_ = this.request(protoOpts, reqOpts, function(err, resp) { + this.activeRequest_ = this.parent.request(protoOpts, reqOpts, function(err) { self.activeRequest_ = null; + var resp = arguments[1]; + if (err) { if (err.code === 504) { // Simulate a server timeout where no messages were received. @@ -779,7 +781,7 @@ Subscription.prototype.seek = function(snapshot, callback) { throw new Error('Either a snapshot name or Date is needed to seek to.'); } - this.request(protoOpts, reqOpts, callback); + this.parent.request(protoOpts, reqOpts, callback); }; /** @@ -825,7 +827,7 @@ Subscription.prototype.setAckDeadline = function(options, callback) { ackDeadlineSeconds: options.seconds }; - this.request(protoOpts, reqOpts, function(err, resp) { + this.parent.request(protoOpts, reqOpts, function(err, resp) { callback(err, resp); }); }; diff --git a/packages/pubsub/src/topic.js b/packages/pubsub/src/topic.js index 8bf664a0696..1cd3dbed632 100644 --- a/packages/pubsub/src/topic.js +++ b/packages/pubsub/src/topic.js @@ -49,7 +49,6 @@ var IAM = require('./iam.js'); */ function Topic(pubsub, name) { this.name = Topic.formatName_(pubsub.projectId, name); - this.pubsub = pubsub; var methods = { /** @@ -319,7 +318,7 @@ Topic.prototype.getSubscriptions = function(options, callback) { options = options || {}; options.topic = this; - return this.pubsub.getSubscriptions(options, callback); + return this.parent.getSubscriptions(options, callback); }; /** @@ -353,7 +352,7 @@ Topic.prototype.getSubscriptionsStream = function(options) { options = options || {}; options.topic = this; - return this.pubsub.getSubscriptionsStream(options); + return this.parent.getSubscriptionsStream(options); }; /** @@ -475,7 +474,7 @@ Topic.prototype.publish = function(messages, options, callback) { .map(Topic.formatMessage_) }; - this.request(protoOpts, reqOpts, function(err, result) { + this.parent.request(protoOpts, reqOpts, function(err, result) { if (err) { callback(err, null, result); return; @@ -541,7 +540,7 @@ Topic.prototype.publish = function(messages, options, callback) { * }); */ Topic.prototype.subscribe = function(subName, options, callback) { - this.pubsub.subscribe(this, subName, options, callback); + this.parent.subscribe(this, subName, options, callback); }; /** @@ -574,7 +573,7 @@ Topic.prototype.subscription = function(name, options) { options = options || {}; options.topic = this; - return this.pubsub.subscription(name, options); + return this.parent.subscription(name, options); }; /*! Developer Documentation diff --git a/packages/pubsub/test/index.js b/packages/pubsub/test/index.js index 6eabd31dd1c..e307ab652c4 100644 --- a/packages/pubsub/test/index.js +++ b/packages/pubsub/test/index.js @@ -40,7 +40,12 @@ var fakeUtil = extend({}, util, { } promisified = true; - assert.deepEqual(options.exclude, ['snapshot', 'subscription', 'topic']); + assert.deepEqual(options.exclude, [ + 'request', + 'snapshot', + 'subscription', + 'topic' + ]); } }); @@ -48,6 +53,11 @@ function FakeGrpcService() { this.calledWith_ = arguments; } +var grpcServiceRequestOverride; +FakeGrpcService.prototype.request = function() { + return (grpcServiceRequestOverride || util.noop).apply(this, arguments); +}; + function FakeSnapshot() { this.calledWith_ = arguments; } @@ -74,6 +84,22 @@ var fakePaginator = { } }; +var GAX_CONFIG_PUBLISHER_OVERRIDE = {}; +var GAX_CONFIG_SUBSCRIBER_OVERRIDE = {}; + +var GAX_CONFIG = { + Publisher: { + interfaces: { + 'google.pubsub.v1.Publisher': GAX_CONFIG_PUBLISHER_OVERRIDE + } + }, + Subscriber: { + interfaces: { + 'google.pubsub.v1.Subscriber': GAX_CONFIG_SUBSCRIBER_OVERRIDE + } + } +}; + describe('PubSub', function() { var PubSub; var PROJECT_ID = 'test-project'; @@ -94,7 +120,10 @@ describe('PubSub', function() { }, './snapshot.js': FakeSnapshot, './subscription.js': Subscription, - './topic.js': Topic + './topic.js': Topic, + + './v1/publisher_client_config.json': GAX_CONFIG.Publisher, + './v1/subscriber_client_config.json': GAX_CONFIG.Subscriber }); }); @@ -105,6 +134,7 @@ describe('PubSub', function() { }); beforeEach(function() { + grpcServiceRequestOverride = null; SubscriptionOverride = null; pubsub = new PubSub(OPTIONS); pubsub.projectId = PROJECT_ID; @@ -897,6 +927,86 @@ describe('PubSub', function() { }); }); + describe('request', function() { + var TIMEOUT = Math.random(); + + beforeEach(function() { + GAX_CONFIG_PUBLISHER_OVERRIDE.methods = { + MethodName: { + timeout_millis: TIMEOUT + } + }; + }); + + after(function() { + GAX_CONFIG_PUBLISHER_OVERRIDE.methods = {}; + }); + + it('should pass through the request', function(done) { + var args = [ + { + service: 'Publisher', + method: 'MethodName' + }, + { + value: true + }, + { + anotherValue: true + } + ]; + + grpcServiceRequestOverride = function() { + assert.strictEqual(this, pubsub); + assert.strictEqual(args[0], arguments[0]); + assert.strictEqual(args[1], arguments[1]); + assert.strictEqual(args[2], arguments[2]); + done(); + }; + + pubsub.request.apply(pubsub, args); + }); + + it('should assign a timeout', function(done) { + grpcServiceRequestOverride = function(protoOpts) { + assert.strictEqual(protoOpts.timeout, TIMEOUT); + done(); + }; + + pubsub.request({ + service: 'Publisher', + method: 'MethodName' + }); + }); + + it('should not override a timeout if set', function(done) { + var timeout = 0; + + grpcServiceRequestOverride = function(protoOpts) { + assert.strictEqual(protoOpts.timeout, timeout); + done(); + }; + + pubsub.request({ + service: 'Publisher', + method: 'MethodName', + timeout: timeout + }); + }); + + it('should camel case the method name', function(done) { + grpcServiceRequestOverride = function(protoOpts) { + assert.strictEqual(protoOpts.timeout, TIMEOUT); + done(); + }; + + pubsub.request({ + service: 'Publisher', + method: 'methodName' + }); + }); + }); + describe('determineBaseUrl_', function() { function setHost(host) { process.env.PUBSUB_EMULATOR_HOST = host; diff --git a/packages/pubsub/test/subscription.js b/packages/pubsub/test/subscription.js index b635e6be055..bb0f65b9b14 100644 --- a/packages/pubsub/test/subscription.js +++ b/packages/pubsub/test/subscription.js @@ -18,8 +18,6 @@ var assert = require('assert'); var extend = require('extend'); -var GrpcServiceObject = require('@google-cloud/common-grpc').ServiceObject; -var nodeutil = require('util'); var proxyquire = require('proxyquire'); var util = require('@google-cloud/common').util; @@ -34,11 +32,8 @@ var fakeUtil = extend({}, util, { function FakeGrpcServiceObject() { this.calledWith_ = arguments; - GrpcServiceObject.apply(this, arguments); } -nodeutil.inherits(FakeGrpcServiceObject, GrpcServiceObject); - function FakeIAM() { this.calledWith_ = [].slice.call(arguments); } @@ -103,6 +98,8 @@ describe('Subscription', function() { beforeEach(function() { subscription = new Subscription(PUBSUB, { name: SUB_NAME }); + PUBSUB.request = util.noop; + subscription.parent = PUBSUB; }); afterEach(function() { @@ -231,7 +228,7 @@ describe('Subscription', function() { }); it('should inherit from GrpcServiceObject', function() { - assert(subscription instanceof GrpcServiceObject); + assert(subscription instanceof FakeGrpcServiceObject); var calledWith = subscription.calledWith_[0]; @@ -269,7 +266,6 @@ describe('Subscription', function() { name: SUB_NAME, topic: topicInstance }); - assert(subscription instanceof GrpcServiceObject); var calledWith = subscription.calledWith_[0]; assert.deepEqual(calledWith.methods.create, true); @@ -377,14 +373,12 @@ describe('Subscription', function() { it('should accept a single id', function() { assert.doesNotThrow(function() { - subscription.request = util.noop; subscription.ack(1, util.noop); }); }); it('should accept an array of ids', function() { assert.doesNotThrow(function() { - subscription.request = util.noop; subscription.ack([1], util.noop); }); }); @@ -392,7 +386,7 @@ describe('Subscription', function() { it('should make an array out of ids', function(done) { var ID = 'abc'; - subscription.request = function(protoOpts, reqOpts) { + subscription.parent.request = function(protoOpts, reqOpts) { assert.deepEqual(reqOpts.ackIds, [ID]); done(); }; @@ -403,7 +397,7 @@ describe('Subscription', function() { it('should make correct api request', function(done) { var IDS = [1, 2, 3]; - subscription.request = function(protoOpts, reqOpts) { + subscription.parent.request = function(protoOpts, reqOpts) { assert.strictEqual(protoOpts.service, 'Subscriber'); assert.strictEqual(protoOpts.method, 'acknowledge'); @@ -421,7 +415,7 @@ describe('Subscription', function() { timeout: 10 }; - subscription.request = function(protoOpts) { + subscription.parent.request = function(protoOpts) { assert.strictEqual(protoOpts.timeout, options.timeout); done(); }; @@ -439,7 +433,7 @@ describe('Subscription', function() { }); it('should unmark the ack ids as being in progress', function(done) { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(); }; @@ -458,7 +452,7 @@ describe('Subscription', function() { }); it('should not unmark if there was an error', function(done) { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(new Error('Error.')); }; @@ -475,7 +469,7 @@ describe('Subscription', function() { }); it('should refresh paused status', function(done) { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(); }; @@ -487,7 +481,7 @@ describe('Subscription', function() { it('should pass error to callback', function(done) { var error = new Error('Error.'); - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(error); }; @@ -499,7 +493,7 @@ describe('Subscription', function() { it('should pass apiResponse to callback', function(done) { var resp = { success: true }; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(null, resp); }; subscription.ack(1, function(err, apiResponse) { @@ -527,7 +521,7 @@ describe('Subscription', function() { return FULL_SNAPSHOT_NAME; }; - subscription.request = function(protoOpts, reqOpts) { + subscription.parent.request = function(protoOpts, reqOpts) { assert.strictEqual(protoOpts.service, 'Subscriber'); assert.strictEqual(protoOpts.method, 'createSnapshot'); @@ -544,7 +538,7 @@ describe('Subscription', function() { var error = new Error('err'); var resp = {}; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(error, resp); }; @@ -562,7 +556,7 @@ describe('Subscription', function() { var fakeSnapshot = {}; var resp = {}; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(null, resp); }; @@ -585,7 +579,7 @@ describe('Subscription', function() { describe('delete', function() { it('should delete a subscription', function(done) { - subscription.request = function(protoOpts, reqOpts) { + subscription.parent.request = function(protoOpts, reqOpts) { assert.strictEqual(protoOpts.service, 'Subscriber'); assert.strictEqual(protoOpts.method, 'deleteSubscription'); @@ -598,7 +592,7 @@ describe('Subscription', function() { }); it('should close a subscription once deleted', function() { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(); }; subscription.closed = false; @@ -607,7 +601,7 @@ describe('Subscription', function() { }); it('should remove all listeners', function(done) { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(); }; subscription.removeAllListeners = function() { @@ -617,7 +611,7 @@ describe('Subscription', function() { }); it('should execute callback when deleted', function(done) { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(); }; subscription.delete(done); @@ -625,7 +619,7 @@ describe('Subscription', function() { it('should execute callback with an api error', function(done) { var error = new Error('Error.'); - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(error); }; subscription.delete(function(err) { @@ -636,7 +630,7 @@ describe('Subscription', function() { it('should execute callback with apiResponse', function(done) { var resp = { success: true }; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(null, resp); }; subscription.delete(function(err, apiResponse) { @@ -649,7 +643,7 @@ describe('Subscription', function() { describe('pull', function() { beforeEach(function() { subscription.ack = util.noop; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(null, messageObj); }; }); @@ -659,7 +653,7 @@ describe('Subscription', function() { }); it('should default returnImmediately to false', function(done) { - subscription.request = function(protoOpts, reqOpts) { + subscription.parent.request = function(protoOpts, reqOpts) { assert.strictEqual(reqOpts.returnImmediately, false); done(); }; @@ -667,7 +661,7 @@ describe('Subscription', function() { }); it('should honor options', function(done) { - subscription.request = function(protoOpts, reqOpts) { + subscription.parent.request = function(protoOpts, reqOpts) { assert.strictEqual(reqOpts.returnImmediately, true); done(); }; @@ -675,7 +669,7 @@ describe('Subscription', function() { }); it('should make correct api request', function(done) { - subscription.request = function(protoOpts, reqOpts) { + subscription.parent.request = function(protoOpts, reqOpts) { assert.strictEqual(protoOpts.service, 'Subscriber'); assert.strictEqual(protoOpts.method, 'pull'); assert.strictEqual(protoOpts.timeout, 92000); @@ -698,9 +692,11 @@ describe('Subscription', function() { timeout: timeout }); - subscription.request = function(protoOpts) { - assert.strictEqual(protoOpts.timeout, 30000); - done(); + subscription.parent = { + request: function(protoOpts) { + assert.strictEqual(protoOpts.timeout, 30000); + done(); + } }; subscription.pull(assert.ifError); @@ -709,7 +705,7 @@ describe('Subscription', function() { it('should store the active request', function() { var requestInstance = {}; - subscription.request = function() { + subscription.parent.request = function() { return requestInstance; }; @@ -720,7 +716,7 @@ describe('Subscription', function() { it('should clear the active request', function(done) { var requestInstance = {}; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { setImmediate(function() { callback(null, {}); assert.strictEqual(subscription.activeRequest_, null); @@ -735,7 +731,7 @@ describe('Subscription', function() { it('should pass error to callback', function(done) { var error = new Error('Error.'); - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(error); }; subscription.pull(function(err) { @@ -745,7 +741,7 @@ describe('Subscription', function() { }); it('should not return messages if request timed out', function(done) { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback({ code: 504 }); }; @@ -831,7 +827,7 @@ describe('Subscription', function() { }); it('should not autoAck if no messages returned', function(done) { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(null, { receivedMessages: [] }); }; subscription.ack = function() { @@ -884,7 +880,7 @@ describe('Subscription', function() { callback(null, { success: true }); }; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(null, resp); }; @@ -913,7 +909,7 @@ describe('Subscription', function() { return FAKE_FULL_SNAPSHOT_NAME; }; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { assert.strictEqual(protoOpts.service, 'Subscriber'); assert.strictEqual(protoOpts.method, 'seek'); @@ -930,7 +926,7 @@ describe('Subscription', function() { it('should optionally accept a Date object', function(done) { var date = new Date(); - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { var seconds = Math.floor(date.getTime() / 1000); assert.strictEqual(reqOpts.time.seconds, seconds); @@ -947,7 +943,7 @@ describe('Subscription', function() { describe('setAckDeadline', function() { it('should set the ack deadline', function(done) { - subscription.request = function(protoOpts, reqOpts) { + subscription.parent.request = function(protoOpts, reqOpts) { assert.strictEqual(protoOpts.service, 'Subscriber'); assert.strictEqual(protoOpts.method, 'modifyAckDeadline'); @@ -964,7 +960,7 @@ describe('Subscription', function() { }); it('should execute the callback', function(done) { - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(); }; subscription.setAckDeadline({}, done); @@ -972,7 +968,7 @@ describe('Subscription', function() { it('should execute the callback with apiResponse', function(done) { var resp = { success: true }; - subscription.request = function(protoOpts, reqOpts, callback) { + subscription.parent.request = function(protoOpts, reqOpts, callback) { callback(null, resp); }; subscription.setAckDeadline({}, function(err, apiResponse) { diff --git a/packages/pubsub/test/topic.js b/packages/pubsub/test/topic.js index 5e432f3a021..48985c6353a 100644 --- a/packages/pubsub/test/topic.js +++ b/packages/pubsub/test/topic.js @@ -72,6 +72,7 @@ describe('Topic', function() { beforeEach(function() { topic = new Topic(PUBSUB, TOPIC_NAME); + topic.parent = PUBSUB; }); describe('initialization', function() { @@ -128,10 +129,6 @@ describe('Topic', function() { }; new Topic(PUBSUB, TOPIC_NAME); }); - - it('should assign pubsub object to `this`', function() { - assert.deepEqual(topic.pubsub, PUBSUB); - }); }); describe('formatMessage_', function() { @@ -170,7 +167,7 @@ describe('Topic', function() { describe('getSubscriptions', function() { it('should accept just a callback', function(done) { - topic.pubsub.getSubscriptions = function(options, callback) { + topic.parent.getSubscriptions = function(options, callback) { assert.deepEqual(options, { topic: topic }); callback(); }; @@ -181,7 +178,7 @@ describe('Topic', function() { it('should pass correct args to pubsub#getSubscriptions', function(done) { var opts = { a: 'b', c: 'd' }; - topic.pubsub = { + topic.parent = { getSubscriptions: function(options, callback) { assert.deepEqual(options, opts); assert.deepEqual(options.topic, topic); @@ -197,7 +194,7 @@ describe('Topic', function() { it('should return a stream', function(done) { var fakeStream = {}; - topic.pubsub.getSubscriptionsStream = function(options) { + topic.parent.getSubscriptionsStream = function(options) { assert.deepEqual(options, { topic: topic }); setImmediate(done); return fakeStream; @@ -210,7 +207,7 @@ describe('Topic', function() { it('should pass correct args to getSubscriptionsStream', function(done) { var opts = { a: 'b', c: 'd' }; - topic.pubsub = { + topic.parent = { getSubscriptionsStream: function(options) { assert.deepEqual(options, opts); assert.deepEqual(options.topic, topic); @@ -239,7 +236,7 @@ describe('Topic', function() { }); it('should send correct api request', function(done) { - topic.request = function(protoOpts, reqOpts) { + topic.parent.request = function(protoOpts, reqOpts) { assert.strictEqual(protoOpts.service, 'Publisher'); assert.strictEqual(protoOpts.method, 'publish'); @@ -259,7 +256,7 @@ describe('Topic', function() { timeout: 10 }; - topic.request = function(protoOpts) { + topic.parent.request = function(protoOpts) { assert.strictEqual(protoOpts.timeout, options.timeout); done(); }; @@ -268,7 +265,7 @@ describe('Topic', function() { }); it('should send correct api request for raw message', function(done) { - topic.request = function(protoOpts, reqOpts) { + topic.parent.request = function(protoOpts, reqOpts) { assert.deepEqual(reqOpts.messages, [ { data: new Buffer(JSON.stringify(message)).toString('base64'), @@ -291,7 +288,7 @@ describe('Topic', function() { }; var originalMessage = extend({}, message); - topic.request = function() { + topic.parent.request = function() { assert.deepEqual(message, originalMessage); done(); }; @@ -300,7 +297,7 @@ describe('Topic', function() { }); it('should execute callback', function(done) { - topic.request = function(protoOpts, reqOpts, callback) { + topic.parent.request = function(protoOpts, reqOpts, callback) { callback(null, {}); }; @@ -311,7 +308,7 @@ describe('Topic', function() { var error = new Error('Error.'); var apiResponse = {}; - topic.request = function(protoOpts, reqOpts, callback) { + topic.parent.request = function(protoOpts, reqOpts, callback) { callback(error, apiResponse); }; @@ -327,7 +324,7 @@ describe('Topic', function() { it('should execute callback with apiResponse', function(done) { var resp = { success: true }; - topic.request = function(protoOpts, reqOpts, callback) { + topic.parent.request = function(protoOpts, reqOpts, callback) { callback(null, resp); }; @@ -343,7 +340,7 @@ describe('Topic', function() { var subscriptionName = 'subName'; var opts = {}; - topic.pubsub.subscribe = function(t, subName, options, callback) { + topic.parent.subscribe = function(t, subName, options, callback) { assert.deepEqual(t, topic); assert.equal(subName, subscriptionName); assert.deepEqual(options, opts); @@ -359,7 +356,7 @@ describe('Topic', function() { var subscriptionName = 'subName'; var opts = {}; - topic.pubsub.subscription = function(name, options) { + topic.parent.subscription = function(name, options) { assert.equal(name, subscriptionName); assert.deepEqual(options, opts); done(); @@ -369,7 +366,7 @@ describe('Topic', function() { }); it('should attach the topic instance to the options', function(done) { - topic.pubsub.subscription = function(name, options) { + topic.parent.subscription = function(name, options) { assert.strictEqual(options.topic, topic); done(); }; @@ -378,7 +375,7 @@ describe('Topic', function() { }); it('should return the result', function(done) { - topic.pubsub.subscription = function() { + topic.parent.subscription = function() { return done; };