Skip to content

Commit

Permalink
Fix #6 - Allow early ending table/createReadStream (#18)
Browse files Browse the repository at this point in the history
* Fix #6 - Allow early ending table/createReadStream

Fixed data transformation stream to stop processing data on `end` event
Still, need to fix GRPC stream closing.
  • Loading branch information
jiren authored and stephenplusplus committed Dec 13, 2017
1 parent e4770e6 commit bfe42dd
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ Table.prototype.createReadStream = function(options) {
reqOpts.rowsLimit = options.limit;
}

return pumpify.obj([
var stream = pumpify.obj([
this.requestStream(grpcOpts, reqOpts),
through.obj(function(data, enc, next) {
var throughStream = this;
Expand All @@ -498,6 +498,10 @@ Table.prototype.createReadStream = function(options) {
});

rows.forEach(function(rowData) {
if (stream._ended) {
return;
}

var row = self.row(rowData.key);

row.data = rowData.data;
Expand All @@ -507,6 +511,8 @@ Table.prototype.createReadStream = function(options) {
next();
}),
]);

return stream;
};

/**
Expand Down
48 changes: 48 additions & 0 deletions system-test/bigtable.js
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,54 @@ describe('Bigtable', function() {
});
});

it('should end stream early', function(done) {
var entries = [
{
key: 'gwashington',
data: {
follows: {
jadams: 1,
},
},
},
{
key: 'tjefferson',
data: {
follows: {
gwashington: 1,
jadams: 1,
},
},
},
{
key: 'jadams',
data: {
follows: {
gwashington: 1,
tjefferson: 1,
},
},
},
];

TABLE.insert(entries, function(err) {
assert.ifError(err);

var rows = [];

TABLE.createReadStream()
.on('error', done)
.on('data', function(row) {
rows.push(row);
this.end();
})
.on('end', function() {
assert.strictEqual(rows.length, 1);
done();
});
});
});

describe('filters', function() {
it('should get rows via column data', function(done) {
var filter = {
Expand Down
16 changes: 16 additions & 0 deletions test/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,22 @@ describe('Bigtable/Table', function() {
done();
});
});

it('should allow a stream to end early', function(done) {
var rows = [];

table
.createReadStream()
.on('error', done)
.on('data', function(row) {
rows.push(row);
this.end();
})
.on('end', function() {
assert.strictEqual(rows.length, 1);
done();
});
});
});

describe('error', function() {
Expand Down

0 comments on commit bfe42dd

Please sign in to comment.