diff --git a/.travis.yml b/.travis.yml index ee19ca451c..638acf3122 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,38 @@ language: node_js before_install: - npm install -g npm -node_js: - - "0.8" - - "0.10" - - "0.11" - - "0.12" - - "iojs" +notifications: + email: false +matrix: + include: + - node_js: '0.8' + env: TASK=test + - node_js: '0.10' + env: TASK=test + - node_js: '0.11' + env: TASK=test + - node_js: '0.12' + env: TASK=test + - node_js: 'iojs' + env: TASK=test + - node_js: 'iojs' + env: TASK=browser BROWSER_NAME=opera BROWSER_VERSION="11..latest" + - node_js: 'iojs' + env: TASK=browser BROWSER_NAME=ie BROWSER_VERSION="9..latest" + - node_js: 'iojs' + env: TASK=browser BROWSER_NAME=chrome BROWSER_VERSION="39..beta" + - node_js: 'iojs' + env: TASK=browser BROWSER_NAME=firefox BROWSER_VERSION="34..beta" + - node_js: 'iojs' + env: TASK=browser BROWSER_NAME=ipad BROWSER_VERSION="6.0..latest" + - node_js: 'iojs' + env: TASK=browser BROWSER_NAME=iphone BROWSER_VERSION="6.0..latest" + - node_js: 'iojs' + env: TASK=browser BROWSER_NAME=safari BROWSER_VERSION="5..latest" + - node_js: 'iojs' + env: TASK=browser BROWSER_NAME=android BROWSER_VERSION="4.0..latest" +script: "npm run $TASK" +env: + global: + - secure: rE2Vvo7vnjabYNULNyLFxOyt98BoJexDqsiOnfiD6kLYYsiQGfr/sbZkPMOFm9qfQG7pjqx+zZWZjGSswhTt+626C0t/njXqug7Yps4c3dFblzGfreQHp7wNX5TFsvrxd6dAowVasMp61sJcRnB2w8cUzoe3RAYUDHyiHktwqMc= + - secure: g9YINaKAdMatsJ28G9jCGbSaguXCyxSTy+pBO6Ch0Cf57ZLOTka3HqDj8p3nV28LUIHZ3ut5WO43CeYKwt4AUtLpBS3a0dndHdY6D83uY6b2qh5hXlrcbeQTq2cvw2y95F7hm4D1kwrgZ7ViqaKggRcEupAL69YbJnxeUDKWEdI= diff --git a/.zuul.yml b/.zuul.yml new file mode 100644 index 0000000000..96d9cfbd38 --- /dev/null +++ b/.zuul.yml @@ -0,0 +1 @@ +ui: tape diff --git a/README.md b/README.md index 5cebe16e62..f9fb520598 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,9 @@ [![NPM](https://nodei.co/npm/readable-stream.png?downloads=true&downloadRank=true)](https://nodei.co/npm/readable-stream/) [![NPM](https://nodei.co/npm-dl/readable-stream.png?&months=6&height=3)](https://nodei.co/npm/readable-stream/) + +[![Sauce Test Status](https://saucelabs.com/browser-matrix/readable-stream.svg)](https://saucelabs.com/u/readable-stream) + ```bash npm install --save readable-stream ``` diff --git a/build/common-replacements.js b/build/common-replacements.js index 820ebbf89f..fe5badf6af 100644 --- a/build/common-replacements.js +++ b/build/common-replacements.js @@ -26,3 +26,16 @@ module.exports.altIndexOfUseReplacement = [ /(\W)([\w\.\(\),\[\]]+)(\.indexOf\()/gm , '$1indexOf($2, ' ] +module.exports.objectKeysDefine = [ + /^('use strict';)$/m + , '$1\n\n/**/\nvar objectKeys = Object.keys || function (obj) {\n' + + ' var keys = [];\n' + + ' for (var key in obj) keys.push(key);\n' + + ' return keys;\n' + + '}\n/**/\n' +] + +module.exports.objectKeysReplacement = [ + /Object\.keys/g + , 'objectKeys' + ] diff --git a/build/files.js b/build/files.js index 78182d2401..20410c3b25 100644 --- a/build/files.js +++ b/build/files.js @@ -63,10 +63,15 @@ const headRegexp = /(^module.exports = \w+;?)/m ] , objectDefinePropertyReplacement = [ - /Object.defineProperties/ + /(Object\.defineProperties)/ , 'if (Object.defineProperties) $1' ] - + , objectDefinePropertySingReplacement = [ + /Object\.defineProperty\(([\w\W]+?)\}\);/ + , '(function (){try {\n' + + 'Object.defineProperty\($1});\n' + + '}catch(_){}}());\n' + ] , isArrayDefine = [ headRegexp @@ -78,19 +83,9 @@ const headRegexp = /(^module.exports = \w+;?)/m , 'isArray' ] - , objectKeysDefine = [ - headRegexp - , '$1\n\n/**/\nvar objectKeys = Object.keys || function (obj) {\n' - + ' var keys = [];\n' - + ' for (var key in obj) keys.push(key);\n' - + ' return keys;\n' - + '}\n/**/\n' - ] + , objectKeysDefine = require('./common-replacements').objectKeysDefine - , objectKeysReplacement = [ - /Object\.keys/g - , 'objectKeys' - ] + , objectKeysReplacement = require('./common-replacements').objectKeysReplacement , eventEmittterReplacement = [ /(require\('events'\)\.EventEmitter;)/ @@ -122,7 +117,7 @@ const headRegexp = /(^module.exports = \w+;?)/m ] , isBufferReplacement = [ - /(\w+) instanceof Buffer/ + /(\w+) instanceof Buffer/g , 'Buffer.isBuffer($1)' ] @@ -202,6 +197,7 @@ module.exports['_stream_writable.js'] = [ , debugLogReplacement , deprecateReplacement , objectDefinePropertyReplacement + , objectDefinePropertySingReplacement , bufferIsEncodingReplacement , [ /^var assert = require\('assert'\);$/m, '' ] , requireStreamReplacement diff --git a/build/test-replacements.js b/build/test-replacements.js index 4b2ace251b..f98f05f13a 100644 --- a/build/test-replacements.js +++ b/build/test-replacements.js @@ -2,6 +2,10 @@ const altForEachImplReplacement = require('./common-replacements').altForEachImp , altForEachUseReplacement = require('./common-replacements').altForEachUseReplacement , altIndexOfImplReplacement = require('./common-replacements').altIndexOfImplReplacement , altIndexOfUseReplacement = require('./common-replacements').altIndexOfUseReplacement + , objectKeysDefine = + require('./common-replacements').objectKeysDefine + , objectKeysReplacement = + require('./common-replacements').objectKeysReplacement module.exports.all = [ [ @@ -55,7 +59,9 @@ module.exports['test-stream-big-packet.js'] = [ ] module.exports['common.js'] = [ - altForEachImplReplacement + objectKeysDefine + , objectKeysReplacement + , altForEachImplReplacement , altForEachUseReplacement , [ @@ -106,6 +112,18 @@ module.exports['common.js'] = [ + '}\n' + '/**/\n' ] + , [ + /^if \(global\.ArrayBuffer\) \{([^\}]+)\}$/m + , '/**/if (!process.browser) {' + + '\nif \(global\.ArrayBuffer\) {$1}\n' + + '}/**/\n' + ] + , [ + /^Object\.defineProperty\(([\w\W]+?)\}\)\;/mg + , '/**/if (!process.browser) {' + + '\nObject\.defineProperty($1});\n' + + '}/**/\n' + ] ] // this test has some trouble with the nextTick depth when run diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 5ff12c0df7..69558af037 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -5,13 +5,6 @@ 'use strict'; -module.exports = Duplex; - -/**/ -var processNextTick = require('process-nextick-args'); -/**/ - - /**/ var objectKeys = Object.keys || function (obj) { var keys = []; @@ -21,6 +14,13 @@ var objectKeys = Object.keys || function (obj) { /**/ +module.exports = Duplex; + +/**/ +var processNextTick = require('process-nextick-args'); +/**/ + + /**/ var util = require('core-util-is'); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 44f698168e..ffccdb0083 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -143,12 +143,15 @@ WritableState.prototype.getBuffer = function writableStateGetBuffer() { return out; }; +(function (){try { Object.defineProperty(WritableState.prototype, 'buffer', { get: require('util-deprecate')(function() { return this.getBuffer(); }, '_writableState.buffer is deprecated. Use ' + '_writableState.getBuffer() instead.') }); +}catch(_){}}()); + function Writable(options) { var Duplex = require('./_stream_duplex'); @@ -217,7 +220,7 @@ Writable.prototype.write = function(chunk, encoding, cb) { encoding = null; } - if (chunk instanceof Buffer) + if (Buffer.isBuffer(chunk)) encoding = 'buffer'; else if (!encoding) encoding = state.defaultEncoding; @@ -282,7 +285,7 @@ function decodeChunk(state, chunk, encoding) { function writeOrBuffer(stream, state, chunk, encoding, cb) { chunk = decodeChunk(state, chunk, encoding); - if (chunk instanceof Buffer) + if (Buffer.isBuffer(chunk)) encoding = 'buffer'; var len = state.objectMode ? 1 : chunk.length; diff --git a/package.json b/package.json index 595f2aaeda..50929ba884 100644 --- a/package.json +++ b/package.json @@ -5,17 +5,20 @@ "main": "readable.js", "dependencies": { "core-util-is": "~1.0.0", - "process-nextick-args": "~1.0.0", "inherits": "~2.0.1", "isarray": "0.0.1", + "process-nextick-args": "~1.0.0", "string_decoder": "~0.10.x", "util-deprecate": "~1.0.1" }, "devDependencies": { - "tap": "~0.2.6" + "tap": "~0.2.6", + "tape": "~4.0.0", + "zuul": "~3.0.0" }, "scripts": { - "test": "tap test/parallel/*.js" + "test": "tap test/parallel/*.js", + "browser": "zuul --browser-name $BROWSER_NAME --browser-version $BROWSER_VERSION -- test/browser.js" }, "repository": { "type": "git", diff --git a/test/browser.js b/test/browser.js new file mode 100644 index 0000000000..d354b5c72f --- /dev/null +++ b/test/browser.js @@ -0,0 +1,61 @@ +if (!global.console) { + global.console = {}; +} +if (!global.console.log) { + global.console.log = function () {}; +} +if (!global.console.error) { + global.console.error = global.console.log; +} +if (!global.console.info) { + global.console.info = global.console.log; +} +var test = require('tape'); + +test('streams', function (t) { + require('./browser/test-stream-big-packet')(t); + require('./browser/test-stream-big-push')(t); + require('./browser/test-stream-duplex')(t); + require('./browser/test-stream-end-paused')(t); + require('./browser/test-stream-ispaused')(t); + require('./browser/test-stream-pipe-after-end')(t); + require('./browser/test-stream-pipe-cleanup')(t); + require('./browser/test-stream-pipe-error-handling')(t); + require('./browser/test-stream-pipe-event')(t); + require('./browser/test-stream-push-order')(t); + require('./browser/test-stream-push-strings')(t); + require('./browser/test-stream-readable-constructor-set-methods')(t); + require('./browser/test-stream-readable-event')(t); + require('./browser/test-stream-transform-constructor-set-methods')(t); + require('./browser/test-stream-transform-objectmode-falsey-value')(t); + require('./browser/test-stream-transform-split-objectmode')(t); + require('./browser/test-stream-unshift-empty-chunk')(t); + require('./browser/test-stream-unshift-read-race')(t); + require('./browser/test-stream-writable-change-default-encoding')(t); + require('./browser/test-stream-writable-constructor-set-methods')(t); + require('./browser/test-stream-writable-decoded-encoding')(t); + require('./browser/test-stream-writev')(t); +}); + +test('streams 2', function (t) { + require('./browser/test-stream2-base64-single-char-read-end')(t); + require('./browser/test-stream2-compatibility')(t); + require('./browser/test-stream2-large-read-stall')(t); + require('./browser/test-stream2-objects')(t); + require('./browser/test-stream2-pipe-error-handling')(t); + require('./browser/test-stream2-pipe-error-once-listener')(t); + require('./browser/test-stream2-push')(t); + require('./browser/test-stream2-readable-empty-buffer-no-eof')(t); + require('./browser/test-stream2-readable-from-list')(t); + require('./browser/test-stream2-transform')(t); + require('./browser/test-stream2-set-encoding')(t); + require('./browser/test-stream2-readable-legacy-drain')(t); + require('./browser/test-stream2-readable-wrap-empty')(t); + require('./browser/test-stream2-readable-non-empty-end')(t); + require('./browser/test-stream2-readable-wrap')(t); + require('./browser/test-stream2-unpipe-drain')(t); + require('./browser/test-stream2-writable')(t); +}); +test('streams 3', function (t) { + require('./browser/test-stream3-pause-then-read')(t); +}); diff --git a/test/browser/test-stream-big-packet.js b/test/browser/test-stream-big-packet.js new file mode 100644 index 0000000000..8670e02301 --- /dev/null +++ b/test/browser/test-stream-big-packet.js @@ -0,0 +1,62 @@ +'use strict'; +var common = require('../common'); +var inherits = require('inherits'); +var stream = require('../../'); + +module.exports = function (t) { + t.test('big packet', function (t) { + t.plan(3); + var passed = false; + + function PassThrough() { + stream.Transform.call(this); + }; + inherits(PassThrough, stream.Transform); + PassThrough.prototype._transform = function(chunk, encoding, done) { + this.push(chunk); + done(); + }; + + function TestStream() { + stream.Transform.call(this); + }; + inherits(TestStream, stream.Transform); + TestStream.prototype._transform = function(chunk, encoding, done) { + if (!passed) { + // Char 'a' only exists in the last write + passed = indexOf(chunk.toString(), 'a') >= 0; + } + if (passed) { + t.ok(passed); + } + done(); + }; + + var s1 = new PassThrough(); + var s2 = new PassThrough(); + var s3 = new TestStream(); + s1.pipe(s3); + // Don't let s2 auto close which may close s3 + s2.pipe(s3, {end: false}); + + // We must write a buffer larger than highWaterMark + var big = new Buffer(s1._writableState.highWaterMark + 1); + big.fill('x'); + + // Since big is larger than highWaterMark, it will be buffered internally. + t.ok(!s1.write(big)); + // 'tiny' is small enough to pass through internal buffer. + t.ok(s2.write('tiny')); + + // Write some small data in next IO loop, which will never be written to s3 + // Because 'drain' event is not emitted from s1 and s1 is still paused + setImmediate(s1.write.bind(s1), 'later'); + + function indexOf (xs, x) { + for (var i = 0, l = xs.length; i < l; i++) { + if (xs[i] === x) return i; + } + return -1; + } + }); +} diff --git a/test/browser/test-stream-big-push.js b/test/browser/test-stream-big-push.js new file mode 100644 index 0000000000..7403e16937 --- /dev/null +++ b/test/browser/test-stream-big-push.js @@ -0,0 +1,68 @@ +'use strict'; +var common = require('../common'); +var stream = require('../../'); +module.exports = function (t) { + t.test('big push', function (t) { + + var str = 'asdfasdfasdfasdfasdf'; + + var r = new stream.Readable({ + highWaterMark: 5, + encoding: 'utf8' + }); + + var reads = 0; + var eofed = false; + var ended = false; + + r._read = function(n) { + if (reads === 0) { + setTimeout(function() { + r.push(str); + }); + reads++; + } else if (reads === 1) { + var ret = r.push(str); + t.equal(ret, false); + reads++; + } else { + t.notOk(eofed); + eofed = true; + r.push(null); + } + }; + + r.on('end', function() { + ended = true; + }); + + // push some data in to start. + // we've never gotten any read event at this point. + var ret = r.push(str); + // should be false. > hwm + t.notOk(ret); + var chunk = r.read(); + t.equal(chunk, str); + chunk = r.read(); + t.equal(chunk, null); + + r.once('readable', function() { + // this time, we'll get *all* the remaining data, because + // it's been added synchronously, as the read WOULD take + // us below the hwm, and so it triggered a _read() again, + // which synchronously added more, which we then return. + chunk = r.read(); + t.equal(chunk, str + str); + + chunk = r.read(); + t.equal(chunk, null); + }); + + r.on('end', function() { + t.ok(eofed); + t.ok(ended); + t.equal(reads, 2); + t.end(); + }); + }); +} diff --git a/test/browser/test-stream-duplex.js b/test/browser/test-stream-duplex.js new file mode 100644 index 0000000000..9bfd6af145 --- /dev/null +++ b/test/browser/test-stream-duplex.js @@ -0,0 +1,35 @@ +'use strict'; +var common = require('../common'); + +var Duplex = require('../../').Transform; + +var stream = new Duplex({ objectMode: true }); +module.exports = function (t) { + t.test('duplex', function (t) { + t.plan(4); + t.ok(stream._readableState.objectMode); + t.ok(stream._writableState.objectMode); + + var written; + var read; + + stream._write = function(obj, _, cb) { + written = obj; + cb(); + }; + + stream._read = function() {}; + + stream.on('data', function(obj) { + read = obj; + }); + + stream.push({ val: 1 }); + stream.end({ val: 2 }); + + stream.on('end', function() { + t.equal(read.val, 1); + t.equal(written.val, 2); + }); + }); +} diff --git a/test/browser/test-stream-end-paused.js b/test/browser/test-stream-end-paused.js new file mode 100644 index 0000000000..ff56dd8127 --- /dev/null +++ b/test/browser/test-stream-end-paused.js @@ -0,0 +1,32 @@ +'use strict'; +var common = require('../common'); + + +// Make sure we don't miss the end event for paused 0-length streams + +var Readable = require('../../').Readable; +var stream = new Readable(); +module.exports = function (t) { + t.test('end pause', function (t) { + t.plan(2); + var calledRead = false; + stream._read = function() { + t.notOk(calledRead); + calledRead = true; + this.push(null); + }; + + stream.on('data', function() { + throw new Error('should not ever get data'); + }); + stream.pause(); + + setTimeout(function() { + stream.on('end', function() { + t.ok(calledRead); + }); + stream.resume(); + }); + + }); +} diff --git a/test/browser/test-stream-ispaused.js b/test/browser/test-stream-ispaused.js new file mode 100644 index 0000000000..d080f41ba4 --- /dev/null +++ b/test/browser/test-stream-ispaused.js @@ -0,0 +1,27 @@ +'use strict'; +var common = require('../common'); + +var stream = require('../../'); +module.exports = function (t) { + t.test('is paused', function (t) { + var readable = new stream.Readable(); + + // _read is a noop, here. + readable._read = Function(); + + // default state of a stream is not "paused" + t.notOk(readable.isPaused()); + + // make the stream start flowing... + readable.on('data', Function()); + + // still not paused. + t.notOk(readable.isPaused()); + + readable.pause(); + t.ok(readable.isPaused()); + readable.resume(); + t.notOk(readable.isPaused()); + t.end(); + }); +} diff --git a/test/browser/test-stream-pipe-after-end.js b/test/browser/test-stream-pipe-after-end.js new file mode 100644 index 0000000000..0ca97b3d70 --- /dev/null +++ b/test/browser/test-stream-pipe-after-end.js @@ -0,0 +1,64 @@ +'use strict'; +var common = require('../common'); + +var Readable = require('../../lib/_stream_readable'); +var Writable = require('../../lib/_stream_writable'); +var inherits = require('inherits'); +module.exports = function (t) { + t.test('pipe after end', function (t) { + t.plan(4); + inherits(TestReadable, Readable); + function TestReadable(opt) { + if (!(this instanceof TestReadable)) + return new TestReadable(opt); + Readable.call(this, opt); + this._ended = false; + } + + TestReadable.prototype._read = function(n) { + if (this._ended) + this.emit('error', new Error('_read called twice')); + this._ended = true; + this.push(null); + }; + + inherits(TestWritable, Writable); + function TestWritable(opt) { + if (!(this instanceof TestWritable)) + return new TestWritable(opt); + Writable.call(this, opt); + this._written = []; + } + + TestWritable.prototype._write = function(chunk, encoding, cb) { + this._written.push(chunk); + cb(); + }; + + // this one should not emit 'end' until we read() from it later. + var ender = new TestReadable(); + var enderEnded = false; + + // what happens when you pipe() a Readable that's already ended? + var piper = new TestReadable(); + // pushes EOF null, and length=0, so this will trigger 'end' + piper.read(); + + setTimeout(function() { + ender.on('end', function() { + enderEnded = true; + t.ok(true, 'enderEnded'); + }); + t.notOk(enderEnded); + var c = ender.read(); + t.equal(c, null); + + var w = new TestWritable(); + w.on('finish', function() { + t.ok(true, 'writableFinished'); + }); + piper.pipe(w); + + }); + }); +} diff --git a/test/browser/test-stream-pipe-cleanup.js b/test/browser/test-stream-pipe-cleanup.js new file mode 100644 index 0000000000..dd2b6d5269 --- /dev/null +++ b/test/browser/test-stream-pipe-cleanup.js @@ -0,0 +1,108 @@ +'use strict'; +// This test asserts that Stream.prototype.pipe does not leave listeners +// hanging on the source or dest. + +var common = require('../common'); +var stream = require('../../'); +var inherits = require('inherits'); +module.exports = function (t) { + t.test('pipe cleanup', function (t) { + if (/^v0\.8\./.test(process.version)) + return t.end(); + + function Writable() { + this.writable = true; + this.endCalls = 0; + require('stream').Stream.call(this); + } + inherits(Writable, require('stream').Stream); + Writable.prototype.end = function() { + this.endCalls++; + }; + + Writable.prototype.destroy = function() { + this.endCalls++; + }; + + function Readable() { + this.readable = true; + require('stream').Stream.call(this); + } + inherits(Readable, require('stream').Stream); + + function Duplex() { + this.readable = true; + Writable.call(this); + } + inherits(Duplex, Writable); + + var i = 0; + var limit = 100; + + var w = new Writable(); + + var r; + + for (i = 0; i < limit; i++) { + r = new Readable(); + r.pipe(w); + r.emit('end'); + } + t.equal(0, r.listeners('end').length); + t.equal(limit, w.endCalls); + + w.endCalls = 0; + + for (i = 0; i < limit; i++) { + r = new Readable(); + r.pipe(w); + r.emit('close'); + } + t.equal(0, r.listeners('close').length); + t.equal(limit, w.endCalls); + + w.endCalls = 0; + + r = new Readable(); + + for (i = 0; i < limit; i++) { + w = new Writable(); + r.pipe(w); + w.emit('close'); + } + t.equal(0, w.listeners('close').length); + + r = new Readable(); + w = new Writable(); + var d = new Duplex(); + r.pipe(d); // pipeline A + d.pipe(w); // pipeline B + t.equal(r.listeners('end').length, 2); // A.onend, A.cleanup + t.equal(r.listeners('close').length, 2); // A.onclose, A.cleanup + t.equal(d.listeners('end').length, 2); // B.onend, B.cleanup + t.equal(d.listeners('close').length, 3); // A.cleanup, B.onclose, B.cleanup + t.equal(w.listeners('end').length, 0); + t.equal(w.listeners('close').length, 1); // B.cleanup + + r.emit('end'); + t.equal(d.endCalls, 1); + t.equal(w.endCalls, 0); + t.equal(r.listeners('end').length, 0); + t.equal(r.listeners('close').length, 0); + t.equal(d.listeners('end').length, 2); // B.onend, B.cleanup + t.equal(d.listeners('close').length, 2); // B.onclose, B.cleanup + t.equal(w.listeners('end').length, 0); + t.equal(w.listeners('close').length, 1); // B.cleanup + + d.emit('end'); + t.equal(d.endCalls, 1); + t.equal(w.endCalls, 1); + t.equal(r.listeners('end').length, 0); + t.equal(r.listeners('close').length, 0); + t.equal(d.listeners('end').length, 0); + t.equal(d.listeners('close').length, 0); + t.equal(w.listeners('end').length, 0); + t.equal(w.listeners('close').length, 0); + t.end(); + }); +} diff --git a/test/browser/test-stream-pipe-error-handling.js b/test/browser/test-stream-pipe-error-handling.js new file mode 100644 index 0000000000..48a8bb375d --- /dev/null +++ b/test/browser/test-stream-pipe-error-handling.js @@ -0,0 +1,102 @@ +'use strict'; +var common = require('../common'); +var Stream = require('stream').Stream; + +module.exports = function (t) { + t.test('Error Listener Catches', function (t) { + t.plan(1); + var source = new Stream(); + var dest = new Stream(); + + source.pipe(dest); + + var gotErr = null; + source.on('error', function(err) { + gotErr = err; + }); + + var err = new Error('This stream turned into bacon.'); + source.emit('error', err); + t.strictEqual(gotErr, err); + }); + + t.test('Error WithoutListener Throws', function (t) { + t.plan(1); + var source = new Stream(); + var dest = new Stream(); + + source.pipe(dest); + + var err = new Error('This stream turned into bacon.'); + + var gotErr = null; + try { + source.emit('error', err); + } catch (e) { + gotErr = e; + } + + t.strictEqual(gotErr, err); + }); + + t.test('Error With Removed Listener Throws', function (t) { + t.plan(2); + var EE = require('events').EventEmitter; + var R = require('../../').Readable; + var W = require('../../').Writable; + + var r = new R(); + var w = new W(); + var removed = false; + + r._read = function() { + setTimeout(function() { + t.ok(removed); + t.throws(function() { + w.emit('error', new Error('fail')); + }); + }); + }; + + w.on('error', myOnError); + r.pipe(w); + w.removeListener('error', myOnError); + removed = true; + + function myOnError(er) { + throw new Error('this should not happen'); + } + }); + + t.test('Error With Removed Listener Throws', function (t) { + t.plan(2); + var EE = require('events').EventEmitter; + var R = require('../../').Readable; + var W = require('../../').Writable; + + var r = new R(); + var w = new W(); + var removed = false; + var caught = false; + + r._read = function() { + setTimeout(function() { + t.ok(removed); + w.emit('error', new Error('fail')); + }); + }; + + w.on('error', myOnError); + w._write = function() {}; + + r.pipe(w); + // Removing some OTHER random listener should not do anything + w.removeListener('error', function() {}); + removed = true; + + function myOnError(er) { + t.notOk(caught); + caught = true; + } + }); +} diff --git a/test/browser/test-stream-pipe-event.js b/test/browser/test-stream-pipe-event.js new file mode 100644 index 0000000000..c0d7a606c2 --- /dev/null +++ b/test/browser/test-stream-pipe-event.js @@ -0,0 +1,32 @@ +'use strict'; +var common = require('../common'); +var stream = require('../../'); +var inherits = require('inherits'); +module.exports = function (t) { + t.test('pipe event', function (t) { + t.plan(1); + function Writable() { + this.writable = true; + require('stream').Stream.call(this); + } + inherits(Writable, require('stream').Stream); + + function Readable() { + this.readable = true; + require('stream').Stream.call(this); + } + inherits(Readable, require('stream').Stream); + + var passed = false; + + var w = new Writable(); + w.on('pipe', function(src) { + passed = true; + }); + + var r = new Readable(); + r.pipe(w); + + t.ok(passed); + }); +} diff --git a/test/browser/test-stream-push-order.js b/test/browser/test-stream-push-order.js new file mode 100644 index 0000000000..acffd41ba1 --- /dev/null +++ b/test/browser/test-stream-push-order.js @@ -0,0 +1,34 @@ +'use strict'; +var common = require('../common'); +var Readable = require('../../').Readable; +module.exports = function (t) { + t.test('push order', function (t) { + t.plan(1); + var s = new Readable({ + highWaterMark: 20, + encoding: 'ascii' + }); + + var list = ['1', '2', '3', '4', '5', '6']; + + s._read = function(n) { + var one = list.shift(); + if (!one) { + s.push(null); + } else { + var two = list.shift(); + s.push(one); + s.push(two); + } + }; + + var v = s.read(0); + + // ACTUALLY [1, 3, 5, 6, 4, 2] + + setTimeout(function() { + t.deepEqual(s._readableState.buffer, + ['1', '2', '3', '4', '5', '6']); + }); + }); +} diff --git a/test/browser/test-stream-push-strings.js b/test/browser/test-stream-push-strings.js new file mode 100644 index 0000000000..1de240efd8 --- /dev/null +++ b/test/browser/test-stream-push-strings.js @@ -0,0 +1,49 @@ +'use strict'; +var common = require('../common'); + +var Readable = require('../../').Readable; +var inherits = require('inherits'); + +module.exports = function (t) { + t.test('push strings', function (t) { + t.plan(2); + inherits(MyStream, Readable); + function MyStream(options) { + Readable.call(this, options); + this._chunks = 3; + } + + MyStream.prototype._read = function(n) { + switch (this._chunks--) { + case 0: + return this.push(null); + case 1: + return setTimeout(function() { + this.push('last chunk'); + }.bind(this), 100); + case 2: + return this.push('second to last chunk'); + case 3: + return process.nextTick(function() { + this.push('first chunk'); + }.bind(this)); + default: + throw new Error('?'); + } + }; + var expect = [ 'first chunksecond to last chunk', 'last chunk' ]; + + var ms = new MyStream(); + var results = []; + ms.on('readable', function() { + var chunk; + while (null !== (chunk = ms.read())) + results.push(chunk + ''); + }); + + ms.on('end', function() { + t.equal(ms._chunks, -1); + t.deepEqual(results, expect); + }); + }); +} diff --git a/test/browser/test-stream-readable-constructor-set-methods.js b/test/browser/test-stream-readable-constructor-set-methods.js new file mode 100644 index 0000000000..fa0d59ba28 --- /dev/null +++ b/test/browser/test-stream-readable-constructor-set-methods.js @@ -0,0 +1,22 @@ +'use strict'; +var common = require('../common'); + +var Readable = require('../../').Readable; +module.exports = function (t) { + t.test('readable constructor set methods', function (t) { + t.plan(2); + var _readCalled = false; + function _read(n) { + _readCalled = true; + this.push(null); + } + + var r = new Readable({ read: _read }); + r.resume(); + + setTimeout(function() { + t.equal(r._read, _read); + t.ok(_readCalled); + }); + }); +} diff --git a/test/browser/test-stream-readable-event.js b/test/browser/test-stream-readable-event.js new file mode 100644 index 0000000000..6afabc369d --- /dev/null +++ b/test/browser/test-stream-readable-event.js @@ -0,0 +1,114 @@ +'use strict'; +var common = require('../common'); + +var Readable = require('../../').Readable; + +function first(t) { + // First test, not reading when the readable is added. + // make sure that on('readable', ...) triggers a readable event. + var r = new Readable({ + highWaterMark: 3 + }); + + var _readCalled = false; + r._read = function(n) { + _readCalled = true; + }; + + // This triggers a 'readable' event, which is lost. + r.push(new Buffer('blerg')); + + var caughtReadable = false; + setTimeout(function() { + // we're testing what we think we are + t.notOk(r._readableState.reading); + r.on('readable', function() { + caughtReadable = true; + setTimeout(function() { + // we're testing what we think we are + t.notOk(_readCalled); + + t.ok(caughtReadable); + t.end(); + }); + }); + }); + + +} + +function second(t) { + // second test, make sure that readable is re-emitted if there's + // already a length, while it IS reading. + + var r = new Readable({ + highWaterMark: 3 + }); + + var _readCalled = false; + r._read = function(n) { + _readCalled = true; + }; + + // This triggers a 'readable' event, which is lost. + r.push(new Buffer('bl')); + + var caughtReadable = false; + setTimeout(function() { + // assert we're testing what we think we are + t.ok(r._readableState.reading); + r.on('readable', function() { + caughtReadable = true; + setTimeout(function() { + // we're testing what we think we are + t.ok(_readCalled); + + t.ok(caughtReadable); + t.end(); + }); + }); + }); + +} + +function third(t) { + // Third test, not reading when the stream has not passed + // the highWaterMark but *has* reached EOF. + var r = new Readable({ + highWaterMark: 30 + }); + + var _readCalled = false; + r._read = function(n) { + _readCalled = true; + }; + + // This triggers a 'readable' event, which is lost. + r.push(new Buffer('blerg')); + r.push(null); + + var caughtReadable = false; + setTimeout(function() { + // assert we're testing what we think we are + t.notOk(r._readableState.reading); + r.on('readable', function() { + caughtReadable = true; + setTimeout(function() { + // we're testing what we think we are + t.notOk(_readCalled); + + t.ok(caughtReadable); + t.end(); + }); + }); + }); + +}; + +module.exports = function (t) { + t.test('readable events', function (t) { + t.test('first', first); + t.test('second', second); + t.test('third', third); + }); +} diff --git a/test/browser/test-stream-transform-constructor-set-methods.js b/test/browser/test-stream-transform-constructor-set-methods.js new file mode 100644 index 0000000000..de89057c12 --- /dev/null +++ b/test/browser/test-stream-transform-constructor-set-methods.js @@ -0,0 +1,35 @@ +'use strict'; +var common = require('../common'); + +var Transform = require('../../').Transform; +module.exports = function (t) { + t.test('transform constructor set methods', function (t) { + var _transformCalled = false; + function _transform(d, e, n) { + _transformCalled = true; + n(); + } + + var _flushCalled = false; + function _flush(n) { + _flushCalled = true; + n(); + } + + var tr = new Transform({ + transform: _transform, + flush: _flush + }); + + tr.end(new Buffer('blerg')); + tr.resume(); + + tr.on('end', function() { + t.equal(tr._transform, _transform); + t.equal(tr._flush, _flush); + t.ok(_transformCalled); + t.ok(_flushCalled); + t.end(); + }); + }); +} diff --git a/test/browser/test-stream-transform-objectmode-falsey-value.js b/test/browser/test-stream-transform-objectmode-falsey-value.js new file mode 100644 index 0000000000..3b226a7c26 --- /dev/null +++ b/test/browser/test-stream-transform-objectmode-falsey-value.js @@ -0,0 +1,36 @@ +'use strict'; +var common = require('../common'); + +var stream = require('../../'); +var PassThrough = stream.PassThrough; +module.exports = function (t) { + t.test('transform objectmode falsey value', function (t) { + var src = new PassThrough({ objectMode: true }); + var tx = new PassThrough({ objectMode: true }); + var dest = new PassThrough({ objectMode: true }); + + var expect = [ -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]; + var results = []; + dest.on('end', function() { + t.deepEqual(results, expect); + t.end(); + }); + + dest.on('data', function(x) { + results.push(x); + }); + + src.pipe(tx).pipe(dest); + + var i = -1; + var int = setInterval(function() { + if (i > 10) { + src.end(); + clearInterval(int); + } else { + t.ok(true); + src.write(i++); + } + }, 10); + }); +} diff --git a/test/browser/test-stream-transform-split-objectmode.js b/test/browser/test-stream-transform-split-objectmode.js new file mode 100644 index 0000000000..3813499948 --- /dev/null +++ b/test/browser/test-stream-transform-split-objectmode.js @@ -0,0 +1,58 @@ +'use strict'; +var common = require('../common'); + +var Transform = require('../../').Transform; +module.exports = function (t) { + t.test('transform split objectmode', function (t) { + t.plan(10); + var parser = new Transform({ readableObjectMode : true }); + + t.ok(parser._readableState.objectMode, 'parser 1'); + t.notOk(parser._writableState.objectMode, 'parser 2'); + t.equals(parser._readableState.highWaterMark, 16, 'parser 3'); + t.equals(parser._writableState.highWaterMark, (16 * 1024), 'parser 4'); + + parser._transform = function(chunk, enc, callback) { + callback(null, { val : chunk[0] }); + }; + + var parsed; + + parser.on('data', function(obj) { + parsed = obj; + }); + + parser.end(new Buffer([42])); + + parser.on('end', function() { + t.equals(parsed.val, 42, 'parser ended'); + }); + + + var serializer = new Transform({ writableObjectMode : true }); + + t.notOk(serializer._readableState.objectMode, 'serializer 1'); + t.ok(serializer._writableState.objectMode, 'serializer 2'); + t.equals(serializer._readableState.highWaterMark, (16 * 1024), 'serializer 3'); + t.equals(serializer._writableState.highWaterMark, 16, 'serializer 4'); + + serializer._transform = function(obj, _, callback) { + callback(null, new Buffer([obj.val])); + }; + + var serialized; + + serializer.on('data', function(chunk) { + serialized = chunk; + }); + + serializer.write({ val : 42 }); + + serializer.on('end', function() { + t.equals(serialized[0], 42, 'searlizer ended'); + }); + setImmediate(function () { + serializer.end(); + }); + }); +} diff --git a/test/browser/test-stream-unshift-empty-chunk.js b/test/browser/test-stream-unshift-empty-chunk.js new file mode 100644 index 0000000000..ddeb17016a --- /dev/null +++ b/test/browser/test-stream-unshift-empty-chunk.js @@ -0,0 +1,63 @@ +'use strict'; +var common = require('../common'); + +// This test verifies that stream.unshift(Buffer(0)) or +// stream.unshift('') does not set state.reading=false. +var Readable = require('../../').Readable; +module.exports = function (t) { + t.test('unshift empty chunk', function (t) { + t.plan(1); + var r = new Readable(); + var nChunks = 10; + var chunk = new Buffer(10); + chunk.fill('x'); + + r._read = function(n) { + setTimeout(function() { + r.push(--nChunks === 0 ? null : chunk); + }); + }; + + var readAll = false; + var seen = []; + r.on('readable', function() { + var chunk; + while (chunk = r.read()) { + seen.push(chunk.toString()); + // simulate only reading a certain amount of the data, + // and then putting the rest of the chunk back into the + // stream, like a parser might do. We just fill it with + // 'y' so that it's easy to see which bits were touched, + // and which were not. + var putBack = new Buffer(readAll ? 0 : 5); + putBack.fill('y'); + readAll = !readAll; + r.unshift(putBack); + } + }); + + var expect = + [ 'xxxxxxxxxx', + 'yyyyy', + 'xxxxxxxxxx', + 'yyyyy', + 'xxxxxxxxxx', + 'yyyyy', + 'xxxxxxxxxx', + 'yyyyy', + 'xxxxxxxxxx', + 'yyyyy', + 'xxxxxxxxxx', + 'yyyyy', + 'xxxxxxxxxx', + 'yyyyy', + 'xxxxxxxxxx', + 'yyyyy', + 'xxxxxxxxxx', + 'yyyyy' ]; + + r.on('end', function() { + t.deepEqual(seen, expect); + }); + }); +} diff --git a/test/browser/test-stream-unshift-read-race.js b/test/browser/test-stream-unshift-read-race.js new file mode 100644 index 0000000000..b2362a7c60 --- /dev/null +++ b/test/browser/test-stream-unshift-read-race.js @@ -0,0 +1,110 @@ +'use strict'; +var common = require('../common'); + +// This test verifies that: +// 1. unshift() does not cause colliding _read() calls. +// 2. unshift() after the 'end' event is an error, but after the EOF +// signalling null, it is ok, and just creates a new readable chunk. +// 3. push() after the EOF signaling null is an error. +// 4. _read() is not called after pushing the EOF null chunk. + +var stream = require('../../'); +module.exports = function (t) { + t.test('unshift read race', function (tape) { + var hwm = 10; + var r = stream.Readable({ highWaterMark: hwm }); + var chunks = 10; + var t = (chunks * 5); + + var data = new Buffer(chunks * hwm + Math.ceil(hwm / 2)); + for (var i = 0; i < data.length; i++) { + var c = 'asdf'.charCodeAt(i % 4); + data[i] = c; + } + + var pos = 0; + var pushedNull = false; + r._read = function(n) { + tape.notOk(pushedNull, '_read after null push'); + + // every third chunk is fast + push(!(chunks % 3)); + + function push(fast) { + tape.notOk(pushedNull, 'push() after null push'); + var c = pos >= data.length ? null : data.slice(pos, Math.min(pos + n, data.length)); + pushedNull = c === null; + if (fast) { + pos += n; + r.push(c); + if (c === null) pushError(); + } else { + setTimeout(function() { + pos += n; + r.push(c); + if (c === null) pushError(); + }); + } + } + }; + + function pushError() { + tape.throws(function() { + r.push(new Buffer(1)); + }); + } + + + var w = stream.Writable(); + var written = []; + w._write = function(chunk, encoding, cb) { + written.push(chunk.toString()); + cb(); + }; + + var ended = false; + r.on('end', function() { + tape.notOk(ended, 'end emitted more than once'); + tape.throws(function() { + r.unshift(new Buffer(1)); + }); + ended = true; + w.end(); + }); + + r.on('readable', function() { + var chunk; + while (null !== (chunk = r.read(10))) { + w.write(chunk); + if (chunk.length > 4) + r.unshift(new Buffer('1234')); + } + }); + + w.on('finish', function() { + // each chunk should start with 1234, and then be asfdasdfasdf... + // The first got pulled out before the first unshift('1234'), so it's + // lacking that piece. + tape.equal(written[0], 'asdfasdfas'); + var asdf = 'd'; + //console.error('0: %s', written[0]); + for (var i = 1; i < written.length; i++) { + //console.error('%s: %s', i.toString(32), written[i]); + tape.equal(written[i].slice(0, 4), '1234'); + for (var j = 4; j < written[i].length; j++) { + var c = written[i].charAt(j); + tape.equal(c, asdf); + switch (asdf) { + case 'a': asdf = 's'; break; + case 's': asdf = 'd'; break; + case 'd': asdf = 'f'; break; + case 'f': asdf = 'a'; break; + } + } + } + tape.equal(written.length, 18); + tape.end(); + }); + + }); +} diff --git a/test/browser/test-stream-writable-change-default-encoding.js b/test/browser/test-stream-writable-change-default-encoding.js new file mode 100644 index 0000000000..de657152af --- /dev/null +++ b/test/browser/test-stream-writable-change-default-encoding.js @@ -0,0 +1,64 @@ +'use strict'; +var common = require('../common'); + +var stream = require('../../'); +var inherits = require('inherits'); + +function MyWritable(fn, options) { + stream.Writable.call(this, options); + this.fn = fn; +}; + +inherits(MyWritable, stream.Writable); + +MyWritable.prototype._write = function(chunk, encoding, callback) { + this.fn(Buffer.isBuffer(chunk), typeof chunk, encoding); + callback(); +}; + +function defaultCondingIsUtf8(t) { + t.plan(1); + var m = new MyWritable(function(isBuffer, type, enc) { + t.equal(enc, 'utf8'); + }, { decodeStrings: false }); + m.write('foo'); + m.end(); +} + +function changeDefaultEncodingToAscii(t) { + t.plan(1); + var m = new MyWritable(function(isBuffer, type, enc) { + t.equal(enc, 'ascii'); + }, { decodeStrings: false }); + m.setDefaultEncoding('ascii'); + m.write('bar'); + m.end(); +} + +function changeDefaultEncodingToInvalidValue(t) { + t.plan(1); + t.throws(function () { + var m = new MyWritable(function(isBuffer, type, enc) { + }, { decodeStrings: false }); + m.setDefaultEncoding({}); + m.write('bar'); + m.end(); + }, TypeError); +} +function checkVairableCaseEncoding(t) { + t.plan(1); + var m = new MyWritable(function(isBuffer, type, enc) { + t.equal(enc, 'ascii'); + }, { decodeStrings: false }); + m.setDefaultEncoding('AsCii'); + m.write('bar'); + m.end(); +} +module.exports = function (t) { + t.test('writable change default encoding', function (t) { + t.test('defaultCondingIsUtf8', defaultCondingIsUtf8); + t.test('changeDefaultEncodingToAscii', changeDefaultEncodingToAscii); + t.test('changeDefaultEncodingToInvalidValue', changeDefaultEncodingToInvalidValue); + t.test('checkVairableCaseEncoding', checkVairableCaseEncoding); + }); +} diff --git a/test/browser/test-stream-writable-constructor-set-methods.js b/test/browser/test-stream-writable-constructor-set-methods.js new file mode 100644 index 0000000000..25a657e0d3 --- /dev/null +++ b/test/browser/test-stream-writable-constructor-set-methods.js @@ -0,0 +1,40 @@ +'use strict'; +var common = require('../common'); +var Writable = require('../../').Writable; + +module.exports = function (t) { + t.test('writable constructor set methods', function (t){ + + + var _writeCalled = false; + function _write(d, e, n) { + _writeCalled = true; + } + + var w = new Writable({ write: _write }); + w.end(new Buffer('blerg')); + + var _writevCalled = false; + var dLength = 0; + function _writev(d, n) { + dLength = d.length; + _writevCalled = true; + } + + var w2 = new Writable({ writev: _writev }); + w2.cork(); + + w2.write(new Buffer('blerg')); + w2.write(new Buffer('blerg')); + w2.end(); + + setImmediate(function() { + t.equal(w._write, _write); + t.ok(_writeCalled); + t.equal(w2._writev, _writev); + t.equal(dLength, 2); + t.ok(_writevCalled); + t.end(); + }); + }); +} diff --git a/test/browser/test-stream-writable-decoded-encoding.js b/test/browser/test-stream-writable-decoded-encoding.js new file mode 100644 index 0000000000..f32dd7ef63 --- /dev/null +++ b/test/browser/test-stream-writable-decoded-encoding.js @@ -0,0 +1,45 @@ +'use strict'; +var common = require('../common'); + +var stream = require('../../'); +var inherits = require('inherits'); + +function MyWritable(fn, options) { + stream.Writable.call(this, options); + this.fn = fn; +}; + +inherits(MyWritable, stream.Writable); + +MyWritable.prototype._write = function(chunk, encoding, callback) { + this.fn(Buffer.isBuffer(chunk), typeof chunk, encoding); + callback(); +}; + +function decodeStringsTrue(t) { + t.plan(3); + var m = new MyWritable(function(isBuffer, type, enc) { + t.ok(isBuffer); + t.equal(type, 'object'); + t.equal(enc, 'buffer'); + //console.log('ok - decoded string is decoded'); + }, { decodeStrings: true }); + m.write('some-text', 'utf8'); + m.end(); +} + +function decodeStringsFalse(t) { + t.plan(3); + var m = new MyWritable(function(isBuffer, type, enc) { + t.notOk(isBuffer); + t.equal(type, 'string'); + t.equal(enc, 'utf8'); + //console.log('ok - un-decoded string is not decoded'); + }, { decodeStrings: false }); + m.write('some-text', 'utf8'); + m.end(); +} +module.exports = function (t) { + t.test('decodeStringsTrue', decodeStringsTrue); + t.test('decodeStringsFalse', decodeStringsFalse); +} diff --git a/test/browser/test-stream-writev.js b/test/browser/test-stream-writev.js new file mode 100644 index 0000000000..f676f204d5 --- /dev/null +++ b/test/browser/test-stream-writev.js @@ -0,0 +1,105 @@ +'use strict'; +var common = require('../common'); + +var stream = require('../../'); + +var queue = []; +for (var decode = 0; decode < 2; decode++) { + for (var uncork = 0; uncork < 2; uncork++) { + for (var multi = 0; multi < 2; multi++) { + queue.push([!!decode, !!uncork, !!multi]); + } + } +} + +module.exports = function (t) { + t.test('writev', function (t) { + queue.forEach(function (tr, i){ + t.test('round ' + i, test(tr[0], tr[1], tr[2])); + }); + }); +} + +function test(decode, uncork, multi) { + return function (t) { + //console.log('# decode=%j uncork=%j multi=%j', decode, uncork, multi); + var counter = 0; + var expectCount = 0; + function cnt(msg) { + expectCount++; + var expect = expectCount; + var called = false; + return function(er) { + if (er) + throw er; + called = true; + counter++; + t.equal(counter, expect); + }; + } + + var w = new stream.Writable({ decodeStrings: decode }); + w._write = function(chunk, e, cb) { + t.ok(false, 'Should not call _write'); + }; + + var expectChunks = decode ? + [ + { encoding: 'buffer', + chunk: [104, 101, 108, 108, 111, 44, 32] }, + { encoding: 'buffer', + chunk: [119, 111, 114, 108, 100] }, + { encoding: 'buffer', + chunk: [33] }, + { encoding: 'buffer', + chunk: [10, 97, 110, 100, 32, 116, 104, 101, 110, 46, 46, 46] }, + { encoding: 'buffer', + chunk: [250, 206, 190, 167, 222, 173, 190, 239, 222, 202, 251, 173]} + ] : [ + { encoding: 'ascii', chunk: 'hello, ' }, + { encoding: 'utf8', chunk: 'world' }, + { encoding: 'buffer', chunk: [33] }, + { encoding: 'binary', chunk: '\nand then...' }, + { encoding: 'hex', chunk: 'facebea7deadbeefdecafbad' } + ]; + + var actualChunks; + w._writev = function(chunks, cb) { + actualChunks = chunks.map(function(chunk) { + return { + encoding: chunk.encoding, + chunk: Buffer.isBuffer(chunk.chunk) ? + Array.prototype.slice.call(chunk.chunk) : chunk.chunk + }; + }); + cb(); + }; + + w.cork(); + w.write('hello, ', 'ascii', cnt('hello')); + w.write('world', 'utf8', cnt('world')); + + if (multi) + w.cork(); + + w.write(new Buffer('!'), 'buffer', cnt('!')); + w.write('\nand then...', 'binary', cnt('and then')); + + if (multi) + w.uncork(); + + w.write('facebea7deadbeefdecafbad', 'hex', cnt('hex')); + + if (uncork) + w.uncork(); + + w.end(cnt('end')); + + w.on('finish', function() { + // make sure finish comes after all the write cb + cnt('finish')(); + t.deepEqual(expectChunks, actualChunks); + t.end(); + }); + } +} diff --git a/test/browser/test-stream2-base64-single-char-read-end.js b/test/browser/test-stream2-base64-single-char-read-end.js new file mode 100644 index 0000000000..c68e66dba1 --- /dev/null +++ b/test/browser/test-stream2-base64-single-char-read-end.js @@ -0,0 +1,41 @@ +'use strict'; +var common = require('../common'); +var R = require('../../lib/_stream_readable'); +var W = require('../../lib/_stream_writable'); +module.exports = function (t) { + t.test('base64 single char read end', function (t) { + t.plan(1); + var src = new R({encoding: 'base64'}); + var dst = new W(); + var hasRead = false; + var accum = []; + var timeout; + + src._read = function(n) { + if(!hasRead) { + hasRead = true; + process.nextTick(function() { + src.push(new Buffer('1')); + src.push(null); + }); + }; + }; + + dst._write = function(chunk, enc, cb) { + accum.push(chunk); + cb(); + }; + + src.on('end', function() { + t.equal(Buffer.concat(accum) + '', 'MQ=='); + clearTimeout(timeout); + }); + + src.pipe(dst); + + timeout = setTimeout(function() { + assert.fail('timed out waiting for _write'); + }, 100); + +}) +} diff --git a/test/browser/test-stream2-compatibility.js b/test/browser/test-stream2-compatibility.js new file mode 100644 index 0000000000..34961a30a2 --- /dev/null +++ b/test/browser/test-stream2-compatibility.js @@ -0,0 +1,33 @@ +'use strict'; +var R = require('../../lib/_stream_readable'); +var inherits = require('inherits'); +var EE = require('events').EventEmitter; +module.exports = function (t) { + t.test('compatibility', function (t) { + t.plan(1); + + var ondataCalled = 0; + + function TestReader() { + R.apply(this); + this._buffer = new Buffer(100); + this._buffer.fill('x'); + + this.on('data', function() { + ondataCalled++; + }); + } + + inherits(TestReader, R); + + TestReader.prototype._read = function(n) { + this.push(this._buffer); + this._buffer = new Buffer(0); + }; + + var reader = new TestReader(); + setTimeout(function() { + t.equal(ondataCalled, 1); + }); + }); +} diff --git a/test/browser/test-stream2-large-read-stall.js b/test/browser/test-stream2-large-read-stall.js new file mode 100644 index 0000000000..74be495ed1 --- /dev/null +++ b/test/browser/test-stream2-large-read-stall.js @@ -0,0 +1,62 @@ +'use strict'; +var common = require('../common'); +module.exports = function (t) { + t.test('large object read stall', function (t) { + +// If everything aligns so that you do a read(n) of exactly the +// remaining buffer, then make sure that 'end' still emits. + + var READSIZE = 100; + var PUSHSIZE = 20; + var PUSHCOUNT = 1000; + var HWM = 50; + + var Readable = require('../../').Readable; + var r = new Readable({ + highWaterMark: HWM + }); + var rs = r._readableState; + + r._read = push; + + r.on('readable', function() { + ;false && console.error('>> readable'); + do { + ;false && console.error(' > read(%d)', READSIZE); + var ret = r.read(READSIZE); + ;false && console.error(' < %j (%d remain)', ret && ret.length, rs.length); + } while (ret && ret.length === READSIZE); + + ;false && console.error('<< after read()', + ret && ret.length, + rs.needReadable, + rs.length); + }); + + var endEmitted = false; + r.on('end', function() { + t.equal(pushes, PUSHCOUNT + 1); + t.end(); + ;false && console.error('end'); + }); + + var pushes = 0; + function push() { + if (pushes > PUSHCOUNT) + return; + + if (pushes++ === PUSHCOUNT) { + ;false && console.error(' push(EOF)'); + return r.push(null); + } + + ;false && console.error(' push #%d', pushes); + if (r.push(new Buffer(PUSHSIZE))) + setTimeout(push); + } + + // start the flow + var ret = r.read(0); + + }); +} diff --git a/test/browser/test-stream2-objects.js b/test/browser/test-stream2-objects.js new file mode 100644 index 0000000000..26a038b599 --- /dev/null +++ b/test/browser/test-stream2-objects.js @@ -0,0 +1,306 @@ +'use strict'; +var common = require('../common'); +var Readable = require('../../lib/_stream_readable'); +var Writable = require('../../lib/_stream_writable'); + +module.exports = function (t) { + + + + function toArray(callback) { + var stream = new Writable({ objectMode: true }); + var list = []; + stream.write = function(chunk) { + list.push(chunk); + }; + + stream.end = function() { + callback(list); + }; + + return stream; + } + + function fromArray(list) { + var r = new Readable({ objectMode: true }); + r._read = noop; + forEach(list, function(chunk) { + r.push(chunk); + }); + r.push(null); + + return r; + } + + function noop() {} + + t.test('can read objects from stream', function(t) { + var r = fromArray([{ one: '1'}, { two: '2' }]); + + var v1 = r.read(); + var v2 = r.read(); + var v3 = r.read(); + + t.deepEqual(v1, { one: '1' }); + t.deepEqual(v2, { two: '2' }); + t.deepEqual(v3, null); + + t.end(); + }); + + t.test('can pipe objects into stream', function(t) { + var r = fromArray([{ one: '1'}, { two: '2' }]); + + r.pipe(toArray(function(list) { + t.deepEqual(list, [ + { one: '1' }, + { two: '2' } + ]); + + t.end(); + })); + }); + + t.test('read(n) is ignored', function(t) { + var r = fromArray([{ one: '1'}, { two: '2' }]); + + var value = r.read(2); + + t.deepEqual(value, { one: '1' }); + + t.end(); + }); + + t.test('can read objects from _read (sync)', function(t) { + var r = new Readable({ objectMode: true }); + var list = [{ one: '1'}, { two: '2' }]; + r._read = function(n) { + var item = list.shift(); + r.push(item || null); + }; + + r.pipe(toArray(function(list) { + t.deepEqual(list, [ + { one: '1' }, + { two: '2' } + ]); + + t.end(); + })); + }); + + t.test('can read objects from _read (async)', function(t) { + var r = new Readable({ objectMode: true }); + var list = [{ one: '1'}, { two: '2' }]; + r._read = function(n) { + var item = list.shift(); + process.nextTick(function() { + r.push(item || null); + }); + }; + + r.pipe(toArray(function(list) { + t.deepEqual(list, [ + { one: '1' }, + { two: '2' } + ]); + + t.end(); + })); + }); + + t.test('can read strings as objects', function(t) { + var r = new Readable({ + objectMode: true + }); + r._read = noop; + var list = ['one', 'two', 'three']; + forEach(list, function(str) { + r.push(str); + }); + r.push(null); + + r.pipe(toArray(function(array) { + t.deepEqual(array, list); + + t.end(); + })); + }); + + t.test('read(0) for object streams', function(t) { + var r = new Readable({ + objectMode: true + }); + r._read = noop; + + r.push('foobar'); + r.push(null); + + var v = r.read(0); + + r.pipe(toArray(function(array) { + t.deepEqual(array, ['foobar']); + + t.end(); + })); + }); + + t.test('falsey values', function(t) { + var r = new Readable({ + objectMode: true + }); + r._read = noop; + + r.push(false); + r.push(0); + r.push(''); + r.push(null); + + r.pipe(toArray(function(array) { + t.deepEqual(array, [false, 0, '']); + + t.end(); + })); + }); + + t.test('high watermark _read', function(t) { + var r = new Readable({ + highWaterMark: 6, + objectMode: true + }); + var calls = 0; + var list = ['1', '2', '3', '4', '5', '6', '7', '8']; + + r._read = function(n) { + calls++; + }; + + forEach(list, function(c) { + r.push(c); + }); + + var v = r.read(); + + t.equal(calls, 0); + t.equal(v, '1'); + + var v2 = r.read(); + t.equal(v2, '2'); + + var v3 = r.read(); + t.equal(v3, '3'); + + t.equal(calls, 1); + + t.end(); + }); + + t.test('high watermark push', function(t) { + var r = new Readable({ + highWaterMark: 6, + objectMode: true + }); + r._read = function(n) {}; + for (var i = 0; i < 6; i++) { + var bool = r.push(i); + t.equal(bool, i === 5 ? false : true); + } + + t.end(); + }); + + t.test('can write objects to stream', function(t) { + var w = new Writable({ objectMode: true }); + + w._write = function(chunk, encoding, cb) { + t.deepEqual(chunk, { foo: 'bar' }); + cb(); + }; + + w.on('finish', function() { + t.end(); + }); + + w.write({ foo: 'bar' }); + w.end(); + }); + + t.test('can write multiple objects to stream', function(t) { + var w = new Writable({ objectMode: true }); + var list = []; + + w._write = function(chunk, encoding, cb) { + list.push(chunk); + cb(); + }; + + w.on('finish', function() { + t.deepEqual(list, [0, 1, 2, 3, 4]); + + t.end(); + }); + + w.write(0); + w.write(1); + w.write(2); + w.write(3); + w.write(4); + w.end(); + }); + + t.test('can write strings as objects', function(t) { + var w = new Writable({ + objectMode: true + }); + var list = []; + + w._write = function(chunk, encoding, cb) { + list.push(chunk); + process.nextTick(cb); + }; + + w.on('finish', function() { + t.deepEqual(list, ['0', '1', '2', '3', '4']); + + t.end(); + }); + + w.write('0'); + w.write('1'); + w.write('2'); + w.write('3'); + w.write('4'); + w.end(); + }); + + t.test('buffers finish until cb is called', function(t) { + var w = new Writable({ + objectMode: true + }); + var called = false; + + w._write = function(chunk, encoding, cb) { + t.equal(chunk, 'foo'); + + process.nextTick(function() { + called = true; + cb(); + }); + }; + + w.on('finish', function() { + t.equal(called, true); + + t.end(); + }); + + w.write('foo'); + w.end(); + }); + + function forEach (xs, f) { + for (var i = 0, l = xs.length; i < l; i++) { + f(xs[i], i); + } + } +}; diff --git a/test/browser/test-stream2-pipe-error-handling.js b/test/browser/test-stream2-pipe-error-handling.js new file mode 100644 index 0000000000..dc91cc3f33 --- /dev/null +++ b/test/browser/test-stream2-pipe-error-handling.js @@ -0,0 +1,88 @@ +'use strict'; +var common = require('../common'); +var assert = require('assert'); +var stream = require('../../'); +module.exports = function (t) { + t.test('Error Listener Catches', function (t) { + var count = 1000; + + var source = new stream.Readable(); + source._read = function(n) { + n = Math.min(count, n); + count -= n; + source.push(new Buffer(n)); + }; + + var unpipedDest; + source.unpipe = function(dest) { + unpipedDest = dest; + stream.Readable.prototype.unpipe.call(this, dest); + }; + + var dest = new stream.Writable(); + dest._write = function(chunk, encoding, cb) { + cb(); + }; + + source.pipe(dest); + + var gotErr = null; + dest.on('error', function(err) { + gotErr = err; + }); + + var unpipedSource; + dest.on('unpipe', function(src) { + unpipedSource = src; + }); + + var err = new Error('This stream turned into bacon.'); + dest.emit('error', err); + t.strictEqual(gotErr, err); + t.strictEqual(unpipedSource, source); + t.strictEqual(unpipedDest, dest); + t.end(); + }); + + t.test('Error Without Listener Throws', function testErrorWithoutListenerThrows(t) { + var count = 1000; + + var source = new stream.Readable(); + source._read = function(n) { + n = Math.min(count, n); + count -= n; + source.push(new Buffer(n)); + }; + + var unpipedDest; + source.unpipe = function(dest) { + unpipedDest = dest; + stream.Readable.prototype.unpipe.call(this, dest); + }; + + var dest = new stream.Writable(); + dest._write = function(chunk, encoding, cb) { + cb(); + }; + + source.pipe(dest); + + var unpipedSource; + dest.on('unpipe', function(src) { + unpipedSource = src; + }); + + var err = new Error('This stream turned into bacon.'); + + var gotErr = null; + try { + dest.emit('error', err); + } catch (e) { + gotErr = e; + } + t.strictEqual(gotErr, err); + t.strictEqual(unpipedSource, source); + t.strictEqual(unpipedDest, dest); + t.end(); + }); +} diff --git a/test/browser/test-stream2-pipe-error-once-listener.js b/test/browser/test-stream2-pipe-error-once-listener.js new file mode 100644 index 0000000000..5f4a4e2686 --- /dev/null +++ b/test/browser/test-stream2-pipe-error-once-listener.js @@ -0,0 +1,41 @@ +'use strict'; +var common = require('../common'); + +var inherits = require('inherits'); +var stream = require('../../'); + +module.exports = function (t) { + t.test('pipe error once listener', function (t){ + t.plan(1); + var Read = function() { + stream.Readable.call(this); + }; + inherits(Read, stream.Readable); + + Read.prototype._read = function(size) { + this.push('x'); + this.push(null); + }; + + + var Write = function() { + stream.Writable.call(this); + }; + inherits(Write, stream.Writable); + + Write.prototype._write = function(buffer, encoding, cb) { + this.emit('error', new Error('boom')); + this.emit('alldone'); + }; + + var read = new Read(); + var write = new Write(); + + write.once('error', function(err) {}); + write.once('alldone', function(err) { + t.ok(true); + }); + + read.pipe(write); + }); +} diff --git a/test/browser/test-stream2-push.js b/test/browser/test-stream2-push.js new file mode 100644 index 0000000000..7ca5f39ef3 --- /dev/null +++ b/test/browser/test-stream2-push.js @@ -0,0 +1,120 @@ +'use strict'; +var common = require('../common'); +var stream = require('../../'); +var Readable = stream.Readable; +var Writable = stream.Writable; + + +var inherits = require('inherits'); +var EE = require('events').EventEmitter; +module.exports = function (t) { + +// a mock thing a bit like the net.Socket/tcp_wrap.handle interaction + t.test('push', function (t) { + var stream = new Readable({ + highWaterMark: 16, + encoding: 'utf8' + }); + + var source = new EE(); + + stream._read = function() { + //console.error('stream._read'); + readStart(); + }; + + var ended = false; + stream.on('end', function() { + ended = true; + }); + + source.on('data', function(chunk) { + var ret = stream.push(chunk); + //console.error('data', stream._readableState.length); + if (!ret) + readStop(); + }); + + source.on('end', function() { + stream.push(null); + }); + + var reading = false; + + function readStart() { + //console.error('readStart'); + reading = true; + } + + function readStop() { + //console.error('readStop'); + reading = false; + process.nextTick(function() { + var r = stream.read(); + if (r !== null) + writer.write(r); + }); + } + + var writer = new Writable({ + decodeStrings: false + }); + + var written = []; + + var expectWritten = + [ 'asdfgasdfgasdfgasdfg', + 'asdfgasdfgasdfgasdfg', + 'asdfgasdfgasdfgasdfg', + 'asdfgasdfgasdfgasdfg', + 'asdfgasdfgasdfgasdfg', + 'asdfgasdfgasdfgasdfg' ]; + + writer._write = function(chunk, encoding, cb) { + //console.error('WRITE %s', chunk); + written.push(chunk); + process.nextTick(cb); + }; + + writer.on('finish', finish); + + + // now emit some chunks. + + var chunk = 'asdfg'; + + var set = 0; + readStart(); + data(); + function data() { + t.ok(reading); + source.emit('data', chunk); + t.ok(reading); + source.emit('data', chunk); + t.ok(reading); + source.emit('data', chunk); + t.ok(reading); + source.emit('data', chunk); + t.notOk(reading); + if (set++ < 5) + setTimeout(data, 10); + else + end(); + } + + function finish() { + //console.error('finish'); + t.deepEqual(written, expectWritten); + t.end(); + } + + function end() { + source.emit('end'); + t.notOk(reading); + writer.end(stream.read()); + setTimeout(function() { + t.ok(ended); + }); + } + }); +}; diff --git a/test/browser/test-stream2-readable-empty-buffer-no-eof.js b/test/browser/test-stream2-readable-empty-buffer-no-eof.js new file mode 100644 index 0000000000..04c622ef93 --- /dev/null +++ b/test/browser/test-stream2-readable-empty-buffer-no-eof.js @@ -0,0 +1,91 @@ +'use strict'; +var common = require('../common'); + +var Readable = require('../../').Readable; + +module.exports = function (t) { + t.test('readable empty buffer no eof 1', function (t) { + t.plan(1); + var r = new Readable(); + + // should not end when we get a Buffer(0) or '' as the _read result + // that just means that there is *temporarily* no data, but to go + // ahead and try again later. + // + // note that this is very unusual. it only works for crypto streams + // because the other side of the stream will call read(0) to cycle + // data through openssl. that's why we set the timeouts to call + // r.read(0) again later, otherwise there is no more work being done + // and the process just exits. + + var buf = new Buffer(5); + buf.fill('x'); + var reads = 5; + r._read = function(n) { + switch (reads--) { + case 0: + return r.push(null); // EOF + case 1: + return r.push(buf); + case 2: + setTimeout(r.read.bind(r, 0), 50); + return r.push(new Buffer(0)); // Not-EOF! + case 3: + setTimeout(r.read.bind(r, 0), 50); + return process.nextTick(function() { + return r.push(new Buffer(0)); + }); + case 4: + setTimeout(r.read.bind(r, 0), 50); + return setTimeout(function() { + return r.push(new Buffer(0)); + }); + case 5: + return setTimeout(function() { + return r.push(buf); + }); + default: + throw new Error('unreachable'); + } + }; + + var results = []; + function flow() { + var chunk; + while (null !== (chunk = r.read())) + results.push(chunk + ''); + } + r.on('readable', flow); + r.on('end', function() { + results.push('EOF'); + t.deepEqual(results, [ 'xxxxx', 'xxxxx', 'EOF' ]); + }); + flow(); + + }); + + t.test('readable empty buffer no eof 2', function (t) { + t.plan(1); + var r = new Readable({ encoding: 'base64' }); + var reads = 5; + r._read = function(n) { + if (!reads--) + return r.push(null); // EOF + else + return r.push(new Buffer('x')); + }; + + var results = []; + function flow() { + var chunk; + while (null !== (chunk = r.read())) + results.push(chunk + ''); + } + r.on('readable', flow); + r.on('end', function() { + results.push('EOF'); + t.deepEqual(results, [ 'eHh4', 'eHg=', 'EOF' ]); + }); + flow(); + }); +} diff --git a/test/browser/test-stream2-readable-from-list.js b/test/browser/test-stream2-readable-from-list.js new file mode 100644 index 0000000000..62492668a0 --- /dev/null +++ b/test/browser/test-stream2-readable-from-list.js @@ -0,0 +1,66 @@ +'use strict'; +var common = require('../common'); +var fromList = require('../../lib/_stream_readable')._fromList; + + +module.exports = function (t) { + t.test('buffers', function(t) { + // have a length + var len = 16; + var list = [ new Buffer('foog'), + new Buffer('bark'), + new Buffer('bazy'), + new Buffer('kuel') ]; + + // read more than the first element. + var ret = fromList(6, { buffer: list, length: 16 }); + t.equal(ret.toString(), 'foogba'); + + // read exactly the first element. + ret = fromList(2, { buffer: list, length: 10 }); + t.equal(ret.toString(), 'rk'); + + // read less than the first element. + ret = fromList(2, { buffer: list, length: 8 }); + t.equal(ret.toString(), 'ba'); + + // read more than we have. + ret = fromList(100, { buffer: list, length: 6 }); + t.equal(ret.toString(), 'zykuel'); + + // all consumed. + t.same(list, []); + + t.end(); + }); + + t.test('strings', function(t) { + // have a length + var len = 16; + var list = [ 'foog', + 'bark', + 'bazy', + 'kuel' ]; + + // read more than the first element. + var ret = fromList(6, { buffer: list, length: 16, decoder: true }); + t.equal(ret, 'foogba'); + + // read exactly the first element. + ret = fromList(2, { buffer: list, length: 10, decoder: true }); + t.equal(ret, 'rk'); + + // read less than the first element. + ret = fromList(2, { buffer: list, length: 8, decoder: true }); + t.equal(ret, 'ba'); + + // read more than we have. + ret = fromList(100, { buffer: list, length: 6, decoder: true }); + t.equal(ret, 'zykuel'); + + // all consumed. + t.same(list, []); + + t.end(); + }); +} diff --git a/test/browser/test-stream2-readable-legacy-drain.js b/test/browser/test-stream2-readable-legacy-drain.js new file mode 100644 index 0000000000..7abfbc03fa --- /dev/null +++ b/test/browser/test-stream2-readable-legacy-drain.js @@ -0,0 +1,52 @@ +'use strict'; +var common = require('../common'); + +var Stream = require('../../'); +var Readable = require('../../').Readable; +module.exports = function (t) { + t.test('readable legacy drain', function (t) { + var r = new Readable(); + var N = 256; + var reads = 0; + r._read = function(n) { + return r.push(++reads === N ? null : new Buffer(1)); + }; + t.plan(2); + r.on('end', function() { + t.ok(true, 'rended'); + }); + + var w = new Stream(); + w.writable = true; + var writes = 0; + var buffered = 0; + w.write = function(c) { + writes += c.length; + buffered += c.length; + process.nextTick(drain); + return false; + }; + + function drain() { + if(buffered > 3) { + t.ok(false, 'to much buffer'); + } + buffered = 0; + w.emit('drain'); + } + + + w.end = function() { + t.ok(true, 'wended'); + }; + + // Just for kicks, let's mess with the drain count. + // This verifies that even if it gets negative in the + // pipe() cleanup function, we'll still function properly. + r.on('readable', function() { + w.emit('drain'); + }); + + r.pipe(w); +}); +} diff --git a/test/browser/test-stream2-readable-non-empty-end.js b/test/browser/test-stream2-readable-non-empty-end.js new file mode 100644 index 0000000000..14cf6bbd27 --- /dev/null +++ b/test/browser/test-stream2-readable-non-empty-end.js @@ -0,0 +1,57 @@ +'use strict'; +var common = require('../common'); +var Readable = require('../../lib/_stream_readable'); +module.exports = function (t) { + t.test('non empty end', function (t) { + t.plan(4); + var len = 0; + var chunks = new Array(10); + for (var i = 1; i <= 10; i++) { + chunks[i - 1] = new Buffer(i); + len += i; + } + + var test = new Readable(); + var n = 0; + test._read = function(size) { + var chunk = chunks[n++]; + setTimeout(function() { + test.push(chunk === undefined ? null : chunk); + }); + }; + + test.on('end', thrower); + function thrower() { + throw new Error('this should not happen!'); + } + + var bytesread = 0; + test.on('readable', function() { + var b = len - bytesread - 1; + var res = test.read(b); + if (res) { + bytesread += res.length; + //console.error('br=%d len=%d', bytesread, len); + setTimeout(next); + } + test.read(0); + }); + test.read(0); + + function next() { + // now let's make 'end' happen + test.removeListener('end', thrower); + + test.on('end', function() { + t.ok(true, 'end emitted'); + }); + + // one to get the last byte + var r = test.read(); + t.ok(r); + t.equal(r.length, 1); + r = test.read(); + t.equal(r, null); + } + }); +} diff --git a/test/browser/test-stream2-readable-wrap-empty.js b/test/browser/test-stream2-readable-wrap-empty.js new file mode 100644 index 0000000000..d13bbbadbe --- /dev/null +++ b/test/browser/test-stream2-readable-wrap-empty.js @@ -0,0 +1,24 @@ +'use strict'; +var common = require('../common'); + +var Readable = require('../../lib/_stream_readable'); +var EE = require('events').EventEmitter; +module.exports = function (t) { + t.test('wrap empty', function (t) { + t.plan(1); + var oldStream = new EE(); + oldStream.pause = function() {}; + oldStream.resume = function() {}; + + var newStream = new Readable().wrap(oldStream); + + newStream + .on('readable', function() {}) + .on('end', function() { + t.ok(true, 'ended'); + }); + + oldStream.emit('end'); + + }) +} diff --git a/test/browser/test-stream2-readable-wrap.js b/test/browser/test-stream2-readable-wrap.js new file mode 100644 index 0000000000..04f12b886c --- /dev/null +++ b/test/browser/test-stream2-readable-wrap.js @@ -0,0 +1,86 @@ +'use strict'; +var common = require('../common'); + +var Readable = require('../../lib/_stream_readable'); +var Writable = require('../../lib/_stream_writable'); +var EE = require('events').EventEmitter; +var run = 0; +function runTest(t, highWaterMark, objectMode, produce) { + t.test('run #' + (++run), function (t) { + var old = new EE(); + var r = new Readable({ highWaterMark: highWaterMark, + objectMode: objectMode }); + t.equal(r, r.wrap(old)); + + var ended = false; + r.on('end', function() { + ended = true; + }); + + old.pause = function() { + //console.error('old.pause()'); + old.emit('pause'); + flowing = false; + }; + + old.resume = function() { + //console.error('old.resume()'); + old.emit('resume'); + flow(); + }; + + var flowing; + var chunks = 10; + var oldEnded = false; + var expected = []; + function flow() { + flowing = true; + while (flowing && chunks-- > 0) { + var item = produce(); + expected.push(item); + //console.log('old.emit', chunks, flowing); + old.emit('data', item); + //console.log('after emit', chunks, flowing); + } + if (chunks <= 0) { + oldEnded = true; + //console.log('old end', chunks, flowing); + old.emit('end'); + } + } + + var w = new Writable({ highWaterMark: highWaterMark * 2, + objectMode: objectMode }); + var written = []; + w._write = function(chunk, encoding, cb) { + //console.log('_write', chunk); + written.push(chunk); + setTimeout(cb); + }; + + w.on('finish', function() { + performAsserts(); + }); + + r.pipe(w); + + flow(); + + function performAsserts() { + t.ok(ended); + t.ok(oldEnded); + t.deepEqual(written, expected); + t.end(); + } + }); +} +module.exports = function (t) { + t.test('readable wrap', function (t) { + runTest(t, 100, false, function() { return new Buffer(100); }); + runTest(t, 10, false, function() { return new Buffer('xxxxxxxxxx'); }); + runTest(t, 1, true, function() { return { foo: 'bar' }; }); + + var objectChunks = [ 5, 'a', false, 0, '', 'xyz', { x: 4 }, 7, [], 555 ]; + runTest(t, 1, true, function() { return objectChunks.shift(); }); + }); +} diff --git a/test/browser/test-stream2-set-encoding.js b/test/browser/test-stream2-set-encoding.js new file mode 100644 index 0000000000..b174f307f0 --- /dev/null +++ b/test/browser/test-stream2-set-encoding.js @@ -0,0 +1,317 @@ +'use strict'; +var common = require('../common'); +var R = require('../../lib/_stream_readable'); +var util = { + inherits: require('inherits') +}; + +// tiny node-tap lookalike. +module.exports = function (t) { + var test = t.test; + ///// + + util.inherits(TestReader, R); + + function TestReader(n, opts) { + R.call(this, opts); + + this.pos = 0; + this.len = n || 100; + } + + TestReader.prototype._read = function(n) { + setTimeout(function() { + + if (this.pos >= this.len) { + // double push(null) to test eos handling + this.push(null); + return this.push(null); + } + + n = Math.min(n, this.len - this.pos); + if (n <= 0) { + // double push(null) to test eos handling + this.push(null); + return this.push(null); + } + + this.pos += n; + var ret = new Buffer(n); + ret.fill('a'); + + //console.log('this.push(ret)', ret); + + return this.push(ret); + }.bind(this), 1); + }; + + test('setEncoding utf8', function(t) { + var tr = new TestReader(100); + tr.setEncoding('utf8'); + var out = []; + var expect = + [ 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa' ]; + + tr.on('readable', function flow() { + var chunk; + while (null !== (chunk = tr.read(10))) + out.push(chunk); + }); + + tr.on('end', function() { + t.same(out, expect); + t.end(); + }); + }); + + + test('setEncoding hex', function(t) { + var tr = new TestReader(100); + tr.setEncoding('hex'); + var out = []; + var expect = + [ '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161' ]; + + tr.on('readable', function flow() { + var chunk; + while (null !== (chunk = tr.read(10))) + out.push(chunk); + }); + + tr.on('end', function() { + t.same(out, expect); + t.end(); + }); + }); + + test('setEncoding hex with read(13)', function(t) { + var tr = new TestReader(100); + tr.setEncoding('hex'); + var out = []; + var expect = + [ '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '16161' ]; + + tr.on('readable', function flow() { + //console.log('readable once'); + var chunk; + while (null !== (chunk = tr.read(13))) + out.push(chunk); + }); + + tr.on('end', function() { + //console.log('END'); + t.same(out, expect); + t.end(); + }); + }); + + test('setEncoding base64', function(t) { + var tr = new TestReader(100); + tr.setEncoding('base64'); + var out = []; + var expect = + [ 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYQ==' ]; + + tr.on('readable', function flow() { + var chunk; + while (null !== (chunk = tr.read(10))) + out.push(chunk); + }); + + tr.on('end', function() { + t.same(out, expect); + t.end(); + }); + }); + + test('encoding: utf8', function(t) { + var tr = new TestReader(100, { encoding: 'utf8' }); + var out = []; + var expect = + [ 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa', + 'aaaaaaaaaa' ]; + + tr.on('readable', function flow() { + var chunk; + while (null !== (chunk = tr.read(10))) + out.push(chunk); + }); + + tr.on('end', function() { + t.same(out, expect); + t.end(); + }); + }); + + + test('encoding: hex', function(t) { + var tr = new TestReader(100, { encoding: 'hex' }); + var out = []; + var expect = + [ '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161', + '6161616161' ]; + + tr.on('readable', function flow() { + var chunk; + while (null !== (chunk = tr.read(10))) + out.push(chunk); + }); + + tr.on('end', function() { + t.same(out, expect); + t.end(); + }); + }); + + test('encoding: hex with read(13)', function(t) { + var tr = new TestReader(100, { encoding: 'hex' }); + var out = []; + var expect = + [ '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '1616161616161', + '6161616161616', + '16161' ]; + + tr.on('readable', function flow() { + var chunk; + while (null !== (chunk = tr.read(13))) + out.push(chunk); + }); + + tr.on('end', function() { + t.same(out, expect); + t.end(); + }); + }); + + test('encoding: base64', function(t) { + var tr = new TestReader(100, { encoding: 'base64' }); + var out = []; + var expect = + [ 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYWFhYWFh', + 'YWFhYWFhYW', + 'FhYQ==' ]; + + tr.on('readable', function flow() { + var chunk; + while (null !== (chunk = tr.read(10))) + out.push(chunk); + }); + + tr.on('end', function() { + t.same(out, expect); + t.end(); + }); + }); + + test('chainable', function(t) { + var tr = new TestReader(100); + t.equal(tr.setEncoding('utf8'), tr); + t.end(); + }); +} diff --git a/test/browser/test-stream2-transform.js b/test/browser/test-stream2-transform.js new file mode 100644 index 0000000000..921c951650 --- /dev/null +++ b/test/browser/test-stream2-transform.js @@ -0,0 +1,473 @@ +'use strict'; +var common = require('../common'); +var PassThrough = require('../../lib/_stream_passthrough'); +var Transform = require('../../lib/_stream_transform'); + +///// +module.exports = function (t) { + t.test('writable side consumption', function(t) { + var tx = new Transform({ + highWaterMark: 10 + }); + + var transformed = 0; + tx._transform = function(chunk, encoding, cb) { + transformed += chunk.length; + tx.push(chunk); + cb(); + }; + + for (var i = 1; i <= 10; i++) { + tx.write(new Buffer(i)); + } + tx.end(); + + t.equal(tx._readableState.length, 10); + t.equal(transformed, 10); + t.equal(tx._transformState.writechunk.length, 5); + t.same(tx._writableState.getBuffer().map(function(c) { + return c.chunk.length; + }), [6, 7, 8, 9, 10]); + + t.end(); + }); + + t.test('passthrough', function(t) { + var pt = new PassThrough(); + + pt.write(new Buffer('foog')); + pt.write(new Buffer('bark')); + pt.write(new Buffer('bazy')); + pt.write(new Buffer('kuel')); + pt.end(); + + t.equal(pt.read(5).toString(), 'foogb'); + t.equal(pt.read(5).toString(), 'arkba'); + t.equal(pt.read(5).toString(), 'zykue'); + t.equal(pt.read(5).toString(), 'l'); + t.end(); + }); + + t.test('object passthrough', function(t) { + var pt = new PassThrough({ objectMode: true }); + + pt.write(1); + pt.write(true); + pt.write(false); + pt.write(0); + pt.write('foo'); + pt.write(''); + pt.write({ a: 'b'}); + pt.end(); + + t.equal(pt.read(), 1); + t.equal(pt.read(), true); + t.equal(pt.read(), false); + t.equal(pt.read(), 0); + t.equal(pt.read(), 'foo'); + t.equal(pt.read(), ''); + t.same(pt.read(), { a: 'b'}); + t.end(); + }); + + t.test('simple transform', function(t) { + var pt = new Transform(); + pt._transform = function(c, e, cb) { + var ret = new Buffer(c.length); + ret.fill('x'); + pt.push(ret); + cb(); + }; + + pt.write(new Buffer('foog')); + pt.write(new Buffer('bark')); + pt.write(new Buffer('bazy')); + pt.write(new Buffer('kuel')); + pt.end(); + + t.equal(pt.read(5).toString(), 'xxxxx'); + t.equal(pt.read(5).toString(), 'xxxxx'); + t.equal(pt.read(5).toString(), 'xxxxx'); + t.equal(pt.read(5).toString(), 'x'); + t.end(); + }); + + t.test('simple object transform', function(t) { + var pt = new Transform({ objectMode: true }); + pt._transform = function(c, e, cb) { + pt.push(JSON.stringify(c)); + cb(); + }; + + pt.write(1); + pt.write(true); + pt.write(false); + pt.write(0); + pt.write('foo'); + pt.write(''); + pt.write({ a: 'b'}); + pt.end(); + + t.equal(pt.read(), '1'); + t.equal(pt.read(), 'true'); + t.equal(pt.read(), 'false'); + t.equal(pt.read(), '0'); + t.equal(pt.read(), '"foo"'); + t.equal(pt.read(), '""'); + t.equal(pt.read(), '{"a":"b"}'); + t.end(); + }); + + t.test('async passthrough', function(t) { + var pt = new Transform(); + pt._transform = function(chunk, encoding, cb) { + setTimeout(function() { + pt.push(chunk); + cb(); + }, 10); + }; + + pt.write(new Buffer('foog')); + pt.write(new Buffer('bark')); + pt.write(new Buffer('bazy')); + pt.write(new Buffer('kuel')); + pt.end(); + + pt.on('finish', function() { + t.equal(pt.read(5).toString(), 'foogb'); + t.equal(pt.read(5).toString(), 'arkba'); + t.equal(pt.read(5).toString(), 'zykue'); + t.equal(pt.read(5).toString(), 'l'); + t.end(); + }); + }); + + t.test('assymetric transform (expand)', function(t) { + var pt = new Transform(); + + // emit each chunk 2 times. + pt._transform = function(chunk, encoding, cb) { + setTimeout(function() { + pt.push(chunk); + setTimeout(function() { + pt.push(chunk); + cb(); + }, 10); + }, 10); + }; + + pt.write(new Buffer('foog')); + pt.write(new Buffer('bark')); + pt.write(new Buffer('bazy')); + pt.write(new Buffer('kuel')); + pt.end(); + + pt.on('finish', function() { + t.equal(pt.read(5).toString(), 'foogf'); + t.equal(pt.read(5).toString(), 'oogba'); + t.equal(pt.read(5).toString(), 'rkbar'); + t.equal(pt.read(5).toString(), 'kbazy'); + t.equal(pt.read(5).toString(), 'bazyk'); + t.equal(pt.read(5).toString(), 'uelku'); + t.equal(pt.read(5).toString(), 'el'); + t.end(); + }); + }); + + t.test('assymetric transform (compress)', function(t) { + var pt = new Transform(); + + // each output is the first char of 3 consecutive chunks, + // or whatever's left. + pt.state = ''; + + pt._transform = function(chunk, encoding, cb) { + if (!chunk) + chunk = ''; + var s = chunk.toString(); + setTimeout(function() { + this.state += s.charAt(0); + if (this.state.length === 3) { + pt.push(new Buffer(this.state)); + this.state = ''; + } + cb(); + }.bind(this), 10); + }; + + pt._flush = function(cb) { + // just output whatever we have. + pt.push(new Buffer(this.state)); + this.state = ''; + cb(); + }; + + pt.write(new Buffer('aaaa')); + pt.write(new Buffer('bbbb')); + pt.write(new Buffer('cccc')); + pt.write(new Buffer('dddd')); + pt.write(new Buffer('eeee')); + pt.write(new Buffer('aaaa')); + pt.write(new Buffer('bbbb')); + pt.write(new Buffer('cccc')); + pt.write(new Buffer('dddd')); + pt.write(new Buffer('eeee')); + pt.write(new Buffer('aaaa')); + pt.write(new Buffer('bbbb')); + pt.write(new Buffer('cccc')); + pt.write(new Buffer('dddd')); + pt.end(); + + // 'abcdeabcdeabcd' + pt.on('finish', function() { + t.equal(pt.read(5).toString(), 'abcde'); + t.equal(pt.read(5).toString(), 'abcde'); + t.equal(pt.read(5).toString(), 'abcd'); + t.end(); + }); + }); + + // this tests for a stall when data is written to a full stream + // that has empty transforms. + t.test('complex transform', function(t) { + var count = 0; + var saved = null; + var pt = new Transform({highWaterMark:3}); + pt._transform = function(c, e, cb) { + if (count++ === 1) + saved = c; + else { + if (saved) { + pt.push(saved); + saved = null; + } + pt.push(c); + } + + cb(); + }; + + pt.once('readable', function() { + process.nextTick(function() { + pt.write(new Buffer('d')); + pt.write(new Buffer('ef'), function() { + pt.end(); + t.end(); + }); + t.equal(pt.read().toString(), 'abcdef'); + t.equal(pt.read(), null); + }); + }); + + pt.write(new Buffer('abc')); + }); + + + t.test('passthrough event emission', function(t) { + var pt = new PassThrough(); + var emits = 0; + pt.on('readable', function() { + var state = pt._readableState; + //console.error('>>> emit readable %d', emits); + emits++; + }); + + var i = 0; + + pt.write(new Buffer('foog')); + + //console.error('need emit 0'); + pt.write(new Buffer('bark')); + + //console.error('should have emitted readable now 1 === %d', emits); + t.equal(emits, 1); + + t.equal(pt.read(5).toString(), 'foogb'); + t.equal(pt.read(5) + '', 'null'); + + //console.error('need emit 1'); + + pt.write(new Buffer('bazy')); + //console.error('should have emitted, but not again'); + pt.write(new Buffer('kuel')); + + //console.error('should have emitted readable now 2 === %d', emits); + t.equal(emits, 2); + + t.equal(pt.read(5).toString(), 'arkba'); + t.equal(pt.read(5).toString(), 'zykue'); + t.equal(pt.read(5), null); + + //console.error('need emit 2'); + + pt.end(); + + t.equal(emits, 3); + + t.equal(pt.read(5).toString(), 'l'); + t.equal(pt.read(5), null); + + //console.error('should not have emitted again'); + t.equal(emits, 3); + t.end(); + }); + + t.test('passthrough event emission reordered', function(t) { + var pt = new PassThrough(); + var emits = 0; + pt.on('readable', function() { + //console.error('emit readable', emits); + emits++; + }); + + pt.write(new Buffer('foog')); + //console.error('need emit 0'); + pt.write(new Buffer('bark')); + //console.error('should have emitted readable now 1 === %d', emits); + t.equal(emits, 1); + + t.equal(pt.read(5).toString(), 'foogb'); + t.equal(pt.read(5), null); + + //console.error('need emit 1'); + pt.once('readable', function() { + t.equal(pt.read(5).toString(), 'arkba'); + + t.equal(pt.read(5), null); + + //console.error('need emit 2'); + pt.once('readable', function() { + t.equal(pt.read(5).toString(), 'zykue'); + t.equal(pt.read(5), null); + pt.once('readable', function() { + t.equal(pt.read(5).toString(), 'l'); + t.equal(pt.read(5), null); + t.equal(emits, 4); + t.end(); + }); + pt.end(); + }); + pt.write(new Buffer('kuel')); + }); + + pt.write(new Buffer('bazy')); + }); + + t.test('passthrough facaded', function(t) { + //console.error('passthrough facaded'); + var pt = new PassThrough(); + var datas = []; + pt.on('data', function(chunk) { + datas.push(chunk.toString()); + }); + + pt.on('end', function() { + t.same(datas, ['foog', 'bark', 'bazy', 'kuel']); + t.end(); + }); + + pt.write(new Buffer('foog')); + setTimeout(function() { + pt.write(new Buffer('bark')); + setTimeout(function() { + pt.write(new Buffer('bazy')); + setTimeout(function() { + pt.write(new Buffer('kuel')); + setTimeout(function() { + pt.end(); + }, 10); + }, 10); + }, 10); + }, 10); + }); + + t.test('object transform (json parse)', function(t) { + //console.error('json parse stream'); + var jp = new Transform({ objectMode: true }); + jp._transform = function(data, encoding, cb) { + try { + jp.push(JSON.parse(data)); + cb(); + } catch (er) { + cb(er); + } + }; + + // anything except null/undefined is fine. + // those are "magic" in the stream API, because they signal EOF. + var objects = [ + { foo: 'bar' }, + 100, + 'string', + { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } + ]; + + var ended = false; + jp.on('end', function() { + ended = true; + }); + + forEach(objects, function(obj) { + jp.write(JSON.stringify(obj)); + var res = jp.read(); + t.same(res, obj); + }); + + jp.end(); + // read one more time to get the 'end' event + jp.read(); + + process.nextTick(function() { + t.ok(ended); + t.end(); + }); + }); + + t.test('object transform (json stringify)', function(t) { + //console.error('json parse stream'); + var js = new Transform({ objectMode: true }); + js._transform = function(data, encoding, cb) { + try { + js.push(JSON.stringify(data)); + cb(); + } catch (er) { + cb(er); + } + }; + + // anything except null/undefined is fine. + // those are "magic" in the stream API, because they signal EOF. + var objects = [ + { foo: 'bar' }, + 100, + 'string', + { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } + ]; + + var ended = false; + js.on('end', function() { + ended = true; + }); + + forEach(objects, function(obj) { + js.write(obj); + var res = js.read(); + t.equal(res, JSON.stringify(obj)); + }); + + js.end(); + // read one more time to get the 'end' event + js.read(); + + process.nextTick(function() { + t.ok(ended); + t.end(); + }); + }); + + function forEach (xs, f) { + for (var i = 0, l = xs.length; i < l; i++) { + f(xs[i], i); + } + } +}; diff --git a/test/browser/test-stream2-unpipe-drain.js b/test/browser/test-stream2-unpipe-drain.js new file mode 100644 index 0000000000..5fff9539cb --- /dev/null +++ b/test/browser/test-stream2-unpipe-drain.js @@ -0,0 +1,65 @@ +'use strict'; +var common = require('../common'); +var stream = require('../../'); + +var crypto = require('crypto'); + +var inherits = require('inherits'); +module.exports = function (t) { + t.test('unpipe drain', function (t) { + try { + crypto.randomBytes(9); + } catch(_) { + t.ok(true, 'does not suport random, skipping'); + return t.end(); + } + function TestWriter() { + stream.Writable.call(this); + } + inherits(TestWriter, stream.Writable); + + TestWriter.prototype._write = function(buffer, encoding, callback) { + //console.log('write called'); + // super slow write stream (callback never called) + }; + + var dest = new TestWriter(); + + function TestReader(id) { + stream.Readable.call(this); + this.reads = 0; + } + inherits(TestReader, stream.Readable); + + TestReader.prototype._read = function(size) { + this.reads += 1; + this.push(crypto.randomBytes(size)); + }; + + var src1 = new TestReader(); + var src2 = new TestReader(); + + src1.pipe(dest); + + src1.once('readable', function() { + process.nextTick(function() { + + src2.pipe(dest); + + src2.once('readable', function() { + process.nextTick(function() { + + src1.unpipe(dest); + }); + }); + }); + }); + + + dest.on('unpipe', function() { + t.equal(src1.reads, 2); + t.equal(src2.reads, 2); + t.end(); + }); + }); +} diff --git a/test/browser/test-stream2-writable.js b/test/browser/test-stream2-writable.js new file mode 100644 index 0000000000..6b43aef9d2 --- /dev/null +++ b/test/browser/test-stream2-writable.js @@ -0,0 +1,375 @@ +'use strict'; +var common = require('../common'); +var W = require('../../lib/_stream_writable'); +var D = require('../../lib/_stream_duplex'); + +var inherits = require('inherits'); +inherits(TestWriter, W); + +function TestWriter() { + W.apply(this, arguments); + this.buffer = []; + this.written = 0; +} + +TestWriter.prototype._write = function(chunk, encoding, cb) { + // simulate a small unpredictable latency + setTimeout(function() { + this.buffer.push(chunk.toString()); + this.written += chunk.length; + cb(); + }.bind(this), Math.floor(Math.random() * 10)); +}; +inherits(Processstdout, W); + +function Processstdout() { + W.apply(this, arguments); + this.buffer = []; + this.written = 0; +} + +Processstdout.prototype._write = function(chunk, encoding, cb) { + //console.log(chunk.toString()); + cb(); +}; +var chunks = new Array(50); +for (var i = 0; i < chunks.length; i++) { + chunks[i] = new Array(i + 1).join('x'); +} + +module.exports = function (t) { + var test = t.test; + + if (!process.stdout) { + process.stdout = new Processstdout(); + } + + test('write fast', function(t) { + var tw = new TestWriter({ + highWaterMark: 100 + }); + + tw.on('finish', function() { + t.same(tw.buffer, chunks, 'got chunks in the right order'); + t.end(); + }); + + forEach(chunks, function(chunk) { + // screw backpressure. Just buffer it all up. + tw.write(chunk); + }); + tw.end(); + }); + + test('write slow', function(t) { + var tw = new TestWriter({ + highWaterMark: 100 + }); + + tw.on('finish', function() { + t.same(tw.buffer, chunks, 'got chunks in the right order'); + t.end(); + }); + + var i = 0; + (function W() { + tw.write(chunks[i++]); + if (i < chunks.length) + setTimeout(W, 10); + else + tw.end(); + })(); + }); + + test('write backpressure', function(t) { + var tw = new TestWriter({ + highWaterMark: 50 + }); + + var drains = 0; + + tw.on('finish', function() { + t.same(tw.buffer, chunks, 'got chunks in the right order'); + t.equal(drains, 17); + t.end(); + }); + + tw.on('drain', function() { + drains++; + }); + + var i = 0; + (function W() { + do { + var ret = tw.write(chunks[i++]); + } while (ret !== false && i < chunks.length); + + if (i < chunks.length) { + t.ok(tw._writableState.length >= 50); + tw.once('drain', W); + } else { + tw.end(); + } + })(); + }); + + test('write bufferize', function(t) { + var tw = new TestWriter({ + highWaterMark: 100 + }); + + var encodings = + [ 'hex', + 'utf8', + 'utf-8', + 'ascii', + 'binary', + 'base64', + 'ucs2', + 'ucs-2', + 'utf16le', + 'utf-16le', + undefined ]; + + tw.on('finish', function() { + t.same(tw.buffer, chunks, 'got the expected chunks'); + }); + + forEach(chunks, function(chunk, i) { + var enc = encodings[ i % encodings.length ]; + chunk = new Buffer(chunk); + tw.write(chunk.toString(enc), enc); + }); + t.end(); + }); + + test('write no bufferize', function(t) { + var tw = new TestWriter({ + highWaterMark: 100, + decodeStrings: false + }); + + tw._write = function(chunk, encoding, cb) { + t.equals(typeof chunk, 'string'); + chunk = new Buffer(chunk, encoding); + return TestWriter.prototype._write.call(this, chunk, encoding, cb); + }; + + var encodings = + [ 'hex', + 'utf8', + 'utf-8', + 'ascii', + 'binary', + 'base64', + 'ucs2', + 'ucs-2', + 'utf16le', + 'utf-16le', + undefined ]; + + tw.on('finish', function() { + t.same(tw.buffer, chunks, 'got the expected chunks'); + }); + + forEach(chunks, function(chunk, i) { + var enc = encodings[ i % encodings.length ]; + chunk = new Buffer(chunk); + tw.write(chunk.toString(enc), enc); + }); + t.end(); + }); + + test('write callbacks', function(t) { + var callbacks = chunks.map(function(chunk, i) { + return [i, function(er) { + callbacks._called[i] = chunk; + }]; + }).reduce(function(set, x) { + set['callback-' + x[0]] = x[1]; + return set; + }, {}); + callbacks._called = []; + + var tw = new TestWriter({ + highWaterMark: 100 + }); + + tw.on('finish', function() { + process.nextTick(function() { + t.same(tw.buffer, chunks, 'got chunks in the right order'); + t.same(callbacks._called, chunks, 'called all callbacks'); + t.end(); + }); + }); + + forEach(chunks, function(chunk, i) { + tw.write(chunk, callbacks['callback-' + i]); + }); + tw.end(); + }); + + test('end callback', function(t) { + var tw = new TestWriter(); + tw.end(function() { + t.end(); + }); + }); + + test('end callback with chunk', function(t) { + var tw = new TestWriter(); + tw.end(new Buffer('hello world'), function() { + t.end(); + }); + }); + + test('end callback with chunk and encoding', function(t) { + var tw = new TestWriter(); + tw.end('hello world', 'ascii', function() { + t.end(); + }); + }); + + test('end callback after .write() call', function(t) { + var tw = new TestWriter(); + tw.write(new Buffer('hello world')); + tw.end(function() { + t.end(); + }); + }); + + test('end callback called after write callback', function(t) { + var tw = new TestWriter(); + var writeCalledback = false; + tw.write(new Buffer('hello world'), function() { + writeCalledback = true; + }); + tw.end(function() { + t.equal(writeCalledback, true); + t.end(); + }); + }); + + test('encoding should be ignored for buffers', function(t) { + var tw = new W(); + var hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb'; + tw._write = function(chunk, encoding, cb) { + t.equal(chunk.toString('hex'), hex); + t.end(); + }; + var buf = new Buffer(hex, 'hex'); + tw.write(buf, 'binary'); + }); + + test('writables are not pipable', function(t) { + var w = new W(); + w._write = function() {}; + var gotError = false; + w.on('error', function(er) { + gotError = true; + }); + w.pipe(process.stdout); + t.ok(gotError); + t.end(); + }); + + test('duplexes are pipable', function(t) { + var d = new D(); + d._read = function() {}; + d._write = function() {}; + var gotError = false; + d.on('error', function(er) { + gotError = true; + }); + d.pipe(process.stdout); + t.ok(!gotError); + t.end(); + }); + + test('end(chunk) two times is an error', function(t) { + var w = new W(); + w._write = function() {}; + var gotError = false; + w.on('error', function(er) { + gotError = true; + t.equal(er.message, 'write after end'); + }); + w.end('this is the end'); + w.end('and so is this'); + process.nextTick(function() { + t.ok(gotError); + t.end(); + }); + }); + + test('dont end while writing', function(t) { + var w = new W(); + var wrote = false; + w._write = function(chunk, e, cb) { + t.ok(!this.writing); + wrote = true; + this.writing = true; + setTimeout(function() { + this.writing = false; + cb(); + }); + }; + w.on('finish', function() { + t.ok(wrote); + t.end(); + }); + w.write(Buffer(0)); + w.end(); + }); + + test('finish does not come before write cb', function(t) { + var w = new W(); + var writeCb = false; + w._write = function(chunk, e, cb) { + setTimeout(function() { + writeCb = true; + cb(); + }, 10); + }; + w.on('finish', function() { + t.ok(writeCb); + t.end(); + }); + w.write(Buffer(0)); + w.end(); + }); + + test('finish does not come before sync _write cb', function(t) { + var w = new W(); + var writeCb = false; + w._write = function(chunk, e, cb) { + cb(); + }; + w.on('finish', function() { + t.ok(writeCb); + t.end(); + }); + w.write(Buffer(0), function(er) { + writeCb = true; + }); + w.end(); + }); + + test('finish is emitted if last chunk is empty', function(t) { + var w = new W(); + w._write = function(chunk, e, cb) { + process.nextTick(cb); + }; + w.on('finish', function() { + t.end(); + }); + w.write(Buffer(1)); + w.end(Buffer(0)); + }); + + function forEach (xs, f) { + for (var i = 0, l = xs.length; i < l; i++) { + f(xs[i], i); + } + } +} diff --git a/test/browser/test-stream3-pause-then-read.js b/test/browser/test-stream3-pause-then-read.js new file mode 100644 index 0000000000..c81e762263 --- /dev/null +++ b/test/browser/test-stream3-pause-then-read.js @@ -0,0 +1,150 @@ +'use strict'; +var common = require('../common'); + +var stream = require('../../'); +var Readable = stream.Readable; +var Writable = stream.Writable; + +module.exports = function (t){ + t.test('pause then read', function (t) { + var totalChunks = 100; + var chunkSize = 99; + var expectTotalData = totalChunks * chunkSize; + var expectEndingData = expectTotalData; + + var r = new Readable({ highWaterMark: 1000 }); + var chunks = totalChunks; + r._read = function(n) { + if (!(chunks % 2)) + setImmediate(push); + else if (!(chunks % 3)) + process.nextTick(push); + else + push(); + }; + + var totalPushed = 0; + function push() { + var chunk = chunks-- > 0 ? new Buffer(chunkSize) : null; + if (chunk) { + totalPushed += chunk.length; + chunk.fill('x'); + } + r.push(chunk); + } + + read100(); + + // first we read 100 bytes + function read100() { + readn(100, onData); + } + + function readn(n, then) { + //console.error('read %d', n); + expectEndingData -= n; + ;(function read() { + var c = r.read(n); + if (!c) + r.once('readable', read); + else { + t.equal(c.length, n); + t.notOk(r._readableState.flowing); + then(); + } + })(); + } + + // then we listen to some data events + function onData() { + expectEndingData -= 100; + //console.error('onData'); + var seen = 0; + r.on('data', function od(c) { + seen += c.length; + if (seen >= 100) { + // seen enough + r.removeListener('data', od); + r.pause(); + if (seen > 100) { + // oh no, seen too much! + // put the extra back. + var diff = seen - 100; + r.unshift(c.slice(c.length - diff)); + console.error('seen too much', seen, diff); + } + + // Nothing should be lost in between + setImmediate(pipeLittle); + } + }); + } + + // Just pipe 200 bytes, then unshift the extra and unpipe + function pipeLittle() { + expectEndingData -= 200; + //console.error('pipe a little'); + var w = new Writable(); + var written = 0; + w.on('finish', function() { + t.equal(written, 200); + setImmediate(read1234); + }); + w._write = function(chunk, encoding, cb) { + written += chunk.length; + if (written >= 200) { + r.unpipe(w); + w.end(); + cb(); + if (written > 200) { + var diff = written - 200; + written -= diff; + r.unshift(chunk.slice(chunk.length - diff)); + } + } else { + setImmediate(cb); + } + }; + r.pipe(w); + } + + // now read 1234 more bytes + function read1234() { + readn(1234, resumePause); + } + + function resumePause() { + //console.error('resumePause'); + // don't read anything, just resume and re-pause a whole bunch + r.resume(); + r.pause(); + r.resume(); + r.pause(); + r.resume(); + r.pause(); + r.resume(); + r.pause(); + r.resume(); + r.pause(); + setImmediate(pipe); + } + + + function pipe() { + //console.error('pipe the rest'); + var w = new Writable(); + var written = 0; + w._write = function(chunk, encoding, cb) { + written += chunk.length; + cb(); + }; + w.on('finish', function() { + //console.error('written', written, totalPushed); + t.equal(written, expectEndingData); + t.equal(totalPushed, expectTotalData); + t.end(); + }); + r.pipe(w); + } + }); +} diff --git a/test/common.js b/test/common.js index e2ef6257bb..c540b23de6 100644 --- a/test/common.js +++ b/test/common.js @@ -11,6 +11,15 @@ if (!global.clearImmediate) { } /**/ 'use strict'; + +/**/ +var objectKeys = objectKeys || function (obj) { + var keys = []; + for (var key in obj) keys.push(key); + return keys; +} +/**/ + var path = require('path'); var fs = require('fs'); var assert = require('assert'); @@ -36,6 +45,7 @@ var opensslCli = null; var inFreeBSDJail = null; var localhostIPv4 = null; +/**/if (!process.browser) { Object.defineProperty(exports, 'inFreeBSDJail', { get: function() { if (inFreeBSDJail !== null) return inFreeBSDJail; @@ -50,7 +60,10 @@ Object.defineProperty(exports, 'inFreeBSDJail', { return inFreeBSDJail; } }); +}/**/ + +/**/if (!process.browser) { Object.defineProperty(exports, 'localhostIPv4', { get: function() { if (localhostIPv4 !== null) return localhostIPv4; @@ -73,8 +86,11 @@ Object.defineProperty(exports, 'localhostIPv4', { return localhostIPv4; } }); +}/**/ + // opensslCli defined lazily to reduce overhead of spawnSync +/**/if (!process.browser) { Object.defineProperty(exports, 'opensslCli', {get: function() { if (opensslCli !== null) return opensslCli; @@ -95,10 +111,15 @@ Object.defineProperty(exports, 'opensslCli', {get: function() { } return opensslCli; }, enumerable: true }); +}/**/ + +/**/if (!process.browser) { Object.defineProperty(exports, 'hasCrypto', {get: function() { return process.versions.openssl ? true : false; }}); +}/**/ + if (process.platform === 'win32') { exports.PIPE = '\\\\.\\pipe\\libuv-test'; @@ -125,7 +146,7 @@ if (process.platform === 'win32') { } var ifaces = os.networkInterfaces(); -exports.hasIPv6 = Object.keys(ifaces).some(function(name) { +exports.hasIPv6 = objectKeys(ifaces).some(function(name) { return /lo/.test(name) && ifaces[name].some(function(info) { return info.family === 'IPv6'; }); @@ -245,6 +266,7 @@ if (global.LTTNG_HTTP_SERVER_RESPONSE) { knownGlobals.push(LTTNG_NET_SERVER_CONNECTION); } +/**/if (!process.browser) { if (global.ArrayBuffer) { knownGlobals.push(ArrayBuffer); knownGlobals.push(Int8Array); @@ -258,6 +280,8 @@ if (global.ArrayBuffer) { knownGlobals.push(Float64Array); knownGlobals.push(DataView); } +}/**/ + // Harmony features. if (global.Proxy) {