Skip to content

Commit

Permalink
db: refactor disk usage estimate using range annotations
Browse files Browse the repository at this point in the history
This change updates `db.EstimateDiskUsage` to use range annotations to
estimate the disk usage of a key range. This should improve the
performance of repeated disk usage estimates for similar or identical
key ranges.

At the Cockroach layer we use `db.EstimateDiskUsage` in a few places,
most notably when [computing MVCC span stats](https:/cockroachdb/cockroach/blob/master/pkg/server/span_stats_server.go#L217).

Informs: #3793
  • Loading branch information
anish-shanbhag committed Aug 14, 2024
1 parent a6d2952 commit 4c6c47b
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 95 deletions.
108 changes: 43 additions & 65 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,14 @@ type DB struct {
// validating is set to true when validation is running.
validating bool
}

// annotators contains various instances of manifest.Annotator which
// should be protected from concurrent access.
annotators struct {
totalSizeAnnotator *manifest.Annotator[uint64]
remoteSizeAnnotator *manifest.Annotator[uint64]
externalSizeAnnotator *manifest.Annotator[uint64]
}
}

// Normally equal to time.Now() but may be overridden in tests.
Expand Down Expand Up @@ -2228,6 +2236,31 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
return destLevels, nil
}

// makeFileSizeAnnotator returns an annotator that computes the total size of
// files that meet some criteria defined by filter.
func (d *DB) makeFileSizeAnnotator(filter func(f *fileMetadata) bool) *manifest.Annotator[uint64] {
return &manifest.Annotator[uint64]{
Aggregator: manifest.SumAggregator{
AccumulateFunc: func(f *fileMetadata) (uint64, bool) {
if filter(f) {
return f.Size, true
}
return 0, true
},
AccumulatePartialOverlapFunc: func(f *fileMetadata, bounds base.UserKeyBounds) uint64 {
if filter(f) {
size, err := d.tableCache.estimateSize(f, bounds.Start, bounds.End.Key)
if err != nil {
return 0
}
return size
}
return 0
},
},
}
}

// EstimateDiskUsage returns the estimated filesystem space used in bytes for
// storing the range `[start, end]`. The estimation is computed as follows:
//
Expand All @@ -2254,7 +2287,9 @@ func (d *DB) EstimateDiskUsageByBackingType(
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.Comparer.Compare(start, end) > 0 {

bounds := base.UserKeyBoundsInclusive(start, end)
if !bounds.Valid(d.cmp) {
return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
}

Expand All @@ -2264,70 +2299,13 @@ func (d *DB) EstimateDiskUsageByBackingType(
readState := d.loadReadState()
defer readState.unref()

for level, files := range readState.current.Levels {
iter := files.Iter()
if level > 0 {
// We can only use `Overlaps` to restrict `files` at L1+ since at L0 it
// expands the range iteratively until it has found a set of files that
// do not overlap any other L0 files outside that set.
overlaps := readState.current.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
iter = overlaps.Iter()
}
for file := iter.First(); file != nil; file = iter.Next() {
if d.opts.Comparer.Compare(start, file.Smallest.UserKey) <= 0 &&
d.opts.Comparer.Compare(file.Largest.UserKey, end) <= 0 {
// The range fully contains the file, so skip looking it up in
// table cache/looking at its indexes, and add the full file size.
meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
if err != nil {
return 0, 0, 0, err
}
if meta.IsRemote() {
remoteSize += file.Size
if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
externalSize += file.Size
}
}
totalSize += file.Size
} else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 &&
d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 {
var size uint64
var err error
if file.Virtual {
err = d.tableCache.withVirtualReader(
file.VirtualMeta(),
func(r sstable.VirtualReader) (err error) {
size, err = r.EstimateDiskUsage(start, end)
return err
},
)
} else {
err = d.tableCache.withReader(
file.PhysicalMeta(),
func(r *sstable.Reader) (err error) {
size, err = r.EstimateDiskUsage(start, end)
return err
},
)
}
if err != nil {
return 0, 0, 0, err
}
meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
if err != nil {
return 0, 0, 0, err
}
if meta.IsRemote() {
remoteSize += size
if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
externalSize += size
}
}
totalSize += size
}
}
}
return totalSize, remoteSize, externalSize, nil
d.mu.Lock()
defer d.mu.Unlock()

totalSize = *d.mu.annotators.totalSizeAnnotator.VersionRangeAnnotation(readState.current, bounds)
remoteSize = *d.mu.annotators.remoteSizeAnnotator.VersionRangeAnnotation(readState.current, bounds)
externalSize = *d.mu.annotators.externalSizeAnnotator.VersionRangeAnnotation(readState.current, bounds)
return
}

