From 1b5c95e435e77d4a9c448dcd13b0116d1ddeffdc Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 11 Dec 2019 14:50:27 +0000 Subject: [PATCH 1/4] refactor: convert block and pin APIs to async/await --- src/cli/commands/pin/ls.js | 5 +- src/core/components/block/get.js | 16 ++ src/core/components/block/put.js | 51 ++++++ src/core/components/block/rm.js | 61 +++++++ src/core/components/block/stat.js | 21 +++ src/core/components/block/utils.js | 17 ++ src/core/components/index.js | 8 +- src/core/components/init.js | 21 ++- src/core/components/pin.js | 248 ----------------------------- src/core/components/pin/add.js | 74 +++++++++ src/core/components/pin/gc-lock.js | 83 ---------- src/core/components/pin/ls.js | 101 ++++++++++++ src/core/components/pin/rm.js | 69 ++++++++ src/core/components/start.js | 7 +- src/core/components/stop.js | 6 +- src/http/api/resources/pin.js | 17 +- src/utils/mutex.js | 52 ------ 17 files changed, 444 insertions(+), 413 deletions(-) create mode 100644 src/core/components/block/get.js create mode 100644 src/core/components/block/put.js create mode 100644 src/core/components/block/rm.js create mode 100644 src/core/components/block/stat.js create mode 100644 src/core/components/block/utils.js delete mode 100644 src/core/components/pin.js create mode 100644 src/core/components/pin/add.js delete mode 100644 src/core/components/pin/gc-lock.js create mode 100644 src/core/components/pin/ls.js create mode 100644 src/core/components/pin/rm.js delete mode 100644 src/utils/mutex.js diff --git a/src/cli/commands/pin/ls.js b/src/cli/commands/pin/ls.js index 5f75b6e410..7fbc6a2f7b 100644 --- a/src/cli/commands/pin/ls.js +++ b/src/cli/commands/pin/ls.js @@ -34,14 +34,13 @@ module.exports = { resolve((async () => { const paths = ipfsPath const ipfs = await getIpfs() - const results = await ipfs.pin.ls(paths, { type }) - results.forEach((res) => { + for await (const res of ipfs.pin.ls(paths, { type })) { let line = cidToString(res.hash, { base: cidBase }) if (!quiet) { line += ` ${res.type}` } print(line) - }) + } })()) } } diff --git a/src/core/components/block/get.js b/src/core/components/block/get.js new file mode 100644 index 0000000000..da0fe34dda --- /dev/null +++ b/src/core/components/block/get.js @@ -0,0 +1,16 @@ +'use strict' + +const { cleanCid } = require('./utils') + +module.exports = function block ({ blockService, preload }) { + return async function get (cid, options) { // eslint-disable-line require-await + options = options || {} + cid = cleanCid(cid) + + if (options.preload !== false) { + preload(cid) + } + + return blockService.get(cid) + } +} diff --git a/src/core/components/block/put.js b/src/core/components/block/put.js new file mode 100644 index 0000000000..526bc23e7f --- /dev/null +++ b/src/core/components/block/put.js @@ -0,0 +1,51 @@ +'use strict' + +const Block = require('ipfs-block') +const multihashing = require('multihashing-async') +const CID = require('cids') + +module.exports = ({ blockService, gcLock, preload }) => { + return async function put (block, options) { + options = options || {} + + if (Array.isArray(block)) { + throw new Error('Array is not supported') + } + + if (!Block.isBlock(block)) { + if (options.cid && CID.isCID(options.cid)) { + block = new Block(block, options.cid) + } else { + const mhtype = options.mhtype || 'sha2-256' + const format = options.format || 'dag-pb' + let cidVersion + + if (options.version == null) { + // Pick appropriate CID version + cidVersion = mhtype === 'sha2-256' && format === 'dag-pb' ? 0 : 1 + } else { + cidVersion = options.version + } + + const multihash = await multihashing(block, mhtype) + const cid = new CID(cidVersion, format, multihash) + + block = new Block(block, cid) + } + } + + const release = await gcLock.readLock() + + try { + await blockService.put(block) + + if (options.preload !== false) { + preload(block.cid) + } + + return block + } finally { + release() + } + } +} diff --git a/src/core/components/block/rm.js b/src/core/components/block/rm.js new file mode 100644 index 0000000000..6fd8162f75 --- /dev/null +++ b/src/core/components/block/rm.js @@ -0,0 +1,61 @@ +'use strict' + +const CID = require('cids') +const errCode = require('err-code') +const { PinTypes } = require('./pin/pin-manager') +const { cleanCid } = require('./utils') + +module.exports = ({ blockService, gcLock, pinManager }) => { + return async function * rm (cids, options) { + options = options || {} + + if (!Array.isArray(cids)) { + cids = [cids] + } + + // We need to take a write lock here to ensure that adding and removing + // blocks are exclusive operations + const release = await gcLock.writeLock() + + try { + for (let cid of cids) { + cid = cleanCid(cid) + + const result = { + hash: cid.toString() + } + + try { + const pinResult = await pinManager.isPinnedWithType(cid, PinTypes.all) + + if (pinResult.pinned) { + if (CID.isCID(pinResult.reason)) { // eslint-disable-line max-depth + throw errCode(new Error(`pinned via ${pinResult.reason}`)) + } + + throw errCode(new Error(`pinned: ${pinResult.reason}`)) + } + + // remove has check when https://github.com/ipfs/js-ipfs-block-service/pull/88 is merged + const has = await blockService._repo.blocks.has(cid) + + if (!has) { + throw errCode(new Error('block not found'), 'ERR_BLOCK_NOT_FOUND') + } + + await blockService.delete(cid) + } catch (err) { + if (!options.force) { + result.error = `cannot remove ${cid}: ${err.message}` + } + } + + if (!options.quiet) { + yield result + } + } + } finally { + release() + } + } +} diff --git a/src/core/components/block/stat.js b/src/core/components/block/stat.js new file mode 100644 index 0000000000..2a99e33704 --- /dev/null +++ b/src/core/components/block/stat.js @@ -0,0 +1,21 @@ +'use strict' + +const { cleanCid } = require('./utils') + +module.exports = ({ blockService, preload }) => { + return async function stat (cid, options) { + options = options || {} + cid = cleanCid(cid) + + if (options.preload !== false) { + preload(cid) + } + + const block = await blockService.get(cid) + + return { + key: cid.toString(), + size: block.data.length + } + } +} diff --git a/src/core/components/block/utils.js b/src/core/components/block/utils.js new file mode 100644 index 0000000000..76ca4fa293 --- /dev/null +++ b/src/core/components/block/utils.js @@ -0,0 +1,17 @@ +'use strict' + +const CID = require('cids') +const errCode = require('err-code') + +exports.cleanCid = cid => { + if (CID.isCID(cid)) { + return cid + } + + // CID constructor knows how to do the cleaning :) + try { + return new CID(cid) + } catch (err) { + throw errCode(err, 'ERR_INVALID_CID') + } +} diff --git a/src/core/components/index.js b/src/core/components/index.js index 44d922712a..676fa0d951 100644 --- a/src/core/components/index.js +++ b/src/core/components/index.js @@ -3,12 +3,16 @@ exports.add = require('./add') exports.config = require('./config') exports.init = require('./init') +exports.pin = { + add: require('./pin/add'), + ls: require('./pin/ls'), + rm: require('./pin/rm') +} exports.start = require('./start') exports.stop = require('./stop') exports.legacy = { // TODO: these will be removed as the new API is completed dag: require('./dag'), libp2p: require('./libp2p'), - object: require('./object'), - pin: require('./pin') + object: require('./object') } diff --git a/src/core/components/init.js b/src/core/components/init.js index 089d6148dc..6d2c84245a 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -9,7 +9,7 @@ const getDefaultConfig = require('../runtime/config-nodejs.js') const createRepo = require('../runtime/repo-nodejs') const Keychain = require('libp2p-keychain') const NoKeychain = require('./no-keychain') -const GCLock = require('./pin/gc-lock') +const mortice = require('mortice') const { DAGNode } = require('ipld-dag-pb') const UnixFs = require('ipfs-unixfs') const multicodec = require('multicodec') @@ -95,19 +95,21 @@ module.exports = ({ const preload = createPreloader(constructorOptions.preload) await preload.start() - const gcLock = new GCLock(constructorOptions.repoOwner, { - // Make sure GCLock is specific to repo, for tests where there are - // multiple instances of IPFS - morticeId: repo.path - }) - + // Make sure GC lock is specific to repo, for tests where there are + // multiple instances of IPFS + const gcLock = mortice(repo.path, { singleProcess: constructorOptions.repoOwner }) const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) const pinManager = new PinManager(repo, dag) await pinManager.load() - const pin = Commands.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const pin = { + add: Commands.pin.add({ pinManager, gcLock, dag, object }), + ls: Commands.pin.ls({ pinManager, object }), + rm: Commands.pin.rm({ pinManager, gcLock, object }) + } + const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) if (!isInitialized && !options.emptyRepo) { @@ -135,6 +137,7 @@ module.exports = ({ ipld, keychain, peerInfo, + pin, pinManager, preload, print, @@ -277,6 +280,7 @@ function createApi ({ ipld, keychain, peerInfo, + pin, pinManager, preload, print, @@ -301,6 +305,7 @@ function createApi ({ add, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, + pin, start } diff --git a/src/core/components/pin.js b/src/core/components/pin.js deleted file mode 100644 index 176e5f5cc8..0000000000 --- a/src/core/components/pin.js +++ /dev/null @@ -1,248 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -'use strict' - -const callbackify = require('callbackify') -const errCode = require('err-code') -const multibase = require('multibase') -const { resolvePath } = require('../utils') -const PinManager = require('./pin/pin-manager') -const PinTypes = PinManager.PinTypes - -module.exports = (self) => { - const dag = self.dag - const pinManager = self._pinManager || new PinManager(self._repo, dag) - - const pin = { - add: callbackify.variadic(async (paths, options) => { - options = options || {} - - const recursive = options.recursive !== false - const cids = await resolvePath(self.object, paths) - const pinAdd = async () => { - const results = [] - - // verify that each hash can be pinned - for (const cid of cids) { - const key = cid.toBaseEncodedString() - - if (recursive) { - if (pinManager.recursivePins.has(key)) { - // it's already pinned recursively - results.push(key) - - continue - } - - // entire graph of nested links should be pinned, - // so make sure we have all the objects - await pinManager.fetchCompleteDag(key, { preload: options.preload }) - - // found all objects, we can add the pin - results.push(key) - } else { - if (pinManager.recursivePins.has(key)) { - // recursive supersedes direct, can't have both - throw new Error(`${key} already pinned recursively`) - } - - if (!pinManager.directPins.has(key)) { - // make sure we have the object - await dag.get(cid, { preload: options.preload }) - } - - results.push(key) - } - } - - // update the pin sets in memory - const pinset = recursive ? pinManager.recursivePins : pinManager.directPins - results.forEach(key => pinset.add(key)) - - // persist updated pin sets to datastore - await pinManager.flushPins() - - return results.map(hash => ({ hash })) - } - - // When adding a file, we take a lock that gets released after pinning - // is complete, so don't take a second lock here - const lock = Boolean(options.lock) - - if (!lock) { - return pinAdd() - } - - const release = await self._gcLock.readLock() - - try { - await pinAdd() - } finally { - release() - } - }), - - rm: callbackify.variadic(async (paths, options) => { - options = options || {} - - const recursive = options.recursive == null ? true : options.recursive - - if (options.cidBase && !multibase.names.includes(options.cidBase)) { - throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE') - } - - const cids = await resolvePath(self.object, paths) - const release = await self._gcLock.readLock() - const results = [] - - try { - // verify that each hash can be unpinned - for (const cid of cids) { - const res = await pinManager.isPinnedWithType(cid, PinTypes.all) - - const { pinned, reason } = res - const key = cid.toBaseEncodedString() - - if (!pinned) { - throw new Error(`${key} is not pinned`) - } - - switch (reason) { - case (PinTypes.recursive): - if (!recursive) { - throw new Error(`${key} is pinned recursively`) - } - - results.push(key) - - break - case (PinTypes.direct): - results.push(key) - - break - default: - throw new Error(`${key} is pinned indirectly under ${reason}`) - } - } - - // update the pin sets in memory - results.forEach(key => { - if (recursive && pinManager.recursivePins.has(key)) { - pinManager.recursivePins.delete(key) - } else { - pinManager.directPins.delete(key) - } - }) - - // persist updated pin sets to datastore - await pinManager.flushPins() - - self.log(`Removed pins: ${results}`) - - return results.map(hash => ({ hash })) - } finally { - release() - } - }), - - ls: callbackify.variadic(async (paths, options) => { - options = options || {} - - let type = PinTypes.all - - if (paths && paths.type) { - options = paths - paths = null - } - - if (options.type) { - type = options.type - if (typeof options.type === 'string') { - type = options.type.toLowerCase() - } - const err = PinManager.checkPinType(type) - if (err) { - throw err - } - } - - if (paths) { - // check the pinned state of specific hashes - const cids = await resolvePath(self.object, paths) - const results = [] - - for (const cid of cids) { - const { key, reason, pinned } = await pinManager.isPinnedWithType(cid, type) - - if (pinned) { - switch (reason) { - case PinTypes.direct: - case PinTypes.recursive: - results.push({ - hash: key, - type: reason - }) - break - default: - results.push({ - hash: key, - type: `${PinTypes.indirect} through ${reason}` - }) - } - } - } - - if (!results.length) { - throw new Error(`path '${paths}' is not pinned`) - } - - return results - } - - // show all pinned items of type - let pins = [] - - if (type === PinTypes.direct || type === PinTypes.all) { - pins = pins.concat( - Array.from(pinManager.directPins).map(hash => ({ - type: PinTypes.direct, - hash - })) - ) - } - - if (type === PinTypes.recursive || type === PinTypes.all) { - pins = pins.concat( - Array.from(pinManager.recursivePins).map(hash => ({ - type: PinTypes.recursive, - hash - })) - ) - } - - if (type === PinTypes.indirect || type === PinTypes.all) { - const indirects = await pinManager.getIndirectKeys(options) - - pins = pins - // if something is pinned both directly and indirectly, - // report the indirect entry - .filter(({ hash }) => - !indirects.includes(hash) || - (indirects.includes(hash) && !pinManager.directPins.has(hash)) - ) - .concat(indirects.map(hash => ({ - type: PinTypes.indirect, - hash - }))) - - return pins - } - - return pins - }), - - // used by tests - pinManager - } - - return pin -} diff --git a/src/core/components/pin/add.js b/src/core/components/pin/add.js new file mode 100644 index 0000000000..07d6142d72 --- /dev/null +++ b/src/core/components/pin/add.js @@ -0,0 +1,74 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const { resolvePath } = require('../utils') + +module.exports = ({ pinManager, gcLock, dag, object }) => { + return async function add (paths, options) { + options = options || {} + + const recursive = options.recursive !== false + const cids = await resolvePath(object, paths) + const pinAdd = async () => { + const results = [] + + // verify that each hash can be pinned + for (const cid of cids) { + const key = cid.toBaseEncodedString() + + if (recursive) { + if (pinManager.recursivePins.has(key)) { + // it's already pinned recursively + results.push(key) + + continue + } + + // entire graph of nested links should be pinned, + // so make sure we have all the objects + await pinManager.fetchCompleteDag(key, { preload: options.preload }) + + // found all objects, we can add the pin + results.push(key) + } else { + if (pinManager.recursivePins.has(key)) { + // recursive supersedes direct, can't have both + throw new Error(`${key} already pinned recursively`) + } + + if (!pinManager.directPins.has(key)) { + // make sure we have the object + await dag.get(cid, { preload: options.preload }) + } + + results.push(key) + } + } + + // update the pin sets in memory + const pinset = recursive ? pinManager.recursivePins : pinManager.directPins + results.forEach(key => pinset.add(key)) + + // persist updated pin sets to datastore + await pinManager.flushPins() + + return results.map(hash => ({ hash })) + } + + // When adding a file, we take a lock that gets released after pinning + // is complete, so don't take a second lock here + const lock = Boolean(options.lock) + + if (!lock) { + return pinAdd() + } + + const release = await gcLock.readLock() + + try { + await pinAdd() + } finally { + release() + } + } +} diff --git a/src/core/components/pin/gc-lock.js b/src/core/components/pin/gc-lock.js deleted file mode 100644 index faceea12cf..0000000000 --- a/src/core/components/pin/gc-lock.js +++ /dev/null @@ -1,83 +0,0 @@ -'use strict' - -const pull = require('pull-stream/pull') -const pullThrough = require('pull-stream/throughs/through') -const pullAsyncMap = require('pull-stream/throughs/async-map') -const Mutex = require('../../../utils/mutex') -const log = require('debug')('ipfs:gc:lock') - -class GCLock { - constructor (repoOwner, options) { - options = options || {} - - this.mutex = new Mutex(repoOwner, { ...options, log }) - } - - readLock () { - return this.mutex.readLock() - } - - writeLock () { - return this.mutex.writeLock() - } - - pullReadLock (lockedPullFn) { - return this.pullLock('readLock', lockedPullFn) - } - - pullWriteLock (lockedPullFn) { - return this.pullLock('writeLock', lockedPullFn) - } - - pullLock (type, lockedPullFn) { - const pullLocker = new PullLocker(this.mutex, type) - - return pull( - pullLocker.take(), - lockedPullFn(), - pullLocker.release() - ) - } -} - -class PullLocker { - constructor (mutex, type) { - this.mutex = mutex - this.type = type - - // The function to call to release the lock. It is set when the lock is taken - this.releaseLock = null - } - - take () { - return pullAsyncMap((i, cb) => { - // Check if the lock has already been acquired. - // Note: new items will only come through the pull stream once the first - // item has acquired a lock. - if (this.releaseLock) { - // The lock has been acquired so return immediately - return cb(null, i) - } - - // Request the lock - this.mutex[this.type]() - .then(release => { - // Save the release function to be called when the stream completes - this.releaseLock = release - - // The lock has been granted, so run the locked piece of code - cb(null, i) - }, cb) - }) - } - - // Releases the lock - release () { - return pullThrough(null, (err) => { - // When the stream completes, release the lock - this.releaseLock(err) - }) - } -} - -module.exports = GCLock diff --git a/src/core/components/pin/ls.js b/src/core/components/pin/ls.js new file mode 100644 index 0000000000..bc4e92e5e2 --- /dev/null +++ b/src/core/components/pin/ls.js @@ -0,0 +1,101 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const { resolvePath } = require('../utils') +const PinManager = require('./pin/pin-manager') +const PinTypes = PinManager.PinTypes + +module.exports = ({ pinManager, object }) => { + return async function * ls (paths, options) { + options = options || {} + + let type = PinTypes.all + + if (paths && paths.type) { + options = paths + paths = null + } + + if (options.type) { + type = options.type + if (typeof options.type === 'string') { + type = options.type.toLowerCase() + } + const err = PinManager.checkPinType(type) + if (err) { + throw err + } + } + + if (paths) { + // check the pinned state of specific hashes + const cids = await resolvePath(object, paths) + + for (let i = 0; i < cids.length; i++) { + const cid = cids[i] + const { key, reason, pinned } = await pinManager.isPinnedWithType(cid, type) + + if (pinned) { + switch (reason) { + case PinTypes.direct: + case PinTypes.recursive: + yield { + hash: key, + type: reason + } + break + default: + yield { + hash: key, + type: `${PinTypes.indirect} through ${reason}` + } + } + } else { + throw new Error(`path '${paths[i]}' is not pinned`) + } + } + + return + } + + // show all pinned items of type + let pins = [] + + if (type === PinTypes.direct || type === PinTypes.all) { + pins = pins.concat( + Array.from(pinManager.directPins).map(hash => ({ + type: PinTypes.direct, + hash + })) + ) + } + + if (type === PinTypes.recursive || type === PinTypes.all) { + pins = pins.concat( + Array.from(pinManager.recursivePins).map(hash => ({ + type: PinTypes.recursive, + hash + })) + ) + } + + if (type === PinTypes.indirect || type === PinTypes.all) { + const indirects = await pinManager.getIndirectKeys(options) + + pins = pins + // if something is pinned both directly and indirectly, + // report the indirect entry + .filter(({ hash }) => + !indirects.includes(hash) || + (indirects.includes(hash) && !pinManager.directPins.has(hash)) + ) + .concat(indirects.map(hash => ({ + type: PinTypes.indirect, + hash + }))) + } + + // FIXME: https://github.com/ipfs/js-ipfs/issues/2244 + yield * pins + } +} diff --git a/src/core/components/pin/rm.js b/src/core/components/pin/rm.js new file mode 100644 index 0000000000..6d27c20254 --- /dev/null +++ b/src/core/components/pin/rm.js @@ -0,0 +1,69 @@ +'use strict' + +const errCode = require('err-code') +const multibase = require('multibase') +const { resolvePath } = require('../utils') +const { PinTypes } = require('./pin/pin-manager') + +module.exports = ({ pinManager, gcLock, object }) => { + return async function rm (paths, options) { + options = options || {} + + const recursive = options.recursive == null ? true : options.recursive + + if (options.cidBase && !multibase.names.includes(options.cidBase)) { + throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE') + } + + const cids = await resolvePath(object, paths) + const release = await gcLock.readLock() + const results = [] + + try { + // verify that each hash can be unpinned + for (const cid of cids) { + const res = await pinManager.isPinnedWithType(cid, PinTypes.all) + + const { pinned, reason } = res + const key = cid.toBaseEncodedString() + + if (!pinned) { + throw new Error(`${key} is not pinned`) + } + + switch (reason) { + case (PinTypes.recursive): + if (!recursive) { + throw new Error(`${key} is pinned recursively`) + } + + results.push(key) + + break + case (PinTypes.direct): + results.push(key) + + break + default: + throw new Error(`${key} is pinned indirectly under ${reason}`) + } + } + + // update the pin sets in memory + results.forEach(key => { + if (recursive && pinManager.recursivePins.has(key)) { + pinManager.recursivePins.delete(key) + } else { + pinManager.directPins.delete(key) + } + }) + + // persist updated pin sets to datastore + await pinManager.flushPins() + + return results.map(hash => ({ hash })) + } finally { + release() + } + } +} diff --git a/src/core/components/start.js b/src/core/components/start.js index f9f41c7458..84e7e7aaaf 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -109,7 +109,11 @@ function createApi ({ }) { const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) - const pin = Commands.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const pin = { + add: Commands.pin.add({ pinManager, gcLock, dag, object }), + ls: Commands.pin.ls({ pinManager, object }), + rm: Commands.pin.rm({ pinManager, gcLock, object }) + } const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) const stop = Commands.stop({ @@ -133,6 +137,7 @@ function createApi ({ add, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, + pin, start: () => apiManager.api, stop } diff --git a/src/core/components/stop.js b/src/core/components/stop.js index 4e2a9bb036..2478e7dd0a 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -78,7 +78,11 @@ function createApi ({ }) { const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) - const pin = Commands.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const pin = { + add: Commands.pin.add({ pinManager, gcLock, dag, object }), + ls: Commands.pin.ls({ pinManager, object }), + rm: Commands.pin.rm({ pinManager, gcLock, object }) + } const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) const start = Commands.start({ diff --git a/src/http/api/resources/pin.js b/src/http/api/resources/pin.js index 576d9be88d..593b10de69 100644 --- a/src/http/api/resources/pin.js +++ b/src/http/api/resources/pin.js @@ -4,6 +4,7 @@ const multibase = require('multibase') const Joi = require('@hapi/joi') const Boom = require('@hapi/boom') const isIpfs = require('is-ipfs') +const toStream = require('it-to-stream') const { cidToString } = require('../../../utils/cid') function parseArgs (request, h) { @@ -52,21 +53,7 @@ exports.ls = { async handler (request, h) { const { ipfs } = request.server.app const { path, type } = request.pre.args - - let result - try { - result = await ipfs.pin.ls(path, { type }) - } catch (err) { - throw Boom.boomify(err) - } - - return h.response({ - Keys: result.reduce((acc, v) => { - const prop = cidToString(v.hash, { base: request.query['cid-base'] }) - acc[prop] = { Type: v.type } - return acc - }, {}) - }) + return h.response(toStream.readable(ipfs.pin.ls(path, { type }))) } } diff --git a/src/utils/mutex.js b/src/utils/mutex.js deleted file mode 100644 index 8cb3df36cc..0000000000 --- a/src/utils/mutex.js +++ /dev/null @@ -1,52 +0,0 @@ -'use strict' - -const assert = require('assert') -const mortice = require('mortice') -const noop = () => {} - -// Wrap mortice to present a callback interface -class Mutex { - constructor (repoOwner, options) { - options = options || {} - - this.mutex = mortice(options.morticeId, { - singleProcess: repoOwner - }) - - this.log = options.log || noop - this.lockId = 0 - } - - readLock () { - return this._lock('readLock') - } - - writeLock () { - return this._lock('writeLock') - } - - /** - * Request a read or write lock - * - * @param {String} type The type of lock: readLock / writeLock - * @returns {Promise} - */ - async _lock (type) { - assert(typeof type === 'string', `first argument to Mutex.${type}() must be a string, got ${typeof type}`) - - const lockId = this.lockId++ - this.log(`[${lockId}] ${type} requested`) - - // Get a Promise for the lock, wrap it for logging - const release = await this.mutex[type]() - - this.log(`[${lockId}] ${type} started`) - - return () => { - this.log(`[${lockId}] ${type} released`) - release() - } - } -} - -module.exports = Mutex From 070236cf97037a6fe924f127b9b503da80f9e927 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 11 Dec 2019 14:56:56 +0000 Subject: [PATCH 2/4] fix: actually expose block API --- src/cli/commands/block/rm.js | 2 +- src/core/components/block/get.js | 2 +- src/core/components/index.js | 6 ++++++ src/core/components/init.js | 6 ++++++ src/core/components/start.js | 6 ++++++ src/core/components/stop.js | 7 +++++++ src/http/api/resources/block.js | 2 +- 7 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/cli/commands/block/rm.js b/src/cli/commands/block/rm.js index 1f92ed1a06..83982e2273 100644 --- a/src/cli/commands/block/rm.js +++ b/src/cli/commands/block/rm.js @@ -25,7 +25,7 @@ module.exports = { const ipfs = await getIpfs() let errored = false - for await (const result of ipfs.block._rmAsyncIterator(hash, { + for await (const result of ipfs.block.rm(hash, { force, quiet })) { diff --git a/src/core/components/block/get.js b/src/core/components/block/get.js index da0fe34dda..afc95d8b45 100644 --- a/src/core/components/block/get.js +++ b/src/core/components/block/get.js @@ -2,7 +2,7 @@ const { cleanCid } = require('./utils') -module.exports = function block ({ blockService, preload }) { +module.exports = ({ blockService, preload }) => { return async function get (cid, options) { // eslint-disable-line require-await options = options || {} cid = cleanCid(cid) diff --git a/src/core/components/index.js b/src/core/components/index.js index 676fa0d951..bd1cdd2add 100644 --- a/src/core/components/index.js +++ b/src/core/components/index.js @@ -1,6 +1,12 @@ 'use strict' exports.add = require('./add') +exports.block = { + get: require('./block/get'), + put: require('./block/put'), + rm: require('./block/rm'), + stat: require('./block/stat') +} exports.config = require('./config') exports.init = require('./init') exports.pin = { diff --git a/src/core/components/init.js b/src/core/components/init.js index 6d2c84245a..f409ad62f3 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -303,6 +303,12 @@ function createApi ({ const api = { add, + block: { + get: Commands.block.get({ blockService, preload }), + put: Commands.block.put({ blockService, gcLock, preload }), + rm: Commands.block.rm({ blockService, gcLock, pinManager }), + stat: Commands.block.stat({ blockService, preload }) + }, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, pin, diff --git a/src/core/components/start.js b/src/core/components/start.js index 84e7e7aaaf..147493a824 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -135,6 +135,12 @@ function createApi ({ const api = { add, + block: { + get: Commands.block.get({ blockService, preload }), + put: Commands.block.put({ blockService, gcLock, preload }), + rm: Commands.block.rm({ blockService, gcLock, pinManager }), + stat: Commands.block.stat({ blockService, preload }) + }, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, pin, diff --git a/src/core/components/stop.js b/src/core/components/stop.js index 2478e7dd0a..64cac5d3bc 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -102,8 +102,15 @@ function createApi ({ const api = { add, + block: { + get: Commands.block.get({ blockService, preload }), + put: Commands.block.put({ blockService, gcLock, preload }), + rm: Commands.block.rm({ blockService, gcLock, pinManager }), + stat: Commands.block.stat({ blockService, preload }) + }, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, + pin, start, stop: () => apiManager.api } diff --git a/src/http/api/resources/block.js b/src/http/api/resources/block.js index c88b25b15f..a33a1cd311 100644 --- a/src/http/api/resources/block.js +++ b/src/http/api/resources/block.js @@ -132,7 +132,7 @@ exports.rm = { return streamResponse(request, h, async (output) => { try { - for await (const result of request.server.app.ipfs.block._rmAsyncIterator(arg, { + for await (const result of request.server.app.ipfs.block.rm(arg, { force, quiet })) { From a48204466f6ba1e62431de14ea22b03c4321ad82 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 16 Dec 2019 21:25:49 +0000 Subject: [PATCH 3/4] refactor: convert dag API to async/await (#2664) * refactor: convert dag API to async/await * refactor: pull out repeated name to codec conversion code --- src/cli/commands/dag/resolve.js | 3 +- src/core/components/dag.js | 170 ----------------------------- src/core/components/dag/get.js | 34 ++++++ src/core/components/dag/put.js | 70 ++++++++++++ src/core/components/dag/resolve.js | 15 +++ src/core/components/dag/tree.js | 15 +++ src/core/components/dag/utils.js | 48 ++++++++ src/core/components/index.js | 7 +- src/core/components/init.js | 12 +- src/core/components/start.js | 8 +- src/core/components/stop.js | 8 +- src/http/api/resources/dag.js | 2 +- 12 files changed, 215 insertions(+), 177 deletions(-) delete mode 100644 src/core/components/dag.js create mode 100644 src/core/components/dag/get.js create mode 100644 src/core/components/dag/put.js create mode 100644 src/core/components/dag/resolve.js create mode 100644 src/core/components/dag/tree.js create mode 100644 src/core/components/dag/utils.js diff --git a/src/cli/commands/dag/resolve.js b/src/cli/commands/dag/resolve.js index bba7886034..7a9907f427 100644 --- a/src/cli/commands/dag/resolve.js +++ b/src/cli/commands/dag/resolve.js @@ -19,10 +19,9 @@ module.exports = { const options = {} try { - const result = await ipfs.dag.resolve(ref, options) let lastCid - for (const res of result) { + for await (const res of ipfs.dag.resolve(ref, options)) { if (CID.isCID(res.value)) { lastCid = res.value } diff --git a/src/core/components/dag.js b/src/core/components/dag.js deleted file mode 100644 index fd704e8139..0000000000 --- a/src/core/components/dag.js +++ /dev/null @@ -1,170 +0,0 @@ -'use strict' - -const callbackify = require('callbackify') -const CID = require('cids') -const all = require('async-iterator-all') -const errCode = require('err-code') -const multicodec = require('multicodec') - -function parseArgs (cid, path, options) { - options = options || {} - - // Allow options in path position - if (path !== undefined && typeof path !== 'string') { - options = path - path = undefined - } - - if (typeof cid === 'string') { - if (cid.startsWith('/ipfs/')) { - cid = cid.substring(6) - } - - const split = cid.split('/') - - try { - cid = new CID(split[0]) - } catch (err) { - throw errCode(err, 'ERR_INVALID_CID') - } - - split.shift() - - if (split.length > 0) { - path = split.join('/') - } else { - path = path || '/' - } - } else if (Buffer.isBuffer(cid)) { - try { - cid = new CID(cid) - } catch (err) { - throw errCode(err, 'ERR_INVALID_CID') - } - } - - return [ - cid, - path, - options - ] -} - -module.exports = function dag (self) { - return { - put: callbackify.variadic(async (dagNode, options) => { - options = options || {} - - if (options.cid && (options.format || options.hashAlg)) { - throw new Error('Can\'t put dag node. Please provide either `cid` OR `format` and `hashAlg` options.') - } else if (((options.format && !options.hashAlg) || (!options.format && options.hashAlg))) { - throw new Error('Can\'t put dag node. Please provide `format` AND `hashAlg` options.') - } - - const optionDefaults = { - format: multicodec.DAG_CBOR, - hashAlg: multicodec.SHA2_256 - } - - // The IPLD expects the format and hashAlg as constants - if (options.format && typeof options.format === 'string') { - const constantName = options.format.toUpperCase().replace(/-/g, '_') - options.format = multicodec[constantName] - } - if (options.hashAlg && typeof options.hashAlg === 'string') { - const constantName = options.hashAlg.toUpperCase().replace(/-/g, '_') - options.hashAlg = multicodec[constantName] - } - - options = options.cid ? options : Object.assign({}, optionDefaults, options) - - // js-ipld defaults to verion 1 CIDs. Hence set version 0 explicitly for - // dag-pb nodes - if (options.version === undefined) { - if (options.format === multicodec.DAG_PB && options.hashAlg === multicodec.SHA2_256) { - options.version = 0 - } else { - options.version = 1 - } - } - - let release - - if (options.pin) { - release = await self._gcLock.readLock() - } - - try { - const cid = await self._ipld.put(dagNode, options.format, { - hashAlg: options.hashAlg, - cidVersion: options.version - }) - - if (options.pin) { - await self.pin.add(cid, { - lock: false - }) - } - - if (options.preload !== false) { - self._preload(cid) - } - - return cid - } finally { - if (release) { - release() - } - } - }), - - get: callbackify.variadic(async (cid, path, options) => { - [cid, path, options] = parseArgs(cid, path, options) - - if (options.preload !== false) { - self._preload(cid) - } - - if (path == null || path === '/') { - const value = await self._ipld.get(cid) - - return { - value, - remainderPath: '' - } - } else { - let result - - for await (const entry of self._ipld.resolve(cid, path)) { - if (options.localResolve) { - return entry - } - - result = entry - } - - return result - } - }), - - tree: callbackify.variadic(async (cid, path, options) => { // eslint-disable-line require-await - [cid, path, options] = parseArgs(cid, path, options) - - if (options.preload !== false) { - self._preload(cid) - } - - return all(self._ipld.tree(cid, path, options)) - }), - - resolve: callbackify.variadic(async (cid, path, options) => { // eslint-disable-line require-await - [cid, path, options] = parseArgs(cid, path, options) - - if (options.preload !== false) { - self._preload(cid) - } - - return all(self._ipld.resolve(cid, path)) - }) - } -} diff --git a/src/core/components/dag/get.js b/src/core/components/dag/get.js new file mode 100644 index 0000000000..11c17152bc --- /dev/null +++ b/src/core/components/dag/get.js @@ -0,0 +1,34 @@ +'use strict' + +const { parseArgs } = require('./utils') + +module.exports = ({ ipld, preload }) => { + return async function get (cid, path, options) { + [cid, path, options] = parseArgs(cid, path, options) + + if (options.preload !== false) { + preload(cid) + } + + if (path == null || path === '/') { + const value = await ipld.get(cid) + + return { + value, + remainderPath: '' + } + } else { + let result + + for await (const entry of ipld.resolve(cid, path)) { + if (options.localResolve) { + return entry + } + + result = entry + } + + return result + } + } +} diff --git a/src/core/components/dag/put.js b/src/core/components/dag/put.js new file mode 100644 index 0000000000..301c87ba8c --- /dev/null +++ b/src/core/components/dag/put.js @@ -0,0 +1,70 @@ +'use strict' + +const multicodec = require('multicodec') +const nameToCodec = name => multicodec[name.toUpperCase().replace(/-/g, '_')] + +module.exports = ({ ipld, pin, gcLock, preload }) => { + return async function put (dagNode, options) { + options = options || {} + + if (options.cid && (options.format || options.hashAlg)) { + throw new Error('Can\'t put dag node. Please provide either `cid` OR `format` and `hashAlg` options.') + } else if (((options.format && !options.hashAlg) || (!options.format && options.hashAlg))) { + throw new Error('Can\'t put dag node. Please provide `format` AND `hashAlg` options.') + } + + const optionDefaults = { + format: multicodec.DAG_CBOR, + hashAlg: multicodec.SHA2_256 + } + + // The IPLD expects the format and hashAlg as constants + if (options.format && typeof options.format === 'string') { + options.format = nameToCodec(options.format) + } + if (options.hashAlg && typeof options.hashAlg === 'string') { + options.hashAlg = nameToCodec(options.hashAlg) + } + + options = options.cid ? options : Object.assign({}, optionDefaults, options) + + // js-ipld defaults to verion 1 CIDs. Hence set version 0 explicitly for + // dag-pb nodes + if (options.version === undefined) { + if (options.format === multicodec.DAG_PB && options.hashAlg === multicodec.SHA2_256) { + options.version = 0 + } else { + options.version = 1 + } + } + + let release + + if (options.pin) { + release = await gcLock.readLock() + } + + try { + const cid = await ipld.put(dagNode, options.format, { + hashAlg: options.hashAlg, + cidVersion: options.version + }) + + if (options.pin) { + await pin.add(cid, { + lock: false + }) + } + + if (options.preload !== false) { + preload(cid) + } + + return cid + } finally { + if (release) { + release() + } + } + } +} diff --git a/src/core/components/dag/resolve.js b/src/core/components/dag/resolve.js new file mode 100644 index 0000000000..e95e5b526f --- /dev/null +++ b/src/core/components/dag/resolve.js @@ -0,0 +1,15 @@ +'use strict' + +const { parseArgs } = require('./utils') + +module.exports = ({ ipld, preload }) => { + return async function * resolve (cid, path, options) { // eslint-disable-line require-await + [cid, path, options] = parseArgs(cid, path, options) + + if (options.preload !== false) { + preload(cid) + } + + yield * ipld.resolve(cid, path) + } +} diff --git a/src/core/components/dag/tree.js b/src/core/components/dag/tree.js new file mode 100644 index 0000000000..07d2d03e65 --- /dev/null +++ b/src/core/components/dag/tree.js @@ -0,0 +1,15 @@ +'use strict' + +const { parseArgs } = require('./utils') + +module.exports = ({ ipld, preload }) => { + return async function * tree (cid, path, options) { // eslint-disable-line require-await + [cid, path, options] = parseArgs(cid, path, options) + + if (options.preload !== false) { + preload(cid) + } + + yield * ipld.tree(cid, path, options) + } +} diff --git a/src/core/components/dag/utils.js b/src/core/components/dag/utils.js new file mode 100644 index 0000000000..810b0e2f9a --- /dev/null +++ b/src/core/components/dag/utils.js @@ -0,0 +1,48 @@ +'use strict' + +const CID = require('cids') +const errCode = require('err-code') + +exports.parseArgs = (cid, path, options) => { + options = options || {} + + // Allow options in path position + if (path !== undefined && typeof path !== 'string') { + options = path + path = undefined + } + + if (typeof cid === 'string') { + if (cid.startsWith('/ipfs/')) { + cid = cid.substring(6) + } + + const split = cid.split('/') + + try { + cid = new CID(split[0]) + } catch (err) { + throw errCode(err, 'ERR_INVALID_CID') + } + + split.shift() + + if (split.length > 0) { + path = split.join('/') + } else { + path = path || '/' + } + } else if (Buffer.isBuffer(cid)) { + try { + cid = new CID(cid) + } catch (err) { + throw errCode(err, 'ERR_INVALID_CID') + } + } + + return [ + cid, + path, + options + ] +} diff --git a/src/core/components/index.js b/src/core/components/index.js index bd1cdd2add..96598727c8 100644 --- a/src/core/components/index.js +++ b/src/core/components/index.js @@ -8,6 +8,12 @@ exports.block = { stat: require('./block/stat') } exports.config = require('./config') +exports.dag = { + get: require('./dag/get'), + put: require('./dag/put'), + resolve: require('./dag/resolve'), + tree: require('./dag/tree') +} exports.init = require('./init') exports.pin = { add: require('./pin/add'), @@ -18,7 +24,6 @@ exports.start = require('./start') exports.stop = require('./stop') exports.legacy = { // TODO: these will be removed as the new API is completed - dag: require('./dag'), libp2p: require('./libp2p'), object: require('./object') } diff --git a/src/core/components/init.js b/src/core/components/init.js index f409ad62f3..2dda0eba04 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -98,7 +98,11 @@ module.exports = ({ // Make sure GC lock is specific to repo, for tests where there are // multiple instances of IPFS const gcLock = mortice(repo.path, { singleProcess: constructorOptions.repoOwner }) - const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) + const dag = { + get: Commands.dag.get({ ipld, preload }), + resolve: Commands.dag.resolve({ ipld, preload }), + tree: Commands.dag.tree({ ipld, preload }) + } const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) const pinManager = new PinManager(repo, dag) @@ -110,6 +114,9 @@ module.exports = ({ rm: Commands.pin.rm({ pinManager, gcLock, object }) } + // FIXME: resolve this circular dependency + dag.put = Commands.dag.put({ ipld, pin, gcLock, preload }) + const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) if (!isInitialized && !options.emptyRepo) { @@ -132,6 +139,7 @@ module.exports = ({ apiManager, constructorOptions, blockService, + dag, gcLock, initOptions: options, ipld, @@ -275,6 +283,7 @@ function createApi ({ apiManager, constructorOptions, blockService, + dag, gcLock, initOptions, ipld, @@ -310,6 +319,7 @@ function createApi ({ stat: Commands.block.stat({ blockService, preload }) }, config: Commands.config({ repo }), + dag, init: () => { throw new AlreadyInitializedError() }, pin, start diff --git a/src/core/components/start.js b/src/core/components/start.js index 147493a824..e7bd337f13 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -107,13 +107,19 @@ function createApi ({ print, repo }) { - const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) + const dag = { + get: Commands.dag.get({ ipld, preload }), + resolve: Commands.dag.resolve({ ipld, preload }), + tree: Commands.dag.tree({ ipld, preload }) + } const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) const pin = { add: Commands.pin.add({ pinManager, gcLock, dag, object }), ls: Commands.pin.ls({ pinManager, object }), rm: Commands.pin.rm({ pinManager, gcLock, object }) } + // FIXME: resolve this circular dependency + dag.put = Commands.dag.put({ ipld, pin, gcLock, preload }) const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) const stop = Commands.stop({ diff --git a/src/core/components/stop.js b/src/core/components/stop.js index 64cac5d3bc..dcec2f00b5 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -76,13 +76,19 @@ function createApi ({ print, repo }) { - const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) + const dag = { + get: Commands.dag.get({ ipld, preload }), + resolve: Commands.dag.resolve({ ipld, preload }), + tree: Commands.dag.tree({ ipld, preload }) + } const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) const pin = { add: Commands.pin.add({ pinManager, gcLock, dag, object }), ls: Commands.pin.ls({ pinManager, object }), rm: Commands.pin.rm({ pinManager, gcLock, object }) } + // FIXME: resolve this circular dependency + dag.put = Commands.dag.put({ ipld, pin, gcLock, preload }) const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) const start = Commands.start({ diff --git a/src/http/api/resources/dag.js b/src/http/api/resources/dag.js index 436382bc38..f3dffc1b4f 100644 --- a/src/http/api/resources/dag.js +++ b/src/http/api/resources/dag.js @@ -248,7 +248,7 @@ exports.resolve = { let lastRemainderPath = path if (path) { - const result = ipfs._ipld.resolve(lastCid, path) + const result = ipfs.dag.resolve(lastCid, path) while (true) { const resolveResult = (await result.next()).value if (!CID.isCID(resolveResult.value)) { From 13870926f7f2ae074566d4fa5f798f7a8db056d3 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 16 Dec 2019 23:56:53 +0000 Subject: [PATCH 4/4] refactor: review feedback --- src/core/components/block/rm.js | 58 ++++++++++++++++++--------------- src/core/components/pin/ls.js | 45 +++++++++---------------- src/core/components/pin/rm.js | 52 ++++++++++++++--------------- 3 files changed, 72 insertions(+), 83 deletions(-) diff --git a/src/core/components/block/rm.js b/src/core/components/block/rm.js index 6fd8162f75..8c251a9665 100644 --- a/src/core/components/block/rm.js +++ b/src/core/components/block/rm.js @@ -2,9 +2,13 @@ const CID = require('cids') const errCode = require('err-code') +const { parallelMap, filter } = require('streaming-iterables') +const pipe = require('it-pipe') const { PinTypes } = require('./pin/pin-manager') const { cleanCid } = require('./utils') +const BLOCK_RM_CONCURRENCY = 8 + module.exports = ({ blockService, gcLock, pinManager }) => { return async function * rm (cids, options) { options = options || {} @@ -18,42 +22,42 @@ module.exports = ({ blockService, gcLock, pinManager }) => { const release = await gcLock.writeLock() try { - for (let cid of cids) { - cid = cleanCid(cid) + yield * pipe( + cids, + parallelMap(BLOCK_RM_CONCURRENCY, async cid => { + cid = cleanCid(cid) - const result = { - hash: cid.toString() - } + const result = { hash: cid.toString() } - try { - const pinResult = await pinManager.isPinnedWithType(cid, PinTypes.all) + try { + const pinResult = await pinManager.isPinnedWithType(cid, PinTypes.all) - if (pinResult.pinned) { - if (CID.isCID(pinResult.reason)) { // eslint-disable-line max-depth - throw errCode(new Error(`pinned via ${pinResult.reason}`)) - } + if (pinResult.pinned) { + if (CID.isCID(pinResult.reason)) { // eslint-disable-line max-depth + throw errCode(new Error(`pinned via ${pinResult.reason}`)) + } - throw errCode(new Error(`pinned: ${pinResult.reason}`)) - } + throw errCode(new Error(`pinned: ${pinResult.reason}`)) + } - // remove has check when https://github.com/ipfs/js-ipfs-block-service/pull/88 is merged - const has = await blockService._repo.blocks.has(cid) + // remove has check when https://github.com/ipfs/js-ipfs-block-service/pull/88 is merged + const has = await blockService._repo.blocks.has(cid) - if (!has) { - throw errCode(new Error('block not found'), 'ERR_BLOCK_NOT_FOUND') - } + if (!has) { + throw errCode(new Error('block not found'), 'ERR_BLOCK_NOT_FOUND') + } - await blockService.delete(cid) - } catch (err) { - if (!options.force) { - result.error = `cannot remove ${cid}: ${err.message}` + await blockService.delete(cid) + } catch (err) { + if (!options.force) { + result.error = `cannot remove ${cid}: ${err.message}` + } } - } - if (!options.quiet) { - yield result - } - } + return result + }), + filter(() => !options.quiet) + ) } finally { release() } diff --git a/src/core/components/pin/ls.js b/src/core/components/pin/ls.js index bc4e92e5e2..a316b35cd1 100644 --- a/src/core/components/pin/ls.js +++ b/src/core/components/pin/ls.js @@ -1,9 +1,12 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' +const { parallelMap } = require('streaming-iterables') const { resolvePath } = require('../utils') const PinManager = require('./pin/pin-manager') -const PinTypes = PinManager.PinTypes +const { PinTypes } = PinManager + +const PIN_LS_CONCURRENCY = 8 module.exports = ({ pinManager, object }) => { return async function * ls (paths, options) { @@ -31,29 +34,19 @@ module.exports = ({ pinManager, object }) => { // check the pinned state of specific hashes const cids = await resolvePath(object, paths) - for (let i = 0; i < cids.length; i++) { - const cid = cids[i] + yield * parallelMap(PIN_LS_CONCURRENCY, async cid => { const { key, reason, pinned } = await pinManager.isPinnedWithType(cid, type) - if (pinned) { - switch (reason) { - case PinTypes.direct: - case PinTypes.recursive: - yield { - hash: key, - type: reason - } - break - default: - yield { - hash: key, - type: `${PinTypes.indirect} through ${reason}` - } - } - } else { - throw new Error(`path '${paths[i]}' is not pinned`) + if (!pinned) { + throw new Error(`path '${paths[cids.indexOf(cid)]}' is not pinned`) } - } + + if (reason === PinTypes.direct || reason === PinTypes.recursive) { + return { hash: key, type: reason } + } + + return { hash: key, type: `${PinTypes.indirect} through ${reason}` } + }, cids) return } @@ -85,14 +78,8 @@ module.exports = ({ pinManager, object }) => { pins = pins // if something is pinned both directly and indirectly, // report the indirect entry - .filter(({ hash }) => - !indirects.includes(hash) || - (indirects.includes(hash) && !pinManager.directPins.has(hash)) - ) - .concat(indirects.map(hash => ({ - type: PinTypes.indirect, - hash - }))) + .filter(({ hash }) => !indirects.includes(hash) || !pinManager.directPins.has(hash)) + .concat(indirects.map(hash => ({ type: PinTypes.indirect, hash }))) } // FIXME: https://github.com/ipfs/js-ipfs/issues/2244 diff --git a/src/core/components/pin/rm.js b/src/core/components/pin/rm.js index 6d27c20254..0147abd59e 100644 --- a/src/core/components/pin/rm.js +++ b/src/core/components/pin/rm.js @@ -2,14 +2,18 @@ const errCode = require('err-code') const multibase = require('multibase') +const { parallelMap, collect } = require('streaming-iterables') +const pipe = require('it-pipe') const { resolvePath } = require('../utils') const { PinTypes } = require('./pin/pin-manager') +const PIN_RM_CONCURRENCY = 8 + module.exports = ({ pinManager, gcLock, object }) => { return async function rm (paths, options) { options = options || {} - const recursive = options.recursive == null ? true : options.recursive + const recursive = options.recursive !== false if (options.cidBase && !multibase.names.includes(options.cidBase)) { throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE') @@ -17,37 +21,31 @@ module.exports = ({ pinManager, gcLock, object }) => { const cids = await resolvePath(object, paths) const release = await gcLock.readLock() - const results = [] try { // verify that each hash can be unpinned - for (const cid of cids) { - const res = await pinManager.isPinnedWithType(cid, PinTypes.all) - - const { pinned, reason } = res - const key = cid.toBaseEncodedString() - - if (!pinned) { - throw new Error(`${key} is not pinned`) - } + const results = await pipe( + cids, + parallelMap(PIN_RM_CONCURRENCY, async cid => { + const res = await pinManager.isPinnedWithType(cid, PinTypes.all) - switch (reason) { - case (PinTypes.recursive): - if (!recursive) { - throw new Error(`${key} is pinned recursively`) - } + const { pinned, reason } = res + const key = cid.toBaseEncodedString() - results.push(key) - - break - case (PinTypes.direct): - results.push(key) - - break - default: + if (!pinned) { + throw new Error(`${key} is not pinned`) + } + if (reason !== PinTypes.recursive && reason !== PinTypes.direct) { throw new Error(`${key} is pinned indirectly under ${reason}`) - } - } + } + if (reason === PinTypes.recursive && !recursive) { + throw new Error(`${key} is pinned recursively`) + } + + return key + }), + collect + ) // update the pin sets in memory results.forEach(key => { @@ -61,7 +59,7 @@ module.exports = ({ pinManager, gcLock, object }) => { // persist updated pin sets to datastore await pinManager.flushPins() - return results.map(hash => ({ hash })) + return results } finally { release() }