diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1dd9938 --- /dev/null +++ b/.gitignore @@ -0,0 +1,41 @@ +# Logs +logs +*.log +npm-debug.log* + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +node_modules + +# Optional npm cache directory +.npm + +# Optional REPL history +.node_repl_history + +# Vim editor swap files +*.swp + +dist + +.history +.vscode diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..16c0a9b --- /dev/null +++ b/.npmignore @@ -0,0 +1,35 @@ +test + +# Logs +logs +*.log +npm-debug.log* + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +node_modules + +# Optional npm cache directory +.npm + +# Optional REPL history +.node_repl_history diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..74f58e8 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,31 @@ +sudo: false +language: node_js + +matrix: + include: + - node_js: 6 + env: CXX=g++-4.8 + - node_js: 8 + env: CXX=g++-4.8 + # - node_js: stable + # env: CXX=g++-4.8 + +script: + - npm run lint + - npm run test + - npm run coverage + +before_script: + - export DISPLAY=:99.0 + - sh -e /etc/init.d/xvfb start + +after_success: + - npm run coverage-publish + +addons: + firefox: 'latest' + apt: + sources: + - ubuntu-toolchain-r-test + packages: + - g++-4.8 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f94290a --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 Protocol Labs Inc + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index fcf2f47..5296771 100644 --- a/README.md +++ b/README.md @@ -1 +1,197 @@ # js-libp2p-circuit + +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) +[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) +[![Build Status](https://travis-ci.org/libp2p/js-libp2p-circuit.svg?style=flat-square)](https://travis-ci.org/libp2p/js-libp2p-circuit) +[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-circuit/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-circuit?branch=master) +[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-circuit.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-circuit) +[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) +![](https://img.shields.io/badge/npm-%3E%3D3.0.0-orange.svg?style=flat-square) +![](https://img.shields.io/badge/Node.js-%3E%3D4.0.0-orange.svg?style=flat-square) + +![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png) +![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png) + +> Node.js implementation of the Circuit module that libp2p uses, which implements the [interface-connection](https://github.com/libp2p/interface-connection) interface for dial/listen. + +`libp2p-circuit` implements the circuit-relay mechanism that allows nodes that don't speak the same protocol to communicate using a third _relay_ node. + +**Note:** This module uses [pull-streams](https://pull-stream.github.io) for all stream based interfaces. + +### Why? + +`circuit-relaying` uses additional nodes in order to transfer traffic between two otherwise unreachable nodes. This allows nodes that don't speak the same protocols or are running in limited environments, e.g. browsers and IoT devices, to communicate, which would otherwise be impossible given the fact that for example browsers don't have any socket support and as such cannot be directly dialed. + +The applicability of circuit-relaying is not limited to routing traffic between browser nodes, other uses include: + - routing traffic between private nets + - circumventing NAT layers + - route mangling for better privacy (matreshka/shallot dialing). + + It's also possible to use it for clients that implement exotic transports such as devices that only have bluetooth radios to be reachable over bluethoot enabled relays and become full p2p nodes. + +### libp2p-circuit and IPFS + +Prior to `libp2p-circuit` there was a rift in the IPFS network, were IPFS nodes could only access content from nodes that speak the same protocol, for example TCP only nodes could only dial to other TCP only nodes, same for any other protocol combination. In practice, this limitation was most visible in JS-IPFS browser nodes, since they can only dial out but not be dialed in over WebRTC or WebSockets, hence any content that the browser node held was not reachable by the rest of the network even through it was announced on the DHT. Non browser IPFS nodes would usually speak more than one protocol such as TCP, WebSockets and/or WebRTC, this made the problem less severe outside of the browser. `libp2p-circuit` solves this problem completely, as long as there are `relay nodes` capable of routing traffic between those nodes their content should be available to the rest of the IPFS network. + +## Table of Contents + +- [Install](#install) + - [npm](#npm) +- [Usage](#usage) + - [Example](#example) + - [This module uses `pull-streams`](#this-module-uses-pull-streams) + - [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams) +- [API](#api) +- [Contribute](#contribute) +- [License](#license) + +## Install + +### npm + +```sh +> npm i libp2p-circuit +``` + +## Usage + +### Example + +#### Create dialer/listener + +```js +const Circuit = require('libp2p-circuit') +const multiaddr = require('multiaddr') +const pull = require('pull-stream') + +const mh1 = multiaddr('/p2p-circuit/ipfs/QmHash') // dial /ipfs/QmHash over any circuit + +const circuit = new Circuit(swarmInstance, options) // pass swarm instance and options + +const listener = circuit.createListener(mh1, (connection) => { + console.log('new connection opened') + pull( + pull.values(['hello']), + socket + ) +}) + +listener.listen(() => { + console.log('listening') + + pull( + circuit.dial(mh1), + pull.log, + pull.onEnd(() => { + circuit.close() + }) + ) +}) +``` + +Outputs: + +```sh +listening +new connection opened +hello +``` + +#### Create `relay` + +```js +const Relay = require('libp2p-circuit').Realy + +const relay = new Relay(options) + +relay.mount(swarmInstance) // start relaying traffic +``` + +### This module uses `pull-streams` + +We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). + +You can learn more about pull-streams at: + +- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) +- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) +- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) +- [pull-streams documentation](https://pull-stream.github.io/) + +#### Converting `pull-streams` to Node.js Streams + +If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example: + +```js +const pullToStream = require('pull-stream-to-stream') + +const nodeStreamInstance = pullToStream(pullStreamInstance) +// nodeStreamInstance is an instance of a Node.js Stream +``` + +To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream. + +## API + +[![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport) + +`libp2p-circuit` accepts Circuit addresses for both IPFS and non IPFS encapsulated addresses, i.e: + +`/p2p-circuit/ip4/127.0.0.1/tcp/4001/ipfs/QmHash` + +Both for dialing and listening. + +### Implementation rational + +This module is not a transport, however it implements `interface-transport` interface in order to allow circuit to be plugged with `libp2p-swarm`. The rational behind it is that, `libp2p-circuit` has a dial and listen flow, which fits nicely with other transports, moreover, it requires the _raw_ connection to be encrypted and muxed just as a regular transport's connection does. All in all, `interface-transport` ended up being the correct level of abstraction for circuit, as well as allowed us to reuse existing integration points in `libp2p-swarm` and `libp2p` without adding any ad-hoc logic. All parts of `interface-transport` are used, including `.getAddr` which returns a list of `/p2p-circuit` addresses that circuit is currently listening. + +``` + + libp2p libp2p-circuit ++-------------------------------------------------+ +---------------------------------------------------------+ +| | | | +| +---------------------------------+ | | | +| | | | | +------------------------+ | +| | | | circuit-relay registers the /hop | | | | +| | libp2p-swarm |<----------------------------------------------------->| circuit-relay | | +| | | | multistream handler with the swarm | | | | +| | | | to handle incomming dial requests | +------------------------+ | +| +---------------------------------+ | from other nodes | transport | +| ^ ^ ^ ^ ^ ^ | | +-------------------------------------------+ | +| | | | | | | | | | +----------------------------------+ | | +| | | | | | | | dialer uses the swarm to dial to a | | | | | | +| | | | +------------------------------------------------------------------------------>| dialer | | | +| | |transports | | relay node listening on the /hop | | | | | | +| | | | | | | multistream endpoint | | +----------------------------------+ | | +| | | | | | | | | | | +| v v | v v | | | | | +|+------------------|----------------------------+| | | +----------------------------------+ | | +|| | | | | || | | | | | | +||libp2p-tcp |libp2p-ws | .... |libp2p-circuit ||listener registers a /stop multistream | | listener | | | +|| | +-------------------------------------------------------------------------------->| | | | +|| | | |pluggs in just ||handler with the swarm to handle | | +----------------------------------+ | | +|| | | |as any other ||incomming relay connections | | | | +|| | | |transport || | +-------------------------------------------+ | +|+-----------------------------------------------+| | | +| | | | +| | | | ++-------------------------------------------------+ +---------------------------------------------------------+ +``` + + + +## Contribute + +Contributions are welcome! The libp2p implementation in JavaScript is a work in progress. As such, there's a few things you can do right now to help out: + +- [Check out the existing issues](//github.com/libp2p/js-libp2p-circuit/issues). +- **Perform code reviews**. +- **Add tests**. There can never be enough tests. + +Please be aware that all interactions related to libp2p are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). + +Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification. + +## License + +[MIT](LICENSE) © 2017 Protocol Labs diff --git a/circle.yml b/circle.yml new file mode 100644 index 0000000..d67b6ae --- /dev/null +++ b/circle.yml @@ -0,0 +1,18 @@ +machine: + node: + version: stable + +test: + post: + - npm run coverage -- --upload + +dependencies: + pre: + - google-chrome --version + - curl -L -o google-chrome.deb https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb + - sudo dpkg -i google-chrome.deb || true + - sudo apt-get update + - sudo apt-get install -f + - sudo apt-get install --only-upgrade lsb-base + - sudo dpkg -i google-chrome.deb + - google-chrome --version diff --git a/package.json b/package.json new file mode 100644 index 0000000..3a392b5 --- /dev/null +++ b/package.json @@ -0,0 +1,61 @@ +{ + "name": "libp2p-circuit", + "version": "0.0.3", + "description": "JavaScript implementation of circuit/switch relaying", + "main": "src/index.js", + "scripts": { + "lint": "aegir lint", + "build": "aegir build", + "test": "aegir test --target node --target browser", + "test:node": "aegir test --target node", + "test:browser": "aegir test --target browser", + "release": "aegir test release --target node --target browser", + "release-minor": "aegir release --type minor --target node --target browser", + "release-major": "aegir release --type major --target node --target browser", + "coverage": "aegir coverage", + "coverage-publish": "aegir coverage --provider coveralls" + }, + "pre-commit": [ + "lint", + "test" + ], + "repository": { + "type": "git", + "url": "git+https://github.com/libp2p/js-libp2p-circuit.git" + }, + "keywords": [ + "IPFS" + ], + "author": "Dmitriy Ryajov ", + "license": "MIT", + "bugs": { + "url": "https://github.com/libp2p/js-libp2p-circuit/issues" + }, + "homepage": "https://github.com/libp2p/js-libp2p-circuit#readme", + "devDependencies": { + "aegir": "^12.0.8", + "chai": "^3.5.0", + "pre-commit": "^1.2.2", + "sinon": "^2.1.0", + "dirty-chai": "^2.0.1" + }, + "contributors": [], + "dependencies": { + "assert": "^1.4.1", + "async": "^2.1.5", + "debug": "^2.6.1", + "interface-connection": "^0.3.1", + "lodash": "^4.17.4", + "mafmt": "^2.1.8", + "multiaddr": "^2.2.1", + "multistream-select": "^0.13.4", + "peer-id": "^0.10.1", + "peer-info": "^0.11.0", + "protocol-buffers": "^3.2.1", + "pull-abortable": "^4.1.0", + "pull-handshake": "^1.1.4", + "pull-stream": "^3.5.1", + "safe-buffer": "^5.0.1", + "setimmediate": "^1.0.5" + } +} diff --git a/src/circuit.js b/src/circuit.js new file mode 100644 index 0000000..df3987b --- /dev/null +++ b/src/circuit.js @@ -0,0 +1,123 @@ +'use strict' + +const mafmt = require('mafmt') +const multiaddr = require('multiaddr') + +const CircuitDialer = require('./circuit/dialer') +const utilsFactory = require('./circuit/utils') + +const debug = require('debug') +const log = debug('libp2p:circuit:transportdialer') +log.err = debug('libp2p:circuit:error:transportdialer') + +const createListener = require('./listener') + +class Circuit { + static get tag () { + return 'Circuit' + } + + /** + * Creates an instance of Dialer. + * + * @param {Swarm} swarm - the swarm + * @param {any} options - config options + * + * @memberOf Dialer + */ + constructor (swarm, options) { + this.options = options || {} + + this.swarm = swarm + this.dialer = null + this.utils = utilsFactory(swarm) + this.peerInfo = this.swarm._peerInfo + this.relays = this.filter(this.peerInfo.multiaddrs.toArray()) + + // if no explicit relays, add a default relay addr + if (this.relays.length === 0) { + this.peerInfo + .multiaddrs + .add(`/p2p-circuit/ipfs/${this.peerInfo.id.toB58String()}`) + } + + this.dialer = new CircuitDialer(swarm, options) + + this.swarm.on('peer-mux-established', this.dialer.canHop.bind(this.dialer)) + this.swarm.on('peer-mux-closed', (peerInfo) => { + this.dialer.relayPeers.delete(peerInfo.id.toB58String()) + }) + } + + /** + * Dial the relays in the Addresses.Swarm config + * + * @param {Array} relays + * @return {void} + */ + _dialSwarmRelays () { + // if we have relay addresses in swarm config, then dial those relays + this.relays.forEach((relay) => { + let relaySegments = relay + .toString() + .split('/p2p-circuit') + .filter(segment => segment.length) + + relaySegments.forEach((relaySegment) => { + this.dialer._dialRelay(this.utils.peerInfoFromMa(multiaddr(relaySegment))) + }) + }) + } + + /** + * Dial a peer over a relay + * + * @param {multiaddr} ma - the multiaddr of the peer to dial + * @param {Object} options - dial options + * @param {Function} cb - a callback called once dialed + * @returns {Connection} - the connection + * + * @memberOf Dialer + */ + dial (ma, options, cb) { + return this.dialer.dial(ma, options, cb) + } + + /** + * Create a listener + * + * @param {any} options + * @param {Function} handler + * @return {listener} + */ + createListener (options, handler) { + if (typeof options === 'function') { + handler = options + options = this.options || {} + } + + const listener = createListener(this.swarm, options, handler) + listener.on('listen', this._dialSwarmRelays.bind(this)) + return listener + } + + /** + * Filter check for all multiaddresses + * that this transport can dial on + * + * @param {any} multiaddrs + * @returns {Array} + * + * @memberOf Dialer + */ + filter (multiaddrs) { + if (!Array.isArray(multiaddrs)) { + multiaddrs = [multiaddrs] + } + return multiaddrs.filter((ma) => { + return mafmt.Circuit.matches(ma) + }) + } +} + +module.exports = Circuit diff --git a/src/circuit/dialer.js b/src/circuit/dialer.js new file mode 100644 index 0000000..cb5d6cd --- /dev/null +++ b/src/circuit/dialer.js @@ -0,0 +1,239 @@ +'use strict' + +const Connection = require('interface-connection').Connection +const isFunction = require('lodash.isfunction') +const multiaddr = require('multiaddr') +const once = require('once') +const waterfall = require('async/waterfall') +const utilsFactory = require('./utils') +const StreamHandler = require('./stream-handler') +const PeerId = require('peer-id') + +const debug = require('debug') +const log = debug('libp2p:circuit:dialer') +log.err = debug('libp2p:circuit:error:dialer') + +const multicodec = require('../multicodec') +const proto = require('../protocol') + +class Dialer { + /** + * Creates an instance of Dialer. + * @param {Swarm} swarm - the swarm + * @param {any} options - config options + * + * @memberOf Dialer + */ + constructor (swarm, options) { + this.swarm = swarm + this.relayPeers = new Map() + this.options = options + this.utils = utilsFactory(swarm) + } + + /** + * Dial a peer over a relay + * + * @param {multiaddr} ma - the multiaddr of the peer to dial + * @param {Function} cb - a callback called once dialed + * @returns {Connection} - the connection + * + * @memberOf Dialer + */ + dial (ma, cb) { + cb = cb || (() => {}) + const strMa = ma.toString() + if (!strMa.includes('/p2p-circuit')) { + log.err('invalid circuit address') + return cb(new Error('invalid circuit address')) + } + + const addr = strMa.split('p2p-circuit') // extract relay address if any + const relay = addr[0] === '/' ? null : multiaddr(addr[0]) + const peer = multiaddr(addr[1] || addr[0]) + + const dstConn = new Connection() + setImmediate(this._dialPeer.bind(this), peer, relay, (err, conn) => { + if (err) { + log.err(err) + return cb(err) + } + + dstConn.setInnerConn(conn) + cb(null, dstConn) + }) + + return dstConn + } + + /** + * Does the peer support the HOP protocol + * + * @param {PeerInfo} peer + * @param {Function} cb + * @returns {*} + */ + canHop (peer, cb) { + cb = once(cb || (() => {})) + + if (!this.relayPeers.get(this.utils.getB58String(peer))) { + let streamHandler + waterfall([ + (wCb) => this._dialRelay(peer, wCb), + (sh, wCb) => { + streamHandler = sh + wCb() + }, + (wCb) => streamHandler.write(proto.CircuitRelay.encode({ + type: proto.CircuitRelay.Type.CAN_HOP + }), wCb), + (wCb) => streamHandler.read(wCb), + (msg, wCb) => { + const response = proto.CircuitRelay.decode(msg) + + if (response.code !== proto.CircuitRelay.Status.SUCCESS) { + return log(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`) + } + + log(`HOP supported adding as relay - ${this.utils.getB58String(peer)}`) + this.relayPeers.set(this.utils.getB58String(peer), peer) + wCb(null) + } + ], cb) + } + + return cb(null) + } + + /** + * Dial the destination peer over a relay + * + * @param {multiaddr} dstMa + * @param {Connection|PeerInfo} relay + * @param {Function} cb + * @return {Function|void} + * @private + */ + _dialPeer (dstMa, relay, cb) { + if (isFunction(relay)) { + cb = relay + relay = null + } + + if (!cb) { + cb = () => {} + } + + dstMa = multiaddr(dstMa) + // if no relay provided, dial on all available relays until one succeeds + if (!relay) { + const relays = Array.from(this.relayPeers.values()) + let next = (nextRelay) => { + if (!nextRelay) { + let err = `no relay peers were found or all relays failed to dial` + log.err(err) + return cb(err) + } + + return this._negotiateRelay(nextRelay, dstMa, (err, conn) => { + if (err) { + log.err(err) + return next(relays.shift()) + } + cb(null, conn) + }) + } + next(relays.shift()) + } else { + return this._negotiateRelay(relay, dstMa, (err, conn) => { + if (err) { + log.err(`An error has occurred negotiating the relay connection`, err) + return cb(err) + } + + return cb(null, conn) + }) + } + } + + /** + * Negotiate the relay connection + * + * @param {Multiaddr|PeerInfo|Connection} relay - the Connection or PeerInfo of the relay + * @param {multiaddr} dstMa - the multiaddr of the peer to relay the connection for + * @param {Function} callback - a callback which gets the negotiated relay connection + * @returns {void} + * @private + * + * @memberOf Dialer + */ + _negotiateRelay (relay, dstMa, callback) { + dstMa = multiaddr(dstMa) + + const srcMas = this.swarm._peerInfo.multiaddrs.toArray() + let streamHandler + waterfall([ + (cb) => { + if (relay instanceof Connection) { + return cb(null, new StreamHandler(relay)) + } + return this._dialRelay(this.utils.peerInfoFromMa(relay), cb) + }, + (sh, cb) => { + streamHandler = sh + cb(null) + }, + (cb) => { + log(`negotiating relay for peer ${dstMa.getPeerId()}`) + streamHandler.write( + proto.CircuitRelay.encode({ + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: this.swarm._peerInfo.id.id, + addrs: srcMas.map((addr) => addr.buffer) + }, + dstPeer: { + id: PeerId.createFromB58String(dstMa.getPeerId()).id, + addrs: [dstMa.buffer] + } + }), cb) + }, + (cb) => streamHandler.read(cb), + (msg, cb) => { + const message = proto.CircuitRelay.decode(msg) + if (message.type !== proto.CircuitRelay.Type.STATUS) { + return cb(new Error(`Got invalid message type - ` + + `expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`)) + } + + if (message.code !== proto.CircuitRelay.Status.SUCCESS) { + return cb(new Error(`Got ${message.code} error code trying to dial over relay`)) + } + + cb(null, new Connection(streamHandler.rest())) + } + ], callback) + } + + /** + * Dial a relay peer by its PeerInfo + * + * @param {PeerInfo} peer - the PeerInfo of the relay peer + * @param {Function} cb - a callback with the connection to the relay peer + * @returns {Function|void} + * @private + */ + _dialRelay (peer, cb) { + cb = once(cb || (() => {})) + + this.swarm.dial(peer, multicodec.relay, once((err, conn) => { + if (err) { + log.err(err) + return cb(err) + } + cb(null, new StreamHandler(conn)) + })) + } +} + +module.exports = Dialer diff --git a/src/circuit/hop.js b/src/circuit/hop.js new file mode 100644 index 0000000..9ac0f86 --- /dev/null +++ b/src/circuit/hop.js @@ -0,0 +1,197 @@ +'use strict' + +require('setimmediate') +require('safe-buffer') + +const pull = require('pull-stream') +const debug = require('debug') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const EE = require('events').EventEmitter +const once = require('once') +const utilsFactory = require('./utils') +const StreamHandler = require('./stream-handler') +const assignInWith = require('lodash/assignInWith') +const proto = require('../protocol') +const multiaddr = require('multiaddr') + +const multicodec = require('./../multicodec') + +const log = debug('libp2p:swarm:circuit:relay') +log.err = debug('libp2p:swarm:circuit:error:relay') + +class Hop extends EE { + /** + * Construct a Circuit object + * + * This class will handle incoming circuit connections and + * either start a relay or hand the relayed connection to + * the swarm + * + * @param {Swarm} swarm + * @param {Object} options + */ + constructor (swarm, options) { + super() + this.swarm = swarm + this.peerInfo = this.swarm._peerInfo + this.utils = utilsFactory(swarm) + this.config = assignInWith( + { + active: false, + enabled: false + }, + options, + (orig, src) => typeof src === 'undefined' ? false : src) + + this.active = this.config.active + } + + /** + * Handle the relay message + * + * @param {CircuitRelay} message + * @param {StreamHandler} streamHandler + * @returns {*} + */ + handle (message, streamHandler) { + if (!this.config.enabled) { + return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_CANT_SPEAK_RELAY) + } + + if (message.type === proto.CircuitRelay.Type.CAN_HOP) { + return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.SUCCESS) + } + + const srcPeerId = PeerId.createFromBytes(message.dstPeer.id) + if (srcPeerId.toB58String() === this.peerInfo.id.toB58String()) { + return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_CANT_RELAY_TO_SELF) + } + + const dstPeerId = PeerId.createFromBytes(message.dstPeer.id).toB58String() + if (!message.dstPeer.addrs.length) { + // TODO: use encapsulate here + const addr = multiaddr(`/p2p-circuit/ipfs/${dstPeerId}`).buffer + message.dstPeer.addrs.push(addr) + } + + this.utils.validateAddrs(message, streamHandler, proto.CircuitRelay.Type.HOP, (err) => { + if (err) { + return log(err) + } + + let dstPeer + try { + dstPeer = this.swarm._peerBook.get(dstPeerId) + if (!dstPeer.isConnected() && !this.active) { + throw new Error('No Connection to peer') + } + } catch (err) { + if (!this.active) { + log.err(err) + setImmediate(() => this.emit('circuit:error', err)) + return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_NO_CONN_TO_DST) + } + } + + return this._circuit(streamHandler.rest(), message, (err) => { + if (err) { + log.err(err) + setImmediate(() => this.emit('circuit:error', err)) + } + setImmediate(() => this.emit('circuit:success')) + }) + }) + } + + /** + * Attempt to make a circuit from A <-> R <-> B where R is this relay + * + * @param {Connection} conn - the source connection + * @param {CircuitRelay} message - the message with the src and dst entries + * @param {Function} cb - callback to signal success or failure + * @returns {void} + * @private + */ + _circuit (conn, message, cb) { + this._dialPeer(message.dstPeer, (err, dstConn) => { + const srcStreamHandler = new StreamHandler(conn) + if (err) { + this.utils.writeResponse(srcStreamHandler, proto.CircuitRelay.Status.HOP_CANT_DIAL_DST) + pull(pull.empty(), srcStreamHandler.rest()) + log.err(err) + return cb(err) + } + + return this.utils.writeResponse(srcStreamHandler, proto.CircuitRelay.Status.SUCCESS, (err) => { + if (err) { + log.err(err) + return cb(err) + } + + const streamHandler = new StreamHandler(dstConn) + const stopMsg = Object.assign({}, message, { + type: proto.CircuitRelay.Type.STOP // change the message type + }) + streamHandler.write(proto.CircuitRelay.encode(stopMsg), (err) => { + if (err) { + const errStreamHandler = new StreamHandler(conn) + this.utils.writeResponse(errStreamHandler, proto.CircuitRelay.Status.HOP_CANT_OPEN_DST_STREAM) + pull(pull.empty(), errStreamHandler.rest()) + + log.err(err) + return cb(err) + } + + streamHandler.read((err, msg) => { + if (err) { + log.err(err) + return cb(err) + } + + const srcConn = srcStreamHandler.rest() + if (msg.status === proto.CircuitRelay.Status.Success) { + // circuit the src and dst streams + pull( + srcConn, + streamHandler.rest(), + srcConn + ) + + cb() + } else { + // close/end the source stream if there was an error + pull( + pull.empty(), + srcConn + ) + } + }) + }) + }) + }) + } + + /** + * Dial the dest peer and create a circuit + * + * @param {Multiaddr} dstPeer + * @param {Function} callback + * @returns {Function|void} + * @private + */ + _dialPeer (dstPeer, callback) { + const peerInfo = new PeerInfo(PeerId.createFromBytes(dstPeer.id)) + dstPeer.addrs.forEach((a) => peerInfo.multiaddrs.add(a)) + this.swarm.dial(peerInfo, multicodec.relay, once((err, conn) => { + if (err) { + log.err(err) + return callback(err) + } + + callback(null, conn) + })) + } +} + +module.exports = Hop diff --git a/src/circuit/stop.js b/src/circuit/stop.js new file mode 100644 index 0000000..8d554fb --- /dev/null +++ b/src/circuit/stop.js @@ -0,0 +1,46 @@ +'use strict' + +const setImmediate = require('async/setImmediate') + +const EE = require('events').EventEmitter +const Connection = require('interface-connection').Connection +const utilsFactory = require('./utils') +const PeerInfo = require('peer-info') +const proto = require('../protocol') +const series = require('async/series') + +const debug = require('debug') + +const log = debug('libp2p:circuit:stop') +log.err = debug('libp2p:circuit:error:stop') + +class Stop extends EE { + constructor (swarm) { + super() + this.swarm = swarm + this.utils = utilsFactory(swarm) + } + + handle (message, streamHandler, callback) { + callback = callback || (() => {}) + + series([ + (cb) => this.utils.validateAddrs(message, streamHandler, proto.CircuitRelay.Type.STOP, cb), + (cb) => this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.Success, cb) + ], (err) => { + if (err) { + callback(err) + return log(err) + } + + const peerInfo = new PeerInfo(message.srcPeer.id) + message.srcPeer.addrs.forEach((addr) => peerInfo.multiaddrs.add(addr)) + const newConn = new Connection(streamHandler.rest()) + newConn.setPeerInfo(peerInfo) + setImmediate(() => this.emit('connection', newConn)) + callback(null, newConn) + }) + } +} + +module.exports = Stop diff --git a/src/circuit/stream-handler.js b/src/circuit/stream-handler.js new file mode 100644 index 0000000..e00945c --- /dev/null +++ b/src/circuit/stream-handler.js @@ -0,0 +1,117 @@ +'use strict' + +const pull = require('pull-stream') +const lp = require('pull-length-prefixed') +const handshake = require('pull-handshake') + +const debug = require('debug') +const log = debug('libp2p:circuit:stream-handler') +log.err = debug('libp2p:circuit:error:stream-handler') + +class StreamHandler { + /** + * Create a stream handler for connection + * + * @param {Connection} conn - connection to read/write + * @param {Function|undefined} cb - handshake callback called on error + * @param {Number} timeout - handshake timeout + * @param {Number} maxLength - max bytes length of message + */ + constructor (conn, cb, timeout, maxLength) { + this.conn = conn + this.stream = null + this.shake = null + this.timeout = cb || 1000 * 60 + this.maxLength = maxLength || 4096 + + if (typeof cb === 'function') { + this.timeout = timeout || 1000 * 60 + } + + this.stream = handshake({timeout: this.timeout}, cb) + this.shake = this.stream.handshake + + pull(this.stream, conn, this.stream) + } + + isValid () { + return this.conn && this.shake && this.stream + } + + /** + * Read and decode message + * + * @param {Function} cb + * @returns {void|Function} + */ + read (cb) { + if (!this.isValid()) { + cb(new Error(`handler is not in a valid state`)) + } + + lp.decodeFromReader(this.shake, {maxLength: this.maxLength}, (err, msg) => { + if (err) { + log.err(err) + // this.shake.abort(err) + return cb(err) + } + + return cb(null, msg) + }) + } + + /** + * Encode and write array of buffers + * + * @param {Buffer[]} msg + * @param {Function} [cb] + * @returns {Function} + */ + write (msg, cb) { + cb = cb || (() => {}) + + if (!this.isValid()) { + cb(new Error(`handler is not in a valid state`)) + } + + pull( + pull.values([msg]), + lp.encode(), + pull.collect((err, encoded) => { + if (err) { + log.err(err) + this.shake.abort(err) + return cb(err) + } + + encoded.forEach((e) => this.shake.write(e)) + cb() + }) + ) + } + + /** + * Get the raw Connection + * + * @returns {null|Connection|*} + */ + getRawConn () { + return this.conn + } + + /** + * Return the handshake rest stream and invalidate handler + * + * @return {*|{source, sink}} + */ + rest () { + const rest = this.shake.rest() + + this.conn = null + this.stream = null + this.shake = null + return rest + } +} + +module.exports = StreamHandler diff --git a/src/circuit/utils.js b/src/circuit/utils.js new file mode 100644 index 0000000..3b8ee10 --- /dev/null +++ b/src/circuit/utils.js @@ -0,0 +1,129 @@ +'use strict' + +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const proto = require('../protocol') + +module.exports = function (swarm) { + /** + * Get b58 string from multiaddr or peerinfo + * + * @param {Multiaddr|PeerInfo} peer + * @return {*} + */ + function getB58String (peer) { + let b58Id = null + if (multiaddr.isMultiaddr(peer)) { + const relayMa = multiaddr(peer) + b58Id = relayMa.getPeerId() + } else if (PeerInfo.isPeerInfo(peer)) { + b58Id = peer.id.toB58String() + } + + return b58Id + } + + /** + * Helper to make a peer info from a multiaddrs + * + * @param {Multiaddr|PeerInfo|PeerId} ma + * @param {Swarm} swarm + * @return {PeerInfo} + * @private + */ + // TODO: this is ripped off of libp2p, should probably be a generally available util function + function peerInfoFromMa (peer) { + let p + // PeerInfo + if (PeerInfo.isPeerInfo(peer)) { + p = peer + // Multiaddr instance (not string) + } else if (multiaddr.isMultiaddr(peer)) { + const peerIdB58Str = peer.getPeerId() + try { + p = swarm._peerBook.get(peerIdB58Str) + } catch (err) { + p = new PeerInfo(PeerId.createFromB58String(peerIdB58Str)) + } + p.multiaddrs.add(peer) + // PeerId + } else if (PeerId.isPeerId(peer)) { + const peerIdB58Str = peer.toB58String() + p = swarm._peerBook.has(peerIdB58Str) ? swarm._peerBook.get(peerIdB58Str) : peer + } + + return p + } + + /** + * Checks if peer has an existing connection + * + * @param {String} peerId + * @param {Swarm} swarm + * @return {Boolean} + */ + function isPeerConnected (peerId) { + return swarm.muxedConns[peerId] || swarm.conns[peerId] + } + + /** + * Write a response + * + * @param {StreamHandler} streamHandler + * @param {CircuitRelay.Status} status + * @param {Function} cb + * @returns {*} + */ + function writeResponse (streamHandler, status, cb) { + cb = cb || (() => {}) + streamHandler.write(proto.CircuitRelay.encode({ + type: proto.CircuitRelay.Type.STATUS, + code: status + })) + return cb() + } + + /** + * Validate incomming HOP/STOP message + * + * @param {CircuitRelay} msg + * @param {StreamHandler} streamHandler + * @param {CircuitRelay.Type} type + * @returns {*} + * @param {Function} cb + */ + function validateAddrs (msg, streamHandler, type, cb) { + try { + msg.dstPeer.addrs.forEach((addr) => { + return multiaddr(addr) + }) + } catch (err) { + writeResponse(streamHandler, type === proto.CircuitRelay.Type.HOP + ? proto.CircuitRelay.Status.HOP_DST_MULTIADDR_INVALID + : proto.CircuitRelay.Status.STOP_DST_MULTIADDR_INVALID) + return cb(err) + } + + try { + msg.srcPeer.addrs.forEach((addr) => { + return multiaddr(addr) + }) + } catch (err) { + writeResponse(streamHandler, type === proto.CircuitRelay.Type.HOP + ? proto.CircuitRelay.Status.HOP_SRC_MULTIADDR_INVALID + : proto.CircuitRelay.Status.STOP_SRC_MULTIADDR_INVALID) + return cb(err) + } + + return cb(null) + } + + return { + getB58String: getB58String, + peerInfoFromMa: peerInfoFromMa, + isPeerConnected: isPeerConnected, + validateAddrs: validateAddrs, + writeResponse: writeResponse + } +} diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000..e4c7fed --- /dev/null +++ b/src/index.js @@ -0,0 +1,3 @@ +'use strict' + +module.exports = require('./circuit') diff --git a/src/listener.js b/src/listener.js new file mode 100644 index 0000000..d68a1a2 --- /dev/null +++ b/src/listener.js @@ -0,0 +1,140 @@ +'use strict' + +const setImmediate = require('async/setImmediate') + +const multicodec = require('./multicodec') +const EE = require('events').EventEmitter +const multiaddr = require('multiaddr') +const mafmt = require('mafmt') +const Stop = require('./circuit/stop') +const Hop = require('./circuit/hop') +const proto = require('./protocol') +const utilsFactory = require('./circuit/utils') + +const StreamHandler = require('./circuit/stream-handler') + +const debug = require('debug') + +const log = debug('libp2p:circuit:listener') +log.err = debug('libp2p:circuit:error:listener') + +module.exports = (swarm, options, connHandler) => { + const listener = new EE() + const utils = utilsFactory(swarm) + + listener.stopHandler = new Stop(swarm) + listener.hopHandler = new Hop(swarm, options.hop) + + /** + * Add swarm handler and listen for incoming connections + * + * @param {Multiaddr} ma + * @param {Function} callback + * @return {void} + */ + listener.listen = (ma, callback) => { + callback = callback || (() => {}) + + swarm.handle(multicodec.relay, (relayProto, conn) => { + const streamHandler = new StreamHandler(conn) + + streamHandler.read((err, msg) => { + if (err) { + log.err(err) + return + } + + let request = null + try { + request = proto.CircuitRelay.decode(msg) + } catch (err) { + return utils.writeResponse(streamHandler, proto.CircuitRelay.Status.MALFORMED_MESSAGE) + } + + switch (request.type) { + case proto.CircuitRelay.Type.CAN_HOP: + case proto.CircuitRelay.Type.HOP: { + return listener.hopHandler.handle(request, streamHandler) + } + + case proto.CircuitRelay.Type.STOP: { + return listener.stopHandler.handle(request, streamHandler, connHandler) + } + + default: { + return utils.writeResponse(streamHandler, proto.CircuitRelay.Status.INVALID_MSG_TYPE) + } + } + }) + }) + + setImmediate(() => listener.emit('listen')) + callback() + } + + /** + * Remove swarm listener + * + * @param {Function} cb + * @return {void} + */ + listener.close = (cb) => { + swarm.unhandle(multicodec.stop) + setImmediate(() => listener.emit('close')) + cb() + } + + /** + * Get fixed up multiaddrs + * + * NOTE: This method will grab the peers multiaddrs and expand them such that: + * + * a) If it's an existing /p2p-circuit address for a specific relay i.e. + * `/ip4/0.0.0.0/tcp/0/ipfs/QmRelay/p2p-circuit` this method will expand the + * address to `/ip4/0.0.0.0/tcp/0/ipfs/QmRelay/p2p-circuit/ipfs/QmPeer` where + * `QmPeer` is this peers id + * b) If it's not a /p2p-circuit address, it will encapsulate the address as a /p2p-circuit + * addr such that dials a relay uses that address to dial this peer + * + * @param {Function} callback + * @return {void} + */ + listener.getAddrs = (callback) => { + let addrs = swarm._peerInfo.multiaddrs.toArray() + + // get all the explicit relay addrs excluding self + let p2pAddrs = addrs.filter((addr) => { + return mafmt.Circuit.matches(addr) && + !addr.toString().includes(swarm._peerInfo.id.toB58String()) + }) + + // use the explicit relays instead of any relay + if (p2pAddrs.length) { + addrs = p2pAddrs + } + + let listenAddrs = [] + addrs.forEach((addr) => { + const peerMa = `/p2p-circuit/ipfs/${swarm._peerInfo.id.toB58String()}` + if (addr.toString() === peerMa) { + listenAddrs.push(multiaddr(peerMa)) + return + } + + if (!mafmt.Circuit.matches(addr)) { + if (addr.getPeerId()) { + // by default we're reachable over any relay + listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(addr)) + } else { + listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(`${addr}/ipfs/${swarm._peerInfo.id.toB58String()}`)) + } + } else { + listenAddrs.push(addr.encapsulate(`/ipfs/${swarm._peerInfo.id.toB58String()}`)) + } + }) + + callback(null, listenAddrs) + } + + return listener +} diff --git a/src/multicodec.js b/src/multicodec.js new file mode 100644 index 0000000..bcdb978 --- /dev/null +++ b/src/multicodec.js @@ -0,0 +1,5 @@ +'use strict' + +module.exports = { + relay: '/libp2p/circuit/relay/0.1.0' +} diff --git a/src/protocol/index.js b/src/protocol/index.js new file mode 100644 index 0000000..d403960 --- /dev/null +++ b/src/protocol/index.js @@ -0,0 +1,4 @@ +'use strict' + +const protobuf = require('protocol-buffers') +module.exports = protobuf(require('./proto.js')) diff --git a/src/protocol/proto.js b/src/protocol/proto.js new file mode 100644 index 0000000..32a65f1 --- /dev/null +++ b/src/protocol/proto.js @@ -0,0 +1,43 @@ +'use strict' +module.exports = ` +message CircuitRelay { + + enum Status { + SUCCESS = 100; + HOP_SRC_ADDR_TOO_LONG = 220; + HOP_DST_ADDR_TOO_LONG = 221; + HOP_SRC_MULTIADDR_INVALID = 250; + HOP_DST_MULTIADDR_INVALID = 251; + HOP_NO_CONN_TO_DST = 260; + HOP_CANT_DIAL_DST = 261; + HOP_CANT_OPEN_DST_STREAM = 262; + HOP_CANT_SPEAK_RELAY = 270; + HOP_CANT_RELAY_TO_SELF = 280; + STOP_SRC_ADDR_TOO_LONG = 320; + STOP_DST_ADDR_TOO_LONG = 321; + STOP_SRC_MULTIADDR_INVALID = 350; + STOP_DST_MULTIADDR_INVALID = 351; + STOP_RELAY_REFUSED = 390; + MALFORMED_MESSAGE = 400; + } + + enum Type { // RPC identifier, either HOP, STOP or STATUS + HOP = 1; + STOP = 2; + STATUS = 3; + CAN_HOP = 4; + } + + message Peer { + required bytes id = 1; // peer id + repeated bytes addrs = 2; // peer's known addresses + } + + optional Type type = 1; // Type of the message + + optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STATUS + optional Peer dstPeer = 3; + + optional Status code = 4; // Status code, used when Type is STATUS +} +` diff --git a/test/dialer.spec.js b/test/dialer.spec.js new file mode 100644 index 0000000..2d4d0d0 --- /dev/null +++ b/test/dialer.spec.js @@ -0,0 +1,268 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ + +'use strict' + +const Dialer = require('../src/circuit/dialer') +const nodes = require('./fixtures/nodes') +const Connection = require('interface-connection').Connection +const multiaddr = require('multiaddr') +const handshake = require('pull-handshake') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const waterfall = require('async/waterfall') +const pull = require('pull-stream') +const lp = require('pull-length-prefixed') +const proto = require('../src/protocol') +const StreamHandler = require('../src/circuit/stream-handler') +const utilsFactory = require('../src/circuit/utils') + +const sinon = require('sinon') +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +describe(`dialer tests`, function () { + describe(`.dial`, function () { + const dialer = sinon.createStubInstance(Dialer) + + beforeEach(function () { + dialer.relayPeers = new Map() + dialer.relayPeers.set(nodes.node2.id, new Connection()) + dialer.relayPeers.set(nodes.node3.id, new Connection()) + dialer.dial.callThrough() + }) + + afterEach(function () { + dialer._dialPeer.reset() + }) + + it(`fail on non circuit addr`, function () { + const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`) + expect(() => dialer.dial(dstMa, (err) => { + err.to.match(/invalid circuit address/) + })) + }) + + it(`dial a peer`, function (done) { + const dstMa = multiaddr(`/p2p-circuit/ipfs/${nodes.node3.id}`) + dialer._dialPeer.callsFake(function (dstMa, relay, callback) { + return callback(null, dialer.relayPeers.get(nodes.node3.id)) + }) + + dialer.dial(dstMa, (err, conn) => { + expect(err).to.be.null() + expect(conn).to.be.an.instanceOf(Connection) + done() + }) + }) + + it(`dial a peer over the specified relay`, function (done) { + const dstMa = multiaddr(`/ipfs/${nodes.node3.id}/p2p-circuit/ipfs/${nodes.node4.id}`) + dialer._dialPeer.callsFake(function (dstMa, relay, callback) { + expect(relay.toString()).to.equal(`/ipfs/${nodes.node3.id}`) + return callback(null, new Connection()) + }) + + dialer.dial(dstMa, (err, conn) => { + expect(err).to.be.null() + expect(conn).to.be.an.instanceOf(Connection) + done() + }) + }) + }) + + describe(`.canHop`, function () { + const dialer = sinon.createStubInstance(Dialer) + + let stream = null + let shake = null + let fromConn = null + let peer = new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA')) + + beforeEach(function () { + stream = handshake({ timeout: 1000 * 60 }) + shake = stream.handshake + fromConn = new Connection(stream) + + dialer.relayPeers = new Map() + dialer.utils = utilsFactory({}) + dialer.canHop.callThrough() + }) + + afterEach(function () { + dialer._dialRelay.reset() + }) + + it(`should handle successful CAN_HOP`, function () { + pull( + pull.values([proto.CircuitRelay.encode({ + type: proto.CircuitRelay.type.HOP, + code: proto.CircuitRelay.Status.SUCCESS + })]), + lp.encode(), + pull.collect((err, encoded) => { + expect(err).to.be.null() + encoded.forEach((e) => shake.write(e)) + dialer._dialRelay.callsFake((peer, cb) => { + cb(null, new StreamHandler(fromConn)) + }) + }) + ) + + dialer.canHop(peer, (err) => { + expect(err).to.be.null() + expect(dialer.relayPeers.has(peer.id.toB58String())).to.be.ok() + }) + }) + + it(`should handle failed CAN_HOP`, function () { + pull( + pull.values([proto.CircuitRelay.encode({ + type: proto.CircuitRelay.type.HOP, + code: proto.CircuitRelay.Status.HOP_CANT_SPEAK_RELAY + })]), + lp.encode(), + pull.collect((err, encoded) => { + expect(err).to.be.null() + encoded.forEach((e) => shake.write(e)) + dialer._dialRelay.callsFake((peer, cb) => { + cb(null, new StreamHandler(fromConn)) + }) + }) + ) + + dialer.canHop(peer, (err) => { + expect(err).to.be.null() + expect(dialer.relayPeers.has(peer.id.toB58String())).to.not.be.ok() + }) + }) + }) + + describe(`._dialPeer`, function () { + const dialer = sinon.createStubInstance(Dialer) + + beforeEach(function () { + dialer.relayPeers = new Map() + dialer.relayPeers.set(nodes.node1.id, new Connection()) + dialer.relayPeers.set(nodes.node2.id, new Connection()) + dialer.relayPeers.set(nodes.node3.id, new Connection()) + dialer._dialPeer.callThrough() + }) + + afterEach(function () { + dialer._negotiateRelay.reset() + }) + + it(`should dial a peer over any relay`, function (done) { + const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`) + dialer._negotiateRelay.callsFake(function (conn, dstMa, callback) { + if (conn === dialer.relayPeers.get(nodes.node3.id)) { + return callback(null, dialer.relayPeers.get(nodes.node3.id)) + } + + callback(new Error(`error`)) + }) + + dialer._dialPeer(dstMa, (err, conn) => { + expect(err).to.be.null() + expect(conn).to.be.an.instanceOf(Connection) + expect(conn).to.deep.equal(dialer.relayPeers.get(nodes.node3.id)) + done() + }) + }) + + it(`should fail dialing a peer over any relay`, function (done) { + const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`) + dialer._negotiateRelay.callsFake(function (conn, dstMa, callback) { + callback(new Error(`error`)) + }) + + dialer._dialPeer(dstMa, (err, conn) => { + expect(conn).to.be.undefined() + expect(err).to.not.be.null() + expect(err).to.equal(`no relay peers were found or all relays failed to dial`) + done() + }) + }) + }) + + describe(`._negotiateRelay`, function () { + const dialer = sinon.createStubInstance(Dialer) + const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`) + + let conn + let stream + let shake + let callback = sinon.stub() + + beforeEach(function (done) { + waterfall([ + (cb) => PeerId.createFromJSON(nodes.node4, cb), + (peerId, cb) => PeerInfo.create(peerId, cb), + (peer, cb) => { + peer.multiaddrs.add(`/p2p-circuit/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`) + dialer.swarm = { + _peerInfo: peer + } + cb() + }, + (cb) => { + dialer.relayConns = new Map() + dialer._negotiateRelay.callThrough() + stream = handshake({ timeout: 1000 * 60 }) + shake = stream.handshake + conn = new Connection() + conn.setPeerInfo(new PeerInfo(PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`))) + conn.setInnerConn(stream) + dialer._negotiateRelay(conn, dstMa, callback) + cb() + } + ], done) + }) + + afterEach(() => { + callback.reset() + }) + + it(`should write the correct dst addr`, function (done) { + lp.decodeFromReader(shake, (err, msg) => { + shake.write(proto.CircuitRelay.encode({ + type: proto.CircuitRelay.Type.STATUS, + code: proto.CircuitRelay.Status.SUCCESS + })) + expect(err).to.be.null() + expect(proto.CircuitRelay.decode(msg).dstPeer.addrs[0]).to.deep.equal(dstMa.buffer) + done() + }) + }) + + it(`should handle failed relay negotiation`, function (done) { + callback.callsFake((err, msg) => { + expect(err).to.not.be.null() + expect(err).to.be.an.instanceOf(Error) + expect(err.message).to.be.equal(`Got 400 error code trying to dial over relay`) + expect(callback.calledOnce).to.be.ok() + done() + }) + + // send failed message + lp.decodeFromReader(shake, (err, msg) => { + if (err) return done(err) + + pull( + pull.values([proto.CircuitRelay.encode({ + type: proto.CircuitRelay.Type.STATUS, + code: proto.CircuitRelay.Status.MALFORMED_MESSAGE + })]), // send arbitrary non 200 code + lp.encode(), + pull.collect((err, encoded) => { + expect(err).to.be.null() + encoded.forEach((e) => shake.write(e)) + }) + ) + }) + }) + }) +}) diff --git a/test/fixtures/nodes.js b/test/fixtures/nodes.js new file mode 100644 index 0000000..71a274d --- /dev/null +++ b/test/fixtures/nodes.js @@ -0,0 +1,25 @@ +'use strict' + +exports.node1 = { + id: 'QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE', + privKey: 'CAASpwkwggSjAgEAAoIBAQDJwzJPar4nylKY71Mm5q2BOED8uPf1ILvIi15VwVZWqter6flnlii/RKEcBypPbFqJHHa56MvybgQgrFmHKwDjnJvq4jyOZfR+o/D/99Ft1p2FAEBjImSXAgNpK4YsbyV5r0Q1+Avcj++aWWlLu6enUrL9WGzeUkf0U5L6XwXEPRUQdEojAIQi241P1hyqXX5gKAZVGqcPtKb6p1db3fcXodkS1G6JR90TopJHCqTCECp3SB9c6LlG7KXU92sIHJBlhOEEzGkEI1pM1SWnNnW5VLEypU7P56ifzzp4QxPNiJeC+cmE5SrgR3cXP44iKOuNVRJwBpCh5oNYqECzgqJ9AgMBAAECggEBAJpCdqXHrAmKJCqv2HiGqCODGhTfax1s4IYNIJwaTOPIjUrwgfKUGSVb2H4wcEX3RyVLsO6lMcFyIg/FFlJFK9HavE8SmFAbXZqxx6I9HE+JZjf5IEFrW1Mlg+wWDejNNe7adSF6O79wATaWo+32VNGWZilTQTGd4UvJ1jc9DZCh8zZeNhm4C6exXD45gMB0HI1t2ZNl47scsBEE4rV+s7F7y8Yk/tIsf0wSI/H8KSXS5I9aFxr3Z9c3HOfbVwhnIfNUDqcFTeU5BnhByYNLJ4v9xGj7puidcabVXkt2zLmm/LHbKVeGzec9LW5D+KkuB/pKaslsCXN6bVlu+SbVr9UCgYEA7MXfzZw36vDyfn4LPCN0wgzz11uh3cm31QzOPlWpA7hIsL/eInpvc8wa9yBRC1sRk41CedPHn913MR6EJi0Ne6/B1QOmRYBUjr60VPRNdTXCAiLykjXg6+TZ+AKnxlUGK1hjTo8krhpWq7iD/JchVlLoqDAXGFHvSxN0H3WEUm8CgYEA2iWC9w1v+YHfT2PXcLxYde9EuLVkIS4TM7Kb0N3wr/4+K4xWjVXuaJJLJoAbihNAZw0Y+2s1PswDUEpSG0jXeNXLs6XcQxYSEAu/pFdvHFeg2BfwVQoeEFlWyTJR29uti9/APaXMo8FSVAPPR5lKZLStJDM9hEfAPfUaHyic39MCgYAKQbwjNQw7Ejr+/cjQzxxkt5jskFyftfhPs2FP0/ghYB9OANHHnpQraQEWCYFZQ5WsVac2jdUM+NQL/a1t1e/Klt+HscPHKPsAwAQh1f9w/2YrH4ZwjQL0VRKYKs1HyzEcOZT7tzm4jQ2KHNEi5Q0dpzPK7WJivFHoZ6xVHIsh4wKBgAQq20mk9BKsLHvzyFXbA0WdgI6WyIbpvmwqaVegJcz26nEiiTTCA3/z64OcxunoXD6bvXJwJeBBPX73LIJg7dzdGLsh3AdcEJRF5S9ajEDaW7RFIM4/FzvwuPu2/mFY3QPjDmUfGb23H7+DIx6XCxjJatVaNT6lsEJ+wDUALZ8JAoGAO0YJSEziA7y0dXPK5azkJUMJ5yaN+zRDmoBnEggza34rQW0s16NnIR0EBzKGwbpNyePlProv4dQEaLF1kboKsSYvV2rW2ftLVdNqBHEUYFRC9ofPctCxwM1YU21TI2/k1squ+swApg2EHMev2+WKd+jpVPIbCIvJ3AjiAKZtiGQ=', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDJwzJPar4nylKY71Mm5q2BOED8uPf1ILvIi15VwVZWqter6flnlii/RKEcBypPbFqJHHa56MvybgQgrFmHKwDjnJvq4jyOZfR+o/D/99Ft1p2FAEBjImSXAgNpK4YsbyV5r0Q1+Avcj++aWWlLu6enUrL9WGzeUkf0U5L6XwXEPRUQdEojAIQi241P1hyqXX5gKAZVGqcPtKb6p1db3fcXodkS1G6JR90TopJHCqTCECp3SB9c6LlG7KXU92sIHJBlhOEEzGkEI1pM1SWnNnW5VLEypU7P56ifzzp4QxPNiJeC+cmE5SrgR3cXP44iKOuNVRJwBpCh5oNYqECzgqJ9AgMBAAE=' +} + +exports.node2 = { + id: 'QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe', + privKey: 'CAASpgkwggSiAgEAAoIBAQDt7YgUeBQsoN/lrgo690mB7yEh8G9iXhZiDecgZCLRRSl3v2cH9w4WjhoW9erfnVbdoTqkCK+se8uK01ySi/ubQQDPcrjacXTa6wAuRTbCG/0bUR9RxKtxZZBS1HaY7L923ulgGDTiVaRQ3JQqhzmQkaU0ikNcluSGaw0kmhXP6JmcL+wndKgW5VD9etcp2Qlk8uUFC/GAO90cOAuER3wnI3ocHGm9on9zyb97g4TDzIfjSaTW4Wanmx2yVbURQxmCba16X3LT9IMPqQaGOzq3+EewMLeCESbUm/uJaJLdqWrWRK4oNzxcMgmUkzav+s476HdA9CRo72am+g3Vdq+lAgMBAAECggEAcByKD6MZVoIjnlVo6qoVUA1+3kAuK/rLrz5/1wp4QYXGaW+eO+mVENm6v3D3UJESGnLbb+nL5Ymbunmn2EHvuBNkL1wOcJgfiPxM5ICmscaAeHu8N0plwpQp8m28yIheG8Qj0az2VmQmfhfCFVwMquuGHgC8hwdu/Uu6MLIObx1xjtaGbY9kk7nzAeXHeJ4RDeuNN0QrYuQVKwrIz1NtPNDR/cli298ZXJcm+HEhBCIHVIYpAq6BHSuiXVqPGEOYWYXo+yVhEtDJ8BmNqlN1Y1s6bnfu/tFkKUN6iQQ46vYnQEGTGR9lg7J/c6tqfRs9FcywWb9J1SX6HxPO8184zQKBgQD6vDYl20UT4ZtrzhFfMyV/1QUqFM/TdwNuiOsIewHBol9o7aOjrxrrbYVa1HOxETyBjmFsW+iIfOVl61SG2HcU4CG+O2s9WBo4JdRlOm4YQ8/83xO3YfbXzuTx8BMCyP/i1uPIZTKQFFAN0HiL96r4L60xHoWB7tQsbZiEbIO/2wKBgQDy7HnkgVeTld6o0+sT84FYRUotjDB00oUWiSeGtj0pFC4yIxhMhD8QjKiWoJyJItcoCsQ/EncuuwwRtuXi83793lJQR1DBYd+TSPg0M8J1pw97fUIPi/FU+jHtrsx7Vn/7Bk9voictsYVLAfbi68tYdsZpAaYOWYMY9NUfVuAmfwKBgCYZDwk1hgt9TkZVK2KRvPLthTldrC5veQAEoeHJ/vxTFbg105V9d9Op8odYnLOc8NqmrbrvRCfpAlo4JcHPhliPrdDf6m2Jw4IgjWNMO4pIU4QSyUYmBoHIGBWC6wCTVf47tKSwa7xkub0/nfF2km3foKtD/fk+NtMBXBlS+7ndAoGAJo6GIlCtN82X07AfJcGGjB4jUetoXYJ0gUkvruAKARUk5+xOFQcAg33v3EiNz+5pu/9JesFRjWc+2Sjwf/8p7t10ry1Ckg8Yz2XLj22PteDYQj91VsZdfaFgf1s5NXJbSdqMjSltkoEUqP0c1JOcaOQhRdVvJ+PpPPLPSPQfC70CgYBvJE1I06s7BEM1DOli3VyfNaJDI4k9W2dCJOU6Bh2MNmbdRjM3xnpOKH5SqRlCz/oI9pn4dxgbX6WPg331MD9CNYy2tt5KBQRrSuDj8p4jlzMIpX36hsyTTrzYU6WWSIPz6jXW8IexXKvXEmr8TVb78ZPiQfbG012cdUhAJniNgg==', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDt7YgUeBQsoN/lrgo690mB7yEh8G9iXhZiDecgZCLRRSl3v2cH9w4WjhoW9erfnVbdoTqkCK+se8uK01ySi/ubQQDPcrjacXTa6wAuRTbCG/0bUR9RxKtxZZBS1HaY7L923ulgGDTiVaRQ3JQqhzmQkaU0ikNcluSGaw0kmhXP6JmcL+wndKgW5VD9etcp2Qlk8uUFC/GAO90cOAuER3wnI3ocHGm9on9zyb97g4TDzIfjSaTW4Wanmx2yVbURQxmCba16X3LT9IMPqQaGOzq3+EewMLeCESbUm/uJaJLdqWrWRK4oNzxcMgmUkzav+s476HdA9CRo72am+g3Vdq+lAgMBAAE=' +} + +exports.node3 = { + id: 'QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA', + privKey: 'CAASpwkwggSjAgEAAoIBAQDdnGp0X7Pix5dIawfyuffVryRDRS5JXdyjayKUkgikJLYoiijB5TakrFKhx1SDKpmVLxxqAGz8m5iA2cHwetIQXTZvdYx7XXxv332En3ji8TiGRUiEFM8KQ5WCJ5G7yw8R2pv/pYdnMrPd04QbtSCn0cFVCiiA2Zkl5KnwBo/lf+sVI/TEeiwmVD9nxi13qWgBTmCysqH8Ppyu8fq+bQgqRZSlalVDswyIhgWlepPkD0uYakJJhhOxY+2RlbNhGY0qjRyMTYou2uR/hfd6j8uR++WdB0v3+DYWG2Kc3sWa4BLYb5r4trvQGO1Iagnwuk3AVoi7PldsaInekzWEVljDAgMBAAECggEAXx0jE49/xXWkmJBXePYYSL5C8hxfIV4HtJvm251R2CFpjTy/AXk/Wq4bSRQkUaeXA1CVAWntXP3rFmJfurb8McnP80agZNJa9ikV1jYbzEt71yUlWosT0XPwV0xkYBVnAmKxUafZ1ZENYcfGi53RxjVgpP8XIzZBZOIfjcVDPVw9NAOzQmq4i3DJEz5xZAkaeSM8mn5ZFl1JMBUOgyOHB7d4BWd3zuLyvnn0/08HlsaSUl0mZa3f2Lm2NlsjOiNfMCJTOIT+xDEP9THm5n2cqieSjvtpAZzV4kcoD0rB8OsyHQlFAEXzkgELDr5dVXji0rrIdVz8stYAKGfi996OAQKBgQDuviV1sc+ClJQA59vqbBiKxWqcuCKMzvmL4Yk1e/AkQeRt+JX9kALWzBx65fFmHTj4Lus8AIQoiruPxa0thtqh/m3SlucWnrdaW410xbz3KqQWS7bx+0sFWZIEi4N+PESrIYhtVbFuRiabYgliqdSU9shxtXXnvfhjl+9quZltiwKBgQDtoUCKqrZbm0bmzLvpnKdNodg1lUHaKGgEvWgza2N1t3b/GE07iha2KO3hBDta3bdfIEEOagY8o13217D0VIGsYNKpiEGLEeNIjfcXBEqAKiTfa/sXUfTprpWBZQ/7ZS+eZIYtQjq14EHa7ifAby1v3yDrMIuxphz5JfKdXFgYqQKBgHr47FikPwu2tkmFJCyqgzWvnEufOQSoc7eOc1tePIKggiX2/mM+M4gqWJ0hJeeAM+D6YeZlKa2sUBItMxeZN7JrWGw5mEx5cl4TfFhipgP2LdDiLRiVZL4bte+rYQ67wm8XdatDkYIIlkhBBi6Q5dPZDcQsQNAedPvvvb2OXi4jAoGBAKp06FpP+L2fle2LYSRDlhNvDCvrpDA8mdkEkRGJb/AKKdb09LnH5WDH3VNy+KzGrHoVJfWUAmNPAOFHeYzabaZcUeEAd5utui7afytIjbSABrEpwRTKWneiH2aROzSnMdBZ5ZHjlz/N3Q+RlHxKg/piwTdUPHCzasch/HX6vsr5AoGAGvhCNPKyCwpu8Gg5GQdx0yN6ZPar9wieD345cLanDZWKkLRQbo4SfkfoS+PDfOLzDbWFdPRnWQ0qhdPm3D/N1YD/nudHqbeDlx0dj/6lEHmmPKFFO2kiNFEhn8DycNGbvWyVBKksacuRXav21+LvW+TatUkRMhi8fgRoypnbJjg=', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDdnGp0X7Pix5dIawfyuffVryRDRS5JXdyjayKUkgikJLYoiijB5TakrFKhx1SDKpmVLxxqAGz8m5iA2cHwetIQXTZvdYx7XXxv332En3ji8TiGRUiEFM8KQ5WCJ5G7yw8R2pv/pYdnMrPd04QbtSCn0cFVCiiA2Zkl5KnwBo/lf+sVI/TEeiwmVD9nxi13qWgBTmCysqH8Ppyu8fq+bQgqRZSlalVDswyIhgWlepPkD0uYakJJhhOxY+2RlbNhGY0qjRyMTYou2uR/hfd6j8uR++WdB0v3+DYWG2Kc3sWa4BLYb5r4trvQGO1Iagnwuk3AVoi7PldsaInekzWEVljDAgMBAAE=' +} + +exports.node4 = { + id: 'QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy', + privKey: 'CAASqAkwggSkAgEAAoIBAQC6pg6LYWbY+49SOYdYap6RPqKqZxg80IXeo3hiTUbiGtTruxVYZpnz3UbernL9J9mwlXJGRUQJUKmXmi1yePTQiyclpH0KyPefaWLbpxJQdCBI1TPZpDWo2hutWSPqhKBU1QyH2FLKQPWLdxuIX1cNFPtIlSl5gCxN6oiDIwh7++kxNM1G+d5XgJX6iHLlLoNv3Wn6XYX+VtYdyZRFk8gYyT2BrISbxmsrSjSOodwUUzF8TYTjsqW6ksL2x0mrRm2cMM9evULktqwU+I8l9ulASDbFWBXUToXaZSL9M+Oq5JvZO0WIjPeYVpAgWladtayhdxg5dBv8aTbDaM5DZvyRAgMBAAECggEAR65YbZz1k6Vg0HI5kXI4/YzxicHYJBrtHqjnJdGJxHILjZCmzPFydJ5phkG29ZRlXRS381bMn0s0Jn3WsFzVoHWgjitSvl6aAsXFapgKR42hjHcc15vh47wH3xYZ3gobTRkZG96vRO+XnX0bvM7orqR9MM3gRMI9wZqt3LcKnhpiqSlyEZ3Zehu7ZZ8B+XcUw42H6ZTXgmg5mCFEjS/1rVt+EsdZl7Ll7jHigahPA6qMjyRiZB6T20qQ0FFYfmaNuRuuC6cWUXf8DOgnEjMB/Mi/Feoip9bTqNBrVYn2XeDxdMv5pDznNKXpalsMkZwx5FpNOMKnIMdQFyAGtkeQ9QKBgQD3rjTiulitpbbQBzF8VXeymtMJAbR1TAqNv2yXoowhL3JZaWICM7nXHjjsJa3UzJygbi8bO0KWrw7tY0nUbPy5SmHtNYhmUsEjiTjqEnNRrYN68tEKr0HlgX+9rArsjOcwucl2svFSfk+rTYDHU5neZkDDhu1QmnZm/pQI92Lo4wKBgQDA6wpMd53fmX9DhWegs3xelRStcqBTw1ucWVRyPgY1hO1cJ0oReYIXKEw9CHNLW0RHvnVM26kRnqCl+dTcg7dhLuqrckuyQyY1KcRYG1ryJnz3euucaSF2UCsZCHvFNV7Vz8dszUMUVCogWmroVP6HE/BoazUCNh25s/dNwE+i+wKBgEfa1WL1luaBzgCaJaQhk4FQY2sYgIcLEYDACTwQn0C9aBpCdXmYEhEzpmX0JHM5DTOJ48atsYrPrK/3/yJOoB8NUk2kGzc8SOYLWGSoB6aphRx1N2o3IBH6ONoJAH5R/nxnWehCz7oUBP74lCS/v0MDPUS8bzrUJQeKUd4sDxjrAoGBAIRO7rJA+1qF+J1DWi4ByxNHJXZLfh/UhPj23w628SU1dGDWZVsUvZ7KOXdGW2RcRLj7q5E5uXtnEoCillViVJtnRPSun7Gzkfm2Gn3ezQH0WZKVkA+mnpd5JgW2JsS69L6pEPnS0OWZT4b+3AFZgXL8vs2ucR2CJeLdxYdilHuPAoGBAPLCzBkAboXZZtvEWqzqtVNqdMrjLHihFrpg4TXSsk8+ZQZCVN+sRyTGTvBX8+Jvx4at6ClaSgT3eJ/412fEH6CHvrFXjUE9W9y6X0axxaT63y1OXmFiB/hU3vjLWZKZWSDGNS7St02fYri4tWmGtJDjYG1maLRhMSzcoj4fP1xz', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC6pg6LYWbY+49SOYdYap6RPqKqZxg80IXeo3hiTUbiGtTruxVYZpnz3UbernL9J9mwlXJGRUQJUKmXmi1yePTQiyclpH0KyPefaWLbpxJQdCBI1TPZpDWo2hutWSPqhKBU1QyH2FLKQPWLdxuIX1cNFPtIlSl5gCxN6oiDIwh7++kxNM1G+d5XgJX6iHLlLoNv3Wn6XYX+VtYdyZRFk8gYyT2BrISbxmsrSjSOodwUUzF8TYTjsqW6ksL2x0mrRm2cMM9evULktqwU+I8l9ulASDbFWBXUToXaZSL9M+Oq5JvZO0WIjPeYVpAgWladtayhdxg5dBv8aTbDaM5DZvyRAgMBAAE=' +} diff --git a/test/helpers/test-node.js b/test/helpers/test-node.js new file mode 100644 index 0000000..998bf07 --- /dev/null +++ b/test/helpers/test-node.js @@ -0,0 +1,22 @@ +'use strict' + +const Libp2p = require('libp2p') +const secio = require('libp2p-secio') + +class TestNode extends Libp2p { + constructor (peerInfo, transports, muxer, options) { + options = options || {} + + const modules = { + transport: transports, + connection: { + muxer: [muxer], + crypto: options.isCrypto ? [secio] : null + }, + discovery: [] + } + super(modules, peerInfo, null, options) + } +} + +module.exports = TestNode diff --git a/test/helpers/utils.js b/test/helpers/utils.js new file mode 100644 index 0000000..de198ca --- /dev/null +++ b/test/helpers/utils.js @@ -0,0 +1,110 @@ +'use strict' + +const TestNode = require('./test-node') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const eachAsync = require('async/each') +const pull = require('pull-stream') + +exports.createNodes = function createNodes (configNodes, callback) { + const nodes = {} + eachAsync(Object.keys(configNodes), (key, cb1) => { + let config = configNodes[key] + + const setup = (err, peer) => { + if (err) { + callback(err) + } + + eachAsync(config.addrs, (addr, cb2) => { + peer.multiaddrs.add(addr) + cb2() + }, (err) => { + if (err) { + return callback(err) + } + + nodes[key] = new TestNode(peer, config.transports, config.muxer, config.config) + cb1() + }) + } + + if (config.id) { + PeerId.createFromJSON(config.id, (err, peerId) => { + if (err) return callback(err) + PeerInfo.create(peerId, setup) + }) + } else { + PeerInfo.create(setup) + } + }, (err) => { + if (err) { + return callback(err) + } + + startNodes(nodes, (err) => { + if (err) { + callback(err) + } + + callback(null, nodes) + }) + }) +} + +function startNodes (nodes, callback) { + eachAsync(Object.keys(nodes), + (key, cb) => { + nodes[key].start(cb) + }, + (err) => { + if (err) { + return callback(err) + } + callback(null) + }) +} + +exports.stopNodes = function stopNodes (nodes, callback) { + eachAsync(Object.keys(nodes), + (key, cb) => { + nodes[key].stop(cb) + }, + (err) => { + if (err) { + return callback(err) + } + callback() + }) +} + +function reverse (protocol, conn) { + pull( + conn, + pull.map((data) => { + return data.toString().split('').reverse().join('') + }), + conn + ) +} + +exports.dialAndReverse = function dialAndRevers (srcNode, dstNode, vals, done) { + dstNode.handle('/ipfs/reverse/1.0.0', reverse) + + srcNode.dial(dstNode.peerInfo, '/ipfs/reverse/1.0.0', (err, conn) => { + if (err) return done(err) + + pull( + pull.values(vals), + conn, + pull.collect((err, data) => { + if (err) return done(err) + + let reversed = data.map((val, i) => { + return val.toString() + }) + + srcNode.hangUp(srcNode.peerInfo, () => done(null, reversed)) + })) + }) +} diff --git a/test/hop.spec.js b/test/hop.spec.js new file mode 100644 index 0000000..8edd1b5 --- /dev/null +++ b/test/hop.spec.js @@ -0,0 +1,232 @@ +/* eslint-env mocha */ +'use strict' + +const Hop = require('../src/circuit/hop') +const nodes = require('./fixtures/nodes') +const Connection = require('interface-connection').Connection +const handshake = require('pull-handshake') +const waterfall = require('async/waterfall') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const multiaddr = require('multiaddr') +const lp = require('pull-length-prefixed') +const proto = require('../src/protocol') +const StreamHandler = require('../src/circuit/stream-handler') + +const sinon = require('sinon') +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +describe('relay', function () { + describe(`should handle circuit requests`, function () { + let relay + let swarm + let fromConn + let stream + let shake + + beforeEach(function (done) { + stream = handshake({ timeout: 1000 * 60 }) + shake = stream.handshake + fromConn = new Connection(stream) + fromConn.setPeerInfo(new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA'))) + + let peers = { + QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: + new PeerInfo(PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`)), + QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA: + new PeerInfo(PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`)), + QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy: + new PeerInfo(PeerId.createFromB58String(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`)) + } + + Object.keys(peers).forEach((key) => { peers[key]._connectedMultiaddr = true }) // make it truthy + + waterfall([ + (cb) => PeerId.createFromJSON(nodes.node4, cb), + (peerId, cb) => PeerInfo.create(peerId, cb), + (peer, cb) => { + peer.multiaddrs.add('/p2p-circuit/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE') + swarm = { + _peerInfo: peer, + conns: { + QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: new Connection(), + QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA: new Connection(), + QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy: new Connection() + }, + _peerBook: { + get: (peer) => { + if (!peers[peer]) { + throw new Error() + } + + return peers[peer] + } + } + } + + cb() + } + ], () => { + relay = new Hop(swarm, { enabled: true }) + relay._circuit = sinon.stub() + relay._circuit.callsArg(2, null, new Connection()) + done() + }) + }) + + afterEach(() => { + relay._circuit.reset() + }) + + it(`should handle a valid circuit request`, function (done) { + let relayMsg = { + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id, + addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer] + }, + dstPeer: { + id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id, + addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer] + } + } + + relay.on('circuit:success', () => { + expect(relay._circuit.calledWith(sinon.match.any, relayMsg)).to.be.ok() + done() + }) + + relay.handle(relayMsg, new StreamHandler(fromConn)) + }) + + it(`should handle a request to passive circuit`, function (done) { + let relayMsg = { + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id, + addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer] + }, + dstPeer: { + id: PeerId.createFromB58String(`QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).id, + addrs: [multiaddr(`/ipfs/QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).buffer] + } + } + + relay.active = false + lp.decodeFromReader(shake, (err, msg) => { + expect(err).to.be.null() + + const response = proto.CircuitRelay.decode(msg) + expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_NO_CONN_TO_DST) + expect(response.type).to.equal(proto.CircuitRelay.Type.STATUS) + done() + }) + + relay.handle(relayMsg, new StreamHandler(fromConn)) + }) + + it(`should handle a request to active circuit`, function (done) { + let relayMsg = { + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id, + addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer] + }, + dstPeer: { + id: PeerId.createFromB58String(`QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).id, + addrs: [multiaddr(`/ipfs/QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).buffer] + } + } + + relay.active = true + relay.on('circuit:success', () => { + expect(relay._circuit.calledWith(sinon.match.any, relayMsg)).to.be.ok() + done() + }) + + relay.on('circuit:error', (err) => { + done(err) + }) + + relay.handle(relayMsg, new StreamHandler(fromConn)) + }) + + it(`not dial to self`, function (done) { + let relayMsg = { + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id, + addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer] + }, + dstPeer: { + id: PeerId.createFromB58String(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).id, + addrs: [multiaddr(`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).buffer] + } + } + + lp.decodeFromReader(shake, (err, msg) => { + expect(err).to.be.null() + + const response = proto.CircuitRelay.decode(msg) + expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_CANT_RELAY_TO_SELF) + expect(response.type).to.equal(proto.CircuitRelay.Type.STATUS) + done() + }) + + relay.handle(relayMsg, new StreamHandler(fromConn)) + }) + + it(`fail on invalid src address`, function (done) { + let relayMsg = { + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: `sdfkjsdnfkjdsb`, + addrs: [`sdfkjsdnfkjdsb`] + }, + dstPeer: { + id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id, + addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer] + } + } + + lp.decodeFromReader(shake, (err, msg) => { + expect(err).to.be.null() + + const response = proto.CircuitRelay.decode(msg) + expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_SRC_MULTIADDR_INVALID) + expect(response.type).to.equal(proto.CircuitRelay.Type.STATUS) + done() + }) + + relay.handle(relayMsg, new StreamHandler(fromConn)) + }) + + it(`fail on invalid dst address`, function (done) { + let relayMsg = { + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id, + addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer] + }, + dstPeer: { + id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id, + addrs: [`sdfkjsdnfkjdsb`] + } + } + + lp.decodeFromReader(shake, (err, msg) => { + expect(err).to.be.null() + + const response = proto.CircuitRelay.decode(msg) + expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_DST_MULTIADDR_INVALID) + expect(response.type).to.equal(proto.CircuitRelay.Type.STATUS) + done() + }) + + relay.handle(relayMsg, new StreamHandler(fromConn)) + }) + }) +}) diff --git a/test/listener.spec.js b/test/listener.spec.js new file mode 100644 index 0000000..ab1c87e --- /dev/null +++ b/test/listener.spec.js @@ -0,0 +1,254 @@ +/* eslint-env mocha */ +'use strict' + +const Listener = require('../src/listener') +const nodes = require('./fixtures/nodes') +const waterfall = require('async/waterfall') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const multiaddr = require('multiaddr') +const handshake = require('pull-handshake') +const Connection = require('interface-connection').Connection +const proto = require('../src/protocol') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') +const multicodec = require('../src/multicodec') + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const sinon = require('sinon') + +describe('listener', function () { + describe(`listen`, function () { + let swarm = null + let handlerSpy = null + let listener = null + let stream = null + let shake = null + let conn = null + + beforeEach(function (done) { + stream = handshake({timeout: 1000 * 60}) + shake = stream.handshake + conn = new Connection(stream) + conn.setPeerInfo(new PeerInfo(PeerId.createFromB58String('QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE'))) + + waterfall([ + (cb) => PeerId.createFromJSON(nodes.node4, cb), + (peerId, cb) => PeerInfo.create(peerId, cb), + (peer, cb) => { + swarm = { + _peerInfo: peer, + handle: sinon.spy((proto, h) => { + handlerSpy = sinon.spy(h) + }), + conns: { + QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: new Connection() + } + } + + listener = Listener(swarm, {}, () => {}) + listener.listen() + cb() + } + ], done) + }) + + afterEach(() => { + listener = null + }) + + it(`should handle HOP`, function (done) { + handlerSpy(multicodec.relay, conn) + + let relayMsg = { + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: `QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`, + addrs: [`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`] + }, + dstPeer: { + id: `QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`, + addrs: [`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`] + } + } + + listener.hopHandler.handle = (message, conn) => { + expect(message.type).to.equal(proto.CircuitRelay.Type.HOP) + + expect(message.srcPeer.id.toString()).to.equal(relayMsg.srcPeer.id) + expect(message.srcPeer.addrs[0].toString()).to.equal(relayMsg.srcPeer.addrs[0]) + + expect(message.dstPeer.id.toString()).to.equal(relayMsg.dstPeer.id) + expect(message.dstPeer.addrs[0].toString()).to.equal(relayMsg.dstPeer.addrs[0]) + + done() + } + + pull( + pull.values([proto.CircuitRelay.encode(relayMsg)]), + lp.encode(), + pull.collect((err, encoded) => { + expect(err).to.be.null() + encoded.forEach((e) => shake.write(e)) + }) + ) + }) + + it(`should handle STOP`, function (done) { + handlerSpy(multicodec.relay, conn) + + let relayMsg = { + type: proto.CircuitRelay.Type.STOP, + srcPeer: { + id: `QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`, + addrs: [`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`] + }, + dstPeer: { + id: `QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`, + addrs: [`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`] + } + } + + listener.stopHandler.handle = (message, conn) => { + expect(message.type).to.equal(proto.CircuitRelay.Type.STOP) + + expect(message.srcPeer.id.toString()).to.equal(relayMsg.srcPeer.id) + expect(message.srcPeer.addrs[0].toString()).to.equal(relayMsg.srcPeer.addrs[0]) + + expect(message.dstPeer.id.toString()).to.equal(relayMsg.dstPeer.id) + expect(message.dstPeer.addrs[0].toString()).to.equal(relayMsg.dstPeer.addrs[0]) + + done() + } + + pull( + pull.values([proto.CircuitRelay.encode(relayMsg)]), + lp.encode(), + pull.collect((err, encoded) => { + expect(err).to.be.null() + encoded.forEach((e) => shake.write(e)) + }) + ) + }) + + it(`should handle CAN_HOP`, function (done) { + handlerSpy(multicodec.relay, conn) + + let relayMsg = { + type: proto.CircuitRelay.Type.CAN_HOP, + srcPeer: { + id: `QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`, + addrs: [`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`] + }, + dstPeer: { + id: `QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`, + addrs: [`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`] + } + } + + listener.hopHandler.handle = (message, conn) => { + expect(message.type).to.equal(proto.CircuitRelay.Type.CAN_HOP) + + expect(message.srcPeer.id.toString()).to.equal(relayMsg.srcPeer.id) + expect(message.srcPeer.addrs[0].toString()).to.equal(relayMsg.srcPeer.addrs[0]) + + expect(message.dstPeer.id.toString()).to.equal(relayMsg.dstPeer.id) + expect(message.dstPeer.addrs[0].toString()).to.equal(relayMsg.dstPeer.addrs[0]) + + done() + } + + pull( + pull.values([proto.CircuitRelay.encode(relayMsg)]), + lp.encode(), + pull.collect((err, encoded) => { + expect(err).to.be.null() + encoded.forEach((e) => shake.write(e)) + }) + ) + }) + + it(`should handle invalid message correctly`, function (done) { + handlerSpy(multicodec.relay, conn) + + let relayMsg = { + type: 100000, + srcPeer: { + id: Buffer.from(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`), + addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer] + }, + dstPeer: { + id: Buffer.from(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`), + addrs: [multiaddr(`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).buffer] + } + } + + pull( + pull.values([Buffer.from([relayMsg])]), + lp.encode(), + pull.collect((err, encoded) => { + expect(err).to.be.null() + encoded.forEach((e) => shake.write(e)) + }), + lp.decodeFromReader(shake, {maxLength: this.maxLength}, (err, msg) => { + expect(err).to.be.null() + expect(proto.CircuitRelay.decode(msg).type).to.equal(proto.CircuitRelay.Type.STATUS) + expect(proto.CircuitRelay.decode(msg).code).to.equal(proto.CircuitRelay.Status.MALFORMED_MESSAGE) + done() + }) + ) + }) + }) + + describe(`getAddrs`, function () { + let swarm = null + let listener = null + let peerInfo = null + + beforeEach(function (done) { + waterfall([ + (cb) => PeerId.createFromJSON(nodes.node4, cb), + (peerId, cb) => PeerInfo.create(peerId, cb), + (peer, cb) => { + swarm = { + _peerInfo: peer + } + + peerInfo = peer + listener = Listener(swarm, {}, () => {}) + cb() + } + ], done) + }) + + afterEach(() => { + peerInfo = null + }) + + it(`should return correct addrs`, function () { + peerInfo.multiaddrs.add(`/ip4/0.0.0.0/tcp/4002`) + peerInfo.multiaddrs.add(`/ip4/127.0.0.1/tcp/4003/ws`) + + listener.getAddrs((err, addrs) => { + expect(err).to.be.null() + expect(addrs).to.deep.equal([ + multiaddr(`/p2p-circuit/ip4/0.0.0.0/tcp/4002/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`), + multiaddr(`/p2p-circuit/ip4/127.0.0.1/tcp/4003/ws/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`)]) + }) + }) + + it(`don't return default addrs in an explicit p2p-circuit addres`, function () { + peerInfo.multiaddrs.add(`/ip4/127.0.0.1/tcp/4003/ws`) + peerInfo.multiaddrs.add(`/p2p-circuit/ip4/0.0.0.0/tcp/4002`) + listener.getAddrs((err, addrs) => { + expect(err).to.be.null() + expect(addrs[0] + .toString()) + .to.equal(`/p2p-circuit/ip4/0.0.0.0/tcp/4002/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`) + }) + }) + }) +}) diff --git a/test/proto.spec.js b/test/proto.spec.js new file mode 100644 index 0000000..d12595a --- /dev/null +++ b/test/proto.spec.js @@ -0,0 +1,50 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const multiaddr = require('multiaddr') + +const proto = require('../src/protocol') + +describe('protocol', function () { + let msgObject = null + let message = null + + before(() => { + msgObject = { + type: proto.CircuitRelay.Type.HOP, + srcPeer: { + id: Buffer.from('QmSource'), + addrs: [ + multiaddr('/p2p-circuit/ipfs/QmSource').buffer, + multiaddr('/p2p-circuit/ip4/0.0.0.0/tcp/9000/ipfs/QmSource').buffer, + multiaddr('/ip4/0.0.0.0/tcp/9000/ipfs/QmSource').buffer + ] + }, + dstPeer: { + id: Buffer.from('QmDest'), + addrs: [ + multiaddr('/p2p-circuit/ipfs/QmDest').buffer, + multiaddr('/p2p-circuit/ip4/1.1.1.1/tcp/9000/ipfs/QmDest').buffer, + multiaddr('/ip4/1.1.1.1/tcp/9000/ipfs/QmDest').buffer + ] + } + } + + let buff = proto.CircuitRelay.encode(msgObject) + message = proto.CircuitRelay.decode(buff) + }) + + it(`should source and dest`, () => { + expect(message.srcPeer).to.deep.equal(msgObject.srcPeer) + expect(message.dstPeer).to.deep.equal(msgObject.dstPeer) + }) + + it(`should encode message`, () => { + expect(message.message).to.deep.equal(msgObject.message) + }) +}) diff --git a/test/stop.spec.js b/test/stop.spec.js new file mode 100644 index 0000000..c85414f --- /dev/null +++ b/test/stop.spec.js @@ -0,0 +1,86 @@ +/* eslint-env mocha */ +'use strict' + +const Stop = require('../src/circuit/stop') +const nodes = require('./fixtures/nodes') +const Connection = require('interface-connection').Connection +const handshake = require('pull-handshake') +const waterfall = require('async/waterfall') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const StreamHandler = require('../src/circuit/stream-handler') +const proto = require('../src/protocol') + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +describe('stop', function () { + describe(`handle relayed connections`, function () { + let stopHandler + + let swarm + let conn + let stream + + beforeEach(function (done) { + stream = handshake({timeout: 1000 * 60}) + conn = new Connection(stream) + conn.setPeerInfo(new PeerInfo(PeerId.createFromB58String('QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE'))) + + waterfall([ + (cb) => PeerId.createFromJSON(nodes.node4, cb), + (peerId, cb) => PeerInfo.create(peerId, cb), + (peer, cb) => { + peer.multiaddrs.add('/p2p-circuit/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE') + swarm = { + _peerInfo: peer, + conns: { + QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: new Connection() + } + } + + stopHandler = new Stop(swarm) + cb() + } + ], done) + }) + + it(`handle request with a valid multiaddr`, function (done) { + stopHandler.handle({ + type: proto.CircuitRelay.Type.STOP, + srcPeer: { + id: `QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`, + addrs: [`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`] + }, + dstPeer: { + id: `QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`, + addrs: [`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`] + } + }, new StreamHandler(conn), (err, conn) => { + expect(err).to.not.exist() + expect(conn).to.be.instanceOf(Connection) + done() + }) + }) + + it(`handle request with invalid multiaddr`, function (done) { + stopHandler.handle({ + type: proto.CircuitRelay.Type.STOP, + srcPeer: { + id: `QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`, + addrs: [`dsfsdfsdf`] + }, + dstPeer: { + id: `QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`, + addrs: [`sdflksdfndsklfnlkdf`] + } + }, new StreamHandler(conn), (err, conn) => { + expect(err).to.be.not.null() + expect(conn).to.not.exist() + done() + }) + }) + }) +})