Skip to content

Commit

Permalink
pubsub: convert to grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Feb 4, 2016
1 parent f69d6ec commit 3a5a790
Show file tree
Hide file tree
Showing 12 changed files with 871 additions and 368 deletions.
100 changes: 100 additions & 0 deletions lib/common/grpc-service-object.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*!
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*!
* @module common/grpcServiceObject
*/

'use strict';

var extend = require('extend');
var nodeutil = require('util');

/**
* @type {module:common/serviceObject}
* @private
*/
var ServiceObject = require('./service-object.js');

/**
* @type {module:common/util}
* @private
*/
var util = require('./util.js');

/**
* GrpcServiceObject is a base class, meant to be inherited from by a service
* object that uses the gRPC protobuf API.
*
* @private
*
* @param {object} config - Configuration object.
*/
function GrpcServiceObject(config) {
ServiceObject.call(this, config);
}

nodeutil.inherits(GrpcServiceObject, ServiceObject);

/**
* Delete the object.
*
* @param {function=} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request.
*/
GrpcServiceObject.prototype.delete = function(callback) {
var protoOpts = this.methods.delete.protoOpts;
var reqOpts = this.methods.delete.reqOpts;

this.request(protoOpts, reqOpts, callback || util.noop);
};

/**
* Get the metadata of this object.
*
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request.
* @param {object} callback.metadata - The metadata for this object.
*/
GrpcServiceObject.prototype.getMetadata = function(callback) {
var protoOpts = this.methods.getMetadata.protoOpts;
var reqOpts = this.methods.getMetadata.reqOpts;

this.request(protoOpts, reqOpts, callback);
};

/**
* Set the metadata for this object.
*
* @param {object} metadata - The metadata to set on this object.
* @param {function=} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request.
*/
GrpcServiceObject.prototype.setMetadata = function(metadata, callback) {
var protoOpts = this.methods.setMetadata.protoOpts;
var reqOpts = extend(true, {}, this.methods.setMetadata.reqOpts, metadata);

this.request(protoOpts, reqOpts, callback || util.noop);
};

/**
* Patch a request to the GrpcService object.
*/
GrpcServiceObject.prototype.request = function(protoOpts, reqOpts, callback) {
this.parent.request(protoOpts, reqOpts, callback);
};

module.exports = GrpcServiceObject;
244 changes: 244 additions & 0 deletions lib/common/grpc-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*!
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*!
* @module common/grpcService
*/

'use strict';

var camelize = require('camelize');
var googleProtoFiles = require('google-proto-files');
var grpc = require('grpc');
var is = require('is');
var nodeutil = require('util');
var path = require('path');
var snakeize = require('snakeize');

/**
* @type {module:common/service}
* @private
*/
var Service = require('./service.js');

/**
* @const {object} - A map of protobuf codes to HTTP status codes.
* @private
*/
var HTTP_ERROR_CODE_MAP = {
0: {
code: 200,
message: 'OK'
},

1: {
code: 499,
message: 'Client Closed Request'
},

2: {
code: 500,
message: 'Internal Server Error'
},

3: {
code: 400,
message: 'Bad Request'
},

4: {
code: 504,
message: 'Gateway Timeout'
},

5: {
code: 404,
message: 'Not Found'
},

6: {
code: 409,
message: 'Conflict'
},

7: {
code: 403,
message: 'Forbidden'
},

8: {
code: 429,
message: 'Too Many Requests'
},

9: {
code: 412,
message: 'Precondition Failed'
},

10: {
code: 409,
message: 'Conflict'
},

11: {
code: 400,
message: 'Bad Request'
},

12: {
code: 501,
message: 'Not Implemented'
},

13: {
code: 500,
message: 'Internal Server Error'
},

14: {
code: 503,
message: 'Service Unavailable'
},

15: {
code: 500,
message: 'Internal Server Error'
},

16: {
code: 401,
message: 'Unauthorized'
}
};

/**
* Service is a base class, meant to be inherited from by a "service," like
* BigQuery or Storage.
*
* This handles making authenticated requests by exposing a `makeReq_` function.
*
* @param {object} config - Configuration object.
* @param {string} config.baseUrl - The base URL to make API requests to.
* @param {string[]} config.scopes - The scopes required for the request.
* @param {object} options - [Configuration object](#/docs/?method=gcloud).
*/
function GrpcService(config, options) {
Service.call(this, config, options);

var service = config.service;
var apiVersion = config.apiVersion;
var rootDir = googleProtoFiles('..');

this.protoOpts = config.proto;
this.proto = grpc.load({
root: rootDir,
file: path.relative(rootDir, googleProtoFiles[service][apiVersion])
}).google[service][apiVersion];
}

nodeutil.inherits(GrpcService, Service);

/**
* Make an authenticated request with gRPC.
*
* @param {object} protoOpts - The proto options.
* @param {string} protoOpts.service - The service name.
* @param {string} protoOpts.method - The method name.
* @param {number=} protoOpts.timeout - After how many milliseconds should the
* request cancel.
* @param {object} reqOpts - The request options.
* @param {function=} callback - The callback function.
*/
GrpcService.prototype.request = function(protoOpts, reqOpts, callback) {
var self = this;
var proto = this.proto;

if (!this.grpcCredentials) {
// We must establish an authClient to give to grpc.
this.authClient.getCredentials(function(err) {
if (err) {
callback(err);
return;
}

var authClient = self.authClient.authClient;

self.grpcCredentials = grpc.credentials.combineChannelCredentials(
grpc.credentials.createSsl(),
grpc.credentials.createFromGoogleCredential(authClient)
);

self.request.call(self, protoOpts, reqOpts, callback);
});
return;
}

var grpcOpts = {};

if (is.number(protoOpts.timeout)) {
grpcOpts.deadline = new Date(Date.now() + protoOpts.timeout);
}

var service = new proto[protoOpts.service](
this.baseUrl,
this.grpcCredentials
);

// snakeize and camelize are used to transform camelCase request options to
// snake_case. This is what ProtoBuf.js (via gRPC) expects. Similarly, the
// response is in snake_case, which is why we use camelize to return it to
// camelCase. An option will be added to gRPC to allow us to skip this step:
// https://github.com/GoogleCloudPlatform/gcloud-node/pull/1070#discussion_r51285492
service[protoOpts.method](snakeize(reqOpts), function(err, resp) {
if (err) {
if (HTTP_ERROR_CODE_MAP[err.code]) {
var httpError = HTTP_ERROR_CODE_MAP[err.code];
err.code = httpError.code;
}

callback(err);
return;
}

function convertBuffers(data) {
if (is.array(data)) {
return data.map(convertBuffers);
}

if (is.object(data)) {
for (var prop in data) {
var value = data[prop];

// An option will be added to gRPC to expose a setting which will
// replace this function (convertBuffers).
// https://github.com/GoogleCloudPlatform/gcloud-node/pull/1070#discussion_r51285492
if (is.object(value) && value.length) {
data[prop] = new Buffer(value).toString('base64');
} else if (is.object(value)) {
data[prop] = convertBuffers(value);
}
}
}

return data;
}

callback(null, convertBuffers(camelize(resp)));
}, null, grpcOpts);
};

module.exports = GrpcService;
3 changes: 1 addition & 2 deletions lib/common/service-object.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,13 @@ ServiceObject.prototype.create = function(options, callback) {
*/
ServiceObject.prototype.delete = function(callback) {
var methodConfig = this.methods.delete || {};
callback = callback || util.noop;

var reqOpts = extend({
method: 'DELETE',
uri: ''
}, methodConfig.reqOpts);

callback = callback || util.noop;

// The `request` method may have been overridden to hold any special behavior.
// Ensure we call the original `request` method.
ServiceObject.prototype.request.call(this, reqOpts, function(err, resp) {
Expand Down
Loading

0 comments on commit 3a5a790

Please sign in to comment.