Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
feat: use new IPLD API
Browse files Browse the repository at this point in the history
This is part of the Awesome Endeavour: Async Iterators:
ipfs/js-ipfs#1670
  • Loading branch information
vmx committed Jan 28, 2019
1 parent daee4b7 commit 7ac770c
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 145 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"detect-node": "^2.0.4",
"detect-webworker": "^1.0.0",
"dirty-chai": "^2.0.1",
"ipld": "~0.20.0",
"ipld": "git+https:/ipld/js-ipld.git#new-api-impl",
"ipld-in-memory": "^2.0.0",
"multihashes": "~0.4.14",
"pull-buffer-stream": "^1.0.0",
Expand All @@ -65,6 +65,7 @@
"joi": "^14.0.4",
"joi-browser": "^13.4.0",
"mortice": "^1.2.1",
"multicodec": "~0.5.0",
"once": "^1.4.0",
"promisify-es6": "^1.0.3",
"pull-cat": "^1.1.11",
Expand Down
10 changes: 5 additions & 5 deletions src/core/cp.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ const copyToFile = (context, source, destination, destinationTrail, options, cal
const child = sourceTrail[sourceTrail.length - 1]

waterfall([
(next) => context.ipld.get(parent.cid, next),
(result, next) => addLink(context, {
parent: result.value,
async () => context.ipld.get([parent.cid]).first(),
(node, next) => addLink(context, {
parent: node,
parentCid: parent.cid,
size: child.size,
cid: child.cid,
Expand Down Expand Up @@ -165,8 +165,8 @@ const copyToDirectory = (context, sources, destination, destinationTrail, option
const parent = destinationTrail[destinationTrail.length - 1]

waterfall([
(next) => context.ipld.get(parent.cid, next),
(result, next) => next(null, { cid: parent.cid, node: result.value })
async () => context.ipld.get([parent.cid]).first(),
(node, next) => next(null, { cid: parent.cid, node })
].concat(
sourceTrails.map((sourceTrail, index) => {
return (parent, done) => {
Expand Down
25 changes: 13 additions & 12 deletions src/core/utils/add-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
generatePath,
updateHamtDirectory
} = require('./hamt-utils')
const toMulticodecCode = require('./to-multicodec-code')

const defaultOptions = {
parent: undefined,
Expand Down Expand Up @@ -42,8 +43,8 @@ const addLink = (context, options, callback) => {
log('Loading parent node', options.parentCid.toBaseEncodedString())

return waterfall([
(cb) => context.ipld.get(options.parentCid, cb),
(result, cb) => cb(null, result.value),
async () => context.ipld.get([options.parentCid]).first(),
(node, cb) => cb(null, node),
(node, cb) => addLink(context, {
...options,
parent: node
Expand Down Expand Up @@ -108,17 +109,18 @@ const addToDirectory = (context, options, callback) => {
waterfall([
(done) => DAGNode.rmLink(options.parent, options.name, done),
(parent, done) => DAGNode.addLink(parent, new DAGLink(options.name, options.size, options.cid), done),
(parent, done) => {
async (parent) => {
// Persist the new parent DAGNode
context.ipld.put(parent, {
const cid = await context.ipld.put([parent], {
version: options.cidVersion,
format: options.codec,
hashAlg: options.hashAlg,
format: toMulticodecCode(options.codec),
hashAlg: toMulticodecCode(options.hashAlg),
hashOnly: !options.flush
}, (error, cid) => done(error, {
}).first()
return {
node: parent,
cid
}))
}
}
], callback)
}
Expand Down Expand Up @@ -162,10 +164,9 @@ const updateShard = (context, positions, child, options, callback) => {
size: child.size,
multihash: child.cid.buffer
}], {}, done),
({ node: { links: [ shard ] } }, done) => {
return context.ipld.get(shard.cid, (err, result) => {
done(err, { cid: shard.cid, node: result && result.value })
})
async ({ node: { links: [ shard ] } }) => {
const node = await context.ipld.get([shard.cid]).first()
return { cid: shard.cid, node }
},
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
Expand Down
21 changes: 13 additions & 8 deletions src/core/utils/create-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ const UnixFS = require('ipfs-unixfs')
const {
DAGNode
} = require('ipld-dag-pb')
const multicodec = require('multicodec')
const toMulticodecCode = require('./to-multicodec-code')

const createNode = (context, type, options, callback) => {
waterfall([
(done) => DAGNode.create(new UnixFS(type).marshal(), [], done),
(node, done) => context.ipld.put(node, {
version: options.cidVersion,
format: options.format,
hashAlg: options.hashAlg
}, (err, cid) => done(err, {
cid,
node
}))
async (node) => {
const cid = await context.ipld.put([node], {
version: options.cidVersion,
format: toMulticodecCode(options.format),
hashAlg: toMulticodecCode(options.hashAlg)
}).first()
return {
cid,
node
}
}
], callback)
}

Expand Down
78 changes: 39 additions & 39 deletions src/core/utils/hamt-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Bucket = require('hamt-sharding/src/bucket')
const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
const log = require('debug')('ipfs:mfs:core:utils:hamt-utils')
const UnixFS = require('ipfs-unixfs')
const toMulticodecCode = require('./to-multicodec-code')

const updateHamtDirectory = (context, links, bucket, options, callback) => {
// update parent with new bit field
Expand All @@ -21,17 +22,18 @@ const updateHamtDirectory = (context, links, bucket, options, callback) => {

DAGNode.create(dir.marshal(), links, cb)
},
(parent, done) => {
async (parent) => {
// Persist the new parent DAGNode
context.ipld.put(parent, {
const cid = await context.ipld.put([parent], {
version: options.cidVersion,
format: options.codec,
hashAlg: options.hashAlg,
format: toMulticodecCode(options.codec),
hashAlg: toMulticodecCode(options.hashAlg),
hashOnly: !options.flush
}, (error, cid) => done(error, {
}).first()
return {
node: parent,
cid
}))
}
}
], callback)
}
Expand Down Expand Up @@ -137,43 +139,41 @@ const generatePath = (context, fileName, rootNode, callback) => {

// found subshard
log(`Found subshard ${segment.prefix}`)
context.ipld.get(link.cid, (err, result) => {
if (err) {
return next(err)
}

// subshard hasn't been loaded, descend to the next level of the HAMT
if (!path[index - 1]) {
log(`Loaded new subshard ${segment.prefix}`)
const node = result.value

return recreateHamtLevel(node.links, rootBucket, segment.bucket, parseInt(segment.prefix, 16), async (err, bucket) => {
if (err) {
return next(err)
}

const position = await rootBucket._findNewBucketAndPos(fileName)

index++
path.unshift({
bucket: position.bucket,
prefix: toPrefix(position.pos),
node: node
context.ipld.get([link.cid]).first().then(
(node) => {
// subshard hasn't been loaded, descend to the next level of the HAMT
if (!path[index - 1]) {
log(`Loaded new subshard ${segment.prefix}`)

return recreateHamtLevel(node.links, rootBucket, segment.bucket, parseInt(segment.prefix, 16), async (err, bucket) => {
if (err) {
return next(err)
}

const position = await rootBucket._findNewBucketAndPos(fileName)

index++
path.unshift({
bucket: position.bucket,
prefix: toPrefix(position.pos),
node: node
})

next()
})
}

next()
})
}

const nextSegment = path[index - 1]
const nextSegment = path[index - 1]

// add intermediate links to bucket
addLinksToHamtBucket(result.value.links, nextSegment.bucket, rootBucket, (error) => {
nextSegment.node = result.value
// add intermediate links to bucket
addLinksToHamtBucket(node.links, nextSegment.bucket, rootBucket, (error) => {
nextSegment.node = node

next(error)
})
})
next(error)
})
},
(error) => next(error)
)
},
async (err, path) => {
await rootBucket.put(fileName, true)
Expand Down
6 changes: 3 additions & 3 deletions src/core/utils/load-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ const loadNode = (context, dagLink, callback) => {
log(`Loading DAGNode for child ${cid.toBaseEncodedString()}`)

waterfall([
(cb) => context.ipld.get(cid, cb),
(result, cb) => cb(null, {
node: result.value,
async () => context.ipld.get([cid]).first(),
(node, cb) => cb(null, {
node,
cid
})
], callback)
Expand Down
31 changes: 17 additions & 14 deletions src/core/utils/remove-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const {
generatePath,
updateHamtDirectory
} = require('./hamt-utils')
const toMulticodecCode = require('./to-multicodec-code')

const defaultOptions = {
parent: undefined,
Expand Down Expand Up @@ -39,8 +40,8 @@ const removeLink = (context, options, callback) => {
log('Loading parent node', options.parentCid.toBaseEncodedString())

return waterfall([
(cb) => context.ipld.get(options.parentCid, cb),
(result, cb) => cb(null, result.value),
async () => context.ipld.get([options.parentCid]).first(),
(node, cb) => cb(null, node),
(node, cb) => removeLink(context, {
...options,
parent: node
Expand Down Expand Up @@ -68,15 +69,16 @@ const removeLink = (context, options, callback) => {
const removeFromDirectory = (context, options, callback) => {
waterfall([
(cb) => DAGNode.rmLink(options.parent, options.name, cb),
(newParentNode, cb) => {
context.ipld.put(newParentNode, {
async (newParentNode) => {
const cid = await context.ipld.put([newParentNode], {
version: options.cidVersion,
format: options.codec,
hashAlg: options.hashAlg
}, (error, cid) => cb(error, {
format: toMulticodecCode(options.codec),
hashAlg: toMulticodecCode(options.hashAlg)
}).first()
return {
node: newParentNode,
cid
}))
}
},
(result, cb) => {
log('Updated regular directory', result.cid.toBaseEncodedString())
Expand Down Expand Up @@ -129,16 +131,17 @@ const updateShard = (context, positions, child, options, callback) => {

return waterfall([
(done) => DAGNode.rmLink(node, link.name, done),
(node, done) => {
context.ipld.put(node, {
async (node) => {
const cid = await context.ipld.put([node], {
version: options.cidVersion,
format: options.codec,
hashAlg: options.hashAlg,
format: toMulticodecCode(options.codec),
hashAlg: toMulticodecCode(options.hashAlg),
hashOnly: !options.flush
}, (error, cid) => done(error, {
}).first()
return {
node,
cid
}))
}
},
(result, done) => {
bucket.del(child.name)
Expand Down
16 changes: 16 additions & 0 deletions src/core/utils/to-multicodec-code.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict'

const multicodec = require('multicodec')

// Converts a multicodec name to the corresponding code if it isn't a code
// already
const toMulticodecCode = (name) => {
if (typeof name === 'string') {
const constantName = name.toUpperCase().replace(/-/g, '_')
return multicodec[constantName]
} else {
return name
}
}

module.exports = toMulticodecCode
4 changes: 3 additions & 1 deletion src/core/utils/update-tree.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ const updateTree = (context, trail, options, callback) => {
options = Object.assign({}, defaultOptions, options)

waterfall([
(cb) => context.ipld.getMany(trail.map(node => node.cid), cb),
async () => {
return context.ipld.get(trail.map(node => node.cid)).all()
},
(nodes, cb) => {
let index = trail.length - 1

Expand Down
9 changes: 5 additions & 4 deletions src/core/utils/with-mfs-root.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const {
} = require('ipld-dag-pb')
const log = require('debug')('ipfs:mfs:utils:with-mfs-root')
const waterfall = require('async/waterfall')
const multicodec = require('multicodec')

const {
MFS_ROOT_KEY
Expand All @@ -26,11 +27,11 @@ const withMfsRoot = (context, callback) => {
return waterfall([
// Store an empty node as the root
(next) => DAGNode.create(new UnixFs('directory').marshal(), next),
(node, next) => context.ipld.put(node, {
async (node) => context.ipld.put([node], {
version: 0,
hashAlg: 'sha2-256',
format: 'dag-pb'
}, next),
hashAlg: multicodec.SHA2_256,
format: multicodec.DAG_PB
}).first(),
// Store the Buffer in the datastore
(cid, next) => context.repo.datastore.put(MFS_ROOT_KEY, cid.buffer, (error) => next(error, cid))
], cb)
Expand Down
Loading

0 comments on commit 7ac770c

Please sign in to comment.