diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index 196ed3496970c8..e05a4f31332126 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -5,6 +5,7 @@ const util = require('util'); const Socket = require('net').Socket; const JSStream = process.binding('js_stream').JSStream; const uv = process.binding('uv'); +const debug = util.debuglog('stream_wrap'); function StreamWrap(stream) { const handle = new JSStream(); @@ -15,6 +16,7 @@ function StreamWrap(stream) { const self = this; handle.close = function(cb) { + debug('close'); self.doClose(cb); }; handle.isAlive = function() { @@ -40,18 +42,23 @@ function StreamWrap(stream) { this.stream.on('error', function(err) { self.emit('error', err); }); - - Socket.call(this, { - handle: handle - }); - this.stream.on('data', function(chunk) { - if (self._handle) - self._handle.readBuffer(chunk); + setImmediate(function() { + debug('data', chunk.length); + if (self._handle) + self._handle.readBuffer(chunk); + }); }); this.stream.once('end', function() { - if (self._handle) - self._handle.emitEOF(); + setImmediate(function() { + debug('end'); + if (self._handle) + self._handle.emitEOF(); + }); + }); + + Socket.call(this, { + handle: handle }); } util.inherits(StreamWrap, Socket); @@ -61,11 +68,11 @@ module.exports = StreamWrap; StreamWrap.StreamWrap = StreamWrap; StreamWrap.prototype.isAlive = function isAlive() { - return this.readable && this.writable; + return true; }; StreamWrap.prototype.isClosing = function isClosing() { - return !this.isAlive(); + return !this.readable || !this.writable; }; StreamWrap.prototype.readStart = function readStart() { @@ -79,11 +86,16 @@ StreamWrap.prototype.readStop = function readStop() { }; StreamWrap.prototype.doShutdown = function doShutdown(req) { + const self = this; const handle = this._handle; + const item = this._enqueue('shutdown', req); this.stream.end(function() { // Ensure that write was dispatched setImmediate(function() { + if (!self._dequeue(item)) + return; + handle.finishShutdown(req, 0); }); }); @@ -97,7 +109,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) { var pending = bufs.length; // Queue the request to be able to cancel it - self._enqueue(req); + const item = self._enqueue('write', req); self.stream.cork(); bufs.forEach(function(buf) { @@ -115,7 +127,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) { // Ensure that write was dispatched setImmediate(function() { // Do not invoke callback twice - if (!self._dequeue(req)) + if (!self._dequeue(item)) return; var errCode = 0; @@ -134,39 +146,47 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) { return 0; }; -StreamWrap.prototype._enqueue = function enqueue(req) { +function QueueItem(type, req) { + this.type = type; + this.req = req; + this.prev = this; + this.next = this; +} + +StreamWrap.prototype._enqueue = function enqueue(type, req) { + const item = new QueueItem(type, req); if (this._queue === null) { - this._queue = req; - req._prev = req; - req._next = req; - return; + this._queue = item; + return item; } - req._next = this._queue._next; - req._prev = this._queue; - req._next._prev = req; - req._prev._next = req; + item.next = this._queue.next; + item.prev = this._queue; + item.next.prev = item; + item.prev.next = item; + + return item; }; -StreamWrap.prototype._dequeue = function dequeue(req) { - var next = req._next; - var prev = req._prev; +StreamWrap.prototype._dequeue = function dequeue(item) { + var next = item.next; + var prev = item.prev; if (next === null && prev === null) return false; - req._next = null; - req._prev = null; + item.next = null; + item.prev = null; - if (next === req) { + if (next === item) { prev = null; next = null; } else { - prev._next = next; - next._prev = prev; + prev.next = next; + next.prev = prev; } - if (this._queue === req) + if (this._queue === item) this._queue = next; return true; @@ -178,12 +198,17 @@ StreamWrap.prototype.doClose = function doClose(cb) { setImmediate(function() { while (self._queue !== null) { - const req = self._queue; - self._dequeue(req); + const item = self._queue; + const req = item.req; + self._dequeue(item); const errCode = uv.UV_ECANCELED; - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); + if (item.type === 'write') { + handle.doAfterWrite(req); + handle.finishWrite(req, errCode); + } else if (item.type === 'shutdown') { + handle.finishShutdown(req, errCode); + } } // Should be already set by net.js diff --git a/test/parallel/test-stream-wrap.js b/test/parallel/test-stream-wrap.js index cc28a344eb1e35..e7a7ecddd2385d 100644 --- a/test/parallel/test-stream-wrap.js +++ b/test/parallel/test-stream-wrap.js @@ -6,41 +6,34 @@ const StreamWrap = require('_stream_wrap'); const Duplex = require('stream').Duplex; const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap; +var done = false; + function testShutdown(callback) { var stream = new Duplex({ read: function() { }, - write: function(data, enc, callback) { - callback(null); + write: function() { } }); var wrap = new StreamWrap(stream); var req = new ShutdownWrap(); - req.oncomplete = function() {}; + req.oncomplete = function(code) { + assert(code < 0); + callback(); + }; req.handle = wrap._handle; - wrap._handle.shutdown(req); + // Close the handle to simulate wrap.destroy(); - - process.nextTick(callback); -} - -function testReadAfterClose(callback) { - var stream = new Duplex({ - read: function() { - }, - write: function(data, enc, callback) { - callback(null); - } - }); - stream.push('data'); - stream.push(null); - - var wrap = new StreamWrap(stream); + req.handle.shutdown(req); } testShutdown(function() { - testReadAfterClose(); + done = true; +}); + +process.on('exit', function() { + assert(done); });