diff --git a/README.md b/README.md index edc1dd63..34639bbf 100644 --- a/README.md +++ b/README.md @@ -29,20 +29,29 @@ Gossipsub is an implementation of pubsub based on meshsub and floodsub. You can ```javascript const Gossipsub = require('libp2p-gossipsub') -const gsub = new Gossipsub(node) +const registrar = { + handle: (multicodecs, handle) => { + // register multicodec to libp2p + // handle function is called everytime a remote peer opens a stream to the peer. + }, + register: (multicodecs, handlers) => { + // handlers will be used to notify pubsub of peer connection establishment or closing + }, + unregister: (id) => { -gsub.start((err) => { - if (err) { - console.log('Upsy', err) } - gsub.on('fruit', (data) => { - console.log(data) - }) - gsub.subscribe('fruit') +} - gsub.publish('fruit', new Buffer('banana')) +const gsub = new Gossipsub(peerInfo, registrar, options) + +await gsub.start() + +gsub.on('fruit', (data) => { + console.log(data) }) +gsub.subscribe('fruit') +gsub.publish('fruit', new Buffer('banana')) ``` ## API @@ -51,7 +60,7 @@ gsub.start((err) => { ```js const options = {…} -const gossipsub = new Gossipsub(libp2pNode, options) +const gossipsub = new Gossipsub(peerInfo, registrar, options) ``` Options is an optional object with the following key-value pairs: @@ -59,6 +68,8 @@ Options is an optional object with the following key-value pairs: * **`fallbackToFloodsub`**: boolean identifying whether the node should fallback to the floodsub protocol, if another connecting peer does not support gossipsub (defaults to **true**). * **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **false**). +For the remaining API, see https://github.com/libp2p/js-libp2p-pubsub + ## Contribute This module is actively under development. Please check out the issues and submit PRs! diff --git a/package.json b/package.json index 1d058b65..2ac8f52d 100644 --- a/package.json +++ b/package.json @@ -32,35 +32,33 @@ "lint" ], "dependencies": { - "async": "^2.6.2", - "err-code": "^1.1.2", - "libp2p-floodsub": "~0.17.1", - "libp2p-pubsub": "~0.2.0", - "multistream-select": "~0.14.6", - "peer-id": "~0.12.2", - "peer-info": "~0.15.1", + "debug": "^4.1.1", + "err-code": "^2.0.0", + "it-length-prefixed": "^2.0.0", + "it-pipe": "^1.0.1", + "libp2p-floodsub": "^0.19.0", + "libp2p-pubsub": "~0.3.1", + "p-map": "^3.0.0", + "peer-id": "~0.13.3", + "peer-info": "~0.17.0", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.3", - "pull-stream": "^3.6.13" + "time-cache": "^0.3.0" }, "devDependencies": { - "@types/chai": "^4.1.7", + "@types/chai": "^4.2.3", "@types/mocha": "^5.2.7", - "aegir": "^20.0.0", + "aegir": "^20.4.1", "benchmark": "^2.1.4", "chai": "^4.2.0", "chai-spies": "^1.0.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", - "libp2p": "~0.25.5", - "libp2p-secio": "~0.11.1", - "libp2p-spdy": "~0.13.3", - "libp2p-tcp": "~0.13.0", - "libp2p-websockets": "~0.12.2", + "it-pair": "^1.0.0", "lodash": "^4.17.15", - "mocha": "^5.2.0", + "mocha": "^6.2.1", + "p-times": "^2.1.0", "promisify-es6": "^1.0.3", - "sinon": "^7.3.2" + "sinon": "^7.5.0" }, "contributors": [ "Cayman ", diff --git a/src/heartbeat.js b/src/heartbeat.js index 159a4941..3d8c230f 100644 --- a/src/heartbeat.js +++ b/src/heartbeat.js @@ -12,11 +12,11 @@ class Heartbeat { this.gossipsub = gossipsub } - start (callback) { + start () { if (this._heartbeatTimer) { const errMsg = 'Heartbeat timer is already running' this.gossipsub.log(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_HEARTBEAT_ALREADY_RUNNING')) + throw errcode(new Error(errMsg), 'ERR_HEARTBEAT_ALREADY_RUNNING') } const heartbeatTimer = { @@ -25,39 +25,35 @@ class Heartbeat { runPeriodically: (fn, period) => { heartbeatTimer._timeoutId = setInterval(fn, period) }, - cancel: (cb) => { + cancel: () => { clearTimeout(heartbeatTimer._timeoutId) - cb() } } const heartbeat = this._heartbeat.bind(this) + setTimeout(() => { heartbeat() heartbeatTimer.runPeriodically(heartbeat, constants.GossipSubHeartbeatInterval) }, constants.GossipSubHeartbeatInitialDelay) this._heartbeatTimer = heartbeatTimer - callback() } /** * Unmounts the gossipsub protocol and shuts down every connection - * * @override - * @param {Function} callback * @returns {void} */ - stop (callback) { + stop () { if (!this._heartbeatTimer) { const errMsg = 'Heartbeat timer is not running' this.gossipsub.log(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_HEARTBEAT_NO_RUNNING')) + throw errcode(new Error(errMsg), 'ERR_HEARTBEAT_NO_RUNNING') } - this._heartbeatTimer.cancel(() => { - this._heartbeatTimer = null - callback() - }) + + this._heartbeatTimer.cancel() + this._heartbeatTimer = null } /** diff --git a/src/index.js b/src/index.js index 65bdcf3e..148108ef 100644 --- a/src/index.js +++ b/src/index.js @@ -3,6 +3,8 @@ const assert = require('assert') const { utils } = require('libp2p-pubsub') +const PeerInfo = require('peer-info') + const BasicPubsub = require('./pubsub') const { MessageCache } = require('./messageCache') @@ -12,15 +14,27 @@ const Heartbeat = require('./heartbeat') class GossipSub extends BasicPubsub { /** - * @param {Object} libp2p an instance of Libp2p - * @param {Object} options - * @param {bool} options.emitSelf if publish should emit to self, if subscribed, defaults to false - * @param {bool} options.gossipIncoming if incoming messages on a subscribed topic should be automatically gossiped, defaults to true - * @param {bool} options.fallbackToFloodsub if dial should fallback to floodsub, defaults to true + * @param {PeerInfo} peerInfo instance of the peer's PeerInfo + * @param {Object} registrar + * @param {function} registrar.handle + * @param {function} registrar.register + * @param {function} registrar.unregister + * @param {Object} [options] + * @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false + * @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true + * @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true * @constructor */ - constructor (libp2p, options) { - super('libp2p:gossipsub', constants.GossipSubID, libp2p, options) + constructor (peerInfo, registrar, options = {}) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') + + super({ + debugName: 'libp2p:gossipsub', + multicodec: constants.GossipSubID, + peerInfo, + registrar, + options + }) /** * Map of topic meshes @@ -71,37 +85,35 @@ class GossipSub extends BasicPubsub { /** * Removes a peer from the router - * * @override * @param {Peer} peer * @returns {PeerInfo} */ _removePeer (peer) { super._removePeer(peer) - // Only delete when no one else if referencing this peer. - if (peer._references === 0) { - // Remove this peer from the mesh - // eslint-disable-next-line no-unused-vars - for (const [_, peers] of this.mesh.entries()) { - peers.delete(peer) - } - // Remove this peer from the fanout - // eslint-disable-next-line no-unused-vars - for (const [_, peers] of this.fanout.entries()) { - peers.delete(peer) - } - // Remove from gossip mapping - this.gossip.delete(peer) - // Remove from control mapping - this.control.delete(peer) + // Remove this peer from the mesh + // eslint-disable-next-line no-unused-vars + for (const [_, peers] of this.mesh.entries()) { + peers.delete(peer) + } + + // Remove this peer from the fanout + // eslint-disable-next-line no-unused-vars + for (const [_, peers] of this.fanout.entries()) { + peers.delete(peer) } + + // Remove from gossip mapping + this.gossip.delete(peer) + // Remove from control mapping + this.control.delete(peer) + return peer } /** * Handles an rpc control message from a peer - * * @param {Peer} peer * @param {rpc.RPC} rpc * @returns {void} @@ -139,6 +151,7 @@ class GossipSub extends BasicPubsub { if (!this._options.gossipIncoming) { return } + // Emit to floodsub peers this.peers.forEach((peer) => { if (peer.info.protocols.has(constants.FloodSubID) && @@ -168,10 +181,8 @@ class GossipSub extends BasicPubsub { /** * Handles IHAVE messages - * * @param {Peer} peer * @param {Array} ihave - * * @returns {rpc.RPC.ControlIWant} */ _handleIHave (peer, ihave) { @@ -204,10 +215,8 @@ class GossipSub extends BasicPubsub { /** * Handles IWANT messages * Returns messages to send back to peer - * * @param {Peer} peer * @param {Array} iwant - * * @returns {Array} */ _handleIWant (peer, iwant) { @@ -234,12 +243,9 @@ class GossipSub extends BasicPubsub { /** * Handles Graft messages - * * @param {Peer} peer * @param {Array} graft - * * @return {Array} - * */ _handleGraft (peer, graft) { const prune = [] @@ -271,12 +277,9 @@ class GossipSub extends BasicPubsub { /** * Handles Prune messages - * * @param {Peer} peer * @param {Array} prune - * * @returns {void} - * */ _handlePrune (peer, prune) { prune.forEach(({ topicID }) => { @@ -292,36 +295,28 @@ class GossipSub extends BasicPubsub { /** * Mounts the gossipsub protocol onto the libp2p node and sends our * our subscriptions to every peer connected - * * @override - * @param {Function} callback - * @returns {void} - * + * @returns {Promise} */ - start (callback) { - super.start((err) => { - if (err) return callback(err) - this.heartbeat.start(callback) - }) + async start () { + await super.start() + this.heartbeat.start() } /** * Unmounts the gossipsub protocol and shuts down every connection - * * @override - * @param {Function} callback - * @returns {void} + * @returns {Promise} */ - stop (callback) { - super.stop((err) => { - if (err) return callback(err) - this.mesh = new Map() - this.fanout = new Map() - this.lastpub = new Map() - this.gossip = new Map() - this.control = new Map() - this.heartbeat.stop(callback) - }) + async stop () { + await super.stop() + this.heartbeat.stop() + + this.mesh = new Map() + this.fanout = new Map() + this.lastpub = new Map() + this.gossip = new Map() + this.control = new Map() } /** @@ -355,7 +350,6 @@ class GossipSub extends BasicPubsub { /** * Leave topics - * * @param {Array|string} topics * @returns {void} */ @@ -431,7 +425,6 @@ class GossipSub extends BasicPubsub { /** * Sends a GRAFT message to a peer - * * @param {Peer} peer * @param {String} topic * @returns {void} @@ -447,7 +440,6 @@ class GossipSub extends BasicPubsub { /** * Sends a PRUNE message to a peer - * * @param {Peer} peer * @param {String} topic * @returns {void} @@ -505,7 +497,6 @@ class GossipSub extends BasicPubsub { /** * Send graft and prune messages - * * @param {Map>} tograft * @param {Map>} toprune */ @@ -532,7 +523,6 @@ class GossipSub extends BasicPubsub { /** * Emits gossip to peers in a particular topic - * * @param {String} topic * @param {Set} peers - peers to exclude * @returns {void} @@ -575,7 +565,6 @@ class GossipSub extends BasicPubsub { /** * Adds new IHAVE messages to pending gossip - * * @param {Peer} peer * @param {Array} controlIHaveMsgs * @returns {void} @@ -588,7 +577,6 @@ class GossipSub extends BasicPubsub { /** * Returns the current time in milliseconds - * * @returns {number} */ _now () { @@ -597,3 +585,4 @@ class GossipSub extends BasicPubsub { } module.exports = GossipSub +module.exports.multicodec = constants.GossipSubID diff --git a/src/pubsub.js b/src/pubsub.js index e84d7271..d10f63b6 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -1,162 +1,125 @@ 'use strict' -const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') -const Pubsub = require('libp2p-pubsub') -const pull = require('pull-stream') -const lp = require('pull-length-prefixed') -const nextTick = require('async/nextTick') -const { constants: multistreamConstants } = require('multistream-select') -const { utils } = require('libp2p-pubsub') -const asyncMap = require('async/map') +const assert = require('assert') const errcode = require('err-code') -const assert = require('assert') +const TimeCache = require('time-cache') + +const pipe = require('it-pipe') +const lp = require('it-length-prefixed') +const pMap = require('p-map') +const { GossipSubID } = require('../src/constants') +const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') +const Pubsub = require('libp2p-pubsub') + +const { utils } = require('libp2p-pubsub') const { rpc } = require('./message') class BasicPubSub extends Pubsub { /** - * @param {String} debugName - * @param {String} multicodec - * @param {Object} libp2p libp2p implementation - * @param {Object} options - * @param {bool} options.emitSelf if publish should emit to self, if subscribed, defaults to false - * @param {bool} options.gossipIncoming if incoming messages on a subscribed topic should be automatically gossiped, defaults to true - * @param {bool} options.fallbackToFloodsub if dial should fallback to floodsub, defaults to true + * @param {Object} props + * @param {String} props.debugName log namespace + * @param {string} props.multicodec protocol identificer to connect + * @param {PeerInfo} props.peerInfo peer's peerInfo + * @param {Object} props.registrar registrar for libp2p protocols + * @param {function} props.registrar.handle + * @param {function} props.registrar.register + * @param {function} props.registrar.unregister + * @param {Object} [props.options] + * @param {bool} [props.options.emitSelf] if publish should emit to self, if subscribed, defaults to false + * @param {bool} [props.options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true + * @param {bool} [props.options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true * @constructor */ - constructor (debugName, multicodec, libp2p, options) { - super(debugName, multicodec, libp2p, options) + constructor ({ debugName, multicodec, peerInfo, registrar, options = {} }) { + const multicodecs = [multicodec] + const _options = { + emitSelf: false, + gossipIncoming: true, + fallbackToFloodsub: true, + ...options + } + + // Also wants to get notified of peers connected using floodsub + if (_options.fallbackToFloodsub) { + multicodecs.push(floodsubMulticodec) + } + + super({ + debugName, + multicodecs, + peerInfo, + registrar, + ..._options + }) + /** * A set of subscriptions */ this.subscriptions = new Set() /** - * Pubsub options + * Cache of seen messages + * + * @type {TimeCache} */ - this._options = { - emitSelf: false, - gossipIncoming: true, - fallbackToFloodsub: true, - ...options - } - } + this.seenCache = new TimeCache() - /** - * When a peer has dialed into another peer, it sends its subscriptions to it. - * @override - * @param {PeerInfo} peerInfo The peer dialed - * @param {Connection} conn The connection with the peer - * @param {Function} callback - * - * @returns {void} - */ - _onDial (peerInfo, conn, callback) { - const idB58Str = peerInfo.id.toB58String() - - super._onDial(peerInfo, conn, (err) => { - if (err) return callback(err) + /** + * Pubsub options + */ + this._options = _options - const peer = this.peers.get(idB58Str) - if (peer && peer.isWritable) { - // Immediately send my own subscription to the newly established conn - peer.sendSubscriptions(this.subscriptions) - } - nextTick(() => callback()) - }) + this._onRpc = this._onRpc.bind(this) + this.log = this.log.bind(this) } /** - * Dial a received peer. + * Peer connected successfully with pubsub protocol. * @override - * @param {PeerInfo} peerInfo The peer being dialed - * @param {function} callback - * - * @returns {void} + * @param {PeerInfo} peerInfo peer info + * @param {Connection} conn connection to the peer + * @returns {Promise} */ - _dialPeer (peerInfo, callback) { - callback = callback || function noop () { } + async _onPeerConnected (peerInfo, conn) { + await super._onPeerConnected(peerInfo, conn) const idB58Str = peerInfo.id.toB58String() - - // If already have a PubSub conn, ignore const peer = this.peers.get(idB58Str) - if (peer && peer.isConnected) { - return nextTick(() => callback()) - } - - // If already dialing this peer, ignore - if (this._dials.has(idB58Str)) { - this.log('already dialing %s, ignoring dial attempt', idB58Str) - return nextTick(() => callback()) - } - - // Verify if is known that the peer does not support Gossipsub - const onlySupportsFloodsub = peerInfo.protocols.has(floodsubMulticodec) && !peerInfo.protocols.has(this.multicodec) - - // Define multicodec to use - // Should fallback to floodsub if fallback is enabled, protocols were negotiated, and no Gossipsub available - let multicodec = this.multicodec - if (this._options.fallbackToFloodsub && onlySupportsFloodsub) { - multicodec = floodsubMulticodec + if (peer && peer.isWritable) { + // Immediately send my own subscriptions to the newly established conn + peer.sendSubscriptions(this.subscriptions) } - - this._dials.add(idB58Str) - this.log('dialing %s %s', multicodec, idB58Str) - - this.libp2p.dialProtocol(peerInfo, multicodec, (err, conn) => { - this.log('dial to %s complete', idB58Str) - this._dials.delete(idB58Str) - - if (err) { - // If previously dialed gossipsub and not supported, try floodsub if enabled fallback - if (this._options.fallbackToFloodsub && - multicodec === this.multicodec && - err.code === multistreamConstants.errors.MULTICODEC_NOT_SUPPORTED) { - this._dials.add(idB58Str) - this.log('dialing %s %s', floodsubMulticodec, idB58Str) - - this.libp2p.dialProtocol(peerInfo, floodsubMulticodec, (err, conn) => { - this.log('dial to %s complete', idB58Str) - this._dials.delete(idB58Str) - - if (err) { - this.log.err(err) - return callback() - } - this._onDial(peerInfo, conn, callback) - }) - } else { - this.log.err(err) - return callback() - } - } else { - this._onDial(peerInfo, conn, callback) - } - }) } /** - * Processes a peer's connection to another peer. - * - * @param {String} idB58Str - * @param {Connection} conn - * @param {Peer} peer - * + * Overriding the implementation of _processConnection should keep the connection and is + * responsible for processing each RPC message received by other peers. + * @override + * @param {string} idB58Str peer id string in base58 + * @param {Connection} conn connection + * @param {PeerInfo} peer peer info * @returns {void} * */ - _processConnection (idB58Str, conn, peer) { - pull( - conn, - lp.decode(), - pull.map((data) => rpc.RPC.decode(data)), - pull.drain( - (rpc) => this._onRpc(idB58Str, rpc), - (err) => this._onConnectionEnd(idB58Str, peer, err) + async _processMessages (idB58Str, conn, peer) { + const onRpcFunc = this._onRpc + try { + await pipe( + conn, + lp.decode(), + async function (source) { + for await (const data of source) { + const rpcMsg = Buffer.isBuffer(data) ? data : data.slice() + + onRpcFunc(idB58Str, rpc.RPC.decode(rpcMsg)) + } + } ) - ) + } catch (err) { + this._onPeerDisconnected(peer, err) + } } /** @@ -203,7 +166,7 @@ class BasicPubSub extends Pubsub { } if (msgs.length) { - msgs.forEach(message => { + msgs.forEach(async message => { const msg = utils.normalizeInRpcMessage(message) const seqno = utils.msgId(msg.from, msg.seqno) @@ -215,14 +178,21 @@ class BasicPubSub extends Pubsub { this.seenCache.put(seqno) // Ensure the message is valid before processing it - this.validate(message, (err, isValid) => { - if (err || !isValid) { - this.log('Message could not be validated, dropping it. isValid=%s', isValid, err) - return - } + let isValid + let error - this._processRpcMessage(msg) - }) + try { + isValid = await this.validate(message) + } catch (err) { + error = err + } + + if (error || !isValid) { + this.log('Message could not be validated, dropping it. isValid=%s', isValid, error) + return + } + + this._processRpcMessage(msg) }) } this._handleRpcControl(peer, rpc) @@ -245,73 +215,45 @@ class BasicPubSub extends Pubsub { } _handleRpcControl (peer, rpc) { - throw errcode('_handleRpcControl must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('_handleRpcControl must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** * Returns a buffer of a RPC message that contains a control message - * * @param {Array} msgs * @param {Array} ihave * @param {Array} iwant * @param {Array} graft * @param {Array} prune - * * @returns {rpc.RPC} - * */ - _rpcWithControl (msgs, ihave, iwant, graft, prune) { + _rpcWithControl (msgs = [], ihave = [], iwant = [], graft = [], prune = []) { return { subscriptions: [], - msgs: msgs || [], + msgs: msgs, control: { - ihave: ihave || [], - iwant: iwant || [], - graft: graft || [], - prune: prune || [] + ihave: ihave, + iwant: iwant, + graft: graft, + prune: prune } } } - /** - * Mounts the protocol onto the libp2p node and sends our - * our subscriptions to every peer connected - * - * @override - * @param {Function} callback - * @returns {void} - * - */ - start (callback) { - super.start((err) => { - if (err) { - return callback(err) - } - // if fallback to floodsub enabled, we need to listen to its protocol - if (this._options.fallbackToFloodsub) { - this.libp2p.handle(floodsubMulticodec, this._onConnection) - } - callback() - }) - } - /** * Unmounts the protocol and shuts down every connection - * * @override - * @param {Function} callback * @returns {void} */ - stop (callback) { - super.stop((err) => { - if (err) return callback(err) - this.subscriptions = new Set() - callback() - }) + async stop () { + await super.stop() + + this.subscriptions = new Set() } /** * Subscribes to topics + * @override * @param {Array|string} topics * @returns {void} */ @@ -332,6 +274,7 @@ class BasicPubSub extends Pubsub { // Broadcast SUBSCRIBE to all peers this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer)) + // make sure that Gossipsub is already mounted function sendSubscriptionsOnceReady (peer) { if (peer && peer.isWritable) { @@ -349,16 +292,18 @@ class BasicPubSub extends Pubsub { } join (topics) { - throw errcode('join must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('join must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** * Leaves a topic - * + * @override * @param {Array|string} topics * @returns {void} */ unsubscribe (topics) { + assert(this.started, 'Pubsub has not started') + topics = utils.ensureArray(topics) const unTopics = topics.filter((topic) => this.subscriptions.has(topic)) @@ -373,6 +318,7 @@ class BasicPubSub extends Pubsub { // Broadcast UNSUBSCRIBE to all peers ready this.peers.forEach((peer) => sendUnsubscriptionsOnceReady(peer)) + // make sure that Gossipsub is already mounted function sendUnsubscriptionsOnceReady (peer) { if (peer && peer.isWritable) { @@ -390,25 +336,25 @@ class BasicPubSub extends Pubsub { } leave (topics) { - throw errcode('leave must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('leave must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** * Publishes messages to all subscribed peers - * + * @override * @param {Array|string} topics * @param {Array|any} messages - * @param {Function|null} callback * @returns {void} */ - publish (topics, messages, callback) { + async publish (topics, messages) { assert(this.started, 'Pubsub has not started') + this.log('publish', topics, messages) + topics = utils.ensureArray(topics) messages = utils.ensureArray(messages) - callback = callback || (() => {}) - const from = this.libp2p.peerInfo.id.toB58String() + const from = this.peerInfo.id.toB58String() const buildMessage = (msg, cb) => { const seqno = utils.randomSeqno() @@ -424,15 +370,23 @@ class BasicPubSub extends Pubsub { // Emit to self if I'm interested and emitSelf enabled this._options.emitSelf && this._emitMessages(topics, [msgObj]) - this._buildMessage(msgObj, cb) + return this._buildMessage(msgObj) } + const msgObjects = await pMap(messages, buildMessage) - asyncMap(messages, buildMessage, (err, msgObjects) => { - if (err) callback(err) - this._publish(utils.normalizeOutRpcMessages(msgObjects)) + // send to all the other peers + this._publish(utils.normalizeOutRpcMessages(msgObjects)) + } - callback() - }) + /** + * Get the list of topics which the peer is subscribed to. + * @override + * @returns {Array} + */ + getTopics () { + assert(this.started, 'Pubsub is not started') + + return Array.from(this.subscriptions) } _emitMessages (topics, messages) { @@ -448,7 +402,7 @@ class BasicPubSub extends Pubsub { } _publish (rpcs) { - throw errcode('_publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('_publish must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** @@ -468,7 +422,7 @@ class BasicPubSub extends Pubsub { // Adds all peers using our protocol let peers = [] peersInTopic.forEach((peer) => { - if (peer.info.protocols.has(this.multicodec)) { + if (peer.info.protocols.has(GossipSubID)) { peers.push(peer) } }) diff --git a/test/2-nodes.js b/test/2-nodes.js deleted file mode 100644 index 7cc0c94d..00000000 --- a/test/2-nodes.js +++ /dev/null @@ -1,608 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -chai.use(require('chai-spies')) -const expect = chai.expect -const times = require('lodash/times') - -const { - createNode, - expectSet, - first, - dialNode, - startNode, - stopNode -} = require('./utils') - -const shouldNotHappen = (msg) => expect.fail() - -describe('1 node', () => { - describe('basics', () => { - let nodeA - - beforeEach(async () => { - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - }) - - afterEach(async function () { - if (nodeA.gs.started) { - await stopNode(nodeA.gs) - } - await stopNode(nodeA) - }) - - it('should mount the pubsub protocol', () => { - expect(nodeA.gs.peers.size).to.be.eql(0) - expect(nodeA.gs.mesh.size).to.eql(0) - expect(nodeA.gs.fanout.size).to.eql(0) - expect(nodeA.gs.lastpub.size).to.eql(0) - expect(nodeA.gs.gossip.size).to.eql(0) - expect(nodeA.gs.control.size).to.eql(0) - expect(nodeA.gs.subscriptions.size).to.eql(0) - expect(nodeA._switch.protocols[nodeA.gs.multicodec]).to.not.be.null() - }) - - it('should start a gossipsub successfully', async () => { - await startNode(nodeA.gs) - expect(nodeA.gs.started).to.equal(true) - }) - }) -}) - -describe('2 nodes', () => { - describe('basics', () => { - let nodeA - let nodeB - - beforeEach(async () => { - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs) - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('Dial from nodeA to nodeB', async () => { - await dialNode(nodeA, nodeB.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - expect(nodeA.gs.peers.size).to.equal(1) - expect(nodeB.gs.peers.size).to.equal(1) - }) - }) - - describe('subscription functionality', () => { - let nodeA - let nodeB - - beforeEach(async function () { - this.timeout(4000) - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs) - ]) - await dialNode(nodeA, nodeB.peerInfo) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('Subscribe to a topic', async () => { - const topic = 'Z' - nodeA.gs.subscribe(topic) - nodeB.gs.subscribe(topic) - - // await subscription change - const [changedPeerInfo, changedTopics, changedSubs] = await new Promise((resolve) => { - nodeA.gs.once('pubsub:subscription-change', (...args) => resolve(args)) - }) - - expectSet(nodeA.gs.subscriptions, [topic]) - expectSet(nodeB.gs.subscriptions, [topic]) - expect(nodeA.gs.peers.size).to.equal(1) - expect(nodeB.gs.peers.size).to.equal(1) - expectSet(first(nodeA.gs.peers).topics, [topic]) - expectSet(first(nodeB.gs.peers).topics, [topic]) - - expect(changedPeerInfo.id.toB58String()).to.equal(first(nodeA.gs.peers).info.id.toB58String()) - expectSet(changedTopics, [topic]) - expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) - - // await heartbeats - await Promise.all([ - new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)), - new Promise((resolve) => nodeB.gs.once('gossipsub:heartbeat', resolve)) - ]) - - expect(first(nodeA.gs.mesh.get(topic)).info.id.toB58String()).to.equal(first(nodeA.gs.peers).info.id.toB58String()) - expect(first(nodeB.gs.mesh.get(topic)).info.id.toB58String()).to.equal(first(nodeB.gs.peers).info.id.toB58String()) - }) - }) - - describe('publish functionality', () => { - let nodeA - let nodeB - const topic = 'Z' - - beforeEach(async function () { - this.timeout(4000) - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs) - ]) - await dialNode(nodeA, nodeB.peerInfo) - - nodeA.gs.subscribe(topic) - nodeB.gs.subscribe(topic) - - // await subscription change and heartbeat - await new Promise((resolve) => nodeA.gs.once('pubsub:subscription-change', resolve)) - await Promise.all([ - new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)), - new Promise((resolve) => nodeB.gs.once('gossipsub:heartbeat', resolve)) - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('Publish to a topic - nodeA', async () => { - const promise = new Promise((resolve) => nodeB.gs.once(topic, resolve)) - nodeA.gs.once(topic, (m) => shouldNotHappen) - - nodeA.gs.publish(topic, Buffer.from('hey')) - - const msg = await promise - - expect(msg.data.toString()).to.equal('hey') - expect(msg.from).to.be.eql(nodeA.gs.libp2p.peerInfo.id.toB58String()) - - nodeA.gs.removeListener(topic, shouldNotHappen) - }) - - it('Publish to a topic - nodeB', async () => { - const promise = new Promise((resolve) => nodeA.gs.once(topic, resolve)) - nodeB.gs.once(topic, shouldNotHappen) - - nodeB.gs.publish(topic, Buffer.from('banana')) - - const msg = await promise - - expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(nodeB.gs.libp2p.peerInfo.id.toB58String()) - - nodeB.gs.removeListener(topic, shouldNotHappen) - }) - - it('Publish 10 msg to a topic', (done) => { - let counter = 0 - - nodeB.gs.once(topic, shouldNotHappen) - - nodeA.gs.on(topic, receivedMsg) - - function receivedMsg (msg) { - expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(nodeB.gs.libp2p.peerInfo.id.toB58String()) - expect(Buffer.isBuffer(msg.seqno)).to.be.true() - expect(msg.topicIDs).to.be.eql([topic]) - - if (++counter === 10) { - nodeA.gs.removeListener(topic, receivedMsg) - nodeB.gs.removeListener(topic, shouldNotHappen) - done() - } - } - - times(10, () => nodeB.gs.publish(topic, Buffer.from('banana'))) - }) - - it('Publish 10 msg to a topic as array', (done) => { - let counter = 0 - - nodeB.gs.once(topic, shouldNotHappen) - - nodeA.gs.on(topic, receivedMsg) - - function receivedMsg (msg) { - expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(nodeB.gs.libp2p.peerInfo.id.toB58String()) - expect(Buffer.isBuffer(msg.seqno)).to.be.true() - expect(msg.topicIDs).to.be.eql([topic]) - - if (++counter === 10) { - nodeA.gs.removeListener(topic, receivedMsg) - nodeB.gs.removeListener(topic, shouldNotHappen) - done() - } - } - - const msgs = [] - times(10, () => msgs.push(Buffer.from('banana'))) - nodeB.gs.publish(topic, msgs) - }) - }) - - describe('publish after unsubscribe', () => { - let nodeA - let nodeB - const topic = 'Z' - - beforeEach(async function () { - this.timeout(4000) - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs) - ]) - await dialNode(nodeA, nodeB.peerInfo) - - nodeA.gs.subscribe(topic) - nodeB.gs.subscribe(topic) - - // await subscription change and heartbeat - await new Promise((resolve) => nodeA.gs.once('pubsub:subscription-change', resolve)) - await Promise.all([ - new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)), - new Promise((resolve) => nodeB.gs.once('gossipsub:heartbeat', resolve)) - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('Unsubscribe from a topic', async () => { - nodeA.gs.unsubscribe(topic) - expect(nodeA.gs.subscriptions.size).to.equal(0) - - const [changedPeerInfo, changedTopics, changedSubs] = await new Promise((resolve) => { - nodeB.gs.once('pubsub:subscription-change', (...args) => resolve(args)) - }) - await new Promise((resolve) => nodeB.gs.once('gossipsub:heartbeat', resolve)) - - expect(nodeB.gs.peers.size).to.equal(1) - expectSet(first(nodeB.gs.peers).topics, []) - expect(changedPeerInfo.id.toB58String()).to.equal(first(nodeB.gs.peers).info.id.toB58String()) - expectSet(changedTopics, []) - expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) - }) - - it('Publish to a topic after unsubscribe', async () => { - nodeA.gs.unsubscribe(topic) - await new Promise((resolve) => nodeB.gs.once('pubsub:subscription-change', resolve)) - await new Promise((resolve) => nodeB.gs.once('gossipsub:heartbeat', resolve)) - - const promise = new Promise((resolve, reject) => { - nodeA.gs.once(topic, reject) - setTimeout(() => { - nodeA.gs.removeListener(topic, reject) - resolve() - }, 100) - }) - - nodeB.gs.publish('Z', Buffer.from('banana')) - nodeA.gs.publish('Z', Buffer.from('banana')) - - try { - await promise - } catch (e) { - expect.fail('message should not be received') - } - }) - }) - - describe('nodes send state on connection', () => { - let nodeA - let nodeB - - before(async () => { - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs) - ]) - }) - - after(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('existing subscriptions are sent upon peer connection', async function () { - this.timeout(5000) - nodeA.gs.subscribe('Za') - nodeB.gs.subscribe('Zb') - - expect(nodeA.gs.peers.size).to.equal(0) - expectSet(nodeA.gs.subscriptions, ['Za']) - expect(nodeB.gs.peers.size).to.equal(0) - expectSet(nodeB.gs.subscriptions, ['Zb']) - - await dialNode(nodeA, nodeB.peerInfo) - - await Promise.all([ - new Promise((resolve) => nodeA.gs.once('pubsub:subscription-change', resolve)), - new Promise((resolve) => nodeB.gs.once('pubsub:subscription-change', resolve)) - ]) - expect(nodeA.gs.peers.size).to.equal(1) - expect(nodeB.gs.peers.size).to.equal(1) - - expectSet(nodeA.gs.subscriptions, ['Za']) - expect(nodeB.gs.peers.size).to.equal(1) - expectSet(first(nodeB.gs.peers).topics, ['Za']) - - expectSet(nodeB.gs.subscriptions, ['Zb']) - expect(nodeA.gs.peers.size).to.equal(1) - expectSet(first(nodeA.gs.peers).topics, ['Zb']) - }) - }) - - describe('nodes handle stopping', () => { - let nodeA - let nodeB - - before(async () => { - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs) - ]) - - await dialNode(nodeA, nodeB.peerInfo) - }) - - after(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('nodes don\'t have peers after stopped', async () => { - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs) - ]) - expect(nodeA.gs.peers.size).to.equal(0) - expect(nodeB.gs.peers.size).to.equal(0) - }) - }) - - describe('prevent concurrent dials', () => { - let sandbox - let nodeA - let nodeB - - before(async () => { - sandbox = chai.spy.sandbox() - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - await startNode(nodeB.gs) - }) - - after(async function () { - this.timeout(4000) - sandbox.restore() - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('does not dial twice to same peer', async () => { - sandbox.on(nodeA.gs, ['_onDial']) - - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - await startNode(nodeA.gs) - - // Simulate a connection coming in from peer B at the same time. This - // causes gossipsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - await new Promise((resolve) => setTimeout(resolve, 1000)) - // Check that only one dial was made - expect(nodeA.gs._onDial).to.have.been.called.once() - }) - }) - - describe('allow dials even after error', () => { - let sandbox - let nodeA - let nodeB - - before(async () => { - sandbox = chai.spy.sandbox() - - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - await startNode(nodeB.gs) - }) - - after(async function () { - this.timeout(4000) - sandbox.restore() - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('can dial again after error', (done) => { - let firstTime = true - const dialProtocol = nodeA.gs.libp2p.dialProtocol.bind(nodeA.gs.libp2p) - sandbox.on(nodeA.gs.libp2p, 'dialProtocol', (peerInfo, multicodec, cb) => { - // Return an error for the first dial - if (firstTime) { - firstTime = false - return cb(new Error('dial error')) - } - - // Subsequent dials proceed as normal - dialProtocol(peerInfo, multicodec, cb) - }) - - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - nodeA.gs.start(startComplete) - - function startComplete () { - // Simulate a connection coming in from peer B. This causes gossipsub - // to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - // Check that both dials were made - setTimeout(() => { - expect(nodeA.gs.libp2p.dialProtocol).to.have.been.called.twice() - done() - }, 1000) - } - }) - }) - - describe('prevent processing dial after stop', () => { - let sandbox - let nodeA - let nodeB - - before(async () => { - sandbox = chai.spy.sandbox() - - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs) - ]) - }) - - after(async function () { - this.timeout(4000) - sandbox.restore() - await stopNode(nodeB.gs) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB) - ]) - }) - - it('does not process dial after stop', (done) => { - sandbox.on(nodeA.gs, ['_onDial']) - - // Simulate a connection coming in from peer B at the same time. This - // causes gossipsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - // Stop gossipsub before the dial can complete - nodeA.gs.stop(() => { - // Check that the dial was not processed - setTimeout(() => { - expect(nodeA.gs._onDial).to.not.have.been.called() - done() - }, 1000) - }) - }) - }) -}) diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js new file mode 100644 index 00000000..94310eb8 --- /dev/null +++ b/test/2-nodes.spec.js @@ -0,0 +1,353 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-spies')) +const expect = chai.expect + +const { GossipSubID: multicodec } = require('../src/constants') + +const { + createGossipsub, + createGossipsubNodes, + createGossipsubConnectedNodes, + mockRegistrar, + expectSet, + ConnectionPair, + first +} = require('./utils') + +const shouldNotHappen = (msg) => expect.fail() + +describe('1 node', () => { + describe('basics', () => { + let gossipsub + + before(async () => { + gossipsub = await createGossipsub(mockRegistrar) + }) + + after(() => gossipsub.stop()) + + it('should mount the pubsub protocol', () => { + expect(gossipsub.peers.size).to.be.eql(0) + expect(gossipsub.mesh.size).to.eql(0) + expect(gossipsub.fanout.size).to.eql(0) + expect(gossipsub.lastpub.size).to.eql(0) + expect(gossipsub.gossip.size).to.eql(0) + expect(gossipsub.control.size).to.eql(0) + expect(gossipsub.subscriptions.size).to.eql(0) + }) + + it('should start a gossipsub successfully', async () => { + await gossipsub.start() + expect(gossipsub.started).to.equal(true) + }) + }) +}) + +describe('2 nodes', () => { + describe('basics', () => { + let nodes, registrarRecords + + // Create pubsub nodes + before(async () => { + ({ + nodes, + registrarRecords + } = await createGossipsubNodes(2, true)) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('Dial from nodeA to nodeB happened with pubsub', () => { + const onConnect0 = registrarRecords[0][multicodec].onConnect + const onConnect1 = registrarRecords[1][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnect0(nodes[1].peerInfo, d0) + onConnect1(nodes[0].peerInfo, d1) + + expect(nodes[0].peers.size).to.be.eql(1) + expect(nodes[1].peers.size).to.be.eql(1) + }) + }) + + describe('subscription functionality', () => { + let nodes + + // Create pubsub nodes + before(async () => { + nodes = await createGossipsubConnectedNodes(2, multicodec) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('Subscribe to a topic', async () => { + const topic = 'Z' + await new Promise((resolve) => setTimeout(resolve, 2000)) + nodes[0].subscribe(topic) + nodes[1].subscribe(topic) + + // await subscription change + const [changedPeerInfo, changedTopics, changedSubs] = await new Promise((resolve) => { + nodes[0].once('pubsub:subscription-change', (...args) => resolve(args)) + }) + + expectSet(nodes[0].subscriptions, [topic]) + expectSet(nodes[1].subscriptions, [topic]) + expect(nodes[0].peers.size).to.equal(1) + expect(nodes[1].peers.size).to.equal(1) + expectSet(first(nodes[0].peers).topics, [topic]) + expectSet(first(nodes[1].peers).topics, [topic]) + + expect(changedPeerInfo.id.toB58String()).to.equal(first(nodes[0].peers).info.id.toB58String()) + expectSet(changedTopics, [topic]) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) + + // await heartbeats + await Promise.all([ + new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)), + new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)) + ]) + + expect(first(nodes[0].mesh.get(topic)).info.id.toB58String()).to.equal(first(nodes[0].peers).info.id.toB58String()) + expect(first(nodes[1].mesh.get(topic)).info.id.toB58String()).to.equal(first(nodes[1].peers).info.id.toB58String()) + }) + }) + + describe('publish functionality', () => { + const topic = 'Z' + let nodes + + // Create pubsub nodes + beforeEach(async () => { + nodes = await createGossipsubConnectedNodes(2, multicodec) + }) + + // Create subscriptions + beforeEach(async () => { + nodes[0].subscribe(topic) + nodes[1].subscribe(topic) + + // await subscription change and heartbeat + await new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)) + await Promise.all([ + new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)), + new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)) + ]) + }) + + afterEach(() => Promise.all(nodes.map((n) => n.stop()))) + + it('Publish to a topic - nodeA', async () => { + const promise = new Promise((resolve) => nodes[1].once(topic, resolve)) + nodes[0].once(topic, (m) => shouldNotHappen) + + nodes[0].publish(topic, Buffer.from('hey')) + + const msg = await promise + + expect(msg.data.toString()).to.equal('hey') + expect(msg.from).to.be.eql(nodes[0].peerInfo.id.toB58String()) + + nodes[0].removeListener(topic, shouldNotHappen) + }) + + it('Publish to a topic - nodeB', async () => { + const promise = new Promise((resolve) => nodes[0].once(topic, resolve)) + nodes[1].once(topic, shouldNotHappen) + + nodes[1].publish(topic, Buffer.from('banana')) + + const msg = await promise + + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(nodes[1].peerInfo.id.toB58String()) + + nodes[1].removeListener(topic, shouldNotHappen) + }) + + it('Publish 10 msg to a topic', (done) => { + let counter = 0 + + nodes[1].once(topic, shouldNotHappen) + + nodes[0].on(topic, receivedMsg) + + function receivedMsg (msg) { + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(nodes[1].peerInfo.id.toB58String()) + expect(Buffer.isBuffer(msg.seqno)).to.be.true() + expect(msg.topicIDs).to.be.eql([topic]) + + if (++counter === 10) { + nodes[0].removeListener(topic, receivedMsg) + nodes[1].removeListener(topic, shouldNotHappen) + done() + } + } + + Array.from({ length: 10 }).forEach(() => { + nodes[1].publish(topic, Buffer.from('banana')) + }) + }) + + it('Publish 10 msg to a topic as array', (done) => { + let counter = 0 + + nodes[1].once(topic, shouldNotHappen) + + nodes[0].on(topic, receivedMsg) + + function receivedMsg (msg) { + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(nodes[1].peerInfo.id.toB58String()) + expect(Buffer.isBuffer(msg.seqno)).to.be.true() + expect(msg.topicIDs).to.be.eql([topic]) + + if (++counter === 10) { + nodes[0].removeListener(topic, receivedMsg) + nodes[1].removeListener(topic, shouldNotHappen) + done() + } + } + + const msgs = [] + Array.from({ length: 10 }).forEach(() => { + msgs.push(Buffer.from('banana')) + }) + nodes[1].publish(topic, msgs) + }) + }) + + describe('publish after unsubscribe', () => { + const topic = 'Z' + let nodes + + // Create pubsub nodes + beforeEach(async () => { + nodes = await createGossipsubConnectedNodes(2, multicodec) + }) + + // Create subscriptions + beforeEach(async () => { + nodes[0].subscribe(topic) + nodes[1].subscribe(topic) + + // await subscription change and heartbeat + await new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)) + await Promise.all([ + new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)), + new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)) + ]) + }) + + afterEach(() => Promise.all(nodes.map((n) => n.stop()))) + + it('Unsubscribe from a topic', async () => { + nodes[0].unsubscribe(topic) + expect(nodes[0].subscriptions.size).to.equal(0) + + const [changedPeerInfo, changedTopics, changedSubs] = await new Promise((resolve) => { + nodes[1].once('pubsub:subscription-change', (...args) => resolve(args)) + }) + await new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)) + + expect(nodes[1].peers.size).to.equal(1) + expectSet(first(nodes[1].peers).topics, []) + expect(changedPeerInfo.id.toB58String()).to.equal(first(nodes[1].peers).info.id.toB58String()) + expectSet(changedTopics, []) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) + }) + + it('Publish to a topic after unsubscribe', async () => { + nodes[0].unsubscribe(topic) + await new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve)) + await new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)) + + const promise = new Promise((resolve, reject) => { + nodes[0].once(topic, reject) + setTimeout(() => { + nodes[0].removeListener(topic, reject) + resolve() + }, 100) + }) + + nodes[1].publish('Z', Buffer.from('banana')) + nodes[0].publish('Z', Buffer.from('banana')) + + try { + await promise + } catch (e) { + expect.fail('message should not be received') + } + }) + }) + + describe('nodes send state on connection', () => { + let nodes, registrarRecords + + // Create pubsub nodes + before(async () => { + ({ + nodes, + registrarRecords + } = await createGossipsubNodes(2, true)) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('existing subscriptions are sent upon peer connection', async function () { + this.timeout(5000) + nodes[0].subscribe('Za') + nodes[1].subscribe('Zb') + + expect(nodes[0].peers.size).to.equal(0) + expectSet(nodes[0].subscriptions, ['Za']) + expect(nodes[1].peers.size).to.equal(0) + expectSet(nodes[1].subscriptions, ['Zb']) + + // Connect nodes + const onConnect0 = registrarRecords[0][multicodec].onConnect + const onConnect1 = registrarRecords[1][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnect0(nodes[1].peerInfo, d0) + onConnect1(nodes[0].peerInfo, d1) + + await Promise.all([ + new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)), + new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve)) + ]) + expect(nodes[0].peers.size).to.equal(1) + expect(nodes[1].peers.size).to.equal(1) + + expectSet(nodes[0].subscriptions, ['Za']) + expect(nodes[1].peers.size).to.equal(1) + expectSet(first(nodes[1].peers).topics, ['Za']) + + expectSet(nodes[1].subscriptions, ['Zb']) + expect(nodes[0].peers.size).to.equal(1) + expectSet(first(nodes[0].peers).topics, ['Zb']) + }) + }) + + describe('nodes handle stopping', () => { + let nodes + + // Create pubsub nodes + before(async () => { + nodes = await createGossipsubConnectedNodes(2, multicodec) + }) + + it('nodes don\'t have peers after stopped', async () => { + await Promise.all(nodes.map((n) => n.stop())) + expect(nodes[0].peers.size).to.equal(0) + expect(nodes[1].peers.size).to.equal(0) + }) + }) +}) diff --git a/test/emit-self.js b/test/emit-self.js deleted file mode 100644 index bb307570..00000000 --- a/test/emit-self.js +++ /dev/null @@ -1,76 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -chai.use(require('chai-spies')) -const expect = chai.expect - -const { - createNode, - startNode, - stopNode -} = require('./utils') - -const shouldNotHappen = (_) => expect.fail() - -// Emit to the node itself -describe('emit self', () => { - const topic = 'Z' - - describe('enabled', () => { - let nodeA - - before(async () => { - nodeA = await createNode('/ip4/127.0.0.1/tcp/0', { emitSelf: true }) - await startNode(nodeA) - await startNode(nodeA.gs) - - nodeA.gs.subscribe(topic) - }) - - after(async function () { - if (nodeA.gs.started) { - await stopNode(nodeA.gs) - } - await stopNode(nodeA) - }) - - it('should emit to self on publish', async () => { - const promise = new Promise((resolve) => nodeA.gs.once(topic, resolve)) - - nodeA.gs.publish(topic, Buffer.from('hey')) - - await promise - }) - }) - - describe('disabled', () => { - let nodeA - - before(async () => { - nodeA = await createNode('/ip4/127.0.0.1/tcp/0', { emitSelf: false }) - await startNode(nodeA) - await startNode(nodeA.gs) - - nodeA.gs.subscribe(topic) - }) - - after(async function () { - if (nodeA.gs.started) { - await stopNode(nodeA.gs) - } - await stopNode(nodeA) - }) - - it('should emit to self on publish', async () => { - nodeA.gs.once(topic, (m) => shouldNotHappen) - - nodeA.gs.publish(topic, Buffer.from('hey')) - - // Wait 1 second to guarantee that self is not noticed - await new Promise((resolve) => setTimeout(() => resolve(), 1000)) - }) - }) -}) diff --git a/test/emit-self.spec.js b/test/emit-self.spec.js new file mode 100644 index 00000000..6e2df10c --- /dev/null +++ b/test/emit-self.spec.js @@ -0,0 +1,55 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-spies')) +const expect = chai.expect + +const { + createGossipsub, + mockRegistrar +} = require('./utils') + +const shouldNotHappen = (_) => expect.fail() + +// Emit to the node itself +describe('emit self', () => { + let gossipsub + const topic = 'Z' + + describe('enabled', () => { + before(async () => { + gossipsub = await createGossipsub(mockRegistrar, true, { emitSelf: true }) + gossipsub.subscribe(topic) + }) + + after(() => gossipsub.stop()) + + it('should emit to self on publish', async () => { + const promise = new Promise((resolve) => gossipsub.once(topic, resolve)) + + gossipsub.publish(topic, Buffer.from('hey')) + + await promise + }) + }) + + describe('disabled', () => { + before(async () => { + gossipsub = await createGossipsub(mockRegistrar, true, { emitSelf: false }) + gossipsub.subscribe(topic) + }) + + after(() => gossipsub.stop()) + + it('should emit to self on publish', async () => { + gossipsub.once(topic, (m) => shouldNotHappen) + + gossipsub.publish(topic, Buffer.from('hey')) + + // Wait 1 second to guarantee that self is not noticed + await new Promise((resolve) => setTimeout(() => resolve(), 1000)) + }) + }) +}) diff --git a/test/floodsub.js b/test/floodsub.js deleted file mode 100644 index ac417c72..00000000 --- a/test/floodsub.js +++ /dev/null @@ -1,342 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) - -const expect = chai.expect -const times = require('lodash/times') - -const { - createNode, - createFloodsubNode, - expectSet, - first, - dialNode, - startNode, - stopNode -} = require('./utils') - -const shouldNotHappen = () => expect.fail() - -describe('gossipsub fallbacks to floodsub', () => { - describe('basics', () => { - let nodeGs - let nodeFs - - beforeEach(async () => { - nodeGs = await createNode('/ip4/127.0.0.1/tcp/0') - nodeFs = await createFloodsubNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeGs) - await startNode(nodeFs) - - await Promise.all([ - startNode(nodeGs.gs), - startNode(nodeFs.fs) - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeGs.gs), - stopNode(nodeFs.fs) - ]) - await Promise.all([ - stopNode(nodeGs), - stopNode(nodeFs) - ]) - }) - - it('Dial from nodeGs to nodeFs', async () => { - await dialNode(nodeGs, nodeFs.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - expect(nodeGs.gs.peers.size).to.equal(1) - expect(nodeFs.fs.peers.size).to.equal(1) - }) - }) - - describe('should not be added if fallback disabled', () => { - let nodeGs - let nodeFs - - beforeEach(async () => { - nodeGs = await createNode('/ip4/127.0.0.1/tcp/0', { fallbackToFloodsub: false }) - nodeFs = await createFloodsubNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeGs) - await startNode(nodeFs) - - await Promise.all([ - startNode(nodeGs.gs), - startNode(nodeFs.fs) - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeGs.gs), - stopNode(nodeFs.fs) - ]) - await Promise.all([ - stopNode(nodeGs), - stopNode(nodeFs) - ]) - }) - - it('Dial from nodeGs to nodeFs', async () => { - await dialNode(nodeGs, nodeFs.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - // Peers not added to the pubsub set - expect(nodeGs.gs.peers.size).to.equal(0) - expect(nodeFs.fs.peers.size).to.equal(0) - }) - }) - - describe('subscription functionality', () => { - let nodeGs - let nodeFs - - beforeEach(async () => { - nodeGs = await createNode('/ip4/127.0.0.1/tcp/0') - nodeFs = await createFloodsubNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeGs) - await startNode(nodeFs) - - await Promise.all([ - startNode(nodeGs.gs), - startNode(nodeFs.fs) - ]) - - await dialNode(nodeGs, nodeFs.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeGs.gs), - stopNode(nodeFs.fs) - ]) - await Promise.all([ - stopNode(nodeGs), - stopNode(nodeFs) - ]) - }) - - it('Subscribe to a topic', async function () { - this.timeout(10000) - const topic = 'Z' - nodeGs.gs.subscribe(topic) - nodeFs.fs.subscribe(topic) - - // await subscription change - const [changedPeerInfo, changedTopics, changedSubs] = await new Promise((resolve) => { - nodeGs.gs.once('pubsub:subscription-change', (...args) => resolve(args)) - }) - await new Promise((resolve) => setTimeout(resolve, 1000)) - - expectSet(nodeGs.gs.subscriptions, [topic]) - expectSet(nodeFs.fs.subscriptions, [topic]) - expect(nodeGs.gs.peers.size).to.equal(1) - expect(nodeFs.fs.peers.size).to.equal(1) - expectSet(first(nodeGs.gs.peers).topics, [topic]) - expectSet(first(nodeFs.fs.peers).topics, [topic]) - - expect(changedPeerInfo.id.toB58String()).to.equal(first(nodeGs.gs.peers).info.id.toB58String()) - expectSet(changedTopics, [topic]) - expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) - }) - }) - - describe('publish functionality', () => { - let nodeGs - let nodeFs - const topic = 'Z' - - beforeEach(async function () { - this.timeout(4000) - nodeGs = await createNode('/ip4/127.0.0.1/tcp/0') - nodeFs = await createFloodsubNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeGs) - await startNode(nodeFs) - - await Promise.all([ - startNode(nodeGs.gs), - startNode(nodeFs.fs) - ]) - await dialNode(nodeGs, nodeFs.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - - nodeGs.gs.subscribe(topic) - nodeFs.fs.subscribe(topic) - - // await subscription change - await new Promise((resolve) => nodeGs.gs.once('pubsub:subscription-change', resolve)) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeGs.gs), - stopNode(nodeFs.fs) - ]) - await Promise.all([ - stopNode(nodeGs), - stopNode(nodeFs) - ]) - }) - - it('Publish to a topic - nodeGs', async () => { - const promise = new Promise((resolve) => nodeFs.fs.once(topic, resolve)) - nodeGs.gs.once(topic, (m) => shouldNotHappen) - - nodeGs.gs.publish(topic, Buffer.from('hey')) - - const msg = await promise - - expect(msg.data.toString()).to.equal('hey') - expect(msg.from).to.be.eql(nodeGs.gs.libp2p.peerInfo.id.toB58String()) - - nodeGs.gs.removeListener(topic, shouldNotHappen) - }) - - it('Publish to a topic - nodeFs', async () => { - const promise = new Promise((resolve) => nodeGs.gs.once(topic, resolve)) - - nodeFs.fs.publish(topic, Buffer.from('banana')) - - const msg = await promise - - expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(nodeFs.fs.libp2p.peerInfo.id.toB58String()) - - nodeFs.fs.removeListener(topic, shouldNotHappen) - }) - - it('Publish 10 msg to a topic', (done) => { - let counter = 0 - - nodeGs.gs.once(topic, shouldNotHappen) - - nodeFs.fs.on(topic, receivedMsg) - - function receivedMsg (msg) { - expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(nodeGs.gs.libp2p.peerInfo.id.toB58String()) - expect(Buffer.isBuffer(msg.seqno)).to.be.true() - expect(msg.topicIDs).to.be.eql([topic]) - - if (++counter === 10) { - nodeFs.fs.removeListener(topic, receivedMsg) - nodeGs.gs.removeListener(topic, shouldNotHappen) - done() - } - } - - times(10, () => nodeGs.gs.publish(topic, Buffer.from('banana'))) - }) - - it('Publish 10 msg to a topic as array', (done) => { - let counter = 0 - - nodeGs.gs.once(topic, shouldNotHappen) - - nodeFs.fs.on(topic, receivedMsg) - - function receivedMsg (msg) { - expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(nodeGs.gs.libp2p.peerInfo.id.toB58String()) - expect(Buffer.isBuffer(msg.seqno)).to.be.true() - expect(msg.topicIDs).to.be.eql([topic]) - - if (++counter === 10) { - nodeFs.fs.removeListener(topic, receivedMsg) - nodeGs.gs.removeListener(topic, shouldNotHappen) - done() - } - } - - const msgs = [] - times(10, () => msgs.push(Buffer.from('banana'))) - nodeGs.gs.publish(topic, msgs) - }) - }) - - describe('publish after unsubscribe', () => { - let nodeGs - let nodeFs - const topic = 'Z' - - beforeEach(async function () { - this.timeout(4000) - nodeGs = await createNode('/ip4/127.0.0.1/tcp/0') - nodeFs = await createFloodsubNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeGs) - await startNode(nodeFs) - - await Promise.all([ - startNode(nodeGs.gs), - startNode(nodeFs.fs) - ]) - await dialNode(nodeGs, nodeFs.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - - nodeGs.gs.subscribe(topic) - nodeFs.fs.subscribe(topic) - - // await subscription change - await new Promise((resolve) => nodeGs.gs.once('pubsub:subscription-change', resolve)) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeGs.gs), - stopNode(nodeFs.fs) - ]) - await Promise.all([ - stopNode(nodeGs), - stopNode(nodeFs) - ]) - }) - - it('Unsubscribe from a topic', async () => { - nodeGs.gs.unsubscribe(topic) - expect(nodeGs.gs.subscriptions.size).to.equal(0) - - const [changedPeerInfo, changedTopics, changedSubs] = await new Promise((resolve) => { - nodeFs.fs.once('floodsub:subscription-change', (...args) => resolve(args)) - }) - - expect(nodeFs.fs.peers.size).to.equal(1) - expectSet(first(nodeFs.fs.peers).topics, []) - expect(changedPeerInfo.id.toB58String()).to.equal(first(nodeFs.fs.peers).info.id.toB58String()) - expectSet(changedTopics, []) - expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) - }) - - it('Publish to a topic after unsubscribe', async () => { - nodeGs.gs.unsubscribe(topic) - await new Promise((resolve) => nodeFs.fs.once('floodsub:subscription-change', resolve)) - - const promise = new Promise((resolve, reject) => { - nodeGs.gs.once(topic, reject) - setTimeout(() => { - nodeGs.gs.removeListener(topic, reject) - resolve() - }, 100) - }) - - nodeFs.fs.publish('Z', Buffer.from('banana')) - nodeGs.gs.publish('Z', Buffer.from('banana')) - - try { - await promise - } catch (e) { - expect.fail('message should not be received') - } - }) - }) -}) diff --git a/test/floodsub.spec.js b/test/floodsub.spec.js new file mode 100644 index 00000000..8cb52b55 --- /dev/null +++ b/test/floodsub.spec.js @@ -0,0 +1,340 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) + +const expect = chai.expect +const times = require('lodash/times') + +const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') + +const { + createGossipsub, + createFloodsubNode, + expectSet, + createMockRegistrar, + ConnectionPair, + first +} = require('./utils') + +const shouldNotHappen = () => expect.fail() + +describe('gossipsub fallbacks to floodsub', () => { + let registrarRecords = Array.from({ length: 2 }) + + describe('basics', () => { + let nodeGs + let nodeFs + + beforeEach(async () => { + registrarRecords[0] = {} + registrarRecords[1] = {} + + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true) + nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) + }) + + afterEach(async function () { + this.timeout(4000) + await Promise.all([ + nodeGs.stop(), + nodeFs.stop() + ]) + }) + + it('Dial event happened from nodeGs to nodeFs', () => { + const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect + const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + + expect(onConnectGs).to.exist() + expect(onConnectFs).to.exist() + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnectGs(nodeFs.peerInfo, d0) + onConnectFs(nodeGs.peerInfo, d1) + + expect(nodeGs.peers.size).to.equal(1) + expect(nodeFs.peers.size).to.equal(1) + }) + }) + + describe('should not be added if fallback disabled', () => { + let nodeGs + let nodeFs + + before(async () => { + registrarRecords[0] = {} + registrarRecords[1] = {} + + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true, { fallbackToFloodsub: false }) + nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) + }) + + after(async function () { + this.timeout(4000) + await Promise.all([ + nodeGs.stop(), + nodeFs.stop() + ]) + }) + + it('Dial event happened from nodeGs to nodeFs, but NodeGs does not support floodsub', () => { + let onConnectGs + let onConnectFs + + try { + onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect + } catch (err) { + expect(err).to.exist() + expect(onConnectFs).to.exist() + expect(onConnectGs).to.not.exist() + + expect(nodeGs.peers.size).to.equal(0) + expect(nodeFs.peers.size).to.equal(0) + return + } + throw new Error('should not have floodsub handler') + }) + }) + + describe('subscription functionality', () => { + let nodeGs + let nodeFs + + before(async () => { + registrarRecords[0] = {} + registrarRecords[1] = {} + + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true) + nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) + + const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect + const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnectGs(nodeFs.peerInfo, d0) + onConnectFs(nodeGs.peerInfo, d1) + }) + + after(async function () { + this.timeout(4000) + await Promise.all([ + nodeGs.stop(), + nodeFs.stop() + ]) + }) + + it('Subscribe to a topic', async function () { + this.timeout(10000) + const topic = 'Z' + nodeGs.subscribe(topic) + nodeFs.subscribe(topic) + + // await subscription change + const [changedPeerInfo, changedTopics, changedSubs] = await new Promise((resolve) => { + nodeGs.once('pubsub:subscription-change', (...args) => resolve(args)) + }) + await new Promise((resolve) => setTimeout(resolve, 1000)) + + expectSet(nodeGs.subscriptions, [topic]) + expectSet(nodeFs.subscriptions, [topic]) + expect(nodeGs.peers.size).to.equal(1) + expect(nodeFs.peers.size).to.equal(1) + expectSet(first(nodeGs.peers).topics, [topic]) + expectSet(first(nodeFs.peers).topics, [topic]) + + expect(changedPeerInfo.id.toB58String()).to.equal(first(nodeGs.peers).info.id.toB58String()) + expectSet(changedTopics, [topic]) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) + }) + }) + + describe('publish functionality', () => { + let nodeGs + let nodeFs + const topic = 'Z' + + beforeEach(async () => { + registrarRecords = Array.from({ length: 2 }) + registrarRecords[0] = {} + registrarRecords[1] = {} + + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true) + nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) + + const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect + const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnectGs(nodeFs.peerInfo, d0) + onConnectFs(nodeGs.peerInfo, d1) + + nodeGs.subscribe(topic) + nodeFs.subscribe(topic) + + // await subscription change + await new Promise((resolve) => nodeGs.once('pubsub:subscription-change', resolve)) + }) + + afterEach(async function () { + this.timeout(4000) + await Promise.all([ + nodeGs.stop(), + nodeFs.stop() + ]) + }) + + it('Publish to a topic - nodeGs', async () => { + const promise = new Promise((resolve) => nodeFs.once(topic, resolve)) + nodeGs.once(topic, (m) => shouldNotHappen) + + nodeGs.publish(topic, Buffer.from('hey')) + + const msg = await promise + + expect(msg.data.toString()).to.equal('hey') + expect(msg.from).to.be.eql(nodeGs.peerInfo.id.toB58String()) + + nodeGs.removeListener(topic, shouldNotHappen) + }) + + it('Publish to a topic - nodeFs', async () => { + const promise = new Promise((resolve) => nodeGs.once(topic, resolve)) + + nodeFs.publish(topic, Buffer.from('banana')) + + const msg = await promise + + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(nodeFs.peerInfo.id.toB58String()) + + nodeFs.removeListener(topic, shouldNotHappen) + }) + + it('Publish 10 msg to a topic', (done) => { + let counter = 0 + + nodeGs.once(topic, shouldNotHappen) + + nodeFs.on(topic, receivedMsg) + + function receivedMsg (msg) { + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(nodeGs.peerInfo.id.toB58String()) + expect(Buffer.isBuffer(msg.seqno)).to.be.true() + expect(msg.topicIDs).to.be.eql([topic]) + + if (++counter === 10) { + nodeFs.removeListener(topic, receivedMsg) + nodeGs.removeListener(topic, shouldNotHappen) + done() + } + } + + times(10, () => nodeGs.publish(topic, Buffer.from('banana'))) + }) + + it('Publish 10 msg to a topic as array', (done) => { + let counter = 0 + + nodeGs.once(topic, shouldNotHappen) + + nodeFs.on(topic, receivedMsg) + + function receivedMsg (msg) { + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(nodeGs.peerInfo.id.toB58String()) + expect(Buffer.isBuffer(msg.seqno)).to.be.true() + expect(msg.topicIDs).to.be.eql([topic]) + + if (++counter === 10) { + nodeFs.removeListener(topic, receivedMsg) + nodeGs.removeListener(topic, shouldNotHappen) + done() + } + } + + const msgs = [] + times(10, () => msgs.push(Buffer.from('banana'))) + nodeGs.publish(topic, msgs) + }) + }) + + describe('publish after unsubscribe', () => { + let nodeGs + let nodeFs + const topic = 'Z' + + beforeEach(async () => { + registrarRecords[0] = {} + registrarRecords[1] = {} + + nodeGs = await createGossipsub(createMockRegistrar(registrarRecords[0]), true) + nodeFs = await createFloodsubNode(createMockRegistrar(registrarRecords[1]), true) + + const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect + const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + await onConnectGs(nodeFs.peerInfo, d0) + await onConnectFs(nodeGs.peerInfo, d1) + + nodeGs.subscribe(topic) + nodeFs.subscribe(topic) + + // await subscription change + await new Promise((resolve) => nodeGs.once('pubsub:subscription-change', resolve)) + }) + + afterEach(async function () { + this.timeout(4000) + await Promise.all([ + nodeGs.stop(), + nodeFs.stop() + ]) + }) + + it('Unsubscribe from a topic', async () => { + nodeGs.unsubscribe(topic) + expect(nodeGs.subscriptions.size).to.equal(0) + + const [changedPeerInfo, changedTopics, changedSubs] = await new Promise((resolve) => { + nodeFs.once('floodsub:subscription-change', (...args) => resolve(args)) + }) + + expect(nodeFs.peers.size).to.equal(1) + expectSet(first(nodeFs.peers).topics, []) + expect(changedPeerInfo.id.toB58String()).to.equal(first(nodeFs.peers).info.id.toB58String()) + expectSet(changedTopics, []) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) + }) + + it('Publish to a topic after unsubscribe', async () => { + nodeGs.unsubscribe(topic) + await new Promise((resolve) => nodeFs.once('floodsub:subscription-change', resolve)) + + const promise = new Promise((resolve, reject) => { + nodeGs.once(topic, reject) + setTimeout(() => { + nodeGs.removeListener(topic, reject) + resolve() + }, 100) + }) + + nodeFs.publish('Z', Buffer.from('banana')) + nodeGs.publish('Z', Buffer.from('banana')) + + try { + await promise + } catch (e) { + expect.fail('message should not be received') + } + }) + }) +}) diff --git a/test/gossip-incoming.js b/test/gossip-incoming.js deleted file mode 100644 index 44772c32..00000000 --- a/test/gossip-incoming.js +++ /dev/null @@ -1,140 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -chai.use(require('chai-spies')) -const expect = chai.expect - -const { - createNode, - dialNode, - startNode, - stopNode -} = require('./utils') - -const shouldNotHappen = (msg) => expect.fail() - -describe('gossip incoming', () => { - const topic = 'Z' - let nodeA - let nodeB - let nodeC - - describe('gossipIncoming == true', () => { - beforeEach(async function () { - this.timeout(4000) - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - nodeB = await createNode('/ip4/127.0.0.1/tcp/0') - nodeC = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeB) - await startNode(nodeC) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs), - startNode(nodeC.gs) - ]) - await dialNode(nodeA, nodeB.peerInfo) - await dialNode(nodeB, nodeC.peerInfo) - - nodeA.gs.subscribe(topic) - nodeB.gs.subscribe(topic) - nodeC.gs.subscribe(topic) - - // await subscription change and heartbeat - await new Promise((resolve) => nodeA.gs.once('pubsub:subscription-change', resolve)) - await Promise.all([ - new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)), - new Promise((resolve) => nodeB.gs.once('gossipsub:heartbeat', resolve)), - new Promise((resolve) => nodeC.gs.once('gossipsub:heartbeat', resolve)) - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs), - stopNode(nodeC.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB), - stopNode(nodeC) - ]) - }) - - it('should gossip incoming messages', async () => { - const promise = new Promise((resolve) => nodeC.gs.once(topic, resolve)) - nodeA.gs.once(topic, (m) => shouldNotHappen) - - nodeA.gs.publish(topic, Buffer.from('hey')) - - const msg = await promise - - expect(msg.data.toString()).to.equal('hey') - expect(msg.from).to.be.eql(nodeA.gs.libp2p.peerInfo.id.toB58String()) - - nodeA.gs.removeListener(topic, shouldNotHappen) - }) - }) - - describe('gossipIncoming == false', () => { - beforeEach(async function () { - this.timeout(4000) - nodeA = await createNode('/ip4/127.0.0.1/tcp/0', { gossipIncoming: false }) - nodeB = await createNode('/ip4/127.0.0.1/tcp/0', { gossipIncoming: false }) - nodeC = await createNode('/ip4/127.0.0.1/tcp/0', { gossipIncoming: false }) - await startNode(nodeA) - await startNode(nodeB) - await startNode(nodeC) - - await Promise.all([ - startNode(nodeA.gs), - startNode(nodeB.gs), - startNode(nodeC.gs) - ]) - await dialNode(nodeA, nodeB.peerInfo) - await dialNode(nodeB, nodeC.peerInfo) - - nodeA.gs.subscribe(topic) - nodeB.gs.subscribe(topic) - nodeC.gs.subscribe(topic) - - // await subscription change and heartbeat - await new Promise((resolve) => nodeA.gs.once('pubsub:subscription-change', resolve)) - await Promise.all([ - new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)), - new Promise((resolve) => nodeB.gs.once('gossipsub:heartbeat', resolve)), - new Promise((resolve) => nodeC.gs.once('gossipsub:heartbeat', resolve)) - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(nodeA.gs), - stopNode(nodeB.gs), - stopNode(nodeC.gs) - ]) - await Promise.all([ - stopNode(nodeA), - stopNode(nodeB), - stopNode(nodeC) - ]) - }) - - it('should not gossip incoming messages', async () => { - nodeC.gs.once(topic, (m) => shouldNotHappen) - - nodeA.gs.publish(topic, Buffer.from('hey')) - - await new Promise((resolve) => setTimeout(resolve, 1000)) - - nodeC.gs.removeListener(topic, shouldNotHappen) - }) - }) -}) diff --git a/test/gossip-incoming.spec.js b/test/gossip-incoming.spec.js new file mode 100644 index 00000000..f401c38f --- /dev/null +++ b/test/gossip-incoming.spec.js @@ -0,0 +1,89 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-spies')) +const expect = chai.expect + +const { GossipSubID: multicodec } = require('../src/constants') +const { createGossipsubConnectedNodes } = require('./utils') + +const shouldNotHappen = (msg) => expect.fail() + +describe('gossip incoming', () => { + const topic = 'Z' + let nodes + + describe('gossipIncoming == true', () => { + // Create pubsub nodes + before(async () => { + nodes = await createGossipsubConnectedNodes(3, multicodec) + }) + + // Create subscriptions + before(async () => { + nodes[0].subscribe(topic) + nodes[1].subscribe(topic) + nodes[2].subscribe(topic) + + // await subscription change and heartbeat + await new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)) + await Promise.all([ + new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)), + new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)), + new Promise((resolve) => nodes[2].once('gossipsub:heartbeat', resolve)) + ]) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('should gossip incoming messages', async () => { + const promise = new Promise((resolve) => nodes[2].once(topic, resolve)) + nodes[0].once(topic, (m) => shouldNotHappen) + + nodes[0].publish(topic, Buffer.from('hey')) + + const msg = await promise + + expect(msg.data.toString()).to.equal('hey') + expect(msg.from).to.be.eql(nodes[0].peerInfo.id.toB58String()) + + nodes[0].removeListener(topic, shouldNotHappen) + }) + }) + + describe('gossipIncoming == false', () => { + // Create pubsub nodes + before(async () => { + nodes = await createGossipsubConnectedNodes(3, multicodec, { gossipIncoming: false }) + }) + + // Create subscriptions + before(async () => { + nodes[0].subscribe(topic) + nodes[1].subscribe(topic) + nodes[2].subscribe(topic) + + // await subscription change and heartbeat + await new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)) + await Promise.all([ + new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)), + new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)), + new Promise((resolve) => nodes[2].once('gossipsub:heartbeat', resolve)) + ]) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('should not gossip incoming messages', async () => { + nodes[2].once(topic, (m) => shouldNotHappen) + + nodes[0].publish(topic, Buffer.from('hey')) + + await new Promise((resolve) => setTimeout(resolve, 1000)) + + nodes[2].removeListener(topic, shouldNotHappen) + }) + }) +}) diff --git a/test/gossip.js b/test/gossip.js index 6f4bbd87..a032a8a9 100644 --- a/test/gossip.js +++ b/test/gossip.js @@ -3,101 +3,96 @@ const { expect } = require('chai') const sinon = require('sinon') -const promisify = require('promisify-es6') -const { GossipSubDhi } = require('../src/constants') +const { GossipSubID: multicodec, GossipSubDhi } = require('../src/constants') const { first, - createNode, - dialNode, - startNode, - stopNode + createGossipsubNodes, + connectGossipsubNodes } = require('./utils') describe('gossip', () => { - const nodes = Array.from({ length: GossipSubDhi + 2 }) // enough nodes to trigger high threshold + let nodes, registrarRecords + // Create pubsub nodes beforeEach(async () => { - for (let i = 0; i < nodes.length; i++) { - nodes[i] = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodes[i]) - await startNode(nodes[i].gs) - } - }) - afterEach(async function () { - this.timeout(10000) - await Promise.all(nodes.map((n) => stopNode(n.gs))) - await Promise.all(nodes.map((n) => stopNode(n))) + ({ + nodes, + registrarRecords + } = await createGossipsubNodes(GossipSubDhi + 2, true)) }) + afterEach(() => Promise.all(nodes.map((n) => n.stop()))) + it('should send gossip to non-mesh peers in topic', async function () { this.timeout(10000) const nodeA = nodes[0] const topic = 'Z' // add subscriptions to each node - nodes.forEach((n) => n.gs.subscribe(topic)) - // every node connected to every other - for (let i = 0; i < nodes.length - 1; i++) { - for (let j = i + 1; j < nodes.length; j++) { - await dialNode(nodes[i], nodes[j].peerInfo) - } - } - await new Promise((resolve) => setTimeout(resolve, 500)) + nodes.forEach((n) => n.subscribe(topic)) + + connectGossipsubNodes(nodes, registrarRecords, multicodec) + + await new Promise((resolve) => setTimeout(resolve, 1000)) + // await mesh rebalancing - await Promise.all(nodes.map((n) => new Promise((resolve) => n.gs.once('gossipsub:heartbeat', resolve)))) + await Promise.all(nodes.map((n) => new Promise((resolve) => n.once('gossipsub:heartbeat', resolve)))) await new Promise((resolve) => setTimeout(resolve, 500)) // set spy - sinon.spy(nodeA.gs, 'log') + sinon.spy(nodeA, 'log') - await promisify(nodeA.gs.publish, { context: nodeA.gs })(topic, Buffer.from('hey')) - await new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)) - expect(nodeA.gs.log.callCount).to.be.gt(1) - nodeA.gs.log.getCalls() + await nodeA.publish(topic, Buffer.from('hey')) + + await new Promise((resolve) => nodeA.once('gossipsub:heartbeat', resolve)) + + expect(nodeA.log.callCount).to.be.gt(1) + nodeA.log.getCalls() .filter((call) => call.args[0] === 'Add gossip to %s') .map((call) => call.args[1]) .forEach((peerId) => { - nodeA.gs.mesh.get(topic).forEach((meshPeer) => { + nodeA.mesh.get(topic).forEach((meshPeer) => { expect(meshPeer.info.id.toB58String()).to.not.equal(peerId) }) }) // unset spy - nodeA.gs.log.restore() + nodeA.log.restore() }) it('should send piggyback gossip into other sent messages', async function () { this.timeout(10000) const nodeA = nodes[0] const topic = 'Z' + // add subscriptions to each node - nodes.forEach((n) => n.gs.subscribe(topic)) + nodes.forEach((n) => n.subscribe(topic)) + // every node connected to every other - for (let i = 0; i < nodes.length - 1; i++) { - for (let j = i + 1; j < nodes.length; j++) { - await dialNode(nodes[i], nodes[j].peerInfo) - } - } + connectGossipsubNodes(nodes, registrarRecords, multicodec) await new Promise((resolve) => setTimeout(resolve, 500)) // await mesh rebalancing - await Promise.all(nodes.map((n) => new Promise((resolve) => n.gs.once('gossipsub:heartbeat', resolve)))) + await Promise.all(nodes.map((n) => new Promise((resolve) => n.once('gossipsub:heartbeat', resolve)))) await new Promise((resolve) => setTimeout(resolve, 500)) - const peerB = first(nodeA.gs.mesh.get(topic)) + const peerB = first(nodeA.mesh.get(topic)) const nodeB = nodes.find((n) => n.peerInfo.id.toB58String() === peerB.info.id.toB58String()) + // set spy - sinon.spy(nodeB.gs, 'log') + sinon.spy(nodeB, 'log') // manually add control message to be sent to peerB - nodeA.gs.control.set(peerB, { graft: [{ topicID: topic }] }) - await promisify(nodeA.gs.publish, { context: nodeA.gs })(topic, Buffer.from('hey')) - await new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)) - expect(nodeB.gs.log.callCount).to.be.gt(1) + nodeA.control.set(peerB, { graft: [{ topicID: topic }] }) + + await nodeA.publish(topic, Buffer.from('hey')) + + await new Promise((resolve) => nodeA.once('gossipsub:heartbeat', resolve)) + expect(nodeB.log.callCount).to.be.gt(1) // expect control message to be sent alongside published message - const call = nodeB.gs.log.getCalls().find((call) => call.args[0] === 'GRAFT: Add mesh link from %s in %s') + const call = nodeB.log.getCalls().find((call) => call.args[0] === 'GRAFT: Add mesh link from %s in %s') expect(call).to.not.equal(undefined) expect(call.args[1]).to.equal(nodeA.peerInfo.id.toB58String()) // unset spy - nodeB.gs.log.restore() + nodeB.log.restore() }) }) diff --git a/test/heartbeat.spec.js b/test/heartbeat.spec.js index f1bf82c8..0d58602b 100644 --- a/test/heartbeat.spec.js +++ b/test/heartbeat.spec.js @@ -3,30 +3,26 @@ const { expect } = require('chai') +const Gossipsub = require('../src') const { GossipSubHeartbeatInterval } = require('../src/constants') -const { - createNode, - startNode, - stopNode -} = require('./utils') +const { createPeerInfo, mockRegistrar } = require('./utils') describe('heartbeat', () => { - let nodeA + let gossipsub + before(async () => { - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodeA) - await startNode(nodeA.gs) - }) - after(async () => { - await stopNode(nodeA.gs) - await stopNode(nodeA) + const peerInfo = await createPeerInfo() + gossipsub = new Gossipsub(peerInfo, mockRegistrar, { emitSelf: true }) + await gossipsub.start() }) + after(() => gossipsub.stop()) + it('should occur with regularity defined by a constant', async function () { this.timeout(3000) - await new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)) + await new Promise((resolve) => gossipsub.once('gossipsub:heartbeat', resolve)) const t1 = Date.now() - await new Promise((resolve) => nodeA.gs.once('gossipsub:heartbeat', resolve)) + await new Promise((resolve) => gossipsub.once('gossipsub:heartbeat', resolve)) const t2 = Date.now() const safeDelta = 100 // ms expect(t2 - t1).to.be.lt(GossipSubHeartbeatInterval + safeDelta) diff --git a/test/mesh.js b/test/mesh.js deleted file mode 100644 index cde12dfc..00000000 --- a/test/mesh.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict' -/* eslint-env mocha */ - -const { expect } = require('chai') - -const { GossipSubDhi } = require('../src/constants') -const { - createNode, - dialNode, - startNode, - stopNode -} = require('./utils') - -describe('mesh overlay', () => { - const nodes = Array.from({ length: GossipSubDhi + 2 }) // enough nodes to trigger high threshold - - beforeEach(async () => { - for (let i = 0; i < nodes.length; i++) { - nodes[i] = await createNode('/ip4/127.0.0.1/tcp/0') - await startNode(nodes[i]) - await startNode(nodes[i].gs) - } - }) - afterEach(async function () { - this.timeout(10000) - await Promise.all(nodes.map((n) => stopNode(n.gs))) - await Promise.all(nodes.map((n) => stopNode(n))) - }) - - it('should add mesh peers below threshold', async function () { - this.timeout(3000) - // test against node0 - const node0 = nodes[0] - const topic = 'Z' - // add subscriptions to each node - nodes.forEach((n) => n.gs.subscribe(topic)) - // connect N (< GossipsubD) nodes to node0 - const N = 4 - await Promise.all(nodes.slice(nodes.length - N).map((n) => dialNode(n, node0.peerInfo))) - await new Promise((resolve) => setTimeout(resolve, 500)) - // await mesh rebalancing - await new Promise((resolve) => node0.gs.once('gossipsub:heartbeat', resolve)) - expect(node0.gs.mesh.get(topic).size).to.equal(N) - }) - it('should remove mesh peers once above threshold', async function () { - this.timeout(10000) - // test against node0 - const node0 = nodes[0] - const topic = 'Z' - // add subscriptions to each node - nodes.forEach((n) => n.gs.subscribe(topic)) - // connect all nodes to node0 - for (let i = 0; i < nodes.length - 1; i++) { - for (let j = i + 1; j < nodes.length; j++) { - await dialNode(nodes[i], nodes[j].peerInfo) - } - } - await new Promise((resolve) => setTimeout(resolve, 500)) - // await mesh rebalancing - await new Promise((resolve) => node0.gs.once('gossipsub:heartbeat', resolve)) - expect(node0.gs.mesh.get(topic).size).to.be.lte(GossipSubDhi) - }) -}) diff --git a/test/mesh.spec.js b/test/mesh.spec.js new file mode 100644 index 00000000..f6c40440 --- /dev/null +++ b/test/mesh.spec.js @@ -0,0 +1,81 @@ +'use strict' +/* eslint-env mocha */ + +const { expect } = require('chai') + +const { GossipSubDhi, GossipSubID: multicodec } = require('../src/constants') +const { + createGossipsubNodes, + ConnectionPair +} = require('./utils') + +describe('mesh overlay', () => { + let nodes, registrarRecords + + // Create pubsub nodes + beforeEach(async () => { + ({ + nodes, + registrarRecords + } = await createGossipsubNodes(GossipSubDhi + 2, true)) + }) + + afterEach(() => Promise.all(nodes.map((n) => n.stop()))) + + it('should add mesh peers below threshold', async function () { + this.timeout(10e3) + + // test against node0 + const node0 = nodes[0] + const topic = 'Z' + + // add subscriptions to each node + nodes.forEach((node) => node.subscribe(topic)) + + // connect N (< GossipsubD) nodes to node0 + const N = 4 + const onConnect0 = registrarRecords[0][multicodec].onConnect + + for (let i = nodes.length; i > nodes.length - N; i--) { + const n = i - 1 + const onConnectN = registrarRecords[n][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnect0(nodes[n].peerInfo, d0) + onConnectN(nodes[0].peerInfo, d1) + } + + // await mesh rebalancing + await new Promise((resolve) => node0.once('gossipsub:heartbeat', resolve)) + + expect(node0.mesh.get(topic).size).to.equal(N) + }) + + it('should remove mesh peers once above threshold', async function () { + this.timeout(10e3) + // test against node0 + const node0 = nodes[0] + const topic = 'Z' + + // add subscriptions to each node + nodes.forEach((node) => node.subscribe(topic)) + + const onConnect0 = registrarRecords[0][multicodec].onConnect + + // connect all nodes to node0 + for (let i = 1; i < nodes.length; i++) { + const onConnectN = registrarRecords[i][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnect0(nodes[i].peerInfo, d0) + onConnectN(nodes[0].peerInfo, d1) + } + + await new Promise((resolve) => setTimeout(resolve, 500)) + // await mesh rebalancing + await new Promise((resolve) => node0.once('gossipsub:heartbeat', resolve)) + expect(node0.mesh.get(topic).size).to.be.lte(GossipSubDhi) + }) +}) diff --git a/test/messageCache.spec.js b/test/messageCache.spec.js index e16dd80c..fe7ff030 100644 --- a/test/messageCache.spec.js +++ b/test/messageCache.spec.js @@ -1,5 +1,4 @@ /* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ /* eslint-disable no-unused-expressions */ 'use strict' @@ -12,7 +11,7 @@ const expect = chai.expect const { MessageCache } = require('../src/messageCache') const { utils } = require('libp2p-pubsub') -const Buffer = require('buffer').Buffer +const { Buffer } = require('buffer') const getMsgID = (msg) => { return utils.msgId(msg.from, msg.seqno) diff --git a/test/multiple-nodes.js b/test/multiple-nodes.js deleted file mode 100644 index b003e6e6..00000000 --- a/test/multiple-nodes.js +++ /dev/null @@ -1,356 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 8] */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect -const promisify = require('promisify-es6') - -const { - createNode, - expectSet, - dialNode, - startNode, - stopNode -} = require('./utils') - -describe('multiple nodes (more than 2)', () => { - describe('every peer subscribes to the topic', () => { - describe('line', () => { - // line - // ◉────◉────◉ - // a b c - describe('subscribe', () => { - let a - let b - let c - const topic = 'Z' - - beforeEach(async () => { - a = await createNode('/ip4/127.0.0.1/tcp/0') - b = await createNode('/ip4/127.0.0.1/tcp/0') - c = await createNode('/ip4/127.0.0.1/tcp/0') - await Promise.all([ - startNode(a), - startNode(b), - startNode(c) - ]) - await Promise.all([ - startNode(a.gs), - startNode(b.gs), - startNode(c.gs) - ]) - await dialNode(a, b.peerInfo) - await dialNode(b, c.peerInfo) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(a.gs), - stopNode(b.gs), - stopNode(c.gs) - ]) - await Promise.all([ - stopNode(a), - stopNode(b), - stopNode(c) - ]) - }) - - it('subscribe to the topic on all nodes', async () => { - a.gs.subscribe(topic) - b.gs.subscribe(topic) - c.gs.subscribe(topic) - - expectSet(a.gs.subscriptions, [topic]) - expectSet(b.gs.subscriptions, [topic]) - expectSet(c.gs.subscriptions, [topic]) - - await Promise.all([ - promisify(a.gs.once.bind(a.gs))('gossipsub:heartbeat'), - promisify(b.gs.once.bind(b.gs))('gossipsub:heartbeat'), - promisify(c.gs.once.bind(c.gs))('gossipsub:heartbeat') - ]) - - expect(a.gs.peers.size).to.equal(1) - expect(b.gs.peers.size).to.equal(2) - expect(c.gs.peers.size).to.equal(1) - - const aPeerId = a.peerInfo.id.toB58String() - const bPeerId = b.peerInfo.id.toB58String() - const cPeerId = c.peerInfo.id.toB58String() - - expectSet(a.gs.peers.get(bPeerId).topics, [topic]) - expectSet(b.gs.peers.get(aPeerId).topics, [topic]) - expectSet(b.gs.peers.get(cPeerId).topics, [topic]) - expectSet(c.gs.peers.get(bPeerId).topics, [topic]) - - expect(a.gs.mesh.get(topic).size).to.equal(1) - expect(b.gs.mesh.get(topic).size).to.equal(2) - expect(c.gs.mesh.get(topic).size).to.equal(1) - }) - }) - - describe('publish', () => { - let a - let b - let c - const topic = 'Z' - - beforeEach(async () => { - a = await createNode('/ip4/127.0.0.1/tcp/0') - b = await createNode('/ip4/127.0.0.1/tcp/0') - c = await createNode('/ip4/127.0.0.1/tcp/0') - await Promise.all([ - startNode(a), - startNode(b), - startNode(c) - ]) - await Promise.all([ - startNode(a.gs), - startNode(b.gs), - startNode(c.gs) - ]) - await dialNode(a, b.peerInfo) - await dialNode(b, c.peerInfo) - - a.gs.subscribe(topic) - b.gs.subscribe(topic) - c.gs.subscribe(topic) - - await Promise.all([ - promisify(a.gs.once.bind(a.gs))('gossipsub:heartbeat'), - promisify(b.gs.once.bind(b.gs))('gossipsub:heartbeat'), - promisify(c.gs.once.bind(c.gs))('gossipsub:heartbeat') - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(a.gs), - stopNode(b.gs), - stopNode(c.gs) - ]) - await Promise.all([ - stopNode(a), - stopNode(b), - stopNode(c) - ]) - }) - - it('publish on node a', async () => { - let msgB = new Promise((resolve) => b.gs.once('Z', resolve)) - let msgC = new Promise((resolve) => c.gs.once('Z', resolve)) - - a.gs.publish('Z', Buffer.from('hey')) - msgB = await msgB - msgC = await msgC - - expect(msgB.data.toString()).to.equal('hey') - expect(msgC.data.toString()).to.equal('hey') - }) - - it('publish array on node a', async () => { - let msgB = new Promise((resolve) => { - const output = [] - b.gs.on('Z', (msg) => { - output.push(msg) - if (output.length === 2) { - b.gs.removeAllListeners('Z') - resolve(output) - } - }) - }) - let msgC = new Promise((resolve) => { - const output = [] - c.gs.on('Z', (msg) => { - output.push(msg) - if (output.length === 2) { - c.gs.removeAllListeners('Z') - resolve(output) - } - }) - }) - - a.gs.publish('Z', [Buffer.from('hey'), Buffer.from('hey')]) - msgB = await msgB - msgC = await msgC - - expect(msgB.length).to.equal(2) - expect(msgB[0].data.toString()).to.equal('hey') - expect(msgB[1].data.toString()).to.equal('hey') - expect(msgC.length).to.equal(2) - expect(msgC[0].data.toString()).to.equal('hey') - expect(msgC[1].data.toString()).to.equal('hey') - }) - }) - }) - - describe('1 level tree', () => { - // 1 level tree - // ┌◉┐ - // │b│ - // ◉─┘ └─◉ - // a c - - let a - let b - let c - const topic = 'Z' - - beforeEach(async () => { - a = await createNode('/ip4/127.0.0.1/tcp/0') - b = await createNode('/ip4/127.0.0.1/tcp/0') - c = await createNode('/ip4/127.0.0.1/tcp/0') - await Promise.all([ - startNode(a), - startNode(b), - startNode(c) - ]) - await Promise.all([ - startNode(a.gs), - startNode(b.gs), - startNode(c.gs) - ]) - await dialNode(a, b.peerInfo) - await dialNode(b, c.peerInfo) - - a.gs.subscribe(topic) - b.gs.subscribe(topic) - c.gs.subscribe(topic) - - await Promise.all([ - promisify(a.gs.once.bind(a.gs))('gossipsub:heartbeat'), - promisify(b.gs.once.bind(b.gs))('gossipsub:heartbeat'), - promisify(c.gs.once.bind(c.gs))('gossipsub:heartbeat') - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(a.gs), - stopNode(b.gs), - stopNode(c.gs) - ]) - await Promise.all([ - stopNode(a), - stopNode(b), - stopNode(c) - ]) - }) - - it('publish on node b', async () => { - let msgA = new Promise((resolve) => a.gs.once('Z', resolve)) - let msgC = new Promise((resolve) => c.gs.once('Z', resolve)) - - b.gs.publish('Z', Buffer.from('hey')) - msgA = await msgA - msgC = await msgC - - expect(msgA.data.toString()).to.equal('hey') - expect(msgC.data.toString()).to.equal('hey') - }) - }) - - describe('2 level tree', () => { - // 2 levels tree - // ┌◉┐ - // │c│ - // ┌◉─┘ └─◉┐ - // │b d│ - // ◉─┘ └─◉ - // a e - let a - let b - let c - let d - let e - const topic = 'Z' - - beforeEach(async function () { - this.timeout(5000) - a = await createNode('/ip4/127.0.0.1/tcp/0') - b = await createNode('/ip4/127.0.0.1/tcp/0') - c = await createNode('/ip4/127.0.0.1/tcp/0') - d = await createNode('/ip4/127.0.0.1/tcp/0') - e = await createNode('/ip4/127.0.0.1/tcp/0') - await Promise.all([ - startNode(a), - startNode(b), - startNode(c), - startNode(d), - startNode(e) - ]) - await Promise.all([ - startNode(a.gs), - startNode(b.gs), - startNode(c.gs), - startNode(d.gs), - startNode(e.gs) - ]) - await dialNode(a, b.peerInfo) - await dialNode(b, c.peerInfo) - await dialNode(c, d.peerInfo) - await dialNode(d, e.peerInfo) - - await new Promise((resolve) => setTimeout(resolve, 500)) - - a.gs.subscribe(topic) - b.gs.subscribe(topic) - c.gs.subscribe(topic) - d.gs.subscribe(topic) - e.gs.subscribe(topic) - - await Promise.all([ - promisify(a.gs.once.bind(a.gs))('gossipsub:heartbeat'), - promisify(b.gs.once.bind(b.gs))('gossipsub:heartbeat'), - promisify(c.gs.once.bind(c.gs))('gossipsub:heartbeat'), - promisify(d.gs.once.bind(c.gs))('gossipsub:heartbeat'), - promisify(e.gs.once.bind(c.gs))('gossipsub:heartbeat') - ]) - }) - - afterEach(async function () { - this.timeout(4000) - await Promise.all([ - stopNode(a.gs), - stopNode(b.gs), - stopNode(c.gs), - stopNode(d.gs), - stopNode(e.gs) - ]) - await Promise.all([ - stopNode(a), - stopNode(b), - stopNode(c), - stopNode(d), - stopNode(e) - ]) - }) - - it('publishes from c', async () => { - let msgA = new Promise((resolve) => a.gs.once('Z', resolve)) - let msgB = new Promise((resolve) => b.gs.once('Z', resolve)) - let msgD = new Promise((resolve) => d.gs.once('Z', resolve)) - let msgE = new Promise((resolve) => e.gs.once('Z', resolve)) - - const msg = 'hey from c' - c.gs.publish('Z', Buffer.from(msg)) - - msgA = await msgA - msgB = await msgB - msgD = await msgD - msgE = await msgE - - expect(msgA.data.toString()).to.equal(msg) - expect(msgB.data.toString()).to.equal(msg) - expect(msgD.data.toString()).to.equal(msg) - expect(msgE.data.toString()).to.equal(msg) - }) - }) - }) -}) diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js new file mode 100644 index 00000000..50b4ffcb --- /dev/null +++ b/test/multiple-nodes.spec.js @@ -0,0 +1,322 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const promisify = require('promisify-es6') + +const { GossipSubID: multicodec } = require('../src/constants') +const { + createGossipsubNodes, + expectSet, + ConnectionPair +} = require('./utils') + +describe('multiple nodes (more than 2)', () => { + describe('every peer subscribes to the topic', () => { + describe('line', () => { + // line + // ◉────◉────◉ + // a b c + describe('subscribe', () => { + let a, b, c, nodes, registrarRecords + const topic = 'Z' + + // Create pubsub nodes + before(async () => { + ({ + nodes, + registrarRecords + } = await createGossipsubNodes(3, true)) + + a = nodes[0] + b = nodes[1] + c = nodes[2] + + const onConnectA = registrarRecords[0][multicodec].onConnect + const onConnectB = registrarRecords[1][multicodec].onConnect + const onConnectC = registrarRecords[2][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnectA(b.peerInfo, d0) + onConnectB(a.peerInfo, d1) + + const [d2, d3] = ConnectionPair() + onConnectB(c.peerInfo, d2) + onConnectC(b.peerInfo, d3) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('subscribe to the topic on all nodes', async () => { + a.subscribe(topic) + b.subscribe(topic) + c.subscribe(topic) + + expectSet(a.subscriptions, [topic]) + expectSet(b.subscriptions, [topic]) + expectSet(c.subscriptions, [topic]) + + await Promise.all([ + promisify(a.once.bind(a))('gossipsub:heartbeat'), + promisify(b.once.bind(b))('gossipsub:heartbeat'), + promisify(c.once.bind(c))('gossipsub:heartbeat') + ]) + + expect(a.peers.size).to.equal(1) + expect(b.peers.size).to.equal(2) + expect(c.peers.size).to.equal(1) + + const aPeerId = a.peerInfo.id.toB58String() + const bPeerId = b.peerInfo.id.toB58String() + const cPeerId = c.peerInfo.id.toB58String() + + expectSet(a.peers.get(bPeerId).topics, [topic]) + expectSet(b.peers.get(aPeerId).topics, [topic]) + expectSet(b.peers.get(cPeerId).topics, [topic]) + expectSet(c.peers.get(bPeerId).topics, [topic]) + + expect(a.mesh.get(topic).size).to.equal(1) + expect(b.mesh.get(topic).size).to.equal(2) + expect(c.mesh.get(topic).size).to.equal(1) + }) + }) + + describe('publish', () => { + let a, b, c, nodes, registrarRecords + const topic = 'Z' + + // Create pubsub nodes + before(async () => { + ({ + nodes, + registrarRecords + } = await createGossipsubNodes(3, true)) + + a = nodes[0] + b = nodes[1] + c = nodes[2] + + const onConnectA = registrarRecords[0][multicodec].onConnect + const onConnectB = registrarRecords[1][multicodec].onConnect + const onConnectC = registrarRecords[2][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnectA(b.peerInfo, d0) + onConnectB(a.peerInfo, d1) + + const [d2, d3] = ConnectionPair() + onConnectB(c.peerInfo, d2) + onConnectC(b.peerInfo, d3) + + a.subscribe(topic) + b.subscribe(topic) + c.subscribe(topic) + + await Promise.all([ + promisify(a.once.bind(a))('gossipsub:heartbeat'), + promisify(b.once.bind(b))('gossipsub:heartbeat'), + promisify(c.once.bind(c))('gossipsub:heartbeat') + ]) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('publish on node a', async () => { + let msgB = new Promise((resolve) => b.once('Z', resolve)) + let msgC = new Promise((resolve) => c.once('Z', resolve)) + + a.publish('Z', Buffer.from('hey')) + msgB = await msgB + msgC = await msgC + + expect(msgB.data.toString()).to.equal('hey') + expect(msgC.data.toString()).to.equal('hey') + }) + + it('publish array on node a', async () => { + let msgB = new Promise((resolve) => { + const output = [] + b.on('Z', (msg) => { + output.push(msg) + if (output.length === 2) { + b.removeAllListeners('Z') + resolve(output) + } + }) + }) + let msgC = new Promise((resolve) => { + const output = [] + c.on('Z', (msg) => { + output.push(msg) + if (output.length === 2) { + c.removeAllListeners('Z') + resolve(output) + } + }) + }) + + a.publish('Z', [Buffer.from('hey'), Buffer.from('hey')]) + msgB = await msgB + msgC = await msgC + + expect(msgB.length).to.equal(2) + expect(msgB[0].data.toString()).to.equal('hey') + expect(msgB[1].data.toString()).to.equal('hey') + expect(msgC.length).to.equal(2) + expect(msgC[0].data.toString()).to.equal('hey') + expect(msgC[1].data.toString()).to.equal('hey') + }) + }) + }) + + describe('1 level tree', () => { + // 1 level tree + // ┌◉┐ + // │b│ + // ◉─┘ └─◉ + // a c + + let a, b, c, nodes, registrarRecords + const topic = 'Z' + + // Create pubsub nodes + before(async () => { + ({ + nodes, + registrarRecords + } = await createGossipsubNodes(3, true)) + + a = nodes[0] + b = nodes[1] + c = nodes[2] + + const onConnectA = registrarRecords[0][multicodec].onConnect + const onConnectB = registrarRecords[1][multicodec].onConnect + const onConnectC = registrarRecords[2][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnectA(b.peerInfo, d0) + onConnectB(a.peerInfo, d1) + + const [d2, d3] = ConnectionPair() + onConnectB(c.peerInfo, d2) + onConnectC(b.peerInfo, d3) + + a.subscribe(topic) + b.subscribe(topic) + c.subscribe(topic) + + await Promise.all([ + promisify(a.once.bind(a))('gossipsub:heartbeat'), + promisify(b.once.bind(b))('gossipsub:heartbeat'), + promisify(c.once.bind(c))('gossipsub:heartbeat') + ]) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('publish on node b', async () => { + let msgA = new Promise((resolve) => a.once('Z', resolve)) + let msgC = new Promise((resolve) => c.once('Z', resolve)) + + b.publish('Z', Buffer.from('hey')) + msgA = await msgA + msgC = await msgC + + expect(msgA.data.toString()).to.equal('hey') + expect(msgC.data.toString()).to.equal('hey') + }) + }) + + describe('2 level tree', () => { + // 2 levels tree + // ┌◉┐ + // │c│ + // ┌◉─┘ └─◉┐ + // │b d│ + // ◉─┘ └─◉ + // a e + let a, b, c, d, e, nodes, registrarRecords + const topic = 'Z' + + // Create pubsub nodes + before(async () => { + ({ + nodes, + registrarRecords + } = await createGossipsubNodes(5, true)) + + a = nodes[0] + b = nodes[1] + c = nodes[2] + d = nodes[3] + e = nodes[4] + + const onConnectA = registrarRecords[0][multicodec].onConnect + const onConnectB = registrarRecords[1][multicodec].onConnect + const onConnectC = registrarRecords[2][multicodec].onConnect + const onConnectD = registrarRecords[3][multicodec].onConnect + const onConnectE = registrarRecords[4][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnectA(b.peerInfo, d0) + onConnectB(a.peerInfo, d1) + + const [d2, d3] = ConnectionPair() + onConnectB(c.peerInfo, d2) + onConnectC(b.peerInfo, d3) + + const [d4, d5] = ConnectionPair() + onConnectC(d.peerInfo, d4) + onConnectD(c.peerInfo, d5) + + const [d6, d7] = ConnectionPair() + onConnectD(e.peerInfo, d6) + onConnectE(d.peerInfo, d7) + + a.subscribe(topic) + b.subscribe(topic) + c.subscribe(topic) + d.subscribe(topic) + e.subscribe(topic) + + await Promise.all([ + promisify(a.once.bind(a))('gossipsub:heartbeat'), + promisify(b.once.bind(b))('gossipsub:heartbeat'), + promisify(c.once.bind(c))('gossipsub:heartbeat'), + promisify(d.once.bind(d))('gossipsub:heartbeat'), + promisify(e.once.bind(e))('gossipsub:heartbeat') + ]) + }) + + after(() => Promise.all(nodes.map((n) => n.stop()))) + + it('publishes from c', async () => { + let msgA = new Promise((resolve) => a.once('Z', resolve)) + let msgB = new Promise((resolve) => b.once('Z', resolve)) + let msgD = new Promise((resolve) => d.once('Z', resolve)) + let msgE = new Promise((resolve) => e.once('Z', resolve)) + + const msg = 'hey from c' + c.publish('Z', Buffer.from(msg)) + + msgA = await msgA + msgB = await msgB + msgD = await msgD + msgE = await msgE + + expect(msgA.data.toString()).to.equal(msg) + expect(msgB.data.toString()).to.equal(msg) + expect(msgD.data.toString()).to.equal(msg) + expect(msgE.data.toString()).to.equal(msg) + }) + }) + }) +}) diff --git a/test/node.js b/test/node.js index b80be0aa..9f48df45 100644 --- a/test/node.js +++ b/test/node.js @@ -1,11 +1,3 @@ 'use strict' -require('./messageCache.spec') -require('./2-nodes') -require('./multiple-nodes') -require('./heartbeat.spec') -require('./mesh') require('./gossip') -require('./floodsub') -require('./emit-self') -require('./gossip-incoming') diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 127c1aac..50db26a8 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,46 +6,37 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const sinon = require('sinon') + const { utils } = require('libp2p-pubsub') -const promisify = require('promisify-es6') const { - createNode, - startNode, - stopNode + createGossipsub, + mockRegistrar } = require('./utils') describe('Pubsub', () => { - let nodeA let gossipsub + before(async () => { - nodeA = await createNode('/ip4/127.0.0.1/tcp/0') - gossipsub = nodeA.gs - await startNode(nodeA) - await startNode(gossipsub) - }) - after(async () => { - await stopNode(gossipsub) - await stopNode(nodeA) + gossipsub = await createGossipsub(mockRegistrar, true) }) + + after(() => gossipsub.stop()) + afterEach(() => { sinon.restore() }) describe('publish', () => { - it('should sign messages on publish', (done) => { - sinon.spy(nodeA.gs, '_publish') - - gossipsub.publish('signing-topic', Buffer.from('hello'), (err) => { - expect(err).to.not.exist() - - // Get the first message sent to _publish, and validate it - const signedMessage = gossipsub._publish.getCall(0).lastArg[0] - gossipsub.validate(signedMessage, (err, isValid) => { - expect(err).to.not.exist() - expect(isValid).to.eql(true) - done() - }) - }) + it('should sign messages on publish', async () => { + sinon.spy(gossipsub, '_publish') + + await gossipsub.publish('signing-topic', Buffer.from('hello')) + + // Get the first message sent to _publish, and validate it + const signedMessage = gossipsub._publish.getCall(0).lastArg[0] + const isValid = await gossipsub.validate(signedMessage) + + expect(isValid).to.eql(true) }) }) @@ -53,7 +44,6 @@ describe('Pubsub', () => { it('should drop unsigned messages', () => { sinon.spy(gossipsub, '_processRpcMessage') sinon.spy(gossipsub, 'validate') - sinon.spy(gossipsub, 'log') sinon.stub(gossipsub.peers, 'get').returns({}) const topic = 'my-topic' @@ -70,10 +60,8 @@ describe('Pubsub', () => { gossipsub._onRpc('QmAnotherPeer', rpc) return new Promise(resolve => setTimeout(() => { - const dropLogs = gossipsub.log.getCalls().filter((call) => call.args[0].match(/dropping it/gi)) expect(gossipsub.validate.callCount).to.eql(1) expect(gossipsub._processRpcMessage.called).to.eql(false) - expect(dropLogs).to.have.length(1) resolve() }, 500)) }) @@ -81,13 +69,10 @@ describe('Pubsub', () => { it('should not drop signed messages', async () => { sinon.spy(gossipsub, '_processRpcMessage') sinon.spy(gossipsub, 'validate') - sinon.spy(gossipsub, 'log') sinon.stub(gossipsub.peers, 'get').returns({}) const topic = 'my-topic' - const signedMessage = await promisify(gossipsub._buildMessage, { - context: gossipsub - })({ + const signedMessage = await gossipsub._buildMessage({ from: gossipsub.peerId.id, data: Buffer.from('an unsigned message'), seqno: utils.randomSeqno(), @@ -102,10 +87,8 @@ describe('Pubsub', () => { gossipsub._onRpc('QmAnotherPeer', rpc) return new Promise(resolve => setTimeout(() => { - const dropLogs = gossipsub.log.getCalls().filter((call) => call.args[0].match(/dropping it/gi)) expect(gossipsub.validate.callCount).to.eql(1) expect(gossipsub._processRpcMessage.callCount).to.eql(1) - expect(dropLogs).to.be.empty() resolve() }, 500)) }) @@ -113,7 +96,6 @@ describe('Pubsub', () => { it('should not drop unsigned messages if strict signing is disabled', () => { sinon.spy(gossipsub, '_processRpcMessage') sinon.spy(gossipsub, 'validate') - sinon.spy(gossipsub, 'log') sinon.stub(gossipsub.peers, 'get').returns({}) // Disable strict signing sinon.stub(gossipsub, 'strictSigning').value(false) @@ -132,10 +114,8 @@ describe('Pubsub', () => { gossipsub._onRpc('QmAnotherPeer', rpc) return new Promise(resolve => setTimeout(() => { - const dropLogs = gossipsub.log.getCalls().filter((call) => call.args[0].match(/dropping it/gi)) expect(gossipsub.validate.callCount).to.eql(1) expect(gossipsub._processRpcMessage.callCount).to.eql(1) - expect(dropLogs).to.be.empty() resolve() }, 500)) }) diff --git a/test/utils/browser-bundle.js b/test/utils/browser-bundle.js deleted file mode 100644 index 7a63c252..00000000 --- a/test/utils/browser-bundle.js +++ /dev/null @@ -1,24 +0,0 @@ -'use strict' - -const WS = require('libp2p-websockets') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const modules = { - transport: [WS], - streamMuxer: [spdy], - connEncryption: [secio] - } - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node diff --git a/test/utils/index.js b/test/utils/index.js index 1bc288aa..489bdeb5 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -1,15 +1,17 @@ 'use strict' -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') const { expect } = require('chai') -const promisify = require('promisify-es6') -const isNode = require('detect-node') -const Node = isNode ? require('./nodejs-bundle') : require('./browser-bundle') +const DuplexPair = require('it-pair/duplex') +const pTimes = require('p-times') -const GossipSub = require('../../src') const FloodSub = require('libp2p-floodsub') +const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') + +const GossipSub = require('../../src') +const { GossipSubID } = require('../../src/constants') exports.first = (map) => map.values().next().value @@ -17,24 +19,136 @@ exports.expectSet = (set, subs) => { expect(Array.from(set.values())).to.eql(subs) } -exports.createNode = async (maddr, options = {}) => { - const id = await promisify(PeerId.create)({ bits: 1024 }) - const peerInfo = await promisify(PeerInfo.create)(id) - peerInfo.multiaddrs.add(maddr) - const node = new Node({ peerInfo }) - node.gs = new GossipSub(node, options) - return node +const createPeerInfo = async (protocol = GossipSubID) => { + const peerId = await PeerId.create({ bits: 1024 }) + const peerInfo = await PeerInfo.create(peerId) + peerInfo.protocols.add(protocol) + + return peerInfo } -exports.createFloodsubNode = async (maddr) => { - const id = await promisify(PeerId.create)({ bits: 1024 }) - const peerInfo = await promisify(PeerInfo.create)(id) - peerInfo.multiaddrs.add(maddr) - const node = new Node({ peerInfo }) - node.fs = new FloodSub(node) - return node +exports.createPeerInfo = createPeerInfo + +const createGossipsub = async (registrar, shouldStart = false, options) => { + const peerInfo = await createPeerInfo() + const gs = new GossipSub(peerInfo, registrar, options) + + if (shouldStart) { + await gs.start() + } + + return gs +} + +exports.createGossipsub = createGossipsub + +const createGossipsubNodes = async (n, shouldStart, options) => { + const registrarRecords = Array.from({ length: n }) + + const nodes = await pTimes(n, (index) => { + registrarRecords[index] = {} + + return createGossipsub(createMockRegistrar(registrarRecords[index]), shouldStart, options) + }) + + return { + nodes, + registrarRecords + } +} + +exports.createGossipsubNodes = createGossipsubNodes + +const connectGossipsubNodes = (nodes, registrarRecords, multicodec) => { + // connect all nodes + for (let i = 0; i < nodes.length; i++) { + for (let j = i + 1; j < nodes.length; j++) { + const onConnectI = registrarRecords[i][multicodec].onConnect + const onConnectJ = registrarRecords[j][multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + onConnectI(nodes[j].peerInfo, d0) + onConnectJ(nodes[i].peerInfo, d1) + } + } + + return nodes +} + +exports.connectGossipsubNodes = connectGossipsubNodes + +const createGossipsubConnectedNodes = async (n, multicodec, options) => { + const { nodes, registrarRecords } = await createGossipsubNodes(n, true, options) + + // connect all nodes + return connectGossipsubNodes(nodes, registrarRecords, multicodec) +} + +exports.createGossipsubConnectedNodes = createGossipsubConnectedNodes + +const createFloodsubNode = async (registrar, shouldStart = false, options) => { + const peerInfo = await createPeerInfo(floodsubMulticodec) + const fs = new FloodSub(peerInfo, registrar, options) + + if (shouldStart) { + await fs.start() + } + + return fs +} + +exports.createFloodsubNode = createFloodsubNode + +exports.mockRegistrar = { + handle: () => { }, + register: () => { }, + unregister: () => { } +} + +const createMockRegistrar = (registrarRecord) => ({ + handle: (multicodecs, handler) => { + multicodecs.forEach((multicodec) => { + const rec = registrarRecord[multicodec] || {} + + registrarRecord[multicodec] = { + ...rec, + handler + } + }) + }, + register: ({ multicodecs, _onConnect, _onDisconnect }) => { + multicodecs.forEach((multicodec) => { + const rec = registrarRecord[multicodec] || {} + + registrarRecord[multicodec] = { + ...rec, + onConnect: _onConnect, + onDisconnect: _onDisconnect + } + }) + return multicodecs[0] + }, + unregister: (id) => { + delete registrarRecord[id] + } +}) + +exports.createMockRegistrar = createMockRegistrar + +const ConnectionPair = () => { + const [d0, d1] = DuplexPair() + + return [ + { + stream: d0, + newStream: () => Promise.resolve({ stream: d0 }) + }, + { + stream: d1, + newStream: () => Promise.resolve({ stream: d1 }) + } + ] } -exports.startNode = (node) => promisify(node.start.bind(node))() -exports.stopNode = (node) => promisify(node.stop.bind(node))() -exports.dialNode = (node, peerInfo) => promisify(node.dial.bind(node))(peerInfo) +exports.ConnectionPair = ConnectionPair diff --git a/test/utils/nodejs-bundle.js b/test/utils/nodejs-bundle.js deleted file mode 100644 index c5e840a8..00000000 --- a/test/utils/nodejs-bundle.js +++ /dev/null @@ -1,24 +0,0 @@ -'use strict' - -const TCP = require('libp2p-tcp') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const modules = { - transport: [TCP], - streamMuxer: [spdy], - connEncryption: [secio] - } - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node