Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
feat: support a priority queue for dials (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun authored Apr 10, 2019
1 parent 4862c48 commit 0357bf2
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 29 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ works like dial, but calls back with a [Connection State Machine](#connection-st
Connection state machines emit a number of events that can be used to determine the current state of the connection
and to received the underlying connection that can be used to transfer data.

### `switch.dialer.connect(peer, options, callback)`

a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will take priority. This should be used when an application only wishes to establish connections to new peers, such as during peer discovery when there is a low peer count. Currently, anything greater than the HIGH_PRIORITY (10) will be placed into the cold call queue, and anything less than or equal to the HIGH_PRIORITY will be added to the normal queue.

- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `options`: Optional
- `options.priority`: Number of the priority of the dial, defaults to 20.
- `options.useFSM`: Boolean of whether or not to callback with a [Connection State Machine](#connection-state-machine)
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)

##### Events
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
Expand Down Expand Up @@ -187,7 +197,17 @@ Emitted when the switch encounters an error.

### `switch.on('peer-mux-closed', (peer) => {})`

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection with.

### `switch.on('connection:start', (peer) => {})`
This will be triggered anytime a new connection is created.

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just started a connection with.

### `switch.on('connection:end', (peer) => {})`
This will be triggered anytime an existing connection, regardless of state, is removed from the switch's internal connection tracking.

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a connection with.

### `switch.on('start', () => {})`

Expand Down
1 change: 1 addition & 0 deletions src/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class BaseConnection extends EventEmitter {
* @returns {void}
*/
_onDisconnected () {
this.switch.connection.remove(this)
this.log('disconnected from %s', this.theirB58Id)
this.emit('close')
this.removeAllListeners()
Expand Down
2 changes: 0 additions & 2 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,6 @@ class ConnectionFSM extends BaseConnection {
_onDisconnecting () {
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))

this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]

let tasks = []
Expand Down
16 changes: 13 additions & 3 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ class ConnectionManager {
// Only add it if it's not there
if (!this.get(connection)) {
this.connections[connection.theirB58Id].push(connection)
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
this.switch.emit('connection:start', connection.theirPeerInfo)
if (connection.getState() === 'MUXED') {
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
} else {
connection.once('muxed', () => this.switch.emit('peer-mux-established', connection.theirPeerInfo))
}
}
}

Expand Down Expand Up @@ -81,8 +86,10 @@ class ConnectionManager {
remove (connection) {
// No record of the peer, disconnect it
if (!this.connections[connection.theirB58Id]) {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
if (connection.theirPeerInfo) {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
}
return
}

Expand All @@ -99,6 +106,9 @@ class ConnectionManager {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
}

// A tracked connection was closed, let the world know
this.switch.emit('connection:end', connection.theirPeerInfo)
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ module.exports = {
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
QUARTER_HOUR: 15 * 60e3
QUARTER_HOUR: 15 * 60e3,
PRIORITY_HIGH: 10,
PRIORITY_LOW: 20
}
32 changes: 27 additions & 5 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ const {
BLACK_LIST_ATTEMPTS,
BLACK_LIST_TTL,
MAX_COLD_CALLS,
MAX_PARALLEL_DIALS
MAX_PARALLEL_DIALS,
PRIORITY_HIGH,
PRIORITY_LOW
} = require('../constants')

module.exports = function (_switch) {
Expand All @@ -19,7 +21,7 @@ module.exports = function (_switch) {
* @param {DialRequest} dialRequest
* @returns {void}
*/
function _dial ({ peerInfo, protocol, useFSM, callback }) {
function _dial ({ peerInfo, protocol, options, callback }) {
if (typeof protocol === 'function') {
callback = protocol
protocol = null
Expand All @@ -32,7 +34,7 @@ module.exports = function (_switch) {
}

// Add it to the queue, it will automatically get executed
dialQueueManager.add({ peerInfo, protocol, useFSM, callback })
dialQueueManager.add({ peerInfo, protocol, options, callback })
}

/**
Expand Down Expand Up @@ -64,14 +66,33 @@ module.exports = function (_switch) {
dialQueueManager.clearBlacklist(peerInfo)
}

/**
* Attempts to establish a connection to the given `peerInfo` at
* a lower priority than a standard dial.
* @param {PeerInfo} peerInfo
* @param {object} options
* @param {boolean} options.useFSM Whether or not to return a `ConnectionFSM`. Defaults to false.
* @param {number} options.priority Lowest priority goes first. Defaults to 20.
* @param {function(Error, Connection)} callback
*/
function connect (peerInfo, options, callback) {
if (typeof options === 'function') {
callback = options
options = null
}
options = { useFSM: false, priority: PRIORITY_LOW, ...options }
_dial({ peerInfo, protocol: null, options, callback })
}

/**
* Adds the dial request to the queue for the given `peerInfo`
* The request will be added with a high priority (10).
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @param {function(Error, Connection)} callback
*/
function dial (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, useFSM: false, callback })
_dial({ peerInfo, protocol, options: { useFSM: false, priority: PRIORITY_HIGH }, callback })
}

/**
Expand All @@ -82,10 +103,11 @@ module.exports = function (_switch) {
* @param {function(Error, ConnectionFSM)} callback
*/
function dialFSM (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, useFSM: true, callback })
_dial({ peerInfo, protocol, options: { useFSM: true, priority: PRIORITY_HIGH }, callback })
}

return {
connect,
dial,
dialFSM,
clearBlacklist,
Expand Down
4 changes: 3 additions & 1 deletion src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ log.error = debug('libp2p:switch:dial:error')
* @typedef {Object} DialRequest
* @property {PeerInfo} peerInfo - The peer to dial to
* @property {string} [protocol] - The protocol to create a stream for
* @property {boolean} useFSM - If `callback` should return a ConnectionFSM
* @property {object} options
* @property {boolean} options.useFSM - If `callback` should return a ConnectionFSM
* @property {number} options.priority - The priority of the dial
* @property {function(Error, Connection|ConnectionFSM)} callback
*/

Expand Down
27 changes: 16 additions & 11 deletions src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const Queue = require('./queue')
const { DIAL_ABORTED } = require('../errors')
const nextTick = require('async/nextTick')
const retimer = require('retimer')
const { QUARTER_HOUR } = require('../constants')
const { QUARTER_HOUR, PRIORITY_HIGH } = require('../constants')
const debug = require('debug')
const log = debug('libp2p:switch:dial:manager')
const noop = () => {}
Expand Down Expand Up @@ -103,17 +103,25 @@ class DialQueueManager {
* @param {DialRequest} dialRequest
* @returns {void}
*/
add ({ peerInfo, protocol, useFSM, callback }) {
add ({ peerInfo, protocol, options, callback }) {
callback = callback ? once(callback) : noop

// Add the dial to its respective queue
const targetQueue = this.getQueue(peerInfo)
// If we have too many cold calls, abort the dial immediately
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) {
return nextTick(callback, DIAL_ABORTED())

// Cold Call
if (options.priority > PRIORITY_HIGH) {
// If we have too many cold calls, abort the dial immediately
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS) {
return nextTick(callback, DIAL_ABORTED())
}

if (this._queue.has(targetQueue.id)) {
return nextTick(callback, DIAL_ABORTED())
}
}

targetQueue.add(protocol, useFSM, callback)
targetQueue.add(protocol, options.useFSM, callback)

// If we're already connected to the peer, start the queue now
// While it might cause queues to go over the max parallel amount,
Expand All @@ -130,15 +138,12 @@ class DialQueueManager {

// Add the id to its respective queue set if the queue isn't running
if (!targetQueue.isRunning) {
if (protocol) {
if (options.priority <= PRIORITY_HIGH) {
this._queue.add(targetQueue.id)
this._coldCallQueue.delete(targetQueue.id)
// Only add it to the cold queue if it's not in the normal queue
} else if (!this._queue.has(targetQueue.id)) {
this._coldCallQueue.add(targetQueue.id)
// The peer is already in the normal queue, abort the cold call
} else {
return nextTick(callback, DIAL_ABORTED())
this._coldCallQueue.add(targetQueue.id)
}
}

Expand Down
33 changes: 28 additions & 5 deletions test/dialer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@ const PeerBook = require('peer-book')
const Queue = require('../src/dialer/queue')
const QueueManager = require('../src/dialer/queueManager')
const Switch = require('../src')
const { PRIORITY_HIGH, PRIORITY_LOW } = require('../src/constants')

const utils = require('./utils')
const createInfos = utils.createInfos

describe('dialer', () => {
let switchA
let switchB

before((done) => createInfos(1, (err, infos) => {
before((done) => createInfos(2, (err, infos) => {
expect(err).to.not.exist()

switchA = new Switch(infos[0], new PeerBook())
switchB = new Switch(infos[1], new PeerBook())

done()
}))
Expand All @@ -31,6 +34,26 @@ describe('dialer', () => {
sinon.restore()
})

describe('connect', () => {
afterEach(() => {
switchA.dialer.clearBlacklist(switchB._peerInfo)
})

it('should use default options', (done) => {
switchA.dialer.connect(switchB._peerInfo, (err) => {
expect(err).to.exist()
done()
})
})

it('should be able to use custom options', (done) => {
switchA.dialer.connect(switchB._peerInfo, { useFSM: true, priority: PRIORITY_HIGH }, (err) => {
expect(err).to.exist()
done()
})
})
})

describe('queue', () => {
it('should blacklist forever after 5 blacklists', () => {
const queue = new Queue('QM', switchA)
Expand Down Expand Up @@ -58,7 +81,7 @@ describe('dialer', () => {
id: { toB58String: () => 'QmA' }
},
protocol: null,
useFSM: true,
options: { useFSM: true, priority: PRIORITY_LOW },
callback: (err) => {
expect(err.code).to.eql('DIAL_ABORTED')
done()
Expand All @@ -75,7 +98,7 @@ describe('dialer', () => {
isConnected: () => null
},
protocol: '/echo/1.0.0',
useFSM: true,
options: { useFSM: true, priority: PRIORITY_HIGH },
callback: () => {}
}

Expand All @@ -99,7 +122,7 @@ describe('dialer', () => {
isConnected: () => null
},
protocol: null,
useFSM: true,
options: { useFSM: true, priority: PRIORITY_LOW },
callback: () => {}
}

Expand All @@ -120,7 +143,7 @@ describe('dialer', () => {
isConnected: () => null
},
protocol: null,
useFSM: true,
options: { useFSM: true, priority: PRIORITY_LOW },
callback: (err) => {
expect(runSpy.called).to.eql(false)
expect(hasSpy.called).to.eql(true)
Expand Down

0 comments on commit 0357bf2

Please sign in to comment.