Skip to content

Commit

Permalink
support GetWithHint and optimize PutWithHint (dgraph-io#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jan 15, 2020
1 parent 0783c47 commit 905fadf
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 61 deletions.
164 changes: 104 additions & 60 deletions skl/skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,32 +247,21 @@ func (s *Skiplist) findNear(key []byte, less bool, allowEqual bool) (*node, bool
}
}

// findSpliceForLevel returns (outBefore, outAfter) with outBefore.key <= key <= outAfter.key.
// findSpliceForLevel returns (outBefore, outAfter, match) with outBefore.key < key <= outAfter.key.
// The input "before" tells us where to start looking.
// If we found a node with the same key, then we return outBefore = outAfter.
// If we found a node with the same key, then we return match = true.
// Otherwise, outBefore.key < key < outAfter.key.
func (s *Skiplist) findSpliceForLevel(key []byte, before, after *node, level int) (*node, *node) {
func (s *Skiplist) findSpliceForLevel(key []byte, before *node, level int) (*node, *node, bool) {
for {
// Assume before.key < key.
next := s.getNext(before, level)
if next == nil {
return before, next
return before, next, false
}
var cmp int
if next == after {
// We compared the same node on the upper level, no need to compare again.
cmp = -1
} else {
nextKey := next.key(s.arena)
cmp = y.CompareKeysWithVer(key, nextKey)
}
if cmp == 0 {
// Equality case.
return next, next
}
if cmp < 0 {
// before.key < key < next.key. We are done for this level.
return before, next
nextKey := next.key(s.arena)
cmp := y.CompareKeysWithVer(key, nextKey)
if cmp <= 0 {
return before, next, cmp == 0
}
before = next // Keep moving right on this level.
}
Expand All @@ -290,8 +279,56 @@ func (s *Skiplist) Put(key []byte, v y.ValueStruct) {
// Hint is used to speed up sequential write.
type Hint struct {
height int32
prev [maxHeight + 1]*node
next [maxHeight + 1]*node

// hitHeight is used to reduce cost of calculateRecomputeHeight.
// For random workload, comparing hint keys from bottom up is wasted work.
// So we record the hit height of the last operation, only grow recompute height from near that height.
hitHeight int32
prev [maxHeight + 1]*node
next [maxHeight + 1]*node
}

func (s *Skiplist) calculateRecomputeHeight(key []byte, hint *Hint, listHeight int32) int32 {
if hint.height < listHeight {
// Either splice is never used or list height has grown, we recompute all.
hint.prev[listHeight] = s.head
hint.next[listHeight] = nil
hint.height = int32(listHeight)
hint.hitHeight = hint.height
return listHeight
}
recomputeHeight := hint.hitHeight - 2
if recomputeHeight < 0 {
recomputeHeight = 0
}
for recomputeHeight < listHeight {
prevNode := hint.prev[recomputeHeight]
nextNode := hint.next[recomputeHeight]
prevNext := s.getNext(prevNode, int(recomputeHeight))
if prevNext != nextNode {
recomputeHeight++
continue
}
if prevNode != s.head &&
prevNode != nil &&
y.CompareKeysWithVer(key, prevNode.key(s.arena)) <= 0 {
// Key is before splice.
for prevNode == hint.prev[recomputeHeight] {
recomputeHeight++
}
continue
}
if nextNode != nil && y.CompareKeysWithVer(key, nextNode.key(s.arena)) > 0 {
// Key is after splice.
for nextNode == hint.next[recomputeHeight] {
recomputeHeight++
}
continue
}
break
}
hint.hitHeight = recomputeHeight
return recomputeHeight
}

// PutWithHint inserts the key-value pair with Hint for better sequential write performance.
Expand All @@ -311,51 +348,21 @@ func (s *Skiplist) PutWithHint(key []byte, v y.ValueStruct, hint *Hint) {
}
listHeight = s.getHeight()
}
recomputeHeight := int32(0)
spliceIsValid := hint != nil
if hint == nil {
hint = new(Hint)
}
if hint.height < listHeight {
// Either splice is never used or list height has grown, we recompute all.
hint.prev[listHeight] = s.head
hint.next[listHeight] = nil
hint.height = listHeight
recomputeHeight = listHeight
} else {
for recomputeHeight < listHeight {
prevNext := s.getNext(hint.prev[recomputeHeight], int(recomputeHeight))
if prevNext != hint.next[recomputeHeight] {
recomputeHeight++
} else if hint.prev[recomputeHeight] != s.head &&
hint.prev[recomputeHeight] != nil &&
y.CompareKeysWithVer(key, hint.prev[recomputeHeight].key(s.arena)) <= 0 {
// Key is before splice.
bad := hint.prev[recomputeHeight]
for bad == hint.prev[recomputeHeight] {
recomputeHeight++
}
} else if hint.next[recomputeHeight] != nil && y.CompareKeysWithVer(key, hint.next[recomputeHeight].key(s.arena)) > 0 {
// Key is after splice.
bad := hint.next[recomputeHeight]
for bad == hint.next[recomputeHeight] {
recomputeHeight++
}
} else {
break
}
}
}
recomputeHeight := s.calculateRecomputeHeight(key, hint, listHeight)
if recomputeHeight > 0 {
for i := recomputeHeight - 1; i >= 0; i-- {
hint.prev[i], hint.next[i] = s.findSpliceForLevel(key, hint.prev[i+1], hint.next[i+1], int(i))
if hint.prev[i] == hint.next[i] {
var match bool
hint.prev[i], hint.next[i], match = s.findSpliceForLevel(key, hint.prev[i+1], int(i))
if match {
// In place update.
match := hint.prev[i]
match.setValue(s.arena, v)
for i >= 0 {
hint.prev[i] = match
hint.next[i] = match
hint.next[i].setValue(s.arena, v)
for i > 0 {
hint.prev[i-1] = hint.prev[i]
hint.next[i-1] = hint.next[i]
i--
}
return
Expand All @@ -379,7 +386,7 @@ func (s *Skiplist) PutWithHint(key []byte, v y.ValueStruct, hint *Hint) {
// CAS failed. We need to recompute prev and next.
// It is unlikely to be helpful to try to use a different level as we redo the search,
// because it is unlikely that lots of nodes are inserted between prev[i] and next[i].
hint.prev[i], hint.next[i] = s.findSpliceForLevel(key, hint.prev[i], nil, i)
hint.prev[i], hint.next[i], _ = s.findSpliceForLevel(key, hint.prev[i], i)
if i > 0 {
spliceIsValid = false
}
Expand All @@ -388,12 +395,49 @@ func (s *Skiplist) PutWithHint(key []byte, v y.ValueStruct, hint *Hint) {
if spliceIsValid {
for i := 0; i < height; i++ {
hint.prev[i] = x
hint.next[i] = s.getNext(x, i)
}
} else {
hint.height = 0
}
}

func (s *Skiplist) GetWithHint(key []byte, hint *Hint) y.ValueStruct {
if hint == nil {
hint = new(Hint)
}
listHeight := s.getHeight()
recomputeHeight := s.calculateRecomputeHeight(key, hint, listHeight)
var n *node
if recomputeHeight > 0 {
for i := recomputeHeight - 1; i >= 0; i-- {
var match bool
hint.prev[i], hint.next[i], match = s.findSpliceForLevel(key, hint.prev[i+1], int(i))
if match {
n = hint.next[i]
for j := i; j >= 0; j-- {
hint.prev[j] = n
hint.next[j] = s.getNext(n, int(j))
}
break
}
}
} else {
n = hint.next[0]
}
if n == nil {
return y.ValueStruct{}
}
nextKey := s.arena.getKey(n.keyOffset, n.keySize)
if !y.SameKey(key, nextKey) {
return y.ValueStruct{}
}
valOffset, valSize := n.getValueOffset()
vs := s.arena.getVal(valOffset, valSize)
vs.Version = y.ParseTs(nextKey)
return vs
}

// Empty returns if the Skiplist is empty.
func (s *Skiplist) Empty() bool {
return s.findLast() == nil
Expand Down
102 changes: 102 additions & 0 deletions skl/skl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"fmt"
"math/rand"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -381,6 +382,31 @@ func TestPutWithHint(t *testing.T) {
require.True(t, cntGot == cnt)
}

func TestGetWithHint(t *testing.T) {
rand.Seed(time.Now().Unix())
l := NewSkiplist(arenaSize)
var keys [][]byte
sp := new(Hint)
for {
if l.arena.size() > arenaSize-256 {
break
}
key := randomKey()
keys = append(keys, key)
l.PutWithHint(key, y.ValueStruct{Value: key}, sp)
}
hint := new(Hint)
for _, key := range keys {
bytes.Equal(l.GetWithHint(key, hint).Value, key)
}
sort.Slice(keys, func(i, j int) bool {
return bytes.Compare(keys[i], keys[j]) < 0
})
for _, key := range keys {
bytes.Equal(l.GetWithHint(key, hint).Value, key)
}
}

func TestPutLargeValue(t *testing.T) {
l := NewSkiplist(arenaSize)
key := randomKey()
Expand Down Expand Up @@ -457,3 +483,79 @@ func BenchmarkReadWriteMap(b *testing.B) {
})
}
}

// Standard test. Some fraction is read. Some fraction is write. Writes have
// to go through mutex lock.
func BenchmarkGetSequential(b *testing.B) {
size := 300000
keys, l, _ := buildKeysAndList(size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := keys[i%size]
l.Get(key)
}
}

func BenchmarkGetWithHintSequential(b *testing.B) {
size := 300000
keys, l, hint := buildKeysAndList(size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := keys[i%size]
l.GetWithHint(key, hint)
}
}

func buildKeysAndList(size int) ([][]byte, *Skiplist, *Hint) {
l := NewSkiplist(32 * 1024 * 1024)
keys := make([][]byte, size)
hint := new(Hint)
for i := 0; i < size; i++ {
keys[i] = y.KeyWithTs([]byte(fmt.Sprintf("%05d", i)), 0)
}
for i := 0; i < size; i++ {
key := keys[i]
l.PutWithHint(key, y.ValueStruct{Value: []byte{byte(i)}}, hint)
}
return keys, l, hint
}

func BenchmarkGetRandom(b *testing.B) {
size := 300000
keys, l, _ := buildKeysAndList(size)
b.ResetTimer()
r := rand.New(rand.NewSource(1))
for i := 0; i < b.N; i++ {
key := keys[r.Int()%size]
l.Get(key)
}
}

func BenchmarkGetWithHintRandom(b *testing.B) {
size := 300000
keys, l, hint := buildKeysAndList(size)
b.ResetTimer()
r := rand.New(rand.NewSource(1))
for i := 0; i < b.N; i++ {
key := keys[r.Int()%size]
l.GetWithHint(key, hint)
}
}

func BenchmarkPutWithHint(b *testing.B) {
l := NewSkiplist(16 * 1024 * 1024)
size := 100000
keys := make([][]byte, size)
for i := 0; i < size; i++ {
keys[i] = y.KeyWithTs([]byte(fmt.Sprintf("%05d", i)), 0)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
hint := new(Hint)
l = NewSkiplist(16 * 1024 * 1024)
for j := 0; j < size; j++ {
key := keys[j]
l.PutWithHint(key, y.ValueStruct{Value: []byte{byte(j)}}, hint)
}
}
}
1 change: 0 additions & 1 deletion table/merge_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (mt *MergeIterator) FillValue(vs *y.ValueStruct) {
}
}


// NewMergeIterator creates a merge iterator
func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator {
if len(iters) == 0 {
Expand Down

0 comments on commit 905fadf

Please sign in to comment.