Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
fix: improve connection tracking and closing (#291)
Browse files Browse the repository at this point in the history
* chore: update deps

* fix: check we have a proper transport before filtering addresses

* fix: improve connection close on stop

* fix: improve stat stopping

* test: fix stats test

* fix: improve tracking of open connections

* chore: remove log

* fix: stats stop in browser

chore: fix linting and browser tests

* fix: remove uneeded set peer info

* fix: abort the base connection on close

* fix: catch edge cases of dialTimeout calling back twice

* fix: close all connections instead of checking peerbook peers

* test: update dial fsm test waits

* test: make parallel dial tests deterministic

fix: improve logic around disconnecting

fix: remove duplicate event handling logic

* chore: fix lint

* test: improve test reliability
  • Loading branch information
jacobheun authored Dec 14, 2018
1 parent f43084b commit aa86307
Show file tree
Hide file tree
Showing 18 changed files with 298 additions and 130 deletions.
22 changes: 11 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@
"npm": ">=3.0.0"
},
"devDependencies": {
"aegir": "^17.0.1",
"aegir": "^17.1.1",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"dirty-chai": "^2.0.1",
"libp2p-mplex": "~0.8.4",
"libp2p-pnet": "~0.1.0",
"libp2p-secio": "~0.10.1",
"libp2p-spdy": "~0.13.0",
"libp2p-spdy": "~0.13.1",
"libp2p-tcp": "~0.13.0",
"libp2p-webrtc-star": "~0.15.5",
"libp2p-webrtc-star": "~0.15.6",
"libp2p-websockets": "~0.12.0",
"peer-book": "~0.8.0",
"portfinder": "^1.0.19",
"sinon": "^7.1.1",
"peer-book": "~0.9.0",
"portfinder": "^1.0.20",
"sinon": "^7.2.0",
"webrtcsupport": "^2.2.0"
},
"dependencies": {
Expand All @@ -63,18 +63,18 @@
"debug": "^4.1.0",
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"hashlru": "^2.2.1",
"interface-connection": "~0.3.2",
"hashlru": "^2.3.0",
"interface-connection": "~0.3.3",
"ip-address": "^5.8.9",
"libp2p-circuit": "~0.3.0",
"libp2p-circuit": "~0.3.1",
"libp2p-identify": "~0.7.2",
"lodash.includes": "^4.3.0",
"moving-average": "^1.0.0",
"multiaddr": "^5.0.2",
"multiaddr": "^6.0.0",
"multistream-select": "~0.14.3",
"once": "^1.4.0",
"peer-id": "~0.12.0",
"peer-info": "~0.14.1",
"peer-info": "~0.15.0",
"pull-stream": "^3.6.9",
"retimer": "^2.0.0"
},
Expand Down
4 changes: 3 additions & 1 deletion src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ class IncomingConnectionFSM extends BaseConnection {
this.msListener = new multistream.Listener()

this._state = FSM('DIALED', {
DISCONNECTED: { },
DISCONNECTED: {
disconnect: 'DISCONNECTED'
},
DIALED: { // Base connection to peer established
privatize: 'PRIVATIZING',
encrypt: 'ENCRYPTING'
Expand Down
34 changes: 22 additions & 12 deletions src/connection/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const FSM = require('fsm-event')
const setImmediate = require('async/setImmediate')
const Circuit = require('libp2p-circuit')
const multistream = require('multistream-select')
const withIs = require('class-is')
Expand All @@ -15,6 +14,8 @@ const Errors = require('../errors')
* @property {Switch} _switch Our switch instance
* @property {PeerInfo} peerInfo The PeerInfo of the peer to dial
* @property {Muxer} muxer Optional - A muxed connection
* @property {Connection} conn Optional - The base connection
* @property {string} type Optional - identify the connection as incoming or outgoing. Defaults to out.
*/

/**
Expand All @@ -29,16 +30,16 @@ class ConnectionFSM extends BaseConnection {
* @param {ConnectionOptions} param0
* @constructor
*/
constructor ({ _switch, peerInfo, muxer }) {
constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) {
super({
_switch,
name: `out:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
name: `${type}:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})

this.theirPeerInfo = peerInfo
this.theirB58Id = this.theirPeerInfo.id.toB58String()

this.conn = null // The base connection
this.conn = conn // The base connection
this.muxer = muxer // The upgraded/muxed connection

let startState = 'DISCONNECTED'
Expand Down Expand Up @@ -114,6 +115,7 @@ class ConnectionFSM extends BaseConnection {
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log(`successfully muxed connection to ${this.theirB58Id}`)
delete this.switch.conns[this.theirB58Id]
this.emit('muxed', this.muxer)
})
this._state.on('CONNECTED', () => {
Expand Down Expand Up @@ -166,7 +168,6 @@ class ConnectionFSM extends BaseConnection {
})
}

this.conn.setPeerInfo(this.theirPeerInfo)
this._protocolHandshake(protocol, this.conn, callback)
}

Expand Down Expand Up @@ -266,14 +267,22 @@ class ConnectionFSM extends BaseConnection {
this.muxer.end()
}

delete this.switch.muxedConns[this.theirB58Id]
this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]
delete this.muxer
delete this.conn

this._state('done')

setImmediate(() => this.switch.emit('peer-mux-closed', this.theirPeerInfo))
// If we have the base connection, abort it
if (this.conn) {
this.conn.source(true, () => {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
delete this.conn
})
} else {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
}
}

/**
Expand Down Expand Up @@ -352,7 +361,8 @@ class ConnectionFSM extends BaseConnection {
const conn = observeConnection(null, key, _conn, this.switch.observer)

this.muxer = this.switch.muxers[key].dialer(conn)
this.switch.muxedConns[this.theirB58Id] = this
// this.switch.muxedConns[this.theirB58Id] = this
this.switch.connection.add(this)

this.muxer.once('close', () => {
this.close()
Expand All @@ -365,7 +375,7 @@ class ConnectionFSM extends BaseConnection {
this.switch.protocolMuxer(null)(conn)
})

setImmediate(() => this.switch.emit('peer-mux-established', this.theirPeerInfo))
this.switch.emit('peer-mux-established', this.theirPeerInfo)

this._didUpgrade(null)
})
Expand Down
105 changes: 93 additions & 12 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const waterfall = require('async/waterfall')
const debug = require('debug')
const log = debug('libp2p:switch:conn-manager')
const once = require('once')
const setImmediate = require('async/setImmediate')
const ConnectionFSM = require('../connection')

const Circuit = require('libp2p-circuit')
Expand All @@ -20,6 +19,92 @@ const plaintext = require('../plaintext')
class ConnectionManager {
constructor (_switch) {
this.switch = _switch
this.connections = {}
}

/**
* Adds the connection for tracking if it's not already added
* @private
* @param {ConnectionFSM} connection
* @returns {void}
*/
add (connection) {
this.connections[connection.theirB58Id] = this.connections[connection.theirB58Id] || []
// Only add it if it's not there
if (!this.get(connection)) {
this.connections[connection.theirB58Id].push(connection)
}
}

/**
* Gets the connection from the list if it exists
* @private
* @param {ConnectionFSM} connection
* @returns {ConnectionFSM|null} The found connection or null
*/
get (connection) {
if (!this.connections[connection.theirB58Id]) return null

for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
return this.connections[connection.theirB58Id][i]
}
}
return null
}