func (d *DB) walPreallocateSize() int {
Expand Down
75 changes: 66 additions & 9 deletions internal/manifest/annotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ type AnnotationAggregator[T any] interface {
Merge(src *T, dst *T) *T
}

// A PartialOverlapAnnotationAggregator is an extension of AnnotationAggregator
// that allows for custom accumulation of range annotations for files that only
// partially overlap with the range.
type PartialOverlapAnnotationAggregator[T any] interface {
AnnotationAggregator[T]
AccumulatePartialOverlap(f *FileMetadata, dst *T, bounds base.UserKeyBounds) *T
}

type annotation struct {
// annotator is a pointer to the Annotator that computed this annotation.
// NB: This is untyped to allow AnnotationAggregator to use Go generics,
Expand Down Expand Up @@ -165,6 +173,15 @@ func (a *Annotator[T]) accumulateRangeAnnotation(

// Accumulate annotations from every item that overlaps the bounds.
for i := leftItem; i < rightItem; i++ {
if i == leftItem || i == rightItem-1 {
if agg, ok := a.Aggregator.(PartialOverlapAnnotationAggregator[T]); ok {
fb := n.items[i].UserKeyBounds()
if cmp(bounds.Start, fb.Start) > 0 || bounds.End.CompareUpperBounds(cmp, fb.End) < 0 {
a.scratch = agg.AccumulatePartialOverlap(n.items[i], a.scratch, bounds)
continue
}
}
}
v, _ := a.Aggregator.Accumulate(n.items[i], a.scratch)
a.scratch = v
}
Expand Down Expand Up @@ -258,6 +275,26 @@ func (a *Annotator[T]) LevelRangeAnnotation(lm LevelMetadata, bounds base.UserKe
return a.scratch
}

// VersionRangeAnnotation calculates the annotation defined by this Annotator
// for all files within the given Version which are within the range
// defined by bounds.
func (a *Annotator[T]) VersionRangeAnnotation(v *Version, bounds base.UserKeyBounds) *T {
accumulateSlice := func(ls LevelSlice) {
if ls.Empty() {
return
}
a.accumulateRangeAnnotation(ls.iter.r, v.cmp.Compare, bounds, false, false)
}
a.scratch = a.Aggregator.Zero(a.scratch)
for _, ls := range v.L0SublevelFiles {
accumulateSlice(ls)
}
for _, lm := range v.Levels[1:] {
accumulateSlice(lm.Slice())
}
return a.scratch
}

// InvalidateAnnotation clears any cached annotations defined by Annotator. A
// pointer to the Annotator is used as the key for pre-calculated values, so
// the same Annotator must be used to clear the appropriate cached annotation.
Expand All @@ -270,27 +307,47 @@ func (a *Annotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) {
a.invalidateNodeAnnotation(lm.tree.root)
}

// sumAggregator defines an Aggregator which sums together a uint64 value
// SumAggregator defines an Aggregator which sums together a uint64 value
// across files.
type sumAggregator struct {
accumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
type SumAggregator struct {
AccumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
AccumulatePartialOverlapFunc func(f *FileMetadata, bounds base.UserKeyBounds) uint64
}

func (sa sumAggregator) Zero(dst *uint64) *uint64 {
// Zero implements AnnotationAggregator.Zero, returning a new uint64 set to 0.
func (sa SumAggregator) Zero(dst *uint64) *uint64 {
if dst == nil {
return new(uint64)
}
*dst = 0
return dst
}

func (sa sumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
accumulated, ok := sa.accumulateFunc(f)
// Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
// file's uint64 value.
func (sa SumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
accumulated, ok := sa.AccumulateFunc(f)
*dst += accumulated
return dst, ok
}

func (sa sumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
// AccumulatePartialOverlap implements
// PartialOverlapAnnotationAggregator.AccumulatePartialOverlap, accumulating a
// single file's uint64 value for a file which only partially overlaps with the
// range defined by bounds.
func (sa SumAggregator) AccumulatePartialOverlap(
f *FileMetadata, dst *uint64, bounds base.UserKeyBounds,
) *uint64 {
if sa.AccumulatePartialOverlapFunc == nil {
v, _ := sa.Accumulate(f, dst)
return v
}
*dst += sa.AccumulatePartialOverlapFunc(f, bounds)
return dst
}

// Merge implements AnnotationAggregator.Merge by summing two uint64 values.
func (sa SumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
*dst += *src
return dst
}
Expand All @@ -300,8 +357,8 @@ func (sa sumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
// files.
func SumAnnotator(accumulate func(f *FileMetadata) (v uint64, cacheOK bool)) *Annotator[uint64] {
return &Annotator[uint64]{
Aggregator: sumAggregator{
accumulateFunc: accumulate,
Aggregator: SumAggregator{
AccumulateFunc: accumulate,
},
}
}
Expand Down
18 changes: 18 additions & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,24 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
d.newIters = d.tableCache.newIters
d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)

d.mu.annotators.totalSizeAnnotator = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
return true
})
d.mu.annotators.remoteSizeAnnotator = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
if err != nil {
return false
}
return meta.IsRemote()
})
d.mu.annotators.externalSizeAnnotator = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
if err != nil {
return false
}
return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
})

var previousOptionsFileNum base.DiskFileNum
var previousOptionsFilename string
for _, filename := range ls {
Expand Down
26 changes: 5 additions & 21 deletions table_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,27 +210,11 @@ func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
func (c *tableCacheContainer) estimateSize(
meta *fileMetadata, lower, upper []byte,
) (size uint64, err error) {
if meta.Virtual {
err = c.withVirtualReader(
meta.VirtualMeta(),
func(r sstable.VirtualReader) (err error) {
size, err = r.EstimateDiskUsage(lower, upper)
return err
},
)
} else {
err = c.withReader(
meta.PhysicalMeta(),
func(r *sstable.Reader) (err error) {
size, err = r.EstimateDiskUsage(lower, upper)
return err
},
)
}
if err != nil {
return 0, err
}
return size, nil
c.withCommonReader(meta, func(cr sstable.CommonReader) error {
size, err = cr.EstimateDiskUsage(lower, upper)
return err
})
return size, err
}

// createCommonReader creates a Reader for this file.
Expand Down

0 comments on commit 4c6c47b

Please sign in to comment.