Skip to content

Commit

Permalink
Support backpressure
Browse files Browse the repository at this point in the history
Fixes #18
  • Loading branch information
feross committed Apr 24, 2015
1 parent 842cf42 commit 43583fb
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ function Peer (opts) {
if (!(self instanceof Peer)) return new Peer(opts)
if (!opts) opts = {}

opts.allowHalfOpen = false
stream.Duplex.call(self, opts)

extend(self, {
initiator: false,
stream: false,
config: Peer.config,
constraints: Peer.constraints,
channelName: (opts && opts.initiator) ? hat(160) : null,
trickle: true
trickle: true,
allowHalfOpen: false, // duplex stream option
highWaterMark: 1024 * 1024 // duplex stream option
}, opts)

stream.Duplex.call(self, opts)

var wrtc = opts.wrtc || getBrowserRTC()
if (!wrtc && typeof window === 'undefined') {
throw new Error('Missing WebRTC support - You must supply ' +
Expand Down Expand Up @@ -94,7 +95,7 @@ function Peer (opts) {
// When local peer is finished writing, close connection to remote peer.
// Half open connections are currently not supported.
// Wait a bit before destroying so the datachannel flushes.
// TODO: does datachannels have a .end() that flushes and closes?
// TODO: is there a more reliable way to accomplish this?
setTimeout(function () {
self._destroy()
}, 100)
Expand All @@ -109,6 +110,15 @@ function Peer (opts) {
})
}
})

self._interval = setInterval(function () {
if (!self._cb || !self._channel || self._channel.bufferedAmount) return
self._debug('removing backpressure')
var cb = self._cb
self._cb = null
cb()
}, 150)
if (self._interval.unref) self._interval.unref()
}

/**
Expand Down Expand Up @@ -178,6 +188,9 @@ Peer.prototype._destroy = function (err, onclose) {
self._pcReady = false
self._channelReady = false

clearInterval(self._interval)
self._interval = null

if (self._pc) {
try {
self._pc.close()
Expand All @@ -200,7 +213,7 @@ Peer.prototype._destroy = function (err, onclose) {
self._pc = null
self._channel = null

this.readable = this.writable = false
self.readable = self.writable = false

if (!self._readableState.ended) self.push(null)
if (!self._writableState.finished) self.end()
Expand Down Expand Up @@ -251,7 +264,12 @@ Peer.prototype._write = function (chunk, encoding, cb) {
self._channel.send(JSON.stringify(chunk))
}

cb(null)
if (self._channel.bufferedAmount) {
self._debug('applying backpressure (bufferedAmount $s)', self._channel.bufferedAmount)
self._cb = cb
} else {
cb(null)
}
}

Peer.prototype._createOffer = function () {
Expand Down

0 comments on commit 43583fb

Please sign in to comment.