Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Ingester Record Insertion: Slice -> Map #681

Merged
merged 16 commits into from
Apr 30, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Performance: Improve Ingester Record Insertion. [#681](https:/grafana/tempo/pull/681)
* [ENHANCEMENT] Improve WAL Replay by not rebuilding the WAL. [#668](https:/grafana/tempo/pull/668)
* [ENHANCEMENT] Add config option to disable write extension to the ingesters. [#677](https:/grafana/tempo/pull/677)

Expand Down
4 changes: 3 additions & 1 deletion modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,15 @@ func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) {
return false, nil
}

level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID)
start := time.Now()
level.Info(log.Logger).Log("msg", "completing block", "userid", op.userID, "blockID", op.blockID)
instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
return false, err
}

err = instance.CompleteBlock(op.blockID)
level.Info(log.Logger).Log("msg", "block completed", "userid", op.userID, "blockID", op.blockID, "duration", time.Since(start))
if err != nil {
handleFailedOp(op, err)

Expand Down
45 changes: 31 additions & 14 deletions tempodb/encoding/appender.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package encoding

import (
"bytes"
"sort"
"hash"

"github.com/cespare/xxhash"
"github.com/grafana/tempo/tempodb/encoding/common"
)

// Appender is capable of tracking objects and ids that are added to it
type Appender interface {
Append(common.ID, []byte) error
Complete() error
Records() []*common.Record
Records() []common.Record
RecordsForID(common.ID) []common.Record
Length() int
DataLength() uint64
}

type appender struct {
dataWriter common.DataWriter
records []*common.Record
records map[uint64][]common.Record
hash hash.Hash64
currentOffset uint64
}

Expand All @@ -27,6 +29,8 @@ type appender struct {
func NewAppender(dataWriter common.DataWriter) Appender {
return &appender{
dataWriter: dataWriter,
records: map[uint64][]common.Record{},
hash: xxhash.New(),
}
}

Expand All @@ -43,24 +47,37 @@ func (a *appender) Append(id common.ID, b []byte) error {
return err
}

i := sort.Search(len(a.records), func(idx int) bool {
return bytes.Compare(a.records[idx].ID, id) == 1
})
a.records = append(a.records, nil)
copy(a.records[i+1:], a.records[i:])
a.records[i] = &common.Record{
a.hash.Reset()
_, _ = a.hash.Write(id)
hash := a.hash.Sum64()

records := a.records[hash]
records = append(records, common.Record{
ID: id,
Start: a.currentOffset,
Length: uint32(bytesWritten),
}

})
a.records[hash] = records
a.currentOffset += uint64(bytesWritten)

return nil
}

func (a *appender) Records() []*common.Record {
return a.records
func (a *appender) Records() []common.Record {
sliceRecords := make([]common.Record, 0, len(a.records))
for _, r := range a.records {
sliceRecords = append(sliceRecords, r...)
}

common.SortRecords(sliceRecords)
return sliceRecords
}

func (a *appender) RecordsForID(id common.ID) []common.Record {
a.hash.Reset()
_, _ = a.hash.Write(id)
hash := a.hash.Sum64()
return a.records[hash]
}

func (a *appender) Length() int {
Expand Down
30 changes: 26 additions & 4 deletions tempodb/encoding/appender_buffered.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package encoding

import (
"bytes"
"context"

"github.com/grafana/tempo/tempodb/encoding/common"
)

Expand All @@ -11,7 +14,7 @@ type bufferedAppender struct {
writer common.DataWriter

// record keeping
records []*common.Record
records []common.Record
totalObjects int
currentOffset uint64
currentRecord *common.Record
Expand All @@ -27,7 +30,7 @@ func NewBufferedAppender(writer common.DataWriter, indexDownsample int, totalObj
return &bufferedAppender{
writer: writer,
indexDownsampleBytes: indexDownsample,
records: make([]*common.Record, 0, totalObjectsEstimate/indexDownsample+1),
records: make([]common.Record, 0, totalObjectsEstimate/indexDownsample+1),
}, nil
}

Expand Down Expand Up @@ -59,10 +62,29 @@ func (a *bufferedAppender) Append(id common.ID, b []byte) error {
}

// Records returns a slice of the current records
func (a *bufferedAppender) Records() []*common.Record {
func (a *bufferedAppender) Records() []common.Record {
return a.records
}

func (a *bufferedAppender) RecordsForID(id common.ID) []common.Record {
_, i, _ := common.Records(a.records).Find(context.Background(), id)
if i >= len(a.records) || i < 0 {
return nil
}

sliceRecords := make([]common.Record, 0, 1)
for bytes.Equal(a.records[i].ID, id) {
sliceRecords = append(sliceRecords, a.records[i])

i++
if i >= len(a.records) {
break
}
}

return sliceRecords
}

// Length returns the number of written objects
func (a *bufferedAppender) Length() int {
return a.totalObjects
Expand Down Expand Up @@ -98,7 +120,7 @@ func (a *bufferedAppender) flush() error {
a.currentRecord.Length += uint32(bytesWritten)

// update index
a.records = append(a.records, a.currentRecord)
a.records = append(a.records, *a.currentRecord)
a.currentRecord = nil

return nil
Expand Down
28 changes: 25 additions & 3 deletions tempodb/encoding/appender_record.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package encoding

import (
"bytes"
"context"

"github.com/grafana/tempo/tempodb/encoding/common"
)

type recordAppender struct {
records []*common.Record
records []common.Record
}

// NewRecordAppender returns an appender that stores records only.
func NewRecordAppender(records []*common.Record) Appender {
func NewRecordAppender(records []common.Record) Appender {
return &recordAppender{
records: records,
}
Expand All @@ -21,10 +24,29 @@ func (a *recordAppender) Append(id common.ID, b []byte) error {
return common.ErrUnsupported
}

func (a *recordAppender) Records() []*common.Record {
func (a *recordAppender) Records() []common.Record {
return a.records
}

func (a *recordAppender) RecordsForID(id common.ID) []common.Record {
_, i, _ := common.Records(a.records).Find(context.Background(), id)
if i >= len(a.records) || i < 0 {
return nil
}

sliceRecords := make([]common.Record, 0, 1)
for bytes.Equal(a.records[i].ID, id) {
sliceRecords = append(sliceRecords, a.records[i])

i++
if i >= len(a.records) {
break
}
}

return sliceRecords
}

func (a *recordAppender) Length() int {
return len(a.records)
}
Expand Down
51 changes: 51 additions & 0 deletions tempodb/encoding/appender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package encoding

import (
"math/rand"
"testing"

"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/stretchr/testify/require"
)

type noopDataWriter struct{}

func (n noopDataWriter) Write(common.ID, []byte) (int, error) { return 10, nil }
func (n noopDataWriter) CutPage() (int, error) { return 100, nil }
func (n noopDataWriter) Complete() error { return nil }

func BenchmarkAppender100(b *testing.B) {
benchmarkAppender(b, 100)
}

func BenchmarkAppender1000(b *testing.B) {
benchmarkAppender(b, 1000)
}

func BenchmarkAppender10000(b *testing.B) {
benchmarkAppender(b, 10000)
}
func BenchmarkAppender200000(b *testing.B) {
benchmarkAppender(b, 200000)
}

func BenchmarkAppender500000(b *testing.B) {
benchmarkAppender(b, 500000)
}

func benchmarkAppender(b *testing.B, appendRecords int) {
for i := 0; i < b.N; i++ {
appender := NewAppender(noopDataWriter{})

for j := 0; j < appendRecords; j++ {
id := make([]byte, 16)
_, err := rand.Read(id)
require.NoError(b, err)

err = appender.Append(id, nil)
require.NoError(b, err)
}

_ = appender.Records()
}
}
6 changes: 3 additions & 3 deletions tempodb/encoding/common/index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
)

// Records is a slice of *Record
type Records []*Record
type Records []Record

// At implements IndexReader
func (r Records) At(_ context.Context, i int) (*Record, error) {
if i < 0 || i >= len(r) {
return nil, nil
}

return r[i], nil
return &r[i], nil
}

// Find implements IndexReader
Expand All @@ -28,5 +28,5 @@ func (r Records) Find(_ context.Context, id ID) (*Record, int, error) {
return nil, -1, nil
}

return r[i], i, nil
return &r[i], i, nil
}
4 changes: 2 additions & 2 deletions tempodb/encoding/common/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
)

type recordSorter struct {
records []*Record
records []Record
}

// SortRecords sorts a slice of record pointers
func SortRecords(records []*Record) {
func SortRecords(records []Record) {
sort.Sort(&recordSorter{
records: records,
})
Expand Down
10 changes: 5 additions & 5 deletions tempodb/encoding/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ObjectCombiner interface {
// DataReader is the primary abstraction point for supporting multiple data
// formats.
type DataReader interface {
Read(context.Context, []*Record, []byte) ([][]byte, []byte, error)
Read(context.Context, []Record, []byte) ([][]byte, []byte, error)
Close()

// NextPage can be used to iterate at a page at a time. May return ErrUnsupported for older formats
Expand Down Expand Up @@ -67,7 +67,7 @@ type DataWriter interface {
// IndexWriter is used to write paged indexes
type IndexWriter interface {
// Write returns a byte representation of the provided Records
Write([]*Record) ([]byte, error)
Write([]Record) ([]byte, error)
}

// ObjectReaderWriter represents a library of methods to read and write
Expand All @@ -81,9 +81,9 @@ type ObjectReaderWriter interface {
// RecordReaderWriter represents a library of methods to read and write
// records
type RecordReaderWriter interface {
MarshalRecords(records []*Record) ([]byte, error)
MarshalRecordsToBuffer(records []*Record, buffer []byte) error
MarshalRecords(records []Record) ([]byte, error)
MarshalRecordsToBuffer(records []Record, buffer []byte) error
RecordCount(b []byte) int
UnmarshalRecord(buff []byte) *Record
UnmarshalRecord(buff []byte) Record
RecordLength() int
}
6 changes: 3 additions & 3 deletions tempodb/encoding/finder_paged.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (f *pagedFinder) Find(ctx context.Context, id common.ID) ([]byte, error) {
}

for {
bytesOne, err := f.findOne(ctx, id, record)
bytesOne, err := f.findOne(ctx, id, *record)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -74,8 +74,8 @@ func (f *pagedFinder) Find(ctx context.Context, id common.ID) ([]byte, error) {
return bytesFound, nil
}

func (f *pagedFinder) findOne(ctx context.Context, id common.ID, record *common.Record) ([]byte, error) {
pages, _, err := f.r.Read(ctx, []*common.Record{record}, nil)
func (f *pagedFinder) findOne(ctx context.Context, id common.ID, record common.Record) ([]byte, error) {
pages, _, err := f.r.Read(ctx, []common.Record{record}, nil)
if err != nil {
return nil, err
}
Expand Down
Loading