Skip to content

Commit

Permalink
Tests for streaming support #18:
Browse files Browse the repository at this point in the history
* Read streaming column.
* New unit test for delayed reading of the stream.
  • Loading branch information
jorgebay committed Sep 13, 2013
1 parent e998c9c commit 0a32ec3
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
11 changes: 6 additions & 5 deletions lib/readers.js
Original file line number Diff line number Diff line change
Expand Up @@ -615,19 +615,19 @@ FrameStreamingParser.prototype.streamRows = function (frameInfo, reader) {
frameInfo.rowIndex = frameInfo.rowIndex ? frameInfo.rowIndex : 0;
var stopReading = false;
for (var i = frameInfo.rowIndex; i < frameInfo.rowLength && !stopReading; i++) {
this.emit('log', 'info', 'reading row ' + i);
if (frameInfo.fieldStream) {
this.streamField(frameInfo, reader, null, i);
stopReading = this.remainingLength() === 0;
stopReading = reader.remainingLength() === 0;
continue;
}
//console.log('--reading row');
var row = [];
row.columns = meta.columns;
row.get = getCellValueByName.bind(row);
var rowOffset = reader.offset;
for(var j = 0; j < meta.columns.length; j++ ) {
var col = meta.columns[j];
//console.log('--reading column');
this.emit('log', 'info', 'reading column ' + col.name);
if (col.name !== frameInfo.streamingColumn) {
var bytes = null;
try {
Expand Down Expand Up @@ -679,7 +679,7 @@ FrameStreamingParser.prototype.bufferForLater = function (frameInfo, reader, ori
* Returns true if read from the reader
*/
FrameStreamingParser.prototype.streamField = function (frameInfo, reader, row, rowIndex) {
//console.log('--streaming field');
this.emit('log', 'info', 'streaming field');
var fieldStream = frameInfo.fieldStream;
if (!fieldStream) {
try {
Expand All @@ -698,8 +698,9 @@ FrameStreamingParser.prototype.streamField = function (frameInfo, reader, row, r
this.emit('rowStartedStreaming', frameInfo.header, row, fieldStream);
}
var availableChunk = reader.read(frameInfo.fieldLength - frameInfo.streamedSoFar);

//push into the stream
fieldStream.push(availableChunk);
fieldStream.add(availableChunk);
frameInfo.streamedSoFar += availableChunk.length;
//check if finishing
if (frameInfo.streamedSoFar === frameInfo.fieldLength) {
Expand Down
57 changes: 51 additions & 6 deletions test/connectionTests.js
Original file line number Diff line number Diff line change
Expand Up @@ -410,17 +410,35 @@ module.exports = {
closeAndEnd(test, localCon);
}
},
'streaming column test': function (test) {
con.execute('INSERT INTO sampletable1 (id, blob_sample) VALUES (?, ?)', [400, new Buffer(80*1)], function (err, result) {
'streaming column': function (test) {
var blob = new Buffer(1024*1024);
var id = 400;
con.execute('INSERT INTO sampletable1 (id, blob_sample) VALUES (?, ?)', [id, blob], function (err, result) {
if (err) return fail(test, err);
con.executeToStream('SELECT id, blob_sample FROM sampletable1 WHERE id = ?', [400], types.consistencies.one, function (err, row, stream) {
con.executeToStream('SELECT id, blob_sample FROM sampletable1 WHERE id = ?', [id], types.consistencies.one, function (err, row, stream) {
if (err) return fail(test, err);
test.equal(row.get('id'), 400);
//TODO: test that stream is readable
test.done();
test.equal(row.get('id'), id);
//test that stream is readable
testStreamReadable(test, stream, blob);
});
});
},
'streaming delayed read': function (test) {
var blob = new Buffer(2048);
blob[2047] = 0xFA;
var id = 401;
con.execute('INSERT INTO sampletable1 (id, blob_sample) VALUES (?, ?)', [id, blob], function (err, result) {
if (err) return fail(test, err);
con.executeToStream('SELECT id, blob_sample FROM sampletable1 WHERE id = ?', [id], types.consistencies.one, function (err, row, stream) {
if (err) return fail(test, err);
test.equal(row.get('id'), id);
setTimeout(function () {
testStreamReadable(test, stream, blob);
}, 700);
});
});
},
//TODO: streaming test field null
/**
* Executes last, closes the connection
*/
Expand Down Expand Up @@ -457,4 +475,31 @@ function fail(test, err, con) {
else {
test.done();
}
}

function testStreamReadable(test, stream, originalBlob, callback) {
var length = 0;
var firstByte = null;
var lastByte = null;
stream.on('readable', function () {
var chunk = null;
while (chunk = stream.read()) {
length += chunk.length;
if (firstByte === null) {
firstByte = chunk[0];
}
if (length === originalBlob.length) {
lastByte = chunk[chunk.length-1];
}
}
});
stream.on('end', function () {
test.equal(length, originalBlob.length, 'The blob returned should be the same size');
test.equal(firstByte, originalBlob[0], 'The first byte of the stream and the blob dont match');
test.equal(lastByte, originalBlob[originalBlob.length-1], 'The last byte of the stream and the blob dont match');
if (!callback) {
callback = test.done;
}
callback();
});
}

0 comments on commit 0a32ec3

Please sign in to comment.