Skip to content

Commit

Permalink
standardize backpressure mechanism
Browse files Browse the repository at this point in the history
replace custom backpressure mechanism that involved queueing write
requests with pausing/resuming source stream

that seems to solve incomplete/unfinished part stream issues that
started appearing in node 4.4.5 and later

probably related to nodejs/node#7278
although not fixed in 4.4.7 as this issue is supposed to be
  • Loading branch information
pirxpilot committed Dec 1, 2016
1 parent e42a415 commit e6e93d9
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions lib/Dicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,23 @@ function Dicer(cfg) {
this._firstWrite = true;
this._inHeader = true;
this._part = undefined;
this._cb = undefined;
this._ignoreData = false;
this._partOpts = (typeof cfg.partHwm === 'number'
? { highWaterMark: cfg.partHwm }
: {});
this._pause = false;

this._hparser = new HeaderParser(cfg);
this._hparser.on('header', function(header) {
self._inHeader = false;
self._part.emit('header', header);
});

this.on('pipe', function(source) {
this._source = source;
});

this.on('unpipe', function() {
this._source = undefined;
});
}
inherits(Dicer, WritableStream);

Expand Down Expand Up @@ -114,10 +118,7 @@ Dicer.prototype._write = function(data, encoding, cb) {

this._bparser.push(data);

if (this._pause)
this._cb = cb;
else
cb();
cb();
};

Dicer.prototype.reset = function() {
Expand Down Expand Up @@ -179,7 +180,7 @@ Dicer.prototype._oninfo = function(isMatch, data, start, end) {
this._justMatched = false;
if (!this._part) {
this._part = partStream(this._partOpts, function() {
self._unpause();
self._sourceResume();
});
ev = this._isPreamble ? 'preamble' : 'part';
if (this._events[ev])
Expand All @@ -195,7 +196,7 @@ Dicer.prototype._oninfo = function(isMatch, data, start, end) {
shouldWriteMore = this._part.push(buf);
shouldWriteMore = this._part.push(data.slice(start, end));
if (!shouldWriteMore)
this._pause = true;
this._sourcePause();
} else if (!this._isPreamble && this._inHeader) {
if (buf)
this._hparser.push(buf);
Expand All @@ -217,7 +218,9 @@ Dicer.prototype._oninfo = function(isMatch, data, start, end) {
self.emit('finish');
self._realFinish = false;
} else {
self._unpause();
if (self._source) {
self._sourceResume();
}
}
}
});
Expand All @@ -230,15 +233,15 @@ Dicer.prototype._oninfo = function(isMatch, data, start, end) {
}
};

Dicer.prototype._unpause = function() {
if (!this._pause)
return;
Dicer.prototype._sourcePause = function() {
if (this._source) {
this._source.pause();
}
};

this._pause = false;
if (this._cb) {
var cb = this._cb;
this._cb = undefined;
cb();
Dicer.prototype._sourceResume = function() {
if (this._source) {
this._source.resume();
}
};

Expand Down

0 comments on commit e6e93d9

Please sign in to comment.