Skip to content

Commit

Permalink
datastore: runQuery(stream mode) - use limit(n) logically - fixes #296
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Nov 10, 2014
1 parent 6022e19 commit b091d03
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 15 deletions.
34 changes: 21 additions & 13 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
DatastoreRequest.prototype.runQuery = function(q, callback) {
var that = this;
var stream;
var resultsToSend = q.limitVal;

var req = {
read_options: {},
Expand All @@ -353,7 +354,6 @@ DatastoreRequest.prototype.runQuery = function(q, callback) {
stream.once('reading', runQuery);
return stream;
} else {
callback = callback || util.noop;
runQuery();
}

Expand All @@ -376,21 +376,29 @@ DatastoreRequest.prototype.runQuery = function(q, callback) {
cursor = resp.batch.end_cursor.toBase64();
}

if (stream) {
if (cursor && entities.length > 0) {
entities.forEach(function (entity) {
stream.push(entity);
});
if (!stream) {
callback(null, entities, cursor);
return;
}

req.query = entity.queryToQueryProto(q.start(cursor).offset(0));
if (!cursor || entities.length === 0) {
stream.end();
return;
}

runQuery();
} else {
stream.end();
}
} else {
callback(null, entities, cursor);
var result;
while ((result = entities.shift()) && resultsToSend !== 0) {
stream.push(result);
resultsToSend--;
}

if (resultsToSend === 0) {
stream.end();
return;
}

req.query = entity.queryToQueryProto(q.start(cursor).offset(0));
runQuery();
});
}
};
Expand Down
18 changes: 16 additions & 2 deletions regression/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ describe('datastore', function() {
});

it('should run a query as a stream', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.limit(5);
var q = ds.createQuery('Character').hasAncestor(ancestor);

var resultsReturned = 0;

Expand All @@ -263,6 +262,21 @@ describe('datastore', function() {
});
});

it('should not go over a limit with a stream', function(done) {
var limit = 3;
var q = ds.createQuery('Character').hasAncestor(ancestor).limit(limit);

var resultsReturned = 0;

ds.runQuery(q)
.on('error', done)
.on('data', function() { resultsReturned++; })
.on('end', function() {
assert.equal(resultsReturned, limit);
done();
});
});

it('should filter queries with simple indexes', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.filter('appearances >=', 20);
Expand Down
19 changes: 19 additions & 0 deletions test/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,25 @@ describe('Request', function() {
});
});

it('should only emit the limited number of results', function(done) {
var limit = 2;

query.limitVal = limit;

request.makeReq_ = function(method, req, callback) {
callback(null, mockResponse.withResultsAndEndCursor);
};

var resultsReturned = 0;

request.runQuery(query)
.on('data', function() { resultsReturned++; })
.on('end', function() {
assert.equal(resultsReturned, limit);
done();
});
});

it('should emit an error', function(done) {
var error = new Error('Error.');

Expand Down

0 comments on commit b091d03

Please sign in to comment.