Skip to content

Commit

Permalink
pubsub: borrow request config from GAX (#2242)
Browse files Browse the repository at this point in the history
* pubsub: borrow request config from GAX [ci skip]

* tests

* rm .only
  • Loading branch information
stephenplusplus authored Apr 21, 2017
1 parent 2e5e321 commit a7492c6
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 79 deletions.
37 changes: 35 additions & 2 deletions packages/pubsub/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ var Subscription = require('./subscription.js');
*/
var Topic = require('./topic.js');

/**
* @type {object} - GAX's default configuration.
*/
var GAX_CONFIG = {
Publisher: require('./v1/publisher_client_config.json').
interfaces['google.pubsub.v1.Publisher'],
Subscriber: require('./v1/subscriber_client_config.json').
interfaces['google.pubsub.v1.Subscriber']
};

/**
* [Cloud Pub/Sub](https://developers.google.com/pubsub/overview) is a
* reliable, many-to-many, asynchronous messaging service from Cloud
Expand Down Expand Up @@ -751,6 +761,24 @@ PubSub.prototype.topic = function(name) {
return new Topic(this, name);
};

/**
* Intercept the call to {module:common/grpc-service#request}, making sure the
* correct timeouts are set.
*
* @private
*/
PubSub.prototype.request = function(protoOpts) {
var method = protoOpts.method;
var camelCaseMethod = method[0].toUpperCase() + method.substr(1);
var config = GAX_CONFIG[protoOpts.service].methods[camelCaseMethod];

if (is.undefined(arguments[0].timeout)) {
arguments[0].timeout = config.timeout_millis;
}

commonGrpc.Service.prototype.request.apply(this, arguments);
};

/**
* Determine the appropriate endpoint to use for API requests, first trying the
* local Pub/Sub emulator environment variable (PUBSUB_EMULATOR_HOST), otherwise
Expand All @@ -775,7 +803,7 @@ PubSub.prototype.determineBaseUrl_ = function() {

/*! Developer Documentation
*
* These methods can be auto-paginated.
* These methods can be agto-paginated.
*/
common.paginator.extend(PubSub, [
'getSnapshots',
Expand All @@ -789,7 +817,12 @@ common.paginator.extend(PubSub, [
* that a callback is omitted.
*/
common.util.promisifyAll(PubSub, {
exclude: ['snapshot', 'subscription', 'topic']
exclude: [
'request',
'snapshot',
'subscription',
'topic'
]
});

PubSub.Subscription = Subscription;
Expand Down
14 changes: 8 additions & 6 deletions packages/pubsub/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ Subscription.prototype.ack = function(ackIds, options, callback) {
ackIds: ackIds
};

this.request(protoOpts, reqOpts, function(err, resp) {
this.parent.request(protoOpts, reqOpts, function(err, resp) {
if (!err) {
ackIds.forEach(function(ackId) {
delete self.inProgressAckIds[ackId];
Expand Down Expand Up @@ -523,7 +523,7 @@ Subscription.prototype.createSnapshot = function(name, callback) {
subscription: this.name
};

this.request(protoOpts, reqOpts, function(err, resp) {
this.parent.request(protoOpts, reqOpts, function(err, resp) {
if (err) {
callback(err, null, resp);
return;
Expand Down Expand Up @@ -601,7 +601,7 @@ Subscription.prototype.delete = function(callback) {
subscription: this.name
};

this.request(protoOpts, reqOpts, function(err, resp) {
this.parent.request(protoOpts, reqOpts, function(err, resp) {
if (err) {
callback(err, resp);
return;
Expand Down Expand Up @@ -696,9 +696,11 @@ Subscription.prototype.pull = function(options, callback) {
maxMessages: options.maxResults
};

this.activeRequest_ = this.request(protoOpts, reqOpts, function(err, resp) {
this.activeRequest_ = this.parent.request(protoOpts, reqOpts, function(err) {
self.activeRequest_ = null;

var resp = arguments[1];

if (err) {
if (err.code === 504) {
// Simulate a server timeout where no messages were received.
Expand Down Expand Up @@ -779,7 +781,7 @@ Subscription.prototype.seek = function(snapshot, callback) {
throw new Error('Either a snapshot name or Date is needed to seek to.');
}

this.request(protoOpts, reqOpts, callback);
this.parent.request(protoOpts, reqOpts, callback);
};

/**
Expand Down Expand Up @@ -825,7 +827,7 @@ Subscription.prototype.setAckDeadline = function(options, callback) {
ackDeadlineSeconds: options.seconds
};

this.request(protoOpts, reqOpts, function(err, resp) {
this.parent.request(protoOpts, reqOpts, function(err, resp) {
callback(err, resp);
});
};
Expand Down
11 changes: 5 additions & 6 deletions packages/pubsub/src/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ var IAM = require('./iam.js');
*/
function Topic(pubsub, name) {
this.name = Topic.formatName_(pubsub.projectId, name);
this.pubsub = pubsub;

var methods = {
/**
Expand Down Expand Up @@ -319,7 +318,7 @@ Topic.prototype.getSubscriptions = function(options, callback) {
options = options || {};
options.topic = this;

return this.pubsub.getSubscriptions(options, callback);
return this.parent.getSubscriptions(options, callback);
};

/**
Expand Down Expand Up @@ -353,7 +352,7 @@ Topic.prototype.getSubscriptionsStream = function(options) {
options = options || {};
options.topic = this;

return this.pubsub.getSubscriptionsStream(options);
return this.parent.getSubscriptionsStream(options);
};

/**
Expand Down Expand Up @@ -475,7 +474,7 @@ Topic.prototype.publish = function(messages, options, callback) {
.map(Topic.formatMessage_)
};

this.request(protoOpts, reqOpts, function(err, result) {
this.parent.request(protoOpts, reqOpts, function(err, result) {
if (err) {
callback(err, null, result);
return;
Expand Down Expand Up @@ -541,7 +540,7 @@ Topic.prototype.publish = function(messages, options, callback) {
* });
*/
Topic.prototype.subscribe = function(subName, options, callback) {
this.pubsub.subscribe(this, subName, options, callback);
this.parent.subscribe(this, subName, options, callback);
};

/**
Expand Down Expand Up @@ -574,7 +573,7 @@ Topic.prototype.subscription = function(name, options) {
options = options || {};
options.topic = this;

return this.pubsub.subscription(name, options);
return this.parent.subscription(name, options);
};

/*! Developer Documentation
Expand Down
114 changes: 112 additions & 2 deletions packages/pubsub/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,24 @@ var fakeUtil = extend({}, util, {
}

promisified = true;
assert.deepEqual(options.exclude, ['snapshot', 'subscription', 'topic']);
assert.deepEqual(options.exclude, [
'request',
'snapshot',
'subscription',
'topic'
]);
}
});

function FakeGrpcService() {
this.calledWith_ = arguments;
}

var grpcServiceRequestOverride;
FakeGrpcService.prototype.request = function() {
return (grpcServiceRequestOverride || util.noop).apply(this, arguments);
};

function FakeSnapshot() {
this.calledWith_ = arguments;
}
Expand All @@ -74,6 +84,22 @@ var fakePaginator = {
}
};

var GAX_CONFIG_PUBLISHER_OVERRIDE = {};
var GAX_CONFIG_SUBSCRIBER_OVERRIDE = {};

var GAX_CONFIG = {
Publisher: {
interfaces: {
'google.pubsub.v1.Publisher': GAX_CONFIG_PUBLISHER_OVERRIDE
}
},
Subscriber: {
interfaces: {
'google.pubsub.v1.Subscriber': GAX_CONFIG_SUBSCRIBER_OVERRIDE
}
}
};

describe('PubSub', function() {
var PubSub;
var PROJECT_ID = 'test-project';
Expand All @@ -94,7 +120,10 @@ describe('PubSub', function() {
},
'./snapshot.js': FakeSnapshot,
'./subscription.js': Subscription,
'./topic.js': Topic
'./topic.js': Topic,

'./v1/publisher_client_config.json': GAX_CONFIG.Publisher,
'./v1/subscriber_client_config.json': GAX_CONFIG.Subscriber
});
});

Expand All @@ -105,6 +134,7 @@ describe('PubSub', function() {
});

beforeEach(function() {
grpcServiceRequestOverride = null;
SubscriptionOverride = null;
pubsub = new PubSub(OPTIONS);
pubsub.projectId = PROJECT_ID;
Expand Down Expand Up @@ -897,6 +927,86 @@ describe('PubSub', function() {
});
});

describe('request', function() {
var TIMEOUT = Math.random();

beforeEach(function() {
GAX_CONFIG_PUBLISHER_OVERRIDE.methods = {
MethodName: {
timeout_millis: TIMEOUT
}
};
});

after(function() {
GAX_CONFIG_PUBLISHER_OVERRIDE.methods = {};
});

it('should pass through the request', function(done) {
var args = [
{
service: 'Publisher',
method: 'MethodName'
},
{
value: true
},
{
anotherValue: true
}
];

grpcServiceRequestOverride = function() {
assert.strictEqual(this, pubsub);
assert.strictEqual(args[0], arguments[0]);
assert.strictEqual(args[1], arguments[1]);
assert.strictEqual(args[2], arguments[2]);
done();
};

pubsub.request.apply(pubsub, args);
});

it('should assign a timeout', function(done) {
grpcServiceRequestOverride = function(protoOpts) {
assert.strictEqual(protoOpts.timeout, TIMEOUT);
done();
};

pubsub.request({
service: 'Publisher',
method: 'MethodName'
});
});

it('should not override a timeout if set', function(done) {
var timeout = 0;

grpcServiceRequestOverride = function(protoOpts) {
assert.strictEqual(protoOpts.timeout, timeout);
done();
};

pubsub.request({
service: 'Publisher',
method: 'MethodName',
timeout: timeout
});
});

it('should camel case the method name', function(done) {
grpcServiceRequestOverride = function(protoOpts) {
assert.strictEqual(protoOpts.timeout, TIMEOUT);
done();
};

pubsub.request({
service: 'Publisher',
method: 'methodName'
});
});
});

describe('determineBaseUrl_', function() {
function setHost(host) {
process.env.PUBSUB_EMULATOR_HOST = host;
Expand Down
Loading

0 comments on commit a7492c6

Please sign in to comment.