Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamrouter: implement across library #692

Merged
265 changes: 151 additions & 114 deletions lib/bigquery/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
'use strict';

var extend = require('extend');
var streamEvents = require('stream-events');
var through = require('through2');

/**
* @type {module:bigquery/dataset}
Expand All @@ -36,6 +34,12 @@ var Dataset = require('./dataset.js');
*/
var Job = require('./job.js');

/**
* @type {module:common/streamrouter}
* @private
*/
var streamRouter = require('../common/stream-router.js');

/**
* @type {module:bigquery/table}
* @private
Expand Down Expand Up @@ -158,15 +162,55 @@ BigQuery.prototype.dataset = function(id) {
*
* @param {object=} query - Configuration object.
* @param {boolean} query.all - List all datasets, including hidden ones.
* @param {boolean} query.autoPaginate - Have pagination handled automatically.
* Default: true.
* @param {number} query.maxResults - Maximum number of results to return.
* @param {string} query.pageToken - Token returned from a previous call, to
* request the next page of results.
* @param {function} callback - The callback function.
*
* @example
* bigquery.getDatasets(function(err, datasets, nextQuery, apiResponse) {
* // If `nextQuery` is non-null, there are more results to fetch.
* bigquery.getDatasets(function(err, datasets) {
* if (!err) {
* // datasets is an array of Dataset objects.
* }
* });
*
* //-
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* var callback = function(err, datasets, nextQuery, apiResponse) {
* if (nextQuery) {
* // More results exist.
* bigquery.getDatasets(nextQuery, callback);
* }
* };
*
* bigquery.getDatasets({
* autoPaginate: false
* }, callback);
*
* //-
* // Get the datasets from your project as a readable object stream.
* //-
* bigquery.getDatasets()
* .on('error', console.error)
* .on('data', function(dataset) {
* // dataset is a Dataset object.
* })
* .on('end', function() {
* // All datasets retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* bigquery.getDatasets()
* .on('data', function(dataset) {
* this.end();
* });
*/
BigQuery.prototype.getDatasets = function(query, callback) {
var that = this;
Expand Down Expand Up @@ -208,6 +252,8 @@ BigQuery.prototype.getDatasets = function(query, callback) {
* @param {object=} options - Configuration object.
* @param {boolean=} options.allUsers - Display jobs owned by all users in the
* project.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number=} options.maxResults - Maximum number of results to return.
* @param {string=} options.pageToken - Token returned from a previous call, to
* request the next page of results.
Expand All @@ -219,9 +265,47 @@ BigQuery.prototype.getDatasets = function(query, callback) {
* @param {function} callback - The callback function.
*
* @example
* bigquery.getJobs(function(err, jobs, nextQuery, apiResponse) {
* // If `nextQuery` is non-null, there are more results to fetch.
* bigquery.getJobs(function(err, jobs) {
* if (!err) {
* // jobs is an array of Job objects.
* }
* });
*
* //-
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* var callback = function(err, jobs, nextQuery, apiRespose) {
* if (nextQuery) {
* // More results exist.
* bigquery.getJobs(nextQuery, callback);
* }
* };
*
* bigquery.getJobs({
* autoPaginate: false
* }, callback);
*
* //-
* // Get the jobs from your project as a readable object stream.
* //-
* bigquery.getJobs()
* .on('error', console.error)
* .on('data', function(job) {
* // job is a Job object.
* })
* .on('end', function() {
* // All jobs retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* bigquery.getJobs()
* .on('data', function(job) {
* this.end();
* });
*/
BigQuery.prototype.getJobs = function(options, callback) {
var that = this;
Expand Down Expand Up @@ -270,31 +354,6 @@ BigQuery.prototype.job = function(id) {
return new Job(this, id);
};

/*! Developer Documentation
*
* The `query` method is dual-purpose, like the use cases for a query.
* Sometimes, a user will want to fetch results from their table in a serial
* manner (get results -> more results exist? -> get more results, repeat.) --
* other times, a user may want to wave their hands at making repeated calls to
* get all of the rows, instead using a stream.
*
* A couple different libraries are used to cover the stream case:
*
* var stream = streamEvents(through2.obj());
*
* - streamEvents - https://github.com/stephenplusplus/stream-events
* This library enables us to wait until our stream is being asked for
* data, before making any API calls. It is possible a user will get a
* stream, then not end up running it - or, it will be run later, at a
* time when the token returned from the API call could have expired.
* Using this library ensures we wait until the last possible chance to
* get that token.
*
* - through2 - https://github.com/rvagg/through2
* This is a popular library for how simple it makes dealing with the
* complicated Node.js Streams API. We're creating an object stream, as
* the data we are receiving from the API are rows of JSON data.
*/
/**
* Run a query scoped to your project.
*
Expand All @@ -310,48 +369,56 @@ BigQuery.prototype.job = function(id) {
* queries for you, pushing each row to the stream.
*
* @param {string|object} options - A string SQL query or configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxResults - Maximum number of results to read.
* @param {string} options.query - A query string, following the BigQuery query
* syntax, of the query to execute.
* @param {number} options.timeoutMs - How long to wait for the query to
* complete, in milliseconds, before returning. Default is to return
* immediately. If the timeout passes before the job completes, the request
* will fail with a `TIMEOUT` error.
* @param {function=} callback - The callback function. If you intend to
* continuously run this query until all results are in as part of a stream,
* do not pass a callback.
* @param {function=} callback - The callback function.
*
* @example
* var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100';
*
* bigquery.query(query, function(err, rows) {
* if (!err) {
* // Handle results here.
* }
* });
*
* //-
* // You can run a query against your data in a serial manner.
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* bigquery.query(query, function(err, rows, nextQuery, apiResponse) {
* // Handle results here.
* var callback = function(err, rows, nextQuery, apiResponse) {
* if (nextQuery) {
* bigquery.query(nextQuery, function(err, rows, nextQuery, apiResponse) {
* // Handle more results here.
* });
* bigquery.query(nextQuery, callback);
* }
* });
* };
*
* bigquery.query({
* query: query,
* autoPaginate: false
* }, callback);
*
* //-
* // You can also use the `query` method as a readable object stream by
* // omitting the callback.
* //-
* var through2 = require('through2');
*
* bigquery.query(query)
* .pipe(through2.obj(function(row, enc, next) {
* this.push(row.url += '?trackingCode=AZ19b\n');
* next();
* }))
* .pipe(process.stdout);
* .on('error', console.error)
* .on('data', function(row) {
* // row is a result from your query.
* })
* .on('end', function() {
* // All rows retrieved.
* });
*/
BigQuery.prototype.query = function(options, callback) {
var that = this;
var stream;

if (util.is(options, 'string')) {
options = {
Expand All @@ -366,79 +433,42 @@ BigQuery.prototype.query = function(options, callback) {
var requestQuery = extend({}, options);
delete requestQuery.job;

if (!util.is(callback, 'function')) {
stream = streamEvents(through.obj());
stream.once('reading', runQuery);
return stream;
if (job) {
// Get results of the query.
var path = '/queries/' + job.id;
that.makeReq_('GET', path, requestQuery, null, responseHandler);
} else {
callback = callback || util.noop;
runQuery();
// Create a job.
that.makeReq_('POST', '/queries', null, options, responseHandler);
}

function runQuery() {
if (job) {
// Get results of the query.
var path = '/queries/' + job.id;
that.makeReq_('GET', path, requestQuery, null, responseHandler);
} else {
// Create a job.
that.makeReq_('POST', '/queries', null, options, responseHandler);
function responseHandler(err, resp) {
if (err) {
callback(err, null, null, resp);
return;
}

function responseHandler(err, resp) {
if (err) {
onComplete(err, null, null, resp);
return;
}

var rows = [];
if (resp.schema && resp.rows) {
rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows);
}

var nextQuery = null;
if (resp.jobComplete === false) {
// Query is still running.
nextQuery = extend({}, options);
} else if (resp.pageToken) {
// More results exist.
nextQuery = extend({}, options, {
pageToken: resp.pageToken
});
}
if (nextQuery && !nextQuery.job && resp.jobReference.jobId) {
// Create a prepared Job to continue the query.
nextQuery.job = that.job(resp.jobReference.jobId);
}

onComplete(null, rows, nextQuery, resp);
var rows = [];
if (resp.schema && resp.rows) {
rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows);
}

function onComplete(err, rows, nextQuery, resp) {
if (err) {
if (stream) {
stream.emit('error', err);
stream.end();
} else {
callback(err, null, null, resp);
}
return;
}

if (stream) {
rows.forEach(function(row) {
stream.push(row);
});

if (nextQuery) {
that.query(nextQuery, onComplete);
} else {
stream.end();
}
} else {
callback(null, rows, nextQuery, resp);
}
var nextQuery = null;
if (resp.jobComplete === false) {
// Query is still running.
nextQuery = extend({}, options);
} else if (resp.pageToken) {
// More results exist.
nextQuery = extend({}, options, {
pageToken: resp.pageToken
});
}
if (nextQuery && !nextQuery.job && resp.jobReference.jobId) {
// Create a prepared Job to continue the query.
nextQuery.job = that.job(resp.jobReference.jobId);
}

callback(null, rows, nextQuery, resp);
}
};

Expand Down Expand Up @@ -564,4 +594,11 @@ BigQuery.prototype.makeReq_ = function(method, path, query, body, callback) {
this.makeAuthorizedRequest_(reqOpts, callback);
};

/*! Developer Documentation
*
* These methods can be used with either a callback or as a readable object
* stream. `streamRouter` is used to add this dual behavior.
*/
streamRouter.extend(BigQuery, ['getDatasets', 'getJobs', 'query']);

module.exports = BigQuery;
Loading