You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Platform: (in fact whatever) Darwin hacker-machine 18.5.0 Darwin Kernel Version 18.5.0: Mon Mar 11 20:40:32 PDT 2019; root:xnu-4903.251.3~3/RELEASE_X86_64 x86_64
Subsystem: stream
I was working on a streaming protocol encoder/decoder, when I write test for it, I found something not right.
To make things clearly, I simplify the logic, these code can recurrent the problem:
send <Buffer 01>
get data <Buffer 01>
send <Buffer 02>
get data <Buffer 02>
send <Buffer 03>
send <Buffer 04>
get data <Buffer 03>
but what I expect is:
send <Buffer 01>
get data <Buffer 01>
send <Buffer 02>
get data <Buffer 02>
send <Buffer 03>
send <Buffer 04>
get data <Buffer 03>
get data <Buffer 04>
I guess it may be some problem related to eventloop tick. So I add process.nextTick:
we can notice that once the awaitDrain got greater than 1 on single pipe target, it's incorrect.
I then dig into the source code, and I know why.
In the first code block above, after first encoder.write() finished, encoder's readable side has been paused because the decoder's writable side write() return false. and next encoder.write() buffer the data in encoder's readable side and wait for a "drain" event in next tick from decoder's writable side.
When the first "drain" event emitted, encoder's readable side become flowing again, and "data" listener set by pipe() was called to dest.write().
From the output debug log and the source code, we can see the .write() call is synchronous, so before the write finished, in decoder's "data" listener, add additional 2 encoder.write() call, but at this time, encoder's writeable side's writing property has become false (because after data buffer in encoder's readable side, onwrite() was called to set writing to false), and so, the third encoder.write() write successfully to encoder's readable side, and
run into encoder's "data" event listener again, here because dest.write() return false, so it make the encoder's readable side paused, and set state.awaitDrain++, and when the third encoder.write() finished, return to the previous encoder's "data" listener, and it found dest.write() (which just finished) return false, so it increase the state.awaitDrain again! Now that we have a wrong counter. After that, we return to the last encoder.write(), and buffered in encoder's readable side again wait for decoder draining. next tick decoder drained, and encoder's state.awaitDrain--, but it was > 1 before, so it never could be 0, and the encoder's readable side was blocked forever.
The problem code is simplified as below:
src.on("data",(d)=>{// in second `encoder.write()`constret=dest.write(d)// inside the above dest.write() in third `encoder.write()`{constret=dest.write(d)if(!ret){state.awaitDrain++;state.pause()}}
...
if(!ret){state.awaitDrain++;state.pause()}})
It seems the stream API document doesn't mention about this behavior.
There'r dozen of solution to fix this, But in fact, except puting .write() out of current tick, a better solution I thought is to keep the sycn behavior and prevent awaitDrain to increase by accident.
The text was updated successfully, but these errors were encountered:
I was working on a streaming protocol encoder/decoder, when I write test for it, I found something not right.
To make things clearly, I simplify the logic, these code can recurrent the problem:
the output is:
but what I expect is:
I guess it may be some problem related to eventloop tick. So I add
process.nextTick
:And this time it output what I expect.
Use
NODE_DEBUG=stream
to see what happened:we can notice that once the
awaitDrain
got greater than 1 on single pipe target, it's incorrect.I then dig into the source code, and I know why.
In the first code block above, after first
encoder.write()
finished, encoder's readable side has been paused because the decoder's writable sidewrite()
return false. and nextencoder.write()
buffer the data in encoder's readable side and wait for a"drain"
event in next tick from decoder's writable side.When the first
"drain"
event emitted, encoder's readable side become flowing again, and"data"
listener set bypipe()
was called todest.write()
.From the output debug log and the source code, we can see the
.write()
call is synchronous, so before the write finished, in decoder's"data"
listener, add additional 2encoder.write()
call, but at this time, encoder's writeable side'swriting
property has becomefalse
(because after data buffer in encoder's readable side,onwrite()
was called to setwriting
tofalse
), and so, the thirdencoder.write()
write successfully to encoder's readable side, andrun into encoder's "data" event listener again, here because
dest.write()
return false, so it make the encoder's readable side paused, and setstate.awaitDrain++
, and when the thirdencoder.write()
finished, return to the previous encoder's "data" listener, and it founddest.write()
(which just finished) return false, so it increase thestate.awaitDrain
again! Now that we have a wrong counter. After that, we return to the lastencoder.write()
, and buffered in encoder's readable side again wait for decoder draining. next tick decoder drained, and encoder'sstate.awaitDrain--
, but it was > 1 before, so it never could be 0, and the encoder's readable side was blocked forever.The problem code is simplified as below:
It seems the stream API document doesn't mention about this behavior.
There'r dozen of solution to fix this, But in fact, except puting
.write()
out of current tick, a better solution I thought is to keep the sycn behavior and preventawaitDrain
to increase by accident.The text was updated successfully, but these errors were encountered: