Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet WAL #1878

Merged
merged 39 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1bc0a59
vParquet WALBlock
joe-elliott Sep 22, 2022
8ea93f1
tests
joe-elliott Sep 26, 2022
192ff79
first pass fetch
joe-elliott Oct 24, 2022
c40b302
todo
joe-elliott Oct 31, 2022
72c9ca7
Fix flaky wal tests, add benchmark just for write path
mdisibio Nov 2, 2022
fa28664
Reuse buffer/writer for efficiency, fix timestamp replay
mdisibio Nov 2, 2022
49647be
Make wal block version configurable, update many tests
mdisibio Nov 2, 2022
b4ba183
ingester flush wal after cutting traces
mdisibio Nov 2, 2022
3f10f34
Merge branch 'main' into parquet-wal
mdisibio Nov 4, 2022
d26ffa5
Fix parquet slack time handling by writing meta.json along with data …
mdisibio Nov 4, 2022
39b7a6a
Don't use parquet.GenericBuffer
mdisibio Nov 4, 2022
a0e2031
Pool trace buffers
mdisibio Nov 4, 2022
38b26d9
Add more wal benchmarks
mdisibio Nov 4, 2022
5d6b158
Store map of trace ids to flushed page
mdisibio Nov 7, 2022
7c67bec
Read fewer columns on wal replay
mdisibio Nov 7, 2022
d794ccb
Add completeblock benchmark for all encoding combinations
mdisibio Nov 7, 2022
08c0f4f
Use internal iterator when completing parquet wal -> parquet complete…
mdisibio Nov 7, 2022
a5d98fe
Fix tests and lint
mdisibio Nov 7, 2022
b5c2538
Rename/cleanup/fix benchmark config
mdisibio Nov 8, 2022
8803880
Wire up searching wal blocks, and don't write flatbuffer data for wal…
mdisibio Nov 8, 2022
5906d6f
Fix flaky test
mdisibio Nov 8, 2022
5c3b98d
Test all combinations of wal->complete versions, fix bug in v2 create…
mdisibio Nov 8, 2022
52059e5
Use pooling in parquet wal->complete process
mdisibio Nov 8, 2022
a75d66d
Global trace pooling instead of per-iterator
mdisibio Nov 9, 2022
e0a6ddf
Disable pooling again
mdisibio Nov 9, 2022
1532780
Merge branch 'main' into parquet-wal
mdisibio Nov 9, 2022
da5fcb8
Honor ingestionslack, lint
mdisibio Nov 9, 2022
fe9b5a1
changelog
mdisibio Nov 9, 2022
edb2712
review feedback
mdisibio Nov 9, 2022
6663964
Fix test
mdisibio Nov 9, 2022
04bf1dc
Ingester flush traces in order by ID, so we can skip buffering in the…
mdisibio Nov 21, 2022
29f12e9
comment
mdisibio Nov 21, 2022
f51937a
comment
mdisibio Nov 21, 2022
6784137
Parquet wal maintain index of traces to rownumber to allow out-of-ord…
mdisibio Nov 22, 2022
675fc64
Merge branch 'main into parquet-wal
mdisibio Nov 22, 2022
a7fac91
update mod
mdisibio Nov 22, 2022
cc14484
cleanup
mdisibio Nov 22, 2022
8b0464b
Add partial replay test for parquet wal block
mdisibio Nov 22, 2022
e058431
lint
mdisibio Nov 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Internal types are updated to use `scope` instead of `instrumentation_library`.
* [ENHANCEMENT] metrics-generator: extract `status_message` field from spans [#1786](https:/grafana/tempo/pull/1786), [#1794](https:/grafana/tempo/pull/1794) (@stoewer)
* [ENHANCEMENT] metrics-generator: handle collisions between user defined and default dimensions [#1794](https:/grafana/tempo/pull/1794) (@stoewer)
* [ENHANCEMENT] distributor: Log span names when `distributor.log_received_spans.include_all_attributes` is on [#1790](https:/grafana/tempo/pull/1790) (@suraciii)
* [ENHANCEMENT] Add parquet WAL [#1878](https:/grafana/tempo/pull/1878) (@joe-elliott, @mdisibio)
* [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https:/grafana/tempo/pull/1697) (@joe-elliott)
* [BUGFIX] Correctly propagate errors from the iterator layer up through the queriers [#1723](https:/grafana/tempo/pull/1723) (@joe-elliott)
* [BUGFIX] Make multitenancy work with HTTP [#1781](https:/grafana/tempo/pull/1781) (@gouthamve)
Expand Down
20 changes: 12 additions & 8 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestPushQueryAllEncodings(t *testing.T) {
TraceID: traceID,
})
require.NoError(t, err, "unexpected error querying")
trace.SortTrace(foundTrace.Trace)
equal := proto.Equal(traces[i], foundTrace.Trace)
require.True(t, equal)
}
Expand Down Expand Up @@ -152,8 +153,8 @@ func TestWal(t *testing.T) {
TraceID: traceID,
})
require.NoError(t, err, "unexpected error querying")
equal := proto.Equal(traces[i], foundTrace.Trace)
require.True(t, equal)
trace.SortTrace(foundTrace.Trace)
test.TracesEqual(t, traces[i], foundTrace.Trace)
}

// a block that has been replayed should have a flush queue entry to complete it
Expand All @@ -173,8 +174,7 @@ func TestWal(t *testing.T) {
require.NoError(t, err, "unexpected error querying")

trace.SortTrace(foundTrace.Trace)
equal := proto.Equal(traces[i], foundTrace.Trace)
require.True(t, equal)
test.TracesEqual(t, traces[i], foundTrace.Trace)
}
}

Expand All @@ -187,20 +187,23 @@ func TestSearchWAL(t *testing.T) {
inst, _ := i.getOrCreateInstance("test")
require.NotNil(t, inst)

dec := model.MustNewSegmentDecoder(model.CurrentEncoding)

// create some search data
id := make([]byte, 16)
_, err = rand.Read(id)
require.NoError(t, err)
trace := test.MakeTrace(10, id)
traceBytes, err := trace.Marshal()
b1, err := dec.PrepareForWrite(trace, 0, 0)
require.NoError(t, err)

entry := &tempofb.SearchEntryMutable{}
entry.TraceID = id
entry.AddTag("foo", "bar")
searchBytes := entry.ToBytes()

// push to instance
require.NoError(t, inst.PushBytes(context.Background(), id, traceBytes, searchBytes))
require.NoError(t, inst.PushBytes(context.Background(), id, b1, searchBytes))

// Write wal
require.NoError(t, inst.CutCompleteTraces(0, true))
Expand Down Expand Up @@ -307,8 +310,8 @@ func TestFlush(t *testing.T) {
TraceID: traceID,
})
require.NoError(t, err, "unexpected error querying")
equal := proto.Equal(traces[i], foundTrace.Trace)
require.True(t, equal)
trace.SortTrace(foundTrace.Trace)
test.TracesEqual(t, traces[i], foundTrace.Trace)
}
}

Expand All @@ -333,6 +336,7 @@ func defaultIngesterModule(t testing.TB, tmpDir string) *Ingester {
},
WAL: &wal.Config{
Filepath: tmpDir,
Version: "v2",
},
},
}, log.NewNopLogger())
Expand Down
50 changes: 31 additions & 19 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
v2 "github.com/grafana/tempo/tempodb/encoding/v2"
"github.com/grafana/tempo/tempodb/search"
)

Expand Down Expand Up @@ -259,7 +260,9 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error
tempopb.ReuseByteSlices(t.batches)
}

return nil
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()
return i.headBlock.Flush()
}

// CutBlockIfReady cuts a completingBlock from the HeadBlock if ready.
Expand All @@ -274,11 +277,18 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes

now := time.Now()
if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {

// Final flush
err := i.headBlock.Flush()
if err != nil {
return uuid.Nil, fmt.Errorf("failed to flush head block: %w", err)
}

completingBlock := i.headBlock

i.completingBlocks = append(i.completingBlocks, completingBlock)

err := i.resetHeadBlock()
err = i.resetHeadBlock()
if err != nil {
return uuid.Nil, fmt.Errorf("failed to resetHeadBlock: %w", err)
}
Expand Down Expand Up @@ -451,15 +461,15 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace
combiner.Consume(completeTrace)

// headBlock
tr, err := i.headBlock.FindTraceByID(ctx, id, common.SearchOptions{})
tr, err := i.headBlock.FindTraceByID(ctx, id, common.DefaultSearchOptions())
if err != nil {
return nil, fmt.Errorf("headBlock.FindTraceByID failed: %w", err)
}
combiner.Consume(tr)

// completingBlock
for _, c := range i.completingBlocks {
tr, err = c.FindTraceByID(ctx, id, common.SearchOptions{})
tr, err = c.FindTraceByID(ctx, id, common.DefaultSearchOptions())
if err != nil {
return nil, fmt.Errorf("completingBlock.FindTraceByID failed: %w", err)
}
Expand All @@ -468,7 +478,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace

// completeBlock
for _, c := range i.completeBlocks {
found, err := c.FindTraceByID(ctx, id, common.SearchOptions{})
found, err := c.FindTraceByID(ctx, id, common.DefaultSearchOptions())
if err != nil {
return nil, fmt.Errorf("completeBlock.FindTraceByID failed: %w", err)
}
Expand Down Expand Up @@ -538,21 +548,23 @@ func (i *instance) resetHeadBlock() error {
i.headBlock = newHeadBlock
i.lastBlockCut = time.Now()

// Create search data wal file
f, enc, err := i.writer.WAL().NewFile(i.headBlock.BlockMeta().BlockID, i.instanceID, searchDir)
if err != nil {
return err
}
// Create search data wal file if needed
if i.useFlatbufferSearch || i.headBlock.BlockMeta().Version == v2.VersionString {
f, enc, err := i.writer.WAL().NewFile(i.headBlock.BlockMeta().BlockID, i.instanceID, searchDir)
if err != nil {
return err
}

b, err := search.NewStreamingSearchBlockForFile(f, i.headBlock.BlockMeta().BlockID, enc)
if err != nil {
return err
}
if i.searchHeadBlock != nil {
i.searchAppendBlocks[oldHeadBlock.BlockMeta().BlockID.String()] = i.searchHeadBlock
}
i.searchHeadBlock = &searchStreamingBlockEntry{
b: b,
b, err := search.NewStreamingSearchBlockForFile(f, i.headBlock.BlockMeta().BlockID, enc)
if err != nil {
return err
}
if i.searchHeadBlock != nil {
i.searchAppendBlocks[oldHeadBlock.BlockMeta().BlockID.String()] = i.searchHeadBlock
}
i.searchHeadBlock = &searchStreamingBlockEntry{
b: b,
}
}
return nil
}
Expand Down
69 changes: 47 additions & 22 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
// deadlocking with other activity (ingest, flushing), caused by releasing
// and then attempting to retake the lock.
i.blocksMtx.RLock()
i.searchWAL(ctx, p, sr)
i.searchWAL(ctx, req, p, sr)
i.searchLocalBlocks(ctx, req, p, sr)
i.blocksMtx.RUnlock()

Expand Down Expand Up @@ -143,7 +143,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr *
}

// searchWAL starts a search task for every WAL block. Must be called under lock.
func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search.Results) {
func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p search.Pipeline, sr *search.Results) {
searchFunc := func(e *searchStreamingBlockEntry) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchWAL")
defer span.Finish()
Expand All @@ -162,11 +162,43 @@ func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search.
}
}

searchWalBlock := func(b common.WALBlock) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchWALBlock", opentracing.Tags{
"blockID": b.BlockMeta().BlockID,
})
defer span.Finish()
defer sr.FinishWorker()

resp, err := b.Search(ctx, req, common.DefaultSearchOptions())
if err != nil {
level.Error(log.Logger).Log("msg", "error searching wal block", "blockID", b.BlockMeta().BlockID.String(), "err", err)
return
}

sr.AddBlockInspected()
sr.AddBytesInspected(resp.Metrics.InspectedBytes)
for _, r := range resp.Traces {
sr.AddResult(ctx, r)
}
}

// head block
sr.StartWorker()
go searchFunc(i.searchHeadBlock)
if i.headBlock != nil {
sr.StartWorker()
go searchWalBlock(i.headBlock)
}

if i.searchHeadBlock != nil {
sr.StartWorker()
go searchFunc(i.searchHeadBlock)
}

// completing blocks
for _, b := range i.completingBlocks {
sr.StartWorker()
go searchWalBlock(b)
}

for _, e := range i.searchAppendBlocks {
sr.StartWorker()
go searchFunc(e)
Expand Down Expand Up @@ -223,10 +255,7 @@ func (i *instance) searchLocalBlocks(ctx context.Context, req *tempopb.SearchReq
span.LogFields(ot_log.Event("local block entry mtx acquired"))
span.SetTag("blockID", blockID)

resp, err := e.Search(ctx, req, common.SearchOptions{
ReadBufferCount: 32,
ReadBufferSize: 1024 * 1024,
})
resp, err := e.Search(ctx, req, common.DefaultSearchOptions())
if err != nil {
level.Error(log.Logger).Log("msg", "error searching local block", "blockID", blockID, "err", err)
return
Expand Down Expand Up @@ -286,10 +315,7 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse,
continue
}

err = b.SearchTags(ctx, distinctValues.Collect, common.SearchOptions{
ReadBufferCount: 32,
ReadBufferSize: 1024 * 1024,
})
err = b.SearchTags(ctx, distinctValues.Collect, common.DefaultSearchOptions())
if err == common.ErrUnsupported {
level.Warn(log.Logger).Log("msg", "block does not support tag search", "blockID", b.BlockMeta().BlockID)
continue
Expand Down Expand Up @@ -358,10 +384,7 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
continue
}

err = b.SearchTagValues(ctx, tagName, distinctValues.Collect, common.SearchOptions{
ReadBufferCount: 32,
ReadBufferSize: 1024 * 1024,
})
err = b.SearchTagValues(ctx, tagName, distinctValues.Collect, common.DefaultSearchOptions())
if err == common.ErrUnsupported {
level.Warn(log.Logger).Log("msg", "block does not support tag value search", "blockID", b.BlockMeta().BlockID)
continue
Expand Down Expand Up @@ -429,12 +452,14 @@ func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visitFn func(bl
return visitFn(entry.b)
}

err := visitUnderLock(i.searchHeadBlock)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
if i.searchHeadBlock != nil {
err := visitUnderLock(i.searchHeadBlock)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}

for _, b := range i.searchAppendBlocks {
Expand Down
4 changes: 3 additions & 1 deletion modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func TestInstanceSearch(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)

err = ingester.stopping(nil)
require.NoError(t, err)
})
}
}
Expand Down Expand Up @@ -153,7 +156,6 @@ func testSearchTagsAndValues(t *testing.T, ctx context.Context, i *instance, tag

sort.Strings(srv.TagValues)
assert.Contains(t, sr.TagNames, tagName)
assert.Equal(t, tagName, sr.TagNames[0])
assert.Equal(t, expectedTagValues, srv.TagValues)
}

Expand Down
12 changes: 8 additions & 4 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
}

func TestInstanceCutBlockIfReady(t *testing.T) {
dec := model.MustNewSegmentDecoder(model.CurrentEncoding)

tt := []struct {
name string
maxBlockLifetime time.Duration
Expand Down Expand Up @@ -416,14 +418,16 @@ func TestInstanceCutBlockIfReady(t *testing.T) {
instance, _ := defaultInstance(t)

for i := 0; i < tc.pushCount; i++ {
request := makeRequest([]byte{})
err := instance.PushBytesRequest(context.Background(), request)
tr := test.MakeTrace(1, uuid.Nil[:])
bytes, err := dec.PrepareForWrite(tr, 0, 0)
require.NoError(t, err)
err = instance.PushBytes(context.Background(), uuid.Nil[:], bytes, nil)
require.NoError(t, err)
}

// Defaults
if tc.maxBlockBytes == 0 {
tc.maxBlockBytes = 1000
tc.maxBlockBytes = 100000
}
if tc.maxBlockLifetime == 0 {
tc.maxBlockLifetime = time.Hour
Expand Down Expand Up @@ -688,7 +692,7 @@ func makeRequestWithByteLimit(maxBytes int, traceID []byte) *tempopb.PushBytesRe
batch := test.MakeBatch(1, traceID)

for batch.Size() < maxBytes {
batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpan(traceID))
batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpanWithAttributeCount(traceID, 0))
}

return makePushBytesRequest(traceID, batch)
Expand Down
2 changes: 2 additions & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/grafana/tempo/tempodb/backend/s3"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
v2 "github.com/grafana/tempo/tempodb/encoding/v2"
"github.com/grafana/tempo/tempodb/pool"
"github.com/grafana/tempo/tempodb/wal"
)
Expand All @@ -36,6 +37,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Trace.WAL = &wal.Config{}
f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.")
cfg.Trace.WAL.Version = v2.VersionString
cfg.Trace.WAL.Encoding = backend.EncSnappy
cfg.Trace.WAL.SearchEncoding = backend.EncNone
cfg.Trace.WAL.IngestionSlack = 2 * time.Minute
Expand Down
Loading