Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

fix: identify on dial #313

Merged
merged 4 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"libp2p-mplex": "~0.8.4",
"libp2p-pnet": "~0.1.0",
"libp2p-secio": "~0.11.1",
"libp2p-spdy": "~0.13.1",
"libp2p-spdy": "~0.13.3",
"libp2p-tcp": "~0.13.0",
"libp2p-webrtc-star": "~0.15.8",
"libp2p-websockets": "~0.12.2",
Expand Down
1 change: 1 addition & 0 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class IncomingConnectionFSM extends BaseConnection {
}
})

this._state.on('DISCONNECTED', () => this._onDisconnected())
this._state.on('PRIVATIZING', () => this._onPrivatizing())
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
Expand Down
39 changes: 39 additions & 0 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ const multistream = require('multistream-select')
const withIs = require('class-is')
const BaseConnection = require('./base')
const parallel = require('async/parallel')
const nextTick = require('async/nextTick')
const identify = require('libp2p-identify')
const errCode = require('err-code')
const { msHandle, msSelect, identifyDialer } = require('../utils')

const observeConnection = require('../observe-connection')
const {
Expand Down Expand Up @@ -390,13 +394,48 @@ class ConnectionFSM extends BaseConnection {

this.switch.emit('peer-mux-established', this.theirPeerInfo)
this._didUpgrade(null)

// Run identify on the connection
if (this.switch.identify) {
this._identify((err, results) => {
if (err) {
return this.close(err)
}
this.theirPeerInfo = this.switch._peerBook.put(results.peerInfo)
})
}
})
}

nextMuxer(muxers.shift())
})
}

/**
* Runs the identify protocol on the connection
* @private
* @param {function(error, { PeerInfo })} callback
* @returns {void}
*/
_identify (callback) {
if (!this.muxer) {
return nextTick(callback, errCode('The connection was already closed', 'ERR_CONNECTION_CLOSED'))
}
this.muxer.newStream(async (err, conn) => {
if (err) return callback(err)
const ms = new multistream.Dialer()
let results
try {
await msHandle(ms, conn)
const msConn = await msSelect(ms, identify.multicodec)
results = await identifyDialer(msConn, this.theirPeerInfo)
} catch (err) {
return callback(err)
}
callback(null, results)
})
}

/**
* Analyses the given error, if it exists, to determine where the state machine
* needs to go.
Expand Down
33 changes: 21 additions & 12 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,27 @@ describe('dialFSM', () => {

expect(switchA.connection.getAllById(peerBId)).to.have.length(0)

// 4 close checks (1 inc and 1 out for each node) and 1 hangup check
expect(5).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')
done()
// Expect 4 `peer-mux-established` events
expect(4).checks(() => {
// Expect 4 `peer-mux-closed`, plus 1 hangup
expect(5).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')
switchA.removeAllListeners('peer-mux-established')
switchB.removeAllListeners('peer-mux-established')
done()
})

switchA.hangUp(switchB._peerInfo, (err) => {
expect(err).to.not.exist().mark()
})
})

switchA.on('peer-mux-established', (peerInfo) => {
expect(peerInfo.id.toB58String()).to.eql(peerBId).mark()
})
switchB.on('peer-mux-established', (peerInfo) => {
expect(peerInfo.id.toB58String()).to.eql(peerAId).mark()
})

switchA.on('peer-mux-closed', (peerInfo) => {
Expand All @@ -243,13 +259,6 @@ describe('dialFSM', () => {
connB.on('muxed', cb)
})
})

connFSM.on('connection', () => {
// Hangup and verify the connections are closed
switchA.hangUp(switchB._peerInfo, (err) => {
expect(err).to.not.exist().mark()
})
})
})
})

Expand Down
41 changes: 27 additions & 14 deletions test/identify.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const TCP = require('libp2p-tcp')
const multiplex = require('libp2p-mplex')
const pull = require('pull-stream')
const secio = require('libp2p-secio')
const PeerInfo = require('peer-info')
const PeerBook = require('peer-book')
const identify = require('libp2p-identify')
const lp = require('pull-length-prefixed')
Expand Down Expand Up @@ -102,23 +103,35 @@ describe('Identify', () => {
})

it('should get protocols for one another', (done) => {
// We need to reset the PeerInfo objects we use,
// since we share memory we can receive a false positive if not
let peerA = new PeerInfo(switchA._peerInfo.id)
switchA._peerInfo.multiaddrs.toArray().forEach((m) => {
peerA.multiaddrs.add(m)
})
switchB._peerBook.remove(switchA._peerInfo.id.toB58String())
switchA._peerBook.remove(switchB._peerInfo.id.toB58String())

switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => {
switchB.dial(peerA, '/id-test/1.0.0', (err) => {
expect(err).to.not.exist()

const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String())
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
])
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
'/id-test/1.0.0'
])

done()
// Give identify a moment to run
setTimeout(() => {
const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String())
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
])
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
'/id-test/1.0.0'
])

done()
}, 500)
})
})

Expand Down