Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
follow new transport and connection spec
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Jun 20, 2016
1 parent 0ee73f0 commit 2dab331
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 53 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"mafmt": "^2.1.0",
"multiaddr": "^2.0.2",
"run-parallel": "^1.1.6",
"simple-websocket": "github:diasdavid/simple-websocket#ec31437"
"simple-websocket": "^4.1.0",
"simple-websocket-server": "^0.1.1"
},
"devDependencies": {
"chai": "^3.5.0",
Expand All @@ -53,4 +54,4 @@
"Francisco Baio Dias <xicombd@gmail.com>",
"Friedel Ziegelmayer <dignifiedquire@gmail.com>"
]
}
}
107 changes: 68 additions & 39 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

const debug = require('debug')
const log = debug('libp2p:websockets')
const SWS = require('simple-websocket')
const SW = require('simple-websocket')
const SWS = require('simple-websocket-server')
const mafmt = require('mafmt')
const parallel = require('run-parallel')
const contains = require('lodash.contains')
const multiaddr = require('multiaddr')

const CLOSE_TIMEOUT = 2000
// const IPFS_CODE = 421

exports = module.exports = WebSockets

Expand All @@ -14,66 +18,91 @@ function WebSockets () {
return new WebSockets()
}

const listeners = []

this.dial = function (multiaddr, options) {
if (!options) {
this.dial = function (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

options.ready = options.ready || function noop () {}
const maOpts = multiaddr.toOptions()
const conn = new SWS('ws://' + maOpts.host + ':' + maOpts.port)
conn.on('connect', options.ready)
if (!callback) {
callback = function noop () {}
}

const maOpts = ma.toOptions()

// TODO make it a real Connection
const conn = new SW('ws://' + maOpts.host + ':' + maOpts.port)
conn.on('connect', callback)

conn.getObservedAddrs = () => {
return [multiaddr]
}

return conn
}

this.createListener = (multiaddrs, options, handler, callback) => {
this.createListener = (options, handler) => {
if (typeof options === 'function') {
callback = handler
handler = options
options = {}
}

if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
const listener = SWS.createServer((conn) => {
// TODO make it a real Connection
conn.getObservedAddrs = () => {
return [] // TODO think if it makes sense for WebSockets
}
handler(conn)
})

var count = 0
let listeningMultiaddr

multiaddrs.forEach((m) => {
if (contains(m.protoNames(), 'ipfs')) {
m = m.decapsulate('ipfs')
listener._listen = listener.listen
listener.listen = (ma, callback) => {
if (!callback) {
callback = function noop () {}
}

const listener = SWS.createServer((conn) => {
conn.getObservedAddrs = () => {
return [] // TODO think if it makes sense for WebSockets
}
handler(conn)
})
listeningMultiaddr = ma

listener.listen(m.toOptions().port, () => {
if (++count === multiaddrs.length) {
callback()
}
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}

listener._listen(ma.toOptions(), callback)
}

listener._close = listener.close
listener.close = (options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
if (!callback) { callback = function noop () {} }
if (!options) { options = {} }

let closed = false
listener._close(callback)
listener.once('close', () => {
closed = true
})
listeners.push(listener)
})
}
setTimeout(() => {
if (closed) {
return
}
log('unable to close graciously, destroying conns')
Object.keys(listener.__connections).forEach((key) => {
log('destroying %s', key)
listener.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)
}

this.close = (callback) => {
if (listeners.length === 0) {
log('Called close with no active listeners')
return callback()
listener.getAddrs = (callback) => {
callback(null, listeningMultiaddr)
}

parallel(listeners.map((listener) => {
return (cb) => listener.close(cb)
}), callback)
return listener
}

this.filter = (multiaddrs) => {
Expand Down
148 changes: 136 additions & 12 deletions test/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,142 @@

const expect = require('chai').expect
const multiaddr = require('multiaddr')
const WSlibp2p = require('../src')
const WS = require('../src')

describe('instantiate the transport', () => {
it('create', (done) => {
const ws = new WS()
expect(ws).to.exist
done()
})

it('create without new', (done) => {
const ws = WS()
expect(ws).to.exist
done()
})
})

describe('listen', () => {
let ws
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')

beforeEach(() => {
ws = new WS()
})

it('listen, check for callback', (done) => {
const listener = ws.createListener((conn) => {})
listener.listen(ma, () => {
listener.close(done)
})
})

it('listen, check for listening event', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090')
const listener = ws.createListener((conn) => {})
listener.on('listening', () => {
listener.close(done)
})
listener.listen(ma)
})

it('listen, check for the close event', (done) => {
done()
})

it('listen on addr with /ipfs/QmHASH', (done) => {
done()
})

it('close listener with connections, through timeout', (done) => {
done()
})

it.skip('listen on port 0', (done) => {
// TODO port 0 not supported yet
})
it.skip('listen on IPv6 addr', (done) => {
// TODO IPv6 not supported yet
})

it.skip('listen on any Interface', (done) => {
// TODO 0.0.0.0 not supported yet
})

it('getAddrs', (done) => {
done()
})

it.skip('getAddrs on port 0 listen', (done) => {
// TODO port 0 not supported yet
})

it.skip('getAddrs from listening on 0.0.0.0', (done) => {
// TODO 0.0.0.0 not supported yet
})

it.skip('getAddrs from listening on 0.0.0.0 and port 0', (done) => {
// TODO 0.0.0.0 or port 0 not supported yet
})

it('getAddrs preserves IPFS Id', (done) => {
done()
})
})

describe.skip('dial', () => {
it('dial on IPv4', (done) => {})

it.skip('dial on IPv6', (done) => {
// TODO IPv6 not supported yet
})

it('dial on IPv4 with IPFS Id', (done) => {})
})

describe('filter addrs', () => {
let ws

before(() => {
ws = new WS()
})

it('filter valid addrs for this transport', (done) => {
const mh1 = multiaddr('/ip4/127.0.0.1/tcp/9090')
const mh2 = multiaddr('/ip4/127.0.0.1/udp/9090')
const mh3 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const mh4 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')

const valid = ws.filter([mh1, mh2, mh3, mh4])
expect(valid.length).to.equal(2)
expect(valid[0]).to.deep.equal(mh3)
expect(valid[1]).to.deep.equal(mh4)
done()
})

it('filter a single addr for this transport', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')

const valid = ws.filter(ma)
expect(valid.length).to.equal(1)
expect(valid[0]).to.deep.equal(ma)
done()
})
})

describe.skip('valid Connection', () => {
it('get observed addrs', (done) => {})
it('get Peer Info', (done) => {})
it('set Peer Info', (done) => {})
})

describe.skip('turbolence', () => {
it('dialer - emits error on the other end is terminated abruptly', (done) => {})
it('listener - emits error on the other end is terminated abruptly', (done) => {})
})

/*
describe('libp2p-websockets', function () {
this.timeout(10000)
var ws
Expand Down Expand Up @@ -50,17 +184,6 @@ describe('libp2p-websockets', function () {
})
})
it('filter', (done) => {
const mh1 = multiaddr('/ip4/127.0.0.1/tcp/9090')
const mh2 = multiaddr('/ip4/127.0.0.1/udp/9090')
const mh3 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')

const valid = ws.filter([mh1, mh2, mh3])
expect(valid.length).to.equal(1)
expect(valid[0]).to.deep.equal(mh3)
done()
})

it('echo', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
ws.createListener(mh, (conn) => {
Expand Down Expand Up @@ -98,3 +221,4 @@ describe('libp2p-websockets', function () {
})
})
})
*/

0 comments on commit 2dab331

Please sign in to comment.