Skip to content

Commit

Permalink
feat: correct encoding and channel closing
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 2e608fd commit 19f0962
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 83 deletions.
6 changes: 2 additions & 4 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ class Channel extends EE {
this._id,
this._initiator
? consts.type.OUT_CLOSE
: consts.type.IN_CLOSE,
''
: consts.type.IN_CLOSE
])
}

Expand All @@ -172,8 +171,7 @@ class Channel extends EE {
this._id,
this._initiator
? consts.type.OUT_RESET
: consts.type.IN_RESET,
''
: consts.type.IN_RESET
])
}
}
Expand Down
17 changes: 7 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Plex extends EE {

this._chandata = pushable((err) => {
if (this._destroyed) { return }
this.destroy(err || new Error('Underlying stream has been closed'))
this.destroy()
})

if (onChan) {
Expand Down Expand Up @@ -67,11 +67,11 @@ class Plex extends EE {
return this._initiator
}

get initiator () {
return this._initiator
}

destroy (err) {
this._destroyed = true
err = err || new Error('Underlying stream has been closed')
this._chandata.end(err)

// propagate close to channels
Object
.keys(this._channels)
Expand All @@ -85,14 +85,11 @@ class Plex extends EE {
delete this._channels[id]
})

this._destroyed = true
this._chandata.end(err) // close source

if (err) {
return setImmediate(() => this.emit('error', err))
setImmediate(() => this.emit('error', err))
}

this.emit('close', err)
this.emit('close')
}

push (data) {
Expand Down
28 changes: 19 additions & 9 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ const through = require('pull-through')
exports.encode = () => {
return pull(
through(function (msg) {
const data = Buffer.concat([
Buffer.from(varint.encode(msg[0] << 3 | msg[1])),
Buffer.from(varint.encode(Buffer.byteLength(msg[2]))),
Buffer.from(msg[2])
])
this.queue(data)
const seq = [Buffer.from(varint.encode(msg[0] << 3 | msg[1]))]

if (msg[2]) {
seq.push(Buffer.from(varint.encode(Buffer.byteLength(msg[2]))))
seq.push(Buffer.from(msg[2]))
} else {
seq.push(Buffer.from(varint.encode(0)))
}

this.queue(Buffer.concat(seq))
})
)
}
Expand All @@ -24,12 +28,18 @@ exports.decode = () => {
let offset = 0
const h = varint.decode(msg)
offset += varint.decode.bytes
const length = varint.decode(msg, offset)
offset += varint.decode.bytes
let length
let data
try {
length = varint.decode(msg, offset)
offset += varint.decode.bytes
data = msg.slice(offset, offset + length)
} catch (err) {} // ignore if data is empty

const decoded = {
id: h >> 3,
type: h & 7,
data: msg.slice(offset, offset + length)
data
}

return [msg.slice(offset + length), decoded]
Expand Down
88 changes: 28 additions & 60 deletions test/plex.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(require('chai-checkmark'))
chai.use(dirtyChai)

const pull = require('pull-stream')
Expand All @@ -19,66 +20,33 @@ const consts = require('../src/consts')
const series = require('async/series')

describe('plex', () => {
// it('should be writable', (done) => {
// const plex = new Plex(false)
//
// plex.on('stream', (stream) => {
// pull(pull.values([Buffer.from('hellooooooooooooo')]), stream)
// })
//
// utils.encodeMsg(3,
// consts.type.NEW,
// Buffer.from('chan1'),
// (err, msg) => {
// expect(err).to.not.exist()
// pull(
// pull.values([msg]),
// plex,
// pull.drain((_data) => {
// expect(err).to.not.exist()
// utils.decodeMsg(_data, (err, data) => {
// expect(err).to.not.exist()
// const { id, type } = data[0]
// expect(id).to.eql(3)
// expect(type).to.eql(consts.type.IN_MESSAGE)
// expect(data[1]).to.deep.eql(Buffer.from('hellooooooooooooo'))
// done()
// })
// })
// )
// })
// })
//
// it('should be readable', (done) => {
// const plex = new Plex(true)
//
// plex.on('stream', (stream) => {
// pull(
// stream,
// // drain, because otherwise we have to send an explicit close
// pull.drain((data) => {
// expect(data).to.deep.eql(Buffer.from('hellooooooooooooo'))
// done()
// })
// )
// })
//
// series([
// (cb) => utils.encodeMsg(3,
// consts.type.NEW,
// Buffer.from('chan1'), cb),
// (cb) => utils.encodeMsg(3,
// consts.type.IN_MESSAGE,
// Buffer.from('hellooooooooooooo'),
// cb)
// ], (err, msgs) => {
// expect(err).to.not.exist()
// pull(
// pull.values(msgs),
// plex
// )
// })
// })
it.only(`destroy should close both ends`, (done) => {
const p = pair()

const plex1 = new Plex(true)
const plex2 = new Plex(false)

pull(plex1, p[0], plex1)
pull(plex2, p[1], plex2)

expect(4).check(done)

const errHandler = (err) => {
expect(err.message).to.be.eql('Underlying stream has been closed').mark()
}
plex1.on('error', errHandler)
plex2.on('error', errHandler)

plex2.on('close', () => {
expect().mark()
})

plex2.on('close', () => {
expect().mark()
})

plex1.destroy()
})

it(`channel id should be correct`, () => [1, 0].forEach((type) => {
const initiator = Boolean(type)
Expand Down
21 changes: 21 additions & 0 deletions test/utils.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,25 @@ describe('utils', () => {
)
})

it('encodes zero length body msg', () => {
pull(
pull.values([[17, 0]]),
utils.encode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0]).to.be.eql(Buffer.from('880100', 'hex'))
})
)
})

it('decodes zero length body msg', () => {
pull(
pull.values([Buffer.from('880100', 'hex')]),
utils.decode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0]).to.be.eql({ id: 17, type: 0, data: Buffer.alloc(0) })
})
)
})
})

0 comments on commit 19f0962

Please sign in to comment.