Skip to content

Commit

Permalink
Upgrade to trampoline based Amalgamate.
Browse files Browse the repository at this point in the history
Created a trampoline to use with the MVCC iterator collection. An async
call will only be performed if necessary, if an asynchronous file read
in required. If the results in the the cache, a descent of the b-tree is
synchronous. Reduces trips to the async event queue. Allows us to use
the decorator pattern for iterators without nesting `async` calls. Any
`async` call is exported through the trampoline. If there are `async`
calls in the trampoline they are awaited `await`ed, if not they are
ignored.

Closes #247.
  • Loading branch information
flatheadmill committed Nov 2, 2020
1 parent c02ef5d commit 2094166
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 59 deletions.
98 changes: 39 additions & 59 deletions locket.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ const Cache = require('b-tree/cache')
const AbstractLevelDOWN = require('abstract-leveldown').AbstractLevelDOWN
const AbstractIterator = require('abstract-leveldown').AbstractIterator

const Trampoline = require('reciprocate')

const Destructible = require('destructible')

const Amalgamator = require('amalgamate')
const Locker = require('amalgamate/locker')

const callbackify = require('prospective/callbackify')

Expand Down Expand Up @@ -89,11 +92,16 @@ class Paginator {
return []
}
} else {
const next = await this._iterator.next()
if (next.done) {
let items = null
const trampoline = new Trampoline
this._iterator.next(trampoline, $items => items = $items)
while (trampoline.seek()) {
await trampoline.shift()
}
if (this._iterator.done) {
return []
} else {
this._items = next.value
this._items = items
this._index = 0
}
}
Expand All @@ -117,8 +125,8 @@ function Iterator (db, options) {
AbstractIterator.call(this, db)
this._constraint = constrain(Buffer.compare, encode, options)
this._db = db
this._versions = this._db._snapshot()
this._paginator = this._db._paginator(this._constraint, this._versions)
this._transaction = this._db._locker.snapshot()
this._paginator = this._db._paginator(this._constraint, this._transaction)
}
util.inherits(Iterator, AbstractIterator)

Expand All @@ -140,7 +148,7 @@ Iterator.prototype._seek = function (target) {
}

Iterator.prototype._end = function (callback) {
this._paginator.release()
this._db._locker.release(this._transaction)
callback()
}

Expand All @@ -161,38 +169,23 @@ Locket.prototype._serializeKey = encode
Locket.prototype._serializeValue = encode

Locket.prototype._open = callbackify(async function (options) {
this._locker = new Locker({ heft: coalesce(options.heft, 1024 * 1024) })
// TODO What is the behavior if you close while opening, or open while
// closing?
this._amalgamator = new Amalgamator(new Destructible('locket'), {
locker: this._locker,
directory: this.location,
cache: new Cache,
comparator: Buffer.compare,
header: {
compose: function (version, method, index, count) {
return { header: { method, index }, count, version }
},
serialize: function (header) {
return Buffer.from(JSON.stringify({
header: {
method: header.header.method,
index: header.header.index
},
count: header.count,
version: header.version.toString()
}))
},
deserialize: function (buffer) {
const header = JSON.parse(buffer.toString())
header.version = BigInt(header.version)
return header
}
},
parts: {
serialize: function (parts) { return parts },
deserialize: function (parts) { return parts }
},
key: {
compare: Buffer.compare,
extract: function (parts) {
return parts[0]
},
serialize: function (key) {
return [ key ]
},
Expand All @@ -201,42 +194,27 @@ Locket.prototype._open = callbackify(async function (options) {
}
},
transformer: function (operation) {
if (operation.type == 'put') {
return {
method: 'insert',
key: operation.key,
parts: [ operation.key, operation.value ]
}
}
return {
method: operation.type == 'put' ? 'insert' : 'remove',
key: encode(operation.key),
value: ('value' in operation) ? encode(operation.value) : null
method: 'remove',
key: operation.key
}
},
...this._options,
...options
})
await this._amalgamator.ready
const counts = {}
this._versions = { 0: true }
for (const stage of this._amalgamator._stages) {
stage.versions = {}
for await (const items of mvcc.riffle.forward(stage.strata, Strata.MIN)) {
for (const item of items) {
const version = item.parts[0].version
if (counts[version] == null) {
assert(item.parts[0].count != null)
this._versions[version] = counts[version] = item.parts[0].count
}
if (0 == --counts[version]) {
stage.versions[version] = true
}
}
}
//await this._amalgamate()
//await this._unstage()
}
await this._amalgamator.count()
await this._amalgamator.locker.rotate()
return []
})

Locket.prototype._snapshot = function () {
return JSON.parse(JSON.stringify(this._versions))
}

// Iteration of the database requires merging the results from the deep storage
// b-tree and the one or two staging logs.
//
Expand All @@ -255,10 +233,10 @@ Locket.prototype._snapshot = function () {
// records that have not been deleted and that match the user's range critera.

//
Locket.prototype._paginator = function (constraint, versions) {
Locket.prototype._paginator = function (constraint, transaction) {
const { key, direction, inclusive } = constraint
const iterator = this._amalgamator.iterator(versions, direction, key, inclusive)
return new Paginator(iterator[Symbol.asyncIterator](), constraint)
const iterator = this._amalgamator.iterator(transaction, direction, key, inclusive)
return new Paginator(iterator, constraint)
}

Locket.prototype._iterator = function (options) {
Expand All @@ -270,18 +248,18 @@ Locket.prototype._get = callbackify(async function (key, options) {
const constraint = constrain(Buffer.compare, encode, {
gte: key, keys: true, values: true, keyAsBuffer: true, valueAsBuffer: true
})
const paginator = this._paginator(constraint, this._snapshot())
const snapshot = this._locker.snapshot()
const paginator = this._paginator(constraint, snapshot)
// TODO How do I reuse Cursor.found out of Riffle et. al.? Eh, no good way
// since we have to advance, merge, dilute, etc. anyway.
const next = await paginator.next()
paginator.release()
this._locker.release(snapshot)
if (next.length != 0 && Buffer.compare(next[0], key) == 0) {
return [ options.asBuffer ? next[1] : next[1].toString() ]
}
throw new Error('NotFoundError: not found')
})


Locket.prototype._put = function (key, value, options, callback) {
this._batch([{ type: 'put', key: key, value: value }], options, callback)
}
Expand All @@ -298,7 +276,9 @@ Locket.prototype._del = function (key, options, callback) {
Locket.prototype._batch = callbackify(async function (batch, options) {
const version = ++this._version
this._versions[version] = true
await this._amalgamator.merge(version, batch, batch.length)
const mutator = this._locker.mutator()
await this._amalgamator.merge(mutator, batch, true)
this._locker.commit(mutator)
return []
})

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"extant": "^1.0.20",
"riffle": "2.0.0-alpha.40",
"prospective": "0.3.0",
"reciprocate": "0.0.1",
"rescue": "7.0.0-alpha.1",
"rimraf": "3.0.2"
},
Expand Down

0 comments on commit 2094166

Please sign in to comment.