/**
* Gets a connection associated with the given peer
* @private
* @param {string} peerId The peers id
* @returns {ConnectionFSM|null} The found connection or null
*/
getOne (peerId) {
if (this.connections[peerId]) {
// TODO: Maybe select the best?
return this.connections[peerId][0]
}
return null
}

/**
* Removes the connection from tracking
* @private
* @param {ConnectionFSM} connection The connection to remove
* @returns {void}
*/
remove (connection) {
if (!this.connections[connection.theirB58Id]) return

for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
this.connections[connection.theirB58Id].splice(i, 1)
return
}
}
}

/**
* Returns all connections being tracked
* @private
* @returns {ConnectionFSM[]}
*/
getAll () {
let connections = []
for (const conns of Object.values(this.connections)) {
connections = [...connections, ...conns]
}
return connections
}

/**
* Returns all connections being tracked for a given peer id
* @private
* @param {string} peerId Stringified peer id
* @returns {ConnectionFSM[]}
*/
getAllById (peerId) {
return this.connections[peerId] || []
}

/**
Expand Down Expand Up @@ -70,9 +155,6 @@ class ConnectionManager {
], (err, peerInfo) => {
if (err) {
return muxedConn.end(() => {
if (peerInfo) {
setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo))
}
callback(err, null)
})
}
Expand All @@ -91,11 +173,14 @@ class ConnectionManager {
}
const b58Str = peerInfo.id.toB58String()

this.switch.muxedConns[b58Str] = new ConnectionFSM({
const connection = new ConnectionFSM({
_switch: this.switch,
peerInfo,
muxer: muxedConn
muxer: muxedConn,
conn: conn,
type: 'inc'
})
this.switch.connection.add(connection)

if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
Expand All @@ -111,14 +196,10 @@ class ConnectionManager {
peerInfo = this.switch._peerBook.put(peerInfo)

muxedConn.once('close', () => {
delete this.switch.muxedConns[b58Str]
peerInfo.disconnect()
peerInfo = this.switch._peerBook.put(peerInfo)
log(`closed connection to ${b58Str}`)
setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo))
connection.close()
})

setImmediate(() => this.switch.emit('peer-mux-established', peerInfo))
this.switch.emit('peer-mux-established', peerInfo)
})
})
}
Expand Down
5 changes: 3 additions & 2 deletions src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ function dial (_switch, returnFSM) {

log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`)

let connection = _switch.muxedConns[b58Id] || _switch.conns[b58Id]
let connection = _switch.connection.getOne(b58Id)

if (!ConnectionFSM.isConnectionFSM(connection)) {
connection = new ConnectionFSM({
_switch,
peerInfo,
muxer: _switch.muxedConns[b58Id] || null
muxer: null,
conn: null
})
connection.once('error', (err) => callback(err))
connection.once('connected', () => connection.protect())
Expand Down
Loading

0 comments on commit aa86307

Please sign in to comment.