diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f5a4ae8c3f..d6f85c2710b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ Internal types are updated to use `scope` instead of `instrumentation_library`. * [ENHANCEMENT] metrics-generator: handle collisions between user defined and default dimensions [#1794](https://github.com/grafana/tempo/pull/1794) (@stoewer) * [ENHANCEMENT] distributor: Log span names when `distributor.log_received_spans.include_all_attributes` is on [#1790](https://github.com/grafana/tempo/pull/1790) (@suraciii) * [ENHANCEMENT] metrics-generator: truncate label names and values exceeding a configurable length [#1897](https://github.com/grafana/tempo/pull/1897) (@kvrhdn) +* [ENHANCEMENT] Add parquet WAL [#1878](https://github.com/grafana/tempo/pull/1878) (@joe-elliott, @mdisibio) * [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https://github.com/grafana/tempo/pull/1697) (@joe-elliott) * [BUGFIX] Correctly propagate errors from the iterator layer up through the queriers [#1723](https://github.com/grafana/tempo/pull/1723) (@joe-elliott) * [BUGFIX] Make multitenancy work with HTTP [#1781](https://github.com/grafana/tempo/pull/1781) (@gouthamve) diff --git a/cmd/tempo-serverless/cloud-run/go.mod b/cmd/tempo-serverless/cloud-run/go.mod index 64302f1ca90..44d0216d89b 100644 --- a/cmd/tempo-serverless/cloud-run/go.mod +++ b/cmd/tempo-serverless/cloud-run/go.mod @@ -32,6 +32,7 @@ require ( github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cristalhq/hedgedhttp v0.7.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect @@ -88,6 +89,7 @@ require ( github.com/pelletier/go-toml/v2 v2.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.13.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect @@ -108,6 +110,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.12.0 // indirect + github.com/stretchr/testify v1.8.1 // indirect github.com/subosito/gotenv v1.3.0 // indirect github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect diff --git a/cmd/tempo-serverless/cloud-run/go.sum b/cmd/tempo-serverless/cloud-run/go.sum index baa35763c44..2d2191cdcf1 100644 --- a/cmd/tempo-serverless/cloud-run/go.sum +++ b/cmd/tempo-serverless/cloud-run/go.sum @@ -486,14 +486,18 @@ github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ= github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI= github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= diff --git a/cmd/tempo-serverless/lambda/go.mod b/cmd/tempo-serverless/lambda/go.mod index c0079055079..6a196a918f0 100644 --- a/cmd/tempo-serverless/lambda/go.mod +++ b/cmd/tempo-serverless/lambda/go.mod @@ -33,6 +33,7 @@ require ( github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cristalhq/hedgedhttp v0.7.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect @@ -89,6 +90,7 @@ require ( github.com/pelletier/go-toml/v2 v2.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.13.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect @@ -109,6 +111,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.12.0 // indirect + github.com/stretchr/testify v1.8.1 // indirect github.com/subosito/gotenv v1.3.0 // indirect github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect diff --git a/cmd/tempo-serverless/lambda/go.sum b/cmd/tempo-serverless/lambda/go.sum index 7712a415e43..13060d9c253 100644 --- a/cmd/tempo-serverless/lambda/go.sum +++ b/cmd/tempo-serverless/lambda/go.sum @@ -492,14 +492,18 @@ github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ= github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI= github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index f4bd5bd91b6..b8c457d8834 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -175,8 +175,7 @@ func TestWal(t *testing.T) { require.NoError(t, err, "unexpected error querying") trace.SortTrace(foundTrace.Trace) - equal := proto.Equal(traces[i], foundTrace.Trace) - require.True(t, equal) + test.TracesEqual(t, traces[i], foundTrace.Trace) } } @@ -189,20 +188,23 @@ func TestSearchWAL(t *testing.T) { inst, _ := i.getOrCreateInstance("test") require.NotNil(t, inst) + dec := model.MustNewSegmentDecoder(model.CurrentEncoding) + // create some search data id := make([]byte, 16) _, err = rand.Read(id) require.NoError(t, err) trace := test.MakeTrace(10, id) - traceBytes, err := trace.Marshal() + b1, err := dec.PrepareForWrite(trace, 0, 0) require.NoError(t, err) + entry := &tempofb.SearchEntryMutable{} entry.TraceID = id entry.AddTag("foo", "bar") searchBytes := entry.ToBytes() // push to instance - require.NoError(t, inst.PushBytes(context.Background(), id, traceBytes, searchBytes)) + require.NoError(t, inst.PushBytes(context.Background(), id, b1, searchBytes)) // Write wal require.NoError(t, inst.CutCompleteTraces(0, true)) @@ -336,6 +338,7 @@ func defaultIngesterModule(t testing.TB, tmpDir string) *Ingester { }, WAL: &wal.Config{ Filepath: tmpDir, + Version: "v2", }, }, }, log.NewNopLogger()) diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 05a5c13be9f..e18495b7389 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" + v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/grafana/tempo/tempodb/search" ) @@ -240,6 +241,11 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error tracesToCut := i.tracesToCut(cutoff, immediate) segmentDecoder := model.MustNewSegmentDecoder(model.CurrentEncoding) + // Sort by ID + sort.Slice(tracesToCut, func(i, j int) bool { + return bytes.Compare(tracesToCut[i].traceID, tracesToCut[j].traceID) == -1 + }) + for _, t := range tracesToCut { // sort batches before cutting to reduce combinations during compaction sortByteSlices(t.batches) @@ -259,7 +265,9 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error tempopb.ReuseByteSlices(t.batches) } - return nil + i.blocksMtx.Lock() + defer i.blocksMtx.Unlock() + return i.headBlock.Flush() } // CutBlockIfReady cuts a completingBlock from the HeadBlock if ready. @@ -274,11 +282,18 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes now := time.Now() if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate { + + // Final flush + err := i.headBlock.Flush() + if err != nil { + return uuid.Nil, fmt.Errorf("failed to flush head block: %w", err) + } + completingBlock := i.headBlock i.completingBlocks = append(i.completingBlocks, completingBlock) - err := i.resetHeadBlock() + err = i.resetHeadBlock() if err != nil { return uuid.Nil, fmt.Errorf("failed to resetHeadBlock: %w", err) } @@ -451,7 +466,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace combiner.Consume(completeTrace) // headBlock - tr, err := i.headBlock.FindTraceByID(ctx, id, common.SearchOptions{}) + tr, err := i.headBlock.FindTraceByID(ctx, id, common.DefaultSearchOptions()) if err != nil { return nil, fmt.Errorf("headBlock.FindTraceByID failed: %w", err) } @@ -459,7 +474,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace // completingBlock for _, c := range i.completingBlocks { - tr, err = c.FindTraceByID(ctx, id, common.SearchOptions{}) + tr, err = c.FindTraceByID(ctx, id, common.DefaultSearchOptions()) if err != nil { return nil, fmt.Errorf("completingBlock.FindTraceByID failed: %w", err) } @@ -468,7 +483,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace // completeBlock for _, c := range i.completeBlocks { - found, err := c.FindTraceByID(ctx, id, common.SearchOptions{}) + found, err := c.FindTraceByID(ctx, id, common.DefaultSearchOptions()) if err != nil { return nil, fmt.Errorf("completeBlock.FindTraceByID failed: %w", err) } @@ -538,21 +553,23 @@ func (i *instance) resetHeadBlock() error { i.headBlock = newHeadBlock i.lastBlockCut = time.Now() - // Create search data wal file - f, enc, err := i.writer.WAL().NewFile(i.headBlock.BlockMeta().BlockID, i.instanceID, searchDir) - if err != nil { - return err - } + // Create search data wal file if needed + if i.useFlatbufferSearch || i.headBlock.BlockMeta().Version == v2.VersionString { + f, enc, err := i.writer.WAL().NewFile(i.headBlock.BlockMeta().BlockID, i.instanceID, searchDir) + if err != nil { + return err + } - b, err := search.NewStreamingSearchBlockForFile(f, i.headBlock.BlockMeta().BlockID, enc) - if err != nil { - return err - } - if i.searchHeadBlock != nil { - i.searchAppendBlocks[oldHeadBlock.BlockMeta().BlockID.String()] = i.searchHeadBlock - } - i.searchHeadBlock = &searchStreamingBlockEntry{ - b: b, + b, err := search.NewStreamingSearchBlockForFile(f, i.headBlock.BlockMeta().BlockID, enc) + if err != nil { + return err + } + if i.searchHeadBlock != nil { + i.searchAppendBlocks[oldHeadBlock.BlockMeta().BlockID.String()] = i.searchHeadBlock + } + i.searchHeadBlock = &searchStreamingBlockEntry{ + b: b, + } } return nil } diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 3d55aa9e0fb..0559a7e82d7 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -51,7 +51,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem // deadlocking with other activity (ingest, flushing), caused by releasing // and then attempting to retake the lock. i.blocksMtx.RLock() - i.searchWAL(ctx, p, sr) + i.searchWAL(ctx, req, p, sr) i.searchLocalBlocks(ctx, req, p, sr) i.blocksMtx.RUnlock() @@ -143,7 +143,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr * } // searchWAL starts a search task for every WAL block. Must be called under lock. -func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search.Results) { +func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p search.Pipeline, sr *search.Results) { searchFunc := func(e *searchStreamingBlockEntry) { span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchWAL") defer span.Finish() @@ -162,11 +162,43 @@ func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search. } } + searchWalBlock := func(b common.WALBlock) { + span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchWALBlock", opentracing.Tags{ + "blockID": b.BlockMeta().BlockID, + }) + defer span.Finish() + defer sr.FinishWorker() + + resp, err := b.Search(ctx, req, common.DefaultSearchOptions()) + if err != nil { + level.Error(log.Logger).Log("msg", "error searching wal block", "blockID", b.BlockMeta().BlockID.String(), "err", err) + return + } + + sr.AddBlockInspected() + sr.AddBytesInspected(resp.Metrics.InspectedBytes) + for _, r := range resp.Traces { + sr.AddResult(ctx, r) + } + } + // head block - sr.StartWorker() - go searchFunc(i.searchHeadBlock) + if i.headBlock != nil { + sr.StartWorker() + go searchWalBlock(i.headBlock) + } + + if i.searchHeadBlock != nil { + sr.StartWorker() + go searchFunc(i.searchHeadBlock) + } // completing blocks + for _, b := range i.completingBlocks { + sr.StartWorker() + go searchWalBlock(b) + } + for _, e := range i.searchAppendBlocks { sr.StartWorker() go searchFunc(e) @@ -223,10 +255,7 @@ func (i *instance) searchLocalBlocks(ctx context.Context, req *tempopb.SearchReq span.LogFields(ot_log.Event("local block entry mtx acquired")) span.SetTag("blockID", blockID) - resp, err := e.Search(ctx, req, common.SearchOptions{ - ReadBufferCount: 32, - ReadBufferSize: 1024 * 1024, - }) + resp, err := e.Search(ctx, req, common.DefaultSearchOptions()) if err != nil { level.Error(log.Logger).Log("msg", "error searching local block", "blockID", blockID, "err", err) return @@ -286,10 +315,7 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, continue } - err = b.SearchTags(ctx, distinctValues.Collect, common.SearchOptions{ - ReadBufferCount: 32, - ReadBufferSize: 1024 * 1024, - }) + err = b.SearchTags(ctx, distinctValues.Collect, common.DefaultSearchOptions()) if err == common.ErrUnsupported { level.Warn(log.Logger).Log("msg", "block does not support tag search", "blockID", b.BlockMeta().BlockID) continue @@ -358,10 +384,7 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop continue } - err = b.SearchTagValues(ctx, tagName, distinctValues.Collect, common.SearchOptions{ - ReadBufferCount: 32, - ReadBufferSize: 1024 * 1024, - }) + err = b.SearchTagValues(ctx, tagName, distinctValues.Collect, common.DefaultSearchOptions()) if err == common.ErrUnsupported { level.Warn(log.Logger).Log("msg", "block does not support tag value search", "blockID", b.BlockMeta().BlockID) continue @@ -429,12 +452,14 @@ func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visitFn func(bl return visitFn(entry.b) } - err := visitUnderLock(i.searchHeadBlock) - if err != nil { - return err - } - if err := ctx.Err(); err != nil { - return err + if i.searchHeadBlock != nil { + err := visitUnderLock(i.searchHeadBlock) + if err != nil { + return err + } + if err := ctx.Err(); err != nil { + return err + } } for _, b := range i.searchAppendBlocks { diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index cb292970209..9723f66be64 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -88,6 +88,9 @@ func TestInstanceSearch(t *testing.T) { assert.NoError(t, err) assert.Len(t, sr.Traces, len(ids)) checkEqual(t, ids, sr) + + err = ingester.stopping(nil) + require.NoError(t, err) }) } } @@ -153,7 +156,6 @@ func testSearchTagsAndValues(t *testing.T, ctx context.Context, i *instance, tag sort.Strings(srv.TagValues) assert.Contains(t, sr.TagNames, tagName) - assert.Equal(t, tagName, sr.TagNames[0]) assert.Equal(t, expectedTagValues, srv.TagValues) } diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index c51c18aa858..f44de54ff3f 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -374,6 +374,8 @@ func TestInstanceCutCompleteTraces(t *testing.T) { } func TestInstanceCutBlockIfReady(t *testing.T) { + dec := model.MustNewSegmentDecoder(model.CurrentEncoding) + tt := []struct { name string maxBlockLifetime time.Duration @@ -416,14 +418,16 @@ func TestInstanceCutBlockIfReady(t *testing.T) { instance, _ := defaultInstance(t) for i := 0; i < tc.pushCount; i++ { - request := makeRequest([]byte{}) - err := instance.PushBytesRequest(context.Background(), request) + tr := test.MakeTrace(1, uuid.Nil[:]) + bytes, err := dec.PrepareForWrite(tr, 0, 0) + require.NoError(t, err) + err = instance.PushBytes(context.Background(), uuid.Nil[:], bytes, nil) require.NoError(t, err) } // Defaults if tc.maxBlockBytes == 0 { - tc.maxBlockBytes = 5000 + tc.maxBlockBytes = 100000 } if tc.maxBlockLifetime == 0 { tc.maxBlockLifetime = time.Hour @@ -688,7 +692,7 @@ func makeRequestWithByteLimit(maxBytes int, traceID []byte) *tempopb.PushBytesRe batch := test.MakeBatch(1, traceID) for batch.Size() < maxBytes { - batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpan(traceID)) + batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpanWithAttributeCount(traceID, 0)) } return makePushBytesRequest(traceID, batch) diff --git a/modules/storage/config.go b/modules/storage/config.go index 780767c10ff..b63220caec9 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/tempo/tempodb/backend/s3" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" + v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/grafana/tempo/tempodb/pool" "github.com/grafana/tempo/tempodb/wal" ) @@ -36,6 +37,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.WAL = &wal.Config{} f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") + cfg.Trace.WAL.Version = v2.VersionString cfg.Trace.WAL.Encoding = backend.EncSnappy cfg.Trace.WAL.SearchEncoding = backend.EncNone cfg.Trace.WAL.IngestionSlack = 2 * time.Minute diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index 64484a960be..8816db05a73 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -1,7 +1,9 @@ package test import ( + "encoding/json" "math/rand" + "testing" "time" "github.com/gogo/protobuf/proto" @@ -9,21 +11,36 @@ import ( v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" v1_resource "github.com/grafana/tempo/pkg/tempopb/resource/v1" v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/stretchr/testify/require" ) func MakeSpan(traceID []byte) *v1_trace.Span { + return MakeSpanWithAttributeCount(traceID, rand.Int()%10+1) +} + +func MakeSpanWithAttributeCount(traceID []byte, count int) *v1_trace.Span { + attributes := make([]*v1_common.KeyValue, 0, count) + for i := 0; i < count; i++ { + attributes = append(attributes, &v1_common.KeyValue{ + Key: RandomString(), + Value: &v1_common.AnyValue{Value: &v1_common.AnyValue_StringValue{StringValue: RandomString()}}, + }) + } + now := time.Now() s := &v1_trace.Span{ - Name: "test", - TraceId: traceID, - SpanId: make([]byte, 8), - Kind: v1_trace.Span_SPAN_KIND_CLIENT, + Name: "test", + TraceId: traceID, + SpanId: make([]byte, 8), + ParentSpanId: make([]byte, 8), + Kind: v1_trace.Span_SPAN_KIND_CLIENT, Status: &v1_trace.Status{ Code: 1, Message: "OK", }, StartTimeUnixNano: uint64(now.UnixNano()), EndTimeUnixNano: uint64(now.Add(time.Second).UnixNano()), + Attributes: attributes, DroppedLinksCount: rand.Uint32(), DroppedAttributesCount: rand.Uint32(), } @@ -188,3 +205,12 @@ func RandomString() string { } return string(s) } + +func TracesEqual(t *testing.T, t1 *tempopb.Trace, t2 *tempopb.Trace) { + if !proto.Equal(t1, t2) { + wantJSON, _ := json.MarshalIndent(t1, "", " ") + gotJSON, _ := json.MarshalIndent(t2, "", " ") + + require.Equal(t, string(wantJSON), string(gotJSON)) + } +} diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index bb6d5cd355d..49856498673 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -93,6 +93,7 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -237,6 +238,7 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -378,6 +380,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -447,6 +450,7 @@ func TestCompactionMetrics(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -519,6 +523,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -590,6 +595,7 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, IngestionSlack: time.Since(time.Unix(0, 0)), // Let us use obvious start/end times below }, BlocklistPoll: 0, diff --git a/tempodb/config.go b/tempodb/config.go index e0d5c0a7ea9..94f0f991ec7 100644 --- a/tempodb/config.go +++ b/tempodb/config.go @@ -125,7 +125,12 @@ func validateConfig(cfg *Config) error { return errors.New("block config should be non-nil") } - err := common.ValidateConfig(cfg.Block) + err := wal.ValidateConfig(cfg.WAL) + if err != nil { + return fmt.Errorf("wal config validation failed: %w", err) + } + + err = common.ValidateConfig(cfg.Block) if err != nil { return fmt.Errorf("block config validation failed: %w", err) } diff --git a/tempodb/config_test.go b/tempodb/config_test.go index aedefa7a803..5e7c6db5ad4 100644 --- a/tempodb/config_test.go +++ b/tempodb/config_test.go @@ -8,7 +8,7 @@ import ( ) func TestApplyToOptions(t *testing.T) { - opts := common.SearchOptions{} + opts := common.DefaultSearchOptions() cfg := SearchConfig{} // test defaults diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index ba5fa0d4f18..83a128329ab 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -42,6 +42,19 @@ type SearchOptions struct { CacheControl CacheControl } +// DefaultSearchOptions() is used in a lot of places such as local ingester searches. It is important +// in these cases to set a reasonable read buffer size and count to prevent constant tiny readranges +// against the local backend. +// TODO: Note that there is another method of creating "default search options" that looks like this: +// tempodb.SearchConfig{}.ApplyToOptions(&searchOpts). we should consolidate these. +func DefaultSearchOptions() SearchOptions { + return SearchOptions{ + ReadBufferCount: 32, + ReadBufferSize: 1024 * 1024, + ChunkSizeBytes: 4 * 1024 * 1024, + } +} + type Compactor interface { Compact(ctx context.Context, l log.Logger, r backend.Reader, writerCallback func(*backend.BlockMeta, time.Time) backend.Writer, inputs []*backend.BlockMeta) ([]*backend.BlockMeta, error) } @@ -77,8 +90,9 @@ type WALBlock interface { BackendBlock Append(id ID, b []byte, start, end uint32) error + Flush() error + DataLength() uint64 - Length() int Iterator() (Iterator, error) Clear() error } diff --git a/tempodb/encoding/common/types.go b/tempodb/encoding/common/types.go index ed7774762aa..40aa4b2f0de 100644 --- a/tempodb/encoding/common/types.go +++ b/tempodb/encoding/common/types.go @@ -1,7 +1,79 @@ package common +import ( + "bytes" + "hash" + "hash/fnv" + "sort" +) + // This file contains types that need to be referenced by both the ./encoding and ./encoding/vX packages. // It primarily exists here to break dependency loops. // ID in TempoDB type ID []byte + +type idMapEntry[T any] struct { + id ID + entry T +} + +// IDMap is a helper for recording and checking for IDs. Not safe for concurrent use. +type IDMap[T any] struct { + m map[uint64]idMapEntry[T] + h hash.Hash64 +} + +func NewIDMap[T any]() *IDMap[T] { + return &IDMap[T]{ + m: map[uint64]idMapEntry[T]{}, + h: fnv.New64(), + } +} + +// tokenForID returns a token for use in a hash map given a span id and span kind +// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function +// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique +// as it is shared between client and server spans. +func (m *IDMap[T]) tokenFor(id ID) uint64 { + m.h.Reset() + _, _ = m.h.Write(id) + return m.h.Sum64() +} + +func (m *IDMap[T]) Set(id ID, val T) { + m.m[m.tokenFor(id)] = idMapEntry[T]{id, val} +} + +func (m *IDMap[T]) Has(id ID) bool { + _, ok := m.m[m.tokenFor(id)] + return ok +} + +func (m *IDMap[T]) Get(id ID) (T, bool) { + v, ok := m.m[m.tokenFor(id)] + return v.entry, ok +} + +func (m *IDMap[T]) Len() int { + return len(m.m) +} + +func (m *IDMap[T]) ValuesSortedByID() []T { + // Copy and sort entries by ID + entries := make([]idMapEntry[T], 0, len(m.m)) + for _, e := range m.m { + entries = append(entries, e) + } + sort.Slice(entries, func(i, j int) bool { + return bytes.Compare(entries[i].id, entries[j].id) == -1 + }) + + // Copy sorted values + values := make([]T, 0, len(entries)) + for _, e := range entries { + values = append(values, e.entry) + } + + return values +} diff --git a/tempodb/encoding/v2/appender_record.go b/tempodb/encoding/v2/appender_record.go index 4d2473b8d55..9960b142d70 100644 --- a/tempodb/encoding/v2/appender_record.go +++ b/tempodb/encoding/v2/appender_record.go @@ -53,7 +53,12 @@ func (a *recordAppender) Length() int { } func (a *recordAppender) DataLength() uint64 { - return 0 + if len(a.records) == 0 { + return 0 + } + + lastRecord := a.records[len(a.records)-1] + return lastRecord.Start + uint64(lastRecord.Length) } func (a *recordAppender) Complete() error { diff --git a/tempodb/encoding/v2/create_block.go b/tempodb/encoding/v2/create_block.go index 5e55f65d534..c602228edac 100644 --- a/tempodb/encoding/v2/create_block.go +++ b/tempodb/encoding/v2/create_block.go @@ -14,6 +14,11 @@ import ( const DefaultFlushSizeBytes int = 30 * 1024 * 1024 // 30 MiB func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.BlockMeta, i common.Iterator, to backend.Writer) (*backend.BlockMeta, error) { + // Default data encoding if needed + if meta.DataEncoding == "" { + meta.DataEncoding = model.CurrentEncoding + } + newBlock, err := NewStreamingBlock(cfg, meta.BlockID, meta.TenantID, []*backend.BlockMeta{meta}, meta.TotalObjects) if err != nil { return nil, errors.Wrap(err, "error creating streaming block") @@ -37,8 +42,13 @@ func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.Blo if err != nil || tr == nil { return nil, nil, err } - obj, err := dec.PrepareForWrite(tr, 0, 0) // start/end of the blockmeta are used + traceBytes, err := dec.PrepareForWrite(tr, 0, 0) // start/end of the blockmeta are used + if err != nil { + return nil, nil, err + } + + obj, err := dec.ToObject([][]byte{traceBytes}) return id, obj, err } } diff --git a/tempodb/encoding/v2/encoding.go b/tempodb/encoding/v2/encoding.go index a047953f79d..619196761a2 100644 --- a/tempodb/encoding/v2/encoding.go +++ b/tempodb/encoding/v2/encoding.go @@ -2,9 +2,11 @@ package v2 import ( "context" + "io/fs" "time" "github.com/google/uuid" + "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -36,10 +38,18 @@ func (v Encoding) CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta // OpenWALBlock opens an existing appendable block func (v Encoding) OpenWALBlock(filename string, path string, ingestionSlack time.Duration, additionalStartSlack time.Duration) (common.WALBlock, error, error) { - return newAppendBlockFromFile(filename, path, ingestionSlack, additionalStartSlack) + return openWALBlock(filename, path, ingestionSlack, additionalStartSlack) } // CreateWALBlock creates a new appendable block func (v Encoding) CreateWALBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { - return newAppendBlock(id, tenantID, filepath, e, dataEncoding, ingestionSlack) + // Default data encoding if needed + if dataEncoding == "" { + dataEncoding = model.CurrentEncoding + } + return createWALBlock(id, tenantID, filepath, e, dataEncoding, ingestionSlack) +} + +func (v Encoding) OwnsWALBlock(entry fs.DirEntry) bool { + return ownsWALBlock(entry) } diff --git a/tempodb/encoding/v2/append_block.go b/tempodb/encoding/v2/wal_block.go similarity index 80% rename from tempodb/encoding/v2/append_block.go rename to tempodb/encoding/v2/wal_block.go index 8c1609b6456..7d8906f1e5f 100644 --- a/tempodb/encoding/v2/append_block.go +++ b/tempodb/encoding/v2/wal_block.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io/fs" "math" "os" "path/filepath" @@ -24,11 +25,11 @@ import ( const maxDataEncodingLength = 32 -var _ common.WALBlock = (*v2AppendBlock)(nil) +var _ common.WALBlock = (*walBlock)(nil) -// v2AppendBlock is a block that is actively used to append new objects to. It stores all data in the appendFile +// walBlock is a block that is actively used to append new objects to. It stores all data in the appendFile // in the order it was received and an in memory sorted index. -type v2AppendBlock struct { +type walBlock struct { meta *backend.BlockMeta ingestionSlack time.Duration @@ -40,13 +41,13 @@ type v2AppendBlock struct { once sync.Once } -func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (*v2AppendBlock, error) { +func createWALBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { if strings.ContainsRune(dataEncoding, ':') || strings.ContainsRune(dataEncoding, '+') || len([]rune(dataEncoding)) > maxDataEncodingLength { return nil, fmt.Errorf("dataEncoding %s is invalid", dataEncoding) } - h := &v2AppendBlock{ + h := &walBlock{ meta: backend.NewBlockMeta(tenantID, id, VersionString, e, dataEncoding), filepath: filepath, ingestionSlack: ingestionSlack, @@ -70,16 +71,16 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.En return h, nil } -// newAppendBlockFromFile returns an AppendBlock that can not be appended to, but can +// openWALBlock returns an AppendBlock that can not be appended to, but can // be completed. It can return a warning or a fatal error -func newAppendBlockFromFile(filename string, path string, ingestionSlack time.Duration, additionalStartSlack time.Duration) (common.WALBlock, error, error) { +func openWALBlock(filename string, path string, ingestionSlack time.Duration, additionalStartSlack time.Duration) (common.WALBlock, error, error) { var warning error blockID, tenantID, version, e, dataEncoding, err := ParseFilename(filename) if err != nil { return nil, nil, fmt.Errorf("parsing wal filename: %w", err) } - b := &v2AppendBlock{ + b := &walBlock{ meta: backend.NewBlockMeta(tenantID, blockID, version, e, dataEncoding), filepath: path, ingestionSlack: ingestionSlack, @@ -131,9 +132,23 @@ func newAppendBlockFromFile(filename string, path string, ingestionSlack time.Du return b, warning, nil } +func ownsWALBlock(entry fs.DirEntry) bool { + // all v2 wal blocks are files + if entry.IsDir() { + return false + } + + _, _, version, _, _, err := ParseFilename(entry.Name()) + if err != nil { + return false + } + + return version == VersionString +} + // Append adds an id and object to this wal block. start/end should indicate the time range // associated with the past object. They are unix epoch seconds. -func (a *v2AppendBlock) Append(id common.ID, b []byte, start, end uint32) error { +func (a *walBlock) Append(id common.ID, b []byte, start, end uint32) error { err := a.appender.Append(id, b) if err != nil { return err @@ -143,20 +158,20 @@ func (a *v2AppendBlock) Append(id common.ID, b []byte, start, end uint32) error return nil } -func (a *v2AppendBlock) DataLength() uint64 { - return a.appender.DataLength() +func (a *walBlock) Flush() error { + return nil } -func (a *v2AppendBlock) Length() int { - return a.appender.Length() +func (a *walBlock) DataLength() uint64 { + return a.appender.DataLength() } -func (a *v2AppendBlock) BlockMeta() *backend.BlockMeta { +func (a *walBlock) BlockMeta() *backend.BlockMeta { return a.meta } // Iterator returns a common.Iterator that is secretly also a BytesIterator for use internally -func (a *v2AppendBlock) Iterator() (common.Iterator, error) { +func (a *walBlock) Iterator() (common.Iterator, error) { combiner := model.StaticCombiner if a.appendFile != nil { @@ -187,7 +202,7 @@ func (a *v2AppendBlock) Iterator() (common.Iterator, error) { return iterator.(*dedupingIterator), nil } -func (a *v2AppendBlock) Clear() error { +func (a *walBlock) Clear() error { if a.readFile != nil { _ = a.readFile.Close() a.readFile = nil @@ -206,8 +221,8 @@ func (a *v2AppendBlock) Clear() error { } // Find implements common.Finder -func (a *v2AppendBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "v2AppendBlock.FindTraceByID") +func (a *walBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) { + span, _ := opentracing.StartSpanFromContext(ctx, "v2WalBlock.FindTraceByID") defer span.Finish() combiner := model.StaticCombiner @@ -246,26 +261,26 @@ func (a *v2AppendBlock) FindTraceByID(ctx context.Context, id common.ID, opts co } // Search implements common.Searcher -func (a *v2AppendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts common.SearchOptions) (*tempopb.SearchResponse, error) { +func (a *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts common.SearchOptions) (*tempopb.SearchResponse, error) { return nil, common.ErrUnsupported } // Search implements common.Searcher -func (a *v2AppendBlock) SearchTags(ctx context.Context, cb common.TagCallback, opts common.SearchOptions) error { +func (a *walBlock) SearchTags(ctx context.Context, cb common.TagCallback, opts common.SearchOptions) error { return common.ErrUnsupported } // SearchTagValues implements common.Searcher -func (a *v2AppendBlock) SearchTagValues(ctx context.Context, tag string, cb common.TagCallback, opts common.SearchOptions) error { +func (a *walBlock) SearchTagValues(ctx context.Context, tag string, cb common.TagCallback, opts common.SearchOptions) error { return common.ErrUnsupported } // Fetch implements traceql.SpansetFetcher -func (a *v2AppendBlock) Fetch(context.Context, traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { +func (a *walBlock) Fetch(context.Context, traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { return traceql.FetchSpansResponse{}, common.ErrUnsupported } -func (a *v2AppendBlock) fullFilename() string { +func (a *walBlock) fullFilename() string { filename := a.fullFilenameSeparator("+") _, e1 := os.Stat(filename) if errors.Is(e1, os.ErrNotExist) { @@ -279,7 +294,7 @@ func (a *v2AppendBlock) fullFilename() string { return filename } -func (a *v2AppendBlock) fullFilenameSeparator(separator string) string { +func (a *walBlock) fullFilenameSeparator(separator string) string { if a.meta.Version == "v0" { return filepath.Join(a.filepath, fmt.Sprintf("%v%v%v", a.meta.BlockID, separator, a.meta.TenantID)) } @@ -294,7 +309,7 @@ func (a *v2AppendBlock) fullFilenameSeparator(separator string) string { return filepath.Join(a.filepath, filename) } -func (a *v2AppendBlock) file() (*os.File, error) { +func (a *walBlock) file() (*os.File, error) { var err error a.once.Do(func() { if a.readFile == nil { @@ -307,7 +322,7 @@ func (a *v2AppendBlock) file() (*os.File, error) { return a.readFile, err } -func (a *v2AppendBlock) adjustTimeRangeForSlack(start uint32, end uint32, additionalStartSlack time.Duration) (uint32, uint32) { +func (a *walBlock) adjustTimeRangeForSlack(start uint32, end uint32, additionalStartSlack time.Duration) (uint32, uint32) { now := time.Now() startOfRange := uint32(now.Add(-a.ingestionSlack).Add(-additionalStartSlack).Unix()) endOfRange := uint32(now.Add(a.ingestionSlack).Unix()) diff --git a/tempodb/encoding/v2/append_block_replay.go b/tempodb/encoding/v2/wal_block_replay.go similarity index 100% rename from tempodb/encoding/v2/append_block_replay.go rename to tempodb/encoding/v2/wal_block_replay.go diff --git a/tempodb/encoding/v2/append_block_test.go b/tempodb/encoding/v2/wal_block_test.go similarity index 96% rename from tempodb/encoding/v2/append_block_test.go rename to tempodb/encoding/v2/wal_block_test.go index 95f733144dc..8e3e358bfab 100644 --- a/tempodb/encoding/v2/append_block_test.go +++ b/tempodb/encoding/v2/wal_block_test.go @@ -19,15 +19,18 @@ import ( "github.com/stretchr/testify/require" ) +// Note: Standard wal block functionality (appending, searching, finding, etc.) is tested with all other wal blocks +// in /tempodb/wal/wal_test.go + func TestFullFilename(t *testing.T) { tests := []struct { name string - b *v2AppendBlock + b *walBlock expected string }{ { name: "legacy", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v0", backend.EncNone, ""), filepath: "/blerg", }, @@ -35,7 +38,7 @@ func TestFullFilename(t *testing.T) { }, { name: "ez-mode", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone, ""), filepath: "/blerg", }, @@ -43,7 +46,7 @@ func TestFullFilename(t *testing.T) { }, { name: "nopath", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone, ""), filepath: "", }, @@ -51,7 +54,7 @@ func TestFullFilename(t *testing.T) { }, { name: "gzip", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncGZIP, ""), filepath: "", }, @@ -59,7 +62,7 @@ func TestFullFilename(t *testing.T) { }, { name: "lz41M", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_1M, ""), filepath: "", }, @@ -67,7 +70,7 @@ func TestFullFilename(t *testing.T) { }, { name: "lz4256k", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_256k, ""), filepath: "", }, @@ -75,7 +78,7 @@ func TestFullFilename(t *testing.T) { }, { name: "lz4M", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_4M, ""), filepath: "", }, @@ -83,7 +86,7 @@ func TestFullFilename(t *testing.T) { }, { name: "lz64k", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_64k, ""), filepath: "", }, @@ -91,7 +94,7 @@ func TestFullFilename(t *testing.T) { }, { name: "snappy", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncSnappy, ""), filepath: "", }, @@ -99,7 +102,7 @@ func TestFullFilename(t *testing.T) { }, { name: "zstd", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncZstd, ""), filepath: "", }, @@ -107,7 +110,7 @@ func TestFullFilename(t *testing.T) { }, { name: "data encoding", - b: &v2AppendBlock{ + b: &walBlock{ meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone, "dataencoding"), filepath: "/blerg", }, @@ -124,7 +127,7 @@ func TestFullFilename(t *testing.T) { } func TestAdjustTimeRangeForSlack(t *testing.T) { - a := &v2AppendBlock{ + a := &walBlock{ meta: &backend.BlockMeta{ TenantID: "test", }, @@ -164,7 +167,7 @@ func TestAdjustTimeRangeForSlack(t *testing.T) { func TestPartialBlock(t *testing.T) { blockID := uuid.New() - block, err := newAppendBlock(blockID, testTenantID, t.TempDir(), backend.EncSnappy, "v2", 0) + block, err := createWALBlock(blockID, testTenantID, t.TempDir(), backend.EncSnappy, "v2", 0) require.NoError(t, err, "unexpected error creating block") enc := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -191,7 +194,7 @@ func TestPartialBlock(t *testing.T) { } // append garbage data - v2Block := block + v2Block := block.(*walBlock) garbo := make([]byte, 100) _, err = rand.Read(garbo) require.NoError(t, err) diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index f4fd5f88c0e..08390ec71d5 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -3,6 +3,7 @@ package encoding import ( "context" "fmt" + "io/fs" "time" "github.com/google/uuid" @@ -43,6 +44,9 @@ type VersionedEncoding interface { // CreateWALBlock creates a new appendable block for the WAL CreateWALBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) + + // OwnsWALBlock indicates if this encoding owns the WAL block + OwnsWALBlock(entry fs.DirEntry) bool } // FromVersion returns a versioned encoding for the provided string @@ -62,8 +66,8 @@ func DefaultEncoding() VersionedEncoding { return vparquet.Encoding{} } -// allEncodings returns all encodings -func allEncodings() []VersionedEncoding { +// AllEncodings returns all encodings +func AllEncodings() []VersionedEncoding { return []VersionedEncoding{ v2.Encoding{}, vparquet.Encoding{}, diff --git a/tempodb/encoding/versioned_test.go b/tempodb/encoding/versioned_test.go index a8012b13143..b3ad366ab21 100644 --- a/tempodb/encoding/versioned_test.go +++ b/tempodb/encoding/versioned_test.go @@ -14,7 +14,7 @@ func TestFromVersionErrors(t *testing.T) { } func TestAllVersions(t *testing.T) { - for _, v := range allEncodings() { + for _, v := range AllEncodings() { encoding, err := FromVersion(v.Version()) require.Equal(t, v.Version(), encoding.Version()) diff --git a/tempodb/encoding/vparquet/block_findtracebyid.go b/tempodb/encoding/vparquet/block_findtracebyid.go index 59accd1e24e..e30f73767c6 100644 --- a/tempodb/encoding/vparquet/block_findtracebyid.go +++ b/tempodb/encoding/vparquet/block_findtracebyid.go @@ -7,7 +7,6 @@ import ( "io" "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/segmentio/parquet-go" "github.com/willf/bloom" @@ -16,6 +15,7 @@ import ( pq "github.com/grafana/tempo/pkg/parquetquery" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -76,9 +76,12 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt } defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) - //fmt.Println("read bytes:", rr.TotalBytesRead.Load()) }() + return findTraceByID(derivedCtx, traceID, b.meta, pf) +} + +func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File) (*tempopb.Trace, error) { // traceID column index colIndex, _ := pq.GetColumnIndexByPath(pf, TraceIDColumnName) if colIndex == -1 { @@ -117,7 +120,7 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt return nil, err } if c < 1 { - return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), b.meta.BlockID, rgIdx) + return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), meta.BlockID, rgIdx) } min = buf[0].ByteArray() @@ -163,7 +166,7 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt } // Now iterate the matching row group - iter := parquetquery.NewColumnIterator(derivedCtx, pf.RowGroups()[rowGroup:rowGroup+1], colIndex, "", 1000, parquetquery.NewStringInPredicate([]string{string(traceID)}), "") + iter := parquetquery.NewColumnIterator(ctx, pf.RowGroups()[rowGroup:rowGroup+1], colIndex, "", 1000, parquetquery.NewStringInPredicate([]string{string(traceID)}), "") defer iter.Close() res, err := iter.Next() @@ -190,16 +193,12 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt return nil, errors.Wrap(err, "seek to row") } - span.LogFields(log.Message("seeked to row"), log.Int64("row", rowMatch)) - tr := new(Trace) err = r.Read(tr) if err != nil { return nil, errors.Wrap(err, "error reading row from backend") } - span.LogFields(log.Message("read trace")) - // convert to proto trace and return return parquetTraceToTempopbTrace(tr), nil } diff --git a/tempodb/encoding/vparquet/block_findtracebyid_test.go b/tempodb/encoding/vparquet/block_findtracebyid_test.go index 3cb326b79bd..269fb80dc78 100644 --- a/tempodb/encoding/vparquet/block_findtracebyid_test.go +++ b/tempodb/encoding/vparquet/block_findtracebyid_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/google/uuid" + "github.com/segmentio/parquet-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -96,7 +97,7 @@ func TestBackendBlockFindTraceByID(t *testing.T) { for _, tr := range traces { wantProto := parquetTraceToTempopbTrace(tr) - gotProto, err := b.FindTraceByID(ctx, tr.TraceID, common.SearchOptions{}) + gotProto, err := b.FindTraceByID(ctx, tr.TraceID, common.DefaultSearchOptions()) require.NoError(t, err) require.Equal(t, wantProto, gotProto) } @@ -120,21 +121,23 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) { b := newBackendBlock(meta, r) - iter, err := b.Iterator(context.Background()) + iter, err := b.RawIterator(context.Background(), newRowPool(10)) require.NoError(t, err) + sch := parquet.SchemaOf(new(Trace)) for { - tr, err := iter.Next(context.Background()) + _, row, err := iter.Next(context.Background()) require.NoError(t, err) - if tr == nil { + if row == nil { break } - // fmt.Println(tr) - // fmt.Println("going to search for traceID", util.TraceIDToHexString(tr.TraceID)) + tr := &Trace{} + err = sch.Reconstruct(tr, row) + require.NoError(t, err) - protoTr, err := b.FindTraceByID(ctx, tr.TraceID, common.SearchOptions{}) + protoTr, err := b.FindTraceByID(ctx, tr.TraceID, common.DefaultSearchOptions()) require.NoError(t, err) require.NotNil(t, protoTr) } @@ -165,7 +168,7 @@ func BenchmarkFindTraceByID(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - tr, err := block.FindTraceByID(ctx, traceID, defaultSearchOptions()) + tr, err := block.FindTraceByID(ctx, traceID, common.DefaultSearchOptions()) require.NoError(b, err) require.NotNil(b, tr) } diff --git a/tempodb/encoding/vparquet/block_iterator.go b/tempodb/encoding/vparquet/block_iterator.go index cf0b3ff1e54..03a6779aae0 100644 --- a/tempodb/encoding/vparquet/block_iterator.go +++ b/tempodb/encoding/vparquet/block_iterator.go @@ -28,15 +28,6 @@ func (b *backendBlock) open(ctx context.Context) (*parquet.File, *parquet.Reader return pf, r, nil } -func (b *backendBlock) Iterator(ctx context.Context) (Iterator, error) { - _, r, err := b.open(ctx) - if err != nil { - return nil, err - } - - return &blockIterator{blockID: b.meta.BlockID.String(), r: r}, nil -} - func (b *backendBlock) RawIterator(ctx context.Context, pool *rowPool) (*rawIterator, error) { pf, r, err := b.open(ctx) if err != nil { @@ -51,27 +42,6 @@ func (b *backendBlock) RawIterator(ctx context.Context, pool *rowPool) (*rawIter return &rawIterator{b.meta.BlockID.String(), r, traceIDIndex, pool}, nil } -type blockIterator struct { - blockID string - r *parquet.Reader //nolint:all //deprecated -} - -func (i *blockIterator) Next(context.Context) (*Trace, error) { - t := &Trace{} - switch err := i.r.Read(t); err { - case nil: - return t, nil - case io.EOF: - return nil, nil - default: - return nil, errors.Wrap(err, fmt.Sprintf("error iterating through block %s", i.blockID)) - } -} - -func (i *blockIterator) Close() { - // parquet reader is shared, lets not close it here -} - type rawIterator struct { blockID string r *parquet.Reader //nolint:all //deprecated diff --git a/tempodb/encoding/vparquet/block_iterator_test.go b/tempodb/encoding/vparquet/block_iterator_test.go index dd591b4c254..9f701bacf5f 100644 --- a/tempodb/encoding/vparquet/block_iterator_test.go +++ b/tempodb/encoding/vparquet/block_iterator_test.go @@ -10,7 +10,7 @@ import ( "github.com/grafana/tempo/tempodb/backend/local" ) -func TestIteratorReadsAllRows(t *testing.T) { +func TestRawIteratorReadsAllRows(t *testing.T) { rawR, _, _, err := local.New(&local.Config{ Path: "./test-data", }) @@ -28,13 +28,13 @@ func TestIteratorReadsAllRows(t *testing.T) { b := newBackendBlock(meta, r) - iter, err := b.Iterator(context.Background()) + iter, err := b.RawIterator(context.Background(), newRowPool(10)) require.NoError(t, err) defer iter.Close() actualCount := 0 for { - tr, err := iter.Next(context.Background()) + _, tr, err := iter.Next(context.Background()) if tr == nil { break } diff --git a/tempodb/encoding/vparquet/block_search.go b/tempodb/encoding/vparquet/block_search.go index 559fc66573c..731e37b6095 100644 --- a/tempodb/encoding/vparquet/block_search.go +++ b/tempodb/encoding/vparquet/block_search.go @@ -147,6 +147,10 @@ func (b *backendBlock) SearchTags(ctx context.Context, cb common.TagCallback, op } defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }() + return searchTags(derivedCtx, cb, pf) +} + +func searchTags(_ context.Context, cb common.TagCallback, pf *parquet.File) error { // find indexes of generic attribute columns resourceKeyIdx, _ := pq.GetColumnIndexByPath(pf, FieldResourceAttrKey) spanKeyIdx, _ := pq.GetColumnIndexByPath(pf, FieldSpanAttrKey) @@ -170,6 +174,7 @@ func (b *backendBlock) SearchTags(ctx context.Context, cb common.TagCallback, op } // now search all row groups + var err error rgs := pf.RowGroups() for _, rg := range rgs { // search all special attributes @@ -264,18 +269,22 @@ func (b *backendBlock) SearchTagValues(ctx context.Context, tag string, cb commo } defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }() + return searchTagValues(ctx, tag, cb, pf) +} + +func searchTagValues(ctx context.Context, tag string, cb common.TagCallback, pf *parquet.File) error { // labelMappings will indicate whether this is a search for a special or standard // column column := labelMappings[tag] if column == "" { - err = searchStandardTagValues(ctx, tag, pf, cb) + err := searchStandardTagValues(ctx, tag, pf, cb) if err != nil { return fmt.Errorf("unexpected error searching standard tags: %w", err) } return nil } - err = searchSpecialTagValues(ctx, column, pf, cb) + err := searchSpecialTagValues(ctx, column, pf, cb) if err != nil { return fmt.Errorf("unexpected error searching special tags: %w", err) } diff --git a/tempodb/encoding/vparquet/block_search_test.go b/tempodb/encoding/vparquet/block_search_test.go index ef9d5ca0b09..86afaf652ec 100644 --- a/tempodb/encoding/vparquet/block_search_test.go +++ b/tempodb/encoding/vparquet/block_search_test.go @@ -175,7 +175,7 @@ func TestBackendBlockSearch(t *testing.T) { } for _, req := range searchesThatMatch { - res, err := b.Search(ctx, req, defaultSearchOptions()) + res, err := b.Search(ctx, req, common.DefaultSearchOptions()) require.NoError(t, err) meta := findInResults(expected.TraceID, res.Traces) @@ -221,7 +221,7 @@ func TestBackendBlockSearch(t *testing.T) { }, } for _, req := range searchesThatDontMatch { - res, err := b.Search(ctx, req, defaultSearchOptions()) + res, err := b.Search(ctx, req, common.DefaultSearchOptions()) require.NoError(t, err) meta := findInResults(expected.TraceID, res.Traces) require.Nil(t, meta, req) @@ -239,7 +239,7 @@ func TestBackendBlockSearchTags(t *testing.T) { } ctx := context.Background() - err := block.SearchTags(ctx, cb, defaultSearchOptions()) + err := block.SearchTags(ctx, cb, common.DefaultSearchOptions()) require.NoError(t, err) // test that all attrs are in found attrs @@ -261,7 +261,7 @@ func TestBackendBlockSearchTagValues(t *testing.T) { assert.Equal(t, val, s, tag) } - err := block.SearchTagValues(ctx, tag, cb, defaultSearchOptions()) + err := block.SearchTagValues(ctx, tag, cb, common.DefaultSearchOptions()) require.NoError(t, err) require.True(t, wasCalled, tag) } @@ -304,14 +304,6 @@ func makeBackendBlockWithTraces(t *testing.T, trs []*Trace) *backendBlock { return b } -func defaultSearchOptions() common.SearchOptions { - return common.SearchOptions{ - ChunkSizeBytes: 1_000_000, - ReadBufferCount: 8, - ReadBufferSize: 4 * 1024 * 1024, - } -} - func makeTraces() ([]*Trace, map[string]string) { traces := []*Trace{} attrVals := make(map[string]string) @@ -427,7 +419,7 @@ func BenchmarkBackendBlockSearchTraces(b *testing.B) { block := newBackendBlock(meta, rr) - opts := defaultSearchOptions() + opts := common.DefaultSearchOptions() opts.StartPage = 10 opts.TotalPages = 10 @@ -467,7 +459,7 @@ func BenchmarkBackendBlockSearchTags(b *testing.B) { require.NoError(b, err) block := newBackendBlock(meta, rr) - opts := defaultSearchOptions() + opts := common.DefaultSearchOptions() d := util.NewDistinctStringCollector(1_000_000) b.ResetTimer() @@ -498,7 +490,7 @@ func BenchmarkBackendBlockSearchTagValues(b *testing.B) { require.NoError(b, err) block := newBackendBlock(meta, rr) - opts := defaultSearchOptions() + opts := common.DefaultSearchOptions() for _, tc := range testCases { b.Run(tc, func(b *testing.B) { diff --git a/tempodb/encoding/vparquet/block_traceql.go b/tempodb/encoding/vparquet/block_traceql.go index e08dc1afcd7..a1d31a92592 100644 --- a/tempodb/encoding/vparquet/block_traceql.go +++ b/tempodb/encoding/vparquet/block_traceql.go @@ -97,7 +97,7 @@ func (b *backendBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest) } // TODO - route global search options here - pf, _, err := b.openForSearch(ctx, common.SearchOptions{}) + pf, _, err := b.openForSearch(ctx, common.DefaultSearchOptions()) if err != nil { return traceql.FetchSpansResponse{}, err } @@ -181,6 +181,33 @@ func (i *spansetIterator) Next(ctx context.Context) (*traceql.Spanset, error) { return spanset, nil } +// mergeSpansetIterator iterates through a slice of spansetIterators exhausting them +// in order +type mergeSpansetIterator struct { + iters []*spansetIterator + cur int +} + +var _ traceql.SpansetIterator = (*mergeSpansetIterator)(nil) + +func (i *mergeSpansetIterator) Next(ctx context.Context) (*traceql.Spanset, error) { + if i.cur >= len(i.iters) { + return nil, nil + } + + iter := i.iters[i.cur] + spanset, err := iter.Next(ctx) + if err != nil { + return nil, err + } + if spanset == nil { + i.cur++ + return i.Next(ctx) + } + + return spanset, nil +} + // fetch is the core logic for executing the given conditions against the parquet columns. The algorithm // can be summarized as a hiearchy of iterators where we iterate related columns together and collect the results // at each level into attributes, spans, and spansets. Each condition (.foo=bar) is pushed down to the one or more diff --git a/tempodb/encoding/vparquet/compactor.go b/tempodb/encoding/vparquet/compactor.go index cbd55f5558c..dd73c8ac9cb 100644 --- a/tempodb/encoding/vparquet/compactor.go +++ b/tempodb/encoding/vparquet/compactor.go @@ -35,7 +35,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, totalRecords int minBlockStart time.Time maxBlockEnd time.Time - bookmarks = make([]*bookmark, 0, len(inputs)) + bookmarks = make([]*bookmark[parquet.Row], 0, len(inputs)) // MaxBytesPerTrace is the largest trace that can be expected, and assumes 1 byte per value on average (same as flushing). // Divide by 4 to presumably require 2 slice allocations if we ever see a trace this large pool = newRowPool(c.opts.MaxBytesPerTrace / 4) @@ -64,7 +64,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, return nil, err } - bookmarks = append(bookmarks, newBookmark(iter)) + bookmarks = append(bookmarks, newBookmark[parquet.Row](iter)) } var ( @@ -258,48 +258,6 @@ func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l lo return nil } -type bookmark struct { - iter RawIterator - - currentID common.ID - currentObject parquet.Row - currentErr error -} - -func newBookmark(iter RawIterator) *bookmark { - return &bookmark{ - iter: iter, - } -} - -func (b *bookmark) current(ctx context.Context) ([]byte, parquet.Row, error) { - if b.currentErr != nil { - return nil, nil, b.currentErr - } - - if b.currentObject != nil { - return b.currentID, b.currentObject, nil - } - - b.currentID, b.currentObject, b.currentErr = b.iter.Next(ctx) - return b.currentID, b.currentObject, b.currentErr -} - -func (b *bookmark) done(ctx context.Context) bool { - _, obj, err := b.current(ctx) - - return obj == nil || err != nil -} - -func (b *bookmark) clear() { - b.currentID = nil - b.currentObject = nil -} - -func (b *bookmark) close() { - b.iter.Close() -} - type rowPool struct { pool sync.Pool } diff --git a/tempodb/encoding/vparquet/create.go b/tempodb/encoding/vparquet/create.go index 08ead01f66a..1d8d15d5f97 100644 --- a/tempodb/encoding/vparquet/create.go +++ b/tempodb/encoding/vparquet/create.go @@ -36,24 +36,46 @@ func (b *backendWriter) Close() error { func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.BlockMeta, i common.Iterator, r backend.Reader, to backend.Writer) (*backend.BlockMeta, error) { s := newStreamingBlock(ctx, cfg, meta, r, to, tempo_io.NewBufferedWriter) - trp := &Trace{} + var next func(context.Context) (common.ID, *Trace, error) + repool := false + if ii, ok := i.(*commonIterator); ok { + // Use interal iterator and avoid translation to/from proto + // TODO - Operate on parquet.Row for even better performance + next = ii.NextTrace + // TODO - Figure out what is wrong with pooling and reenable it + //repool = true + } else { + // Need to convert from proto->parquet obj + trp := &Trace{} + next = func(context.Context) (common.ID, *Trace, error) { + id, tr, err := i.Next(ctx) + if err == io.EOF || tr == nil { + return id, nil, err + } + + // Copy ID to allow it to escape the iterator. + id = append([]byte(nil), id...) + + trp = traceToParquet(id, tr, trp) + return id, trp, nil + } + } + for { - id, tr, err := i.Next(ctx) + _, tr, err := next(ctx) if err == io.EOF || tr == nil { break } - // Copy ID to allow it to escape the iterator. - id = append([]byte(nil), id...) - - trp = traceToParquet(id, tr, trp) - err = s.Add(trp, 0, 0) // start and end time of the wal meta are used. + err = s.Add(tr, 0, 0) // start and end time of the wal meta are used. if err != nil { return nil, err } - // Here we repurpose RowGroupSizeBytes as number of raw column values. - // This is a fairly close approximation. + if repool { + tracePoolPut(tr) + } + if s.EstimatedBufferedBytes() > cfg.RowGroupSizeBytes { _, err = s.Flush() if err != nil { diff --git a/tempodb/encoding/vparquet/encoding.go b/tempodb/encoding/vparquet/encoding.go index 44c7efba91c..7ba166cdb09 100644 --- a/tempodb/encoding/vparquet/encoding.go +++ b/tempodb/encoding/vparquet/encoding.go @@ -2,6 +2,7 @@ package vparquet import ( "context" + "io/fs" "time" "github.com/google/uuid" @@ -35,10 +36,14 @@ func (v Encoding) CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta // OpenWALBlock opens an existing appendable block func (v Encoding) OpenWALBlock(filename string, path string, ingestionSlack time.Duration, additionalStartSlack time.Duration) (common.WALBlock, error, error) { - panic("unsupported") + return openWALBlock(filename, path, ingestionSlack, additionalStartSlack) } // CreateWALBlock creates a new appendable block func (v Encoding) CreateWALBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { - panic("unsupported") + return createWALBlock(id, tenantID, filepath, e, dataEncoding, ingestionSlack) +} + +func (v Encoding) OwnsWALBlock(entry fs.DirEntry) bool { + return ownsWALBlock(entry) } diff --git a/tempodb/encoding/vparquet/interfaces.go b/tempodb/encoding/vparquet/interfaces.go index 4dd2855a9f0..7f3f076084a 100644 --- a/tempodb/encoding/vparquet/interfaces.go +++ b/tempodb/encoding/vparquet/interfaces.go @@ -7,8 +7,8 @@ import ( "github.com/segmentio/parquet-go" ) -type Iterator interface { - Next(context.Context) (*Trace, error) +type TraceIterator interface { + NextTrace(context.Context) (common.ID, *Trace, error) Close() } diff --git a/tempodb/encoding/vparquet/multiblock_iterator.go b/tempodb/encoding/vparquet/multiblock_iterator.go index 7d752d08ff7..74b18cfa8dd 100644 --- a/tempodb/encoding/vparquet/multiblock_iterator.go +++ b/tempodb/encoding/vparquet/multiblock_iterator.go @@ -11,32 +11,33 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -type combineFn func([]parquet.Row) (parquet.Row, error) - -type MultiBlockIterator struct { - bookmarks []*bookmark - combine combineFn +type iteratable interface { + parquet.Row | *Trace } -var _ RawIterator = (*MultiBlockIterator)(nil) +type combineFn[T iteratable] func([]T) (T, error) + +type MultiBlockIterator[T iteratable] struct { + bookmarks []*bookmark[T] + combine combineFn[T] +} -func newMultiblockIterator(bookmarks []*bookmark, combine combineFn) *MultiBlockIterator { - return &MultiBlockIterator{ +func newMultiblockIterator[T iteratable](bookmarks []*bookmark[T], combine combineFn[T]) *MultiBlockIterator[T] { + return &MultiBlockIterator[T]{ bookmarks: bookmarks, combine: combine, } } -func (m *MultiBlockIterator) Next(ctx context.Context) (common.ID, parquet.Row, error) { - +func (m *MultiBlockIterator[T]) Next(ctx context.Context) (common.ID, T, error) { if m.done(ctx) { return nil, nil, io.EOF } var ( lowestID common.ID - lowestObjects []parquet.Row - lowestBookmarks []*bookmark + lowestObjects []T + lowestBookmarks []*bookmark[T] ) // find lowest ID of the new object @@ -78,13 +79,13 @@ func (m *MultiBlockIterator) Next(ctx context.Context) (common.ID, parquet.Row, return lowestID, lowestObject, nil } -func (m *MultiBlockIterator) Close() { +func (m *MultiBlockIterator[T]) Close() { for _, b := range m.bookmarks { b.close() } } -func (m *MultiBlockIterator) done(ctx context.Context) bool { +func (m *MultiBlockIterator[T]) done(ctx context.Context) bool { for _, b := range m.bookmarks { if !b.done(ctx) { return false @@ -92,3 +93,50 @@ func (m *MultiBlockIterator) done(ctx context.Context) bool { } return true } + +type bookmark[T iteratable] struct { + iter genericIterator[T] + + currentID common.ID + currentObject T + currentErr error +} + +func newBookmark[T iteratable](iter genericIterator[T]) *bookmark[T] { + return &bookmark[T]{ + iter: iter, + } +} + +func (b *bookmark[T]) current(ctx context.Context) ([]byte, T, error) { + if b.currentErr != nil { + return nil, nil, b.currentErr + } + + if b.currentObject != nil { + return b.currentID, b.currentObject, nil + } + + b.currentID, b.currentObject, b.currentErr = b.iter.Next(ctx) + return b.currentID, b.currentObject, b.currentErr +} + +func (b *bookmark[T]) done(ctx context.Context) bool { + _, obj, err := b.current(ctx) + + return obj == nil || err != nil +} + +func (b *bookmark[T]) clear() { + b.currentID = nil + b.currentObject = nil +} + +func (b *bookmark[T]) close() { + b.iter.Close() +} + +type genericIterator[T iteratable] interface { + Next(ctx context.Context) (common.ID, T, error) + Close() +} diff --git a/tempodb/encoding/vparquet/schema.go b/tempodb/encoding/vparquet/schema.go index bcbc725295b..5ba20ae74d1 100644 --- a/tempodb/encoding/vparquet/schema.go +++ b/tempodb/encoding/vparquet/schema.go @@ -485,7 +485,6 @@ func parquetToProtoEvents(parquetEvents []Event) []*v1_trace.Span_Event { } func parquetTraceToTempopbTrace(parquetTrace *Trace) *tempopb.Trace { - protoTrace := &tempopb.Trace{} protoTrace.Batches = make([]*v1_trace.ResourceSpans, 0, len(parquetTrace.ResourceSpans)) diff --git a/tempodb/encoding/vparquet/wal_block.go b/tempodb/encoding/vparquet/wal_block.go new file mode 100644 index 00000000000..d09bc254035 --- /dev/null +++ b/tempodb/encoding/vparquet/wal_block.go @@ -0,0 +1,597 @@ +package vparquet + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/grafana/tempo/pkg/model" + "github.com/grafana/tempo/pkg/model/trace" + "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/traceql" + "github.com/grafana/tempo/pkg/warnings" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/pkg/errors" + "github.com/segmentio/parquet-go" +) + +var _ common.WALBlock = (*walBlock)(nil) + +// path + filename = folder to create +// path/folder/00001 +// /00002 +// /00003 +// /00004 + +// folder = ++vParquet + +// openWALBlock opens an existing appendable block. It is read-only by +// not assigning a decoder. +func openWALBlock(filename string, path string, ingestionSlack time.Duration, _ time.Duration) (common.WALBlock, error, error) { // jpe what returns a warning? + dir := filepath.Join(path, filename) + _, _, version, err := parseName(filename) + if err != nil { + return nil, nil, err + } + + if version != VersionString { + return nil, nil, fmt.Errorf("mismatched version in vparquet wal: %s, %s, %s", version, path, filename) + } + + metaPath := filepath.Join(dir, backend.MetaName) + metaBytes, err := os.ReadFile(metaPath) + if err != nil { + return nil, nil, fmt.Errorf("error reading wal meta json: %s %w", metaPath, err) + } + + meta := &backend.BlockMeta{} + err = json.Unmarshal(metaBytes, meta) + if err != nil { + return nil, nil, fmt.Errorf("error unmarshaling wal meta json: %s %w", metaPath, err) + } + + b := &walBlock{ + meta: meta, + path: path, + ids: common.NewIDMap[int64](), + ingestionSlack: ingestionSlack, + } + + // read all files in dir + files, err := os.ReadDir(dir) + if err != nil { + return nil, nil, fmt.Errorf("error reading dir: %w", err) + } + + var warning error + for _, f := range files { + if f.Name() == backend.MetaName { + continue + } + + // Ignore 0-byte files which are pages that were + // opened but not flushed. + i, err := f.Info() + if err != nil { + return nil, nil, fmt.Errorf("error getting file info: %s %w", f.Name(), err) + } + if i.Size() == 0 { + continue + } + + // attempt to load in a parquet.file + pf, sz, err := openLocalParquetFile(filepath.Join(dir, f.Name())) + if err != nil { + warning = fmt.Errorf("error opening file: %w", err) + continue + } + + b.flushed = append(b.flushed, &walBlockFlush{ + file: pf, + ids: common.NewIDMap[int64](), + }) + b.flushedSize += sz + } + + // iterate through all files and build meta + for i, page := range b.flushed { + iter := makeIterFunc(context.Background(), page.file.RowGroups(), page.file)(columnPathTraceID, nil, columnPathTraceID) + defer iter.Close() + + for { + match, err := iter.Next() + if err != nil { + return nil, nil, fmt.Errorf("error opening wal folder [%s %d]: %w", b.meta.BlockID.String(), i, err) + } + if match == nil { + break + } + + for _, e := range match.Entries { + switch e.Key { + case columnPathTraceID: + traceID := e.Value.ByteArray() + b.meta.ObjectAdded(traceID, 0, 0) + page.ids.Set(traceID, match.RowNumber[0]) // Save rownumber for the trace ID + } + } + } + } + + return b, warning, nil +} + +// createWALBlock creates a new appendable block +func createWALBlock(id uuid.UUID, tenantID string, filepath string, _ backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (*walBlock, error) { + b := &walBlock{ + meta: &backend.BlockMeta{ + Version: VersionString, + BlockID: id, + TenantID: tenantID, + }, + path: filepath, + ids: common.NewIDMap[int64](), + ingestionSlack: ingestionSlack, + } + + // build folder + err := os.MkdirAll(b.walPath(), os.ModePerm) + if err != nil { + return nil, err + } + + dec, err := model.NewObjectDecoder(dataEncoding) + if err != nil { + return nil, err + } + b.decoder = dec + + err = b.openWriter() + + return b, err +} + +func ownsWALBlock(entry fs.DirEntry) bool { + // all vParquet wal blocks are folders + if !entry.IsDir() { + return false + } + + _, _, version, err := parseName(entry.Name()) + if err != nil { + return false + } + + return version == VersionString +} + +type walBlockFlush struct { + file *parquet.File + ids *common.IDMap[int64] +} + +type walBlock struct { + meta *backend.BlockMeta + path string + ingestionSlack time.Duration + + // Unflushed data + buffer *Trace + ids *common.IDMap[int64] + file *os.File + writer *parquet.GenericWriter[*Trace] + decoder model.ObjectDecoder + unflushedSize int64 + + // Flushed data + flushed []*walBlockFlush + flushedSize int64 +} + +func (b *walBlock) BlockMeta() *backend.BlockMeta { + return b.meta // jpe make ingestion slack a handled by BlockMeta +} + +func (b *walBlock) Append(id common.ID, buff []byte, start, end uint32) error { + // if decoder = nil we were created with OpenWALBlock and will not accept writes + if b.decoder == nil { + return nil + } + + trace, err := b.decoder.PrepareForRead(buff) + if err != nil { + return fmt.Errorf("error preparing trace for read: %w", err) + } + + b.buffer = traceToParquet(id, trace, b.buffer) + + start, end = b.adjustTimeRangeForSlack(start, end, 0) + + // add to current + _, err = b.writer.Write([]*Trace{b.buffer}) + if err != nil { + return fmt.Errorf("error writing row: %w", err) + } + + b.meta.ObjectAdded(id, start, end) + b.ids.Set(id, int64(b.ids.Len())) // Next row number + + // This is actually the protobuf size but close enough + // for this purpose and only temporary until next flush. + b.unflushedSize += int64(len(buff)) + + return nil +} + +func (b *walBlock) adjustTimeRangeForSlack(start uint32, end uint32, additionalStartSlack time.Duration) (uint32, uint32) { + now := time.Now() + startOfRange := uint32(now.Add(-b.ingestionSlack).Add(-additionalStartSlack).Unix()) + endOfRange := uint32(now.Add(b.ingestionSlack).Unix()) + + warn := false + if start < startOfRange { + warn = true + start = uint32(now.Unix()) + } + if end > endOfRange { + warn = true + end = uint32(now.Unix()) + } + + if warn { + warnings.Metric.WithLabelValues(b.meta.TenantID, warnings.ReasonOutsideIngestionSlack).Inc() + } + + return start, end +} + +func (b *walBlock) filepathOf(page int) string { + filename := fmt.Sprintf("%010d", page) + filename = filepath.Join(b.walPath(), filename) + return filename +} + +func (b *walBlock) openWriter() (err error) { + + nextFile := len(b.flushed) + 1 + filename := b.filepathOf(nextFile) + + b.file, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("error opening file: %w", err) + } + + if b.writer == nil { + b.writer = parquet.NewGenericWriter[*Trace](b.file) + } else { + b.writer.Reset(b.file) + } + + return nil +} + +func (b *walBlock) Flush() (err error) { + + if b.ids.Len() == 0 { + return nil + } + + // Flush latest meta first + // This mainly contains the slack-adjusted start/end times + metaBytes, err := json.Marshal(b.BlockMeta()) + if err != nil { + return fmt.Errorf("error marshaling meta json: %w", err) + } + + metaPath := filepath.Join(b.walPath(), backend.MetaName) + err = os.WriteFile(metaPath, metaBytes, 0600) + if err != nil { + return fmt.Errorf("error writing meta json: %w", err) + } + + // Now flush/close current writer + err = b.writer.Close() + if err != nil { + return fmt.Errorf("error closing writer: %w", err) + } + + err = b.file.Close() + if err != nil { + return fmt.Errorf("error closing file: %w", err) + } + + pf, sz, err := openLocalParquetFile(b.file.Name()) + if err != nil { + return fmt.Errorf("error opening local file [%s]: %w", b.file.Name(), err) + } + + b.flushed = append(b.flushed, &walBlockFlush{ + file: pf, + ids: b.ids, + }) + b.flushedSize += sz + b.unflushedSize = 0 + b.ids = common.NewIDMap[int64]() + + // Open next one + return b.openWriter() +} + +// DataLength returns estimated size of WAL files on disk. Used for +// cutting WAL files by max size. +func (b *walBlock) DataLength() uint64 { + return uint64(b.flushedSize + b.unflushedSize) +} + +func (b *walBlock) Iterator() (common.Iterator, error) { + bookmarks := make([]*bookmark[*Trace], 0, len(b.flushed)) + for _, page := range b.flushed { + + r := parquet.NewGenericReader[*Trace](page.file) + iter := &traceIterator{reader: r, rowNumbers: page.ids.ValuesSortedByID()} + + bookmarks = append(bookmarks, newBookmark[*Trace](iter)) + } + + iter := newMultiblockIterator(bookmarks, func(ts []*Trace) (*Trace, error) { + t := CombineTraces(ts...) + return t, nil + }) + + return &commonIterator{ + iter: iter, + }, nil +} + +func (b *walBlock) Clear() error { + return os.RemoveAll(b.walPath()) +} + +// jpe what to do with common.SearchOptions? +func (b *walBlock) FindTraceByID(ctx context.Context, id common.ID, _ common.SearchOptions) (*tempopb.Trace, error) { + trs := make([]*tempopb.Trace, 0) + + for _, page := range b.flushed { + if rowNumber, ok := page.ids.Get(id); ok { + r := parquet.NewReader(page.file) + err := r.SeekToRow(rowNumber) + if err != nil { + return nil, errors.Wrap(err, "seek to row") + } + + tr := new(Trace) + err = r.Read(tr) + if err != nil { + return nil, errors.Wrap(err, "error reading row from backend") + } + + trp := parquetTraceToTempopbTrace(tr) + + trs = append(trs, trp) + } + } + + combiner := trace.NewCombiner() + for i, tr := range trs { + combiner.ConsumeWithFinal(tr, i == len(trs)-1) + } + + tr, _ := combiner.Result() + return tr, nil +} + +func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts common.SearchOptions) (*tempopb.SearchResponse, error) { + results := &tempopb.SearchResponse{ + Metrics: &tempopb.SearchMetrics{}, + } + + // jpe parrallelize? + for i, page := range b.flushed { + r, err := searchParquetFile(ctx, page.file, req, page.file.RowGroups()) + if err != nil { + return nil, fmt.Errorf("error searching block [%s %d]: %w", b.meta.BlockID.String(), i, err) + } + + results.Traces = append(results.Traces, r.Traces...) + if len(results.Traces) >= int(req.Limit) { + break + } + } + + results.Metrics.InspectedBlocks++ + results.Metrics.InspectedBytes += b.DataLength() + results.Metrics.InspectedTraces += uint32(b.meta.TotalObjects) + + return results, nil +} + +func (b *walBlock) SearchTags(ctx context.Context, cb common.TagCallback, opts common.SearchOptions) error { + // jpe parallelize? + for i, page := range b.flushed { + err := searchTags(ctx, cb, page.file) + if err != nil { + return fmt.Errorf("error searching block [%s %d]: %w", b.meta.BlockID.String(), i, err) + } + } + + return nil +} + +func (b *walBlock) SearchTagValues(ctx context.Context, tag string, cb common.TagCallback, opts common.SearchOptions) error { + // jpe parallelize? + for i, f := range b.flushed { + err := searchTagValues(ctx, tag, cb, f.file) + if err != nil { + return fmt.Errorf("error searching block [%s %d]: %w", b.meta.BlockID.String(), i, err) + } + } + + return nil +} + +func (b *walBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { + // todo: this same method is called in backendBlock.Fetch. is there anyway to share this? + err := checkConditions(req.Conditions) + if err != nil { + return traceql.FetchSpansResponse{}, errors.Wrap(err, "conditions invalid") + } + + iters := make([]*spansetIterator, 0, len(b.flushed)) + for _, f := range b.flushed { + iter, err := fetch(ctx, req, f.file) + if err != nil { + return traceql.FetchSpansResponse{}, errors.Wrap(err, "creating fetch iter") + } + iters = append(iters, iter) + } + + // combine iters? + return traceql.FetchSpansResponse{ + Results: &mergeSpansetIterator{ + iters: iters, + }, + }, nil +} + +func (b *walBlock) walPath() string { + filename := fmt.Sprintf("%s+%s+%s", b.meta.BlockID, b.meta.TenantID, VersionString) + return filepath.Join(b.path, filename) +} + +// ++vParquet +func parseName(filename string) (uuid.UUID, string, string, error) { + splits := strings.Split(filename, "+") + + if len(splits) != 3 { + return uuid.UUID{}, "", "", fmt.Errorf("unable to parse %s. unexpected number of segments", filename) + } + + // first segment is blockID + id, err := uuid.Parse(splits[0]) + if err != nil { + return uuid.UUID{}, "", "", fmt.Errorf("unable to parse %s. error parsing uuid: %w", filename, err) + } + + // second segment is tenant + tenant := splits[1] + if len(tenant) == 0 { + return uuid.UUID{}, "", "", fmt.Errorf("unable to parse %s. 0 length tenant", filename) + } + + // third segment is version + version := splits[2] + if version != VersionString { + return uuid.UUID{}, "", "", fmt.Errorf("unable to parse %s. unexpected version %s", filename, version) + } + + return id, tenant, version, nil +} + +// jpe iterators feel like a mess, clean up ? + +var tracePool sync.Pool + +func tracePoolGet() *Trace { + o := tracePool.Get() + if o == nil { + return &Trace{} + } + + return o.(*Trace) +} + +func tracePoolPut(t *Trace) { + tracePool.Put(t) +} + +// traceIterator is used to iterate a parquet file and implement iterIterator +// traces are iterated according to the given row numbers, because there is +// not a guarantee that the underlying parquet file is sorted +type traceIterator struct { + reader *parquet.GenericReader[*Trace] + rowNumbers []int64 +} + +func (i *traceIterator) Next(ctx context.Context) (common.ID, *Trace, error) { + if len(i.rowNumbers) == 0 { + return nil, nil, nil + } + + nextRowNumber := i.rowNumbers[0] + i.rowNumbers = i.rowNumbers[1:] + + err := i.reader.SeekToRow(nextRowNumber) + if err != nil { + return nil, nil, err + } + + tr := tracePoolGet() + _, err = i.reader.Read([]*Trace{tr}) + if err != nil { + return nil, nil, err + } + + return tr.TraceID, tr, nil +} + +func (i *traceIterator) Close() { + i.reader.Close() +} + +var _ TraceIterator = (*commonIterator)(nil) +var _ common.Iterator = (*commonIterator)(nil) + +// commonIterator implements both TraceIterator and common.Iterator. it is returned from the AppendFile and is meant +// to be passed to a CreateBlock +type commonIterator struct { + iter *MultiBlockIterator[*Trace] +} + +func (i *commonIterator) Next(ctx context.Context) (common.ID, *tempopb.Trace, error) { + id, obj, err := i.iter.Next(ctx) + if err != nil && err != io.EOF { + return nil, nil, err + } + + if obj == nil || err == io.EOF { + return nil, nil, nil + } + + tr := parquetTraceToTempopbTrace(obj) + return id, tr, nil +} + +func (i *commonIterator) NextTrace(ctx context.Context) (common.ID, *Trace, error) { + return i.iter.Next(ctx) +} + +func (i *commonIterator) Close() { + i.iter.Close() +} + +func openLocalParquetFile(filename string) (*parquet.File, int64, error) { + file, err := os.OpenFile(filename, os.O_RDONLY, 0644) + if err != nil { + return nil, 0, fmt.Errorf("error opening file: %w", err) + } + info, err := file.Stat() + if err != nil { + return nil, 0, fmt.Errorf("error getting file info: %w", err) + } + sz := info.Size() + pf, err := parquet.OpenFile(file, sz) + if err != nil { + return nil, 0, fmt.Errorf("error opening parquet file: %w", err) + } + + return pf, sz, nil +} diff --git a/tempodb/encoding/vparquet/wal_block_test.go b/tempodb/encoding/vparquet/wal_block_test.go new file mode 100644 index 00000000000..adf3588aeb5 --- /dev/null +++ b/tempodb/encoding/vparquet/wal_block_test.go @@ -0,0 +1,286 @@ +package vparquet + +import ( + "bytes" + "context" + "os" + "path/filepath" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/google/uuid" + "github.com/grafana/tempo/pkg/model" + "github.com/grafana/tempo/pkg/model/trace" + "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/common" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Note: Standard wal block functionality (appending, searching, finding, etc.) is tested with all other wal blocks +// in /tempodb/wal/wal_test.go + +func TestFullFilename(t *testing.T) { + tests := []struct { + name string + b *walBlock + expected string + }{ + { + name: "basic", + b: &walBlock{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), VersionString, backend.EncNone, ""), + path: "/blerg", + }, + expected: "/blerg/123e4567-e89b-12d3-a456-426614174000+foo+vParquet", + }, + { + name: "no path", + b: &walBlock{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), VersionString, backend.EncNone, ""), + path: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000+foo+vParquet", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actual := tc.b.walPath() + assert.Equal(t, tc.expected, actual) + }) + } +} + +// TestPartialReplay verifies that we can best-effort replay a partial/corrupted WAL block. +// This test works by flushing a WAL block across a few pages, corrupting one, and then replaying +// it. +func TestPartialReplay(t *testing.T) { + decoder := model.MustNewSegmentDecoder(model.CurrentEncoding) + blockID := uuid.New() + basePath := t.TempDir() + + w, err := createWALBlock(blockID, "fake", basePath, backend.EncNone, model.CurrentEncoding, 0) + require.NoError(t, err) + + // Flush a set of traces across 2 pages + count := 10 + ids := make([]common.ID, count) + trs := make([]*tempopb.Trace, count) + for i := 0; i < count; i++ { + ids[i] = test.ValidTraceID(nil) + trs[i] = test.MakeTrace(10, ids[i]) + trace.SortTrace(trs[i]) + + b1, err := decoder.PrepareForWrite(trs[i], 0, 0) + require.NoError(t, err) + + b2, err := decoder.ToObject([][]byte{b1}) + require.NoError(t, err) + + err = w.Append(ids[i], b2, 0, 0) + require.NoError(t, err) + + if i+1 == count/2 { + require.NoError(t, w.Flush()) + } + } + require.NoError(t, w.Flush()) + + // Delete half of page 2 + fpath := w.filepathOf(1) + info, err := os.Stat(fpath) + require.NoError(t, err) + require.NoError(t, os.Truncate(fpath, info.Size()/2)) + + // Replay, this has a warning on page 2 + w2, warning, err := openWALBlock(filepath.Base(w.walPath()), filepath.Dir(w.walPath()), 0, 0) + require.NoError(t, err) + require.ErrorContains(t, warning, "invalid magic footer of parquet file") + + // Verify we iterate only the records from the first flush + iter, err := w2.Iterator() + require.NoError(t, err) + + gotCount := 0 + for ; ; gotCount++ { + id, tr, err := iter.Next(context.Background()) + require.NoError(t, err) + + if id == nil { + break + } + + // Find trace in the input data + match := 0 + for i := range ids { + if bytes.Equal(ids[i], id) { + match = i + break + } + } + + require.Equal(t, ids[match], id) + require.True(t, proto.Equal(trs[match], tr)) + } + require.Equal(t, count/2, gotCount) +} + +func TestParseFilename(t *testing.T) { + tests := []struct { + name string + filename string + expectUUID uuid.UUID + expectTenant string + expectedVersion string + expectError bool + }{ + { + name: "happy path", + filename: "123e4567-e89b-12d3-a456-426614174000+tenant+vParquet", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "tenant", + expectedVersion: "vParquet", + }, + { + name: "path fails", + filename: "/blerg/123e4567-e89b-12d3-a456-426614174000+tenant+vParquet", + expectError: true, + }, + { + name: "no +", + filename: "123e4567-e89b-12d3-a456-426614174000", + expectError: true, + }, + { + name: "empty string", + filename: "", + expectError: true, + }, + { + name: "bad uuid", + filename: "123e4+tenant+vParquet", + expectError: true, + }, + { + name: "no tenant", + filename: "123e4567-e89b-12d3-a456-426614174000++vParquet", + expectError: true, + }, + { + name: "no version", + filename: "123e4567-e89b-12d3-a456-426614174000+tenant+", + expectError: true, + }, + { + name: "wrong version", + filename: "123e4567-e89b-12d3-a456-426614174000+tenant+v2", + expectError: true, + }, + { + name: "wrong splits - 4", + filename: "123e4567-e89b-12d3-a456-426614174000+test+test+test", + expectError: true, + }, + { + name: "wrong splits - 2", + filename: "123e4567-e89b-12d3-a456-426614174000+test", + expectError: true, + }, + { + name: "wrong splits - 1", + filename: "123e4567-e89b-12d3-a456-426614174000", + expectError: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actualUUID, actualTenant, actualVersion, err := parseName(tc.filename) + + if tc.expectError { + require.Error(t, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expectUUID, actualUUID) + require.Equal(t, tc.expectTenant, actualTenant) + require.Equal(t, tc.expectedVersion, actualVersion) + }) + } +} + +func TestWalBlockFindTraceByID(t *testing.T) { + testWalBlock(t, func(w *walBlock, ids []common.ID, trs []*tempopb.Trace) { + for i := range ids { + found, err := w.FindTraceByID(context.Background(), ids[i], common.DefaultSearchOptions()) + require.NoError(t, err) + require.NotNil(t, found) + require.True(t, proto.Equal(trs[i], found)) + } + }) +} + +func TestWalBlockIterator(t *testing.T) { + testWalBlock(t, func(w *walBlock, ids []common.ID, trs []*tempopb.Trace) { + + iter, err := w.Iterator() + require.NoError(t, err) + + count := 0 + for ; ; count++ { + id, tr, err := iter.Next(context.Background()) + require.NoError(t, err) + + if id == nil { + break + } + + // Find trace in the input data + match := 0 + for i := range ids { + if bytes.Equal(ids[i], id) { + match = i + break + } + } + + require.Equal(t, ids[match], id) + require.True(t, proto.Equal(trs[match], tr)) + } + require.Equal(t, len(ids), count) + }) +} + +func testWalBlock(t *testing.T, f func(w *walBlock, ids []common.ID, trs []*tempopb.Trace)) { + w, err := createWALBlock(uuid.New(), "fake", t.TempDir(), backend.EncNone, model.CurrentEncoding, 0) + require.NoError(t, err) + + decoder := model.MustNewSegmentDecoder(model.CurrentEncoding) + + count := 10 + ids := make([]common.ID, count) + trs := make([]*tempopb.Trace, count) + for i := 0; i < count; i++ { + ids[i] = test.ValidTraceID(nil) + trs[i] = test.MakeTrace(10, ids[i]) + trace.SortTrace(trs[i]) + + b1, err := decoder.PrepareForWrite(trs[i], 0, 0) + require.NoError(t, err) + + b2, err := decoder.ToObject([][]byte{b1}) + require.NoError(t, err) + + err = w.Append(ids[i], b2, 0, 0) + require.NoError(t, err) + } + + require.NoError(t, w.Flush()) + + f(w, ids, trs) +} diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 06cc1b5f483..2006d5b3a76 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" + v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/grafana/tempo/tempodb/wal" ) @@ -37,6 +38,7 @@ func TestRetention(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -98,6 +100,7 @@ func TestRetentionUpdatesBlocklistImmediately(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -165,6 +168,7 @@ func TestBlockRetentionOverride(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 6279671d886..2d1ac320d53 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -315,7 +315,7 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return nil, nil, nil } - opts := common.SearchOptions{} + opts := common.DefaultSearchOptions() if rw.cfg != nil && rw.cfg.Search != nil { rw.cfg.Search.ApplyToOptions(&opts) } diff --git a/tempodb/tempodb_search_test.go b/tempodb/tempodb_search_test.go index ac56807a9f3..5b0ddfa2dfb 100644 --- a/tempodb/tempodb_search_test.go +++ b/tempodb/tempodb_search_test.go @@ -44,6 +44,7 @@ func testSearchCompleteBlock(t *testing.T, blockVersion string) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, IngestionSlack: time.Since(time.Time{}), }, Search: &SearchConfig{ @@ -85,14 +86,14 @@ func testSearchCompleteBlock(t *testing.T, blockVersion string) { meta := block.BlockMeta() for _, req := range searchesThatMatch { - res, err := r.Search(ctx, meta, req, common.SearchOptions{}) + res, err := r.Search(ctx, meta, req, common.DefaultSearchOptions()) require.NoError(t, err) require.Equal(t, 1, len(res.Traces), "search request: %+v", req) require.Equal(t, wantMeta, res.Traces[0], "search request:", req) } for _, req := range searchesThatDontMatch { - res, err := rw.Search(ctx, meta, req, common.SearchOptions{}) + res, err := rw.Search(ctx, meta, req, common.DefaultSearchOptions()) require.NoError(t, err) require.Empty(t, res.Traces, "search request:", req) } diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 9e71159bd85..0a4cc03af86 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/tempo/pkg/model" + "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" @@ -32,10 +33,12 @@ const ( testTenantID2 = "fake2" ) -func testConfig(t *testing.T, enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writer, Compactor, string) { +type testConfigOption func(*Config) + +func testConfig(t *testing.T, enc backend.Encoding, blocklistPoll time.Duration, opts ...testConfigOption) (Reader, Writer, Compactor, string) { tempDir := t.TempDir() - r, w, c, err := New(&Config{ + cfg := &Config{ Backend: "local", Local: &local.Config{ Path: path.Join(tempDir, "traces"), @@ -50,9 +53,16 @@ func testConfig(t *testing.T, enc backend.Encoding, blocklistPoll time.Duration) }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: blocklistPoll, - }, log.NewNopLogger()) + } + + for _, opt := range opts { + opt(cfg) + } + + r, w, c, err := New(cfg, log.NewNopLogger()) require.NoError(t, err) return r, w, c, tempDir } @@ -567,7 +577,21 @@ func TestSearchCompactedBlocks(t *testing.T) { } func TestCompleteBlock(t *testing.T) { - _, w, _, _ := testConfig(t, backend.EncLZ4_256k, time.Minute) + for _, from := range encoding.AllEncodings() { + for _, to := range encoding.AllEncodings() { + t.Run(fmt.Sprintf("%s->%s", from.Version(), to.Version()), func(t *testing.T) { + testCompleteBlock(t, from.Version(), to.Version()) + }) + } + } +} + +func testCompleteBlock(t *testing.T, from, to string) { + + _, w, _, _ := testConfig(t, backend.EncLZ4_256k, time.Minute, func(c *Config) { + c.WAL.Version = from + c.Block.Version = to + }) wal := w.WAL() @@ -583,18 +607,22 @@ func TestCompleteBlock(t *testing.T) { ids := make([][]byte, 0, numMsgs) for i := 0; i < numMsgs; i++ { id := test.ValidTraceID(nil) - req := test.MakeTrace(rand.Int()%1000, id) + req := test.MakeTrace(rand.Int()%10, id) + trace.SortTrace(req) writeTraceToWal(t, block, dec, id, req, 0, 0) reqs = append(reqs, req) ids = append(ids, id) } + require.NoError(t, block.Flush()) complete, err := w.CompleteBlock(context.Background(), block) require.NoError(t, err, "unexpected error completing block") for i, id := range ids { - found, err := complete.FindTraceByID(context.TODO(), id, common.SearchOptions{}) + found, err := complete.FindTraceByID(context.TODO(), id, common.DefaultSearchOptions()) require.NoError(t, err) + require.NotNil(t, found) + trace.SortTrace(found) require.True(t, proto.Equal(found, reqs[i])) } } @@ -628,6 +656,7 @@ func testCompleteBlockHonorsStartStopTimes(t *testing.T, targetBlockVersion stri WAL: &wal.Config{ IngestionSlack: time.Minute, Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -675,6 +704,7 @@ func TestShouldCache(t *testing.T) { }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), + Version: v2.VersionString, }, BlocklistPoll: 0, CacheMaxBlockAge: time.Hour, @@ -735,8 +765,22 @@ func writeTraceToWal(t require.TestingT, b common.WALBlock, dec model.SegmentDec } func BenchmarkCompleteBlock(b *testing.B) { + enc := encoding.AllEncodings() + + for _, from := range enc { + for _, to := range enc { + b.Run(fmt.Sprintf("%s->%s", from.Version(), to.Version()), func(b *testing.B) { + benchmarkCompleteBlock(b, from, to) + }) + } + } +} + +func benchmarkCompleteBlock(b *testing.B, from, to encoding.VersionedEncoding) { // Create a WAL block with traces traceCount := 10_000 + flushCount := 1000 + tempDir := b.TempDir() _, w, _, err := New(&Config{ Backend: "local", @@ -749,10 +793,13 @@ func BenchmarkCompleteBlock(b *testing.B) { BloomShardSizeBytes: 100_000, Encoding: backend.EncNone, IndexPageSizeBytes: 1000, + Version: to.Version(), + RowGroupSizeBytes: 30_000_000, }, WAL: &wal.Config{ IngestionSlack: time.Minute, Filepath: path.Join(tempDir, "wal"), + Version: from.Version(), }, BlocklistPoll: 0, }, log.NewNopLogger()) @@ -768,6 +815,10 @@ func BenchmarkCompleteBlock(b *testing.B) { id := test.ValidTraceID(nil) req := test.MakeTrace(10, id) writeTraceToWal(b, blk, dec, id, req, 0, 0) + + if i%flushCount == 0 { + require.NoError(b, blk.Flush()) + } } fmt.Println("Created wal block") diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index c44f69a38fb..3e38df2386d 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -14,7 +14,6 @@ import ( "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" - v2 "github.com/grafana/tempo/tempodb/encoding/v2" ) const ( @@ -33,9 +32,18 @@ type Config struct { BlocksFilepath string Encoding backend.Encoding `yaml:"encoding"` SearchEncoding backend.Encoding `yaml:"search_encoding"` + Version string `yaml:"version"` IngestionSlack time.Duration `yaml:"ingestion_time_range_slack"` } +func ValidateConfig(b *Config) error { + if _, err := encoding.FromVersion(b.Version); err != nil { + return err + } + + return nil +} + func New(c *Config) (*WAL, error) { if c.Filepath == "" { return nil, fmt.Errorf("please provide a path for the WAL") @@ -88,15 +96,20 @@ func (w *WAL) RescanBlocks(additionalStartSlack time.Duration, log log.Logger) ( return nil, err } - // todo: rescan blocks will need to detect if this is a vParquet or v2 wal file and choose the appropriate encoding - v, err := encoding.FromVersion(v2.VersionString) - if err != nil { - return nil, fmt.Errorf("from version v2 failed %w", err) - } - + encodings := encoding.AllEncodings() blocks := make([]common.WALBlock, 0, len(files)) for _, f := range files { - if f.IsDir() { + // find owner + var owner encoding.VersionedEncoding + for _, e := range encodings { + if e.OwnsWALBlock(f) { + owner = e + break + } + } + + if owner == nil { + level.Warn(log).Log("msg", "unowned file entry ignored during wal replay", "file", f.Name(), "err", err) continue } @@ -107,7 +120,7 @@ func (w *WAL) RescanBlocks(additionalStartSlack time.Duration, log log.Logger) ( } level.Info(log).Log("msg", "beginning replay", "file", f.Name(), "size", fileInfo.Size()) - b, warning, err := v.OpenWALBlock(f.Name(), w.c.Filepath, w.c.IngestionSlack, additionalStartSlack) + b, warning, err := owner.OpenWALBlock(f.Name(), w.c.Filepath, w.c.IngestionSlack, additionalStartSlack) remove := false if err != nil { @@ -116,7 +129,7 @@ func (w *WAL) RescanBlocks(additionalStartSlack time.Duration, log log.Logger) ( remove = true } - if b != nil && b.Length() == 0 { + if b != nil && b.DataLength() == 0 { level.Warn(log).Log("msg", "empty wal file. ignoring.", "file", f.Name(), "err", err) remove = true } @@ -126,7 +139,7 @@ func (w *WAL) RescanBlocks(additionalStartSlack time.Duration, log log.Logger) ( } if remove { - err = os.Remove(filepath.Join(w.c.Filepath, f.Name())) + err = os.RemoveAll(filepath.Join(w.c.Filepath, f.Name())) if err != nil { return nil, err } @@ -142,16 +155,20 @@ func (w *WAL) RescanBlocks(additionalStartSlack time.Duration, log log.Logger) ( } func (w *WAL) NewBlock(id uuid.UUID, tenantID string, dataEncoding string) (common.WALBlock, error) { - // todo: take version string and use here - v, err := encoding.FromVersion(v2.VersionString) + return w.newBlock(id, tenantID, dataEncoding, w.c.Version) +} + +func (w *WAL) newBlock(id uuid.UUID, tenantID string, dataEncoding string, blockVersion string) (common.WALBlock, error) { + v, err := encoding.FromVersion(blockVersion) if err != nil { - return nil, fmt.Errorf("from version v2 failed %w", err) + return nil, err } return v.CreateWALBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding, w.c.IngestionSlack) } func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, backend.Encoding, error) { - // search WAL pinned to v2 for now + // This is only used for flatbuffer search. + // pinned to v2 because vParquet doesn't need it. walFileVersion := "v2" p := filepath.Join(w.c.Filepath, dir) diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 1497580c940..42977157ac9 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -3,6 +3,7 @@ package wal import ( "bytes" "context" + "fmt" "io" "math/rand" "os" @@ -16,10 +17,16 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/tempo/pkg/model" + "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/traceql" + "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" + v2 "github.com/grafana/tempo/tempodb/encoding/v2" + "github.com/grafana/tempo/tempodb/encoding/vparquet" ) const ( @@ -46,17 +53,31 @@ func TestCompletedDirIsRemoved(t *testing.T) { } func TestAppendBlockStartEnd(t *testing.T) { + encodings := []encoding.VersionedEncoding{ + v2.Encoding{}, + vparquet.Encoding{}, + } + for _, e := range encodings { + t.Run(e.Version(), func(t *testing.T) { + testAppendBlockStartEnd(t, e) + }) + } +} + +func testAppendBlockStartEnd(t *testing.T, e encoding.VersionedEncoding) { wal, err := New(&Config{ Filepath: t.TempDir(), Encoding: backend.EncNone, - IngestionSlack: 2 * time.Minute, + IngestionSlack: 3 * time.Minute, }) require.NoError(t, err, "unexpected error creating temp wal") blockID := uuid.New() - block, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, e.Version()) require.NoError(t, err, "unexpected error creating block") + enc := model.MustNewSegmentDecoder(model.CurrentEncoding) + // create a new block and confirm start/end times are correct blockStart := uint32(time.Now().Add(-time.Minute).Unix()) blockEnd := uint32(time.Now().Add(time.Minute).Unix()) @@ -64,15 +85,20 @@ func TestAppendBlockStartEnd(t *testing.T) { for i := 0; i < 10; i++ { id := make([]byte, 16) rand.Read(id) + obj := test.MakeTrace(rand.Int()%10+1, id) - tr := test.MakeTrace(10, id) - b, err := model.MustNewSegmentDecoder(model.CurrentEncoding).PrepareForWrite(tr, blockStart, blockEnd) - require.NoError(t, err, "unexpected error writing req") + b1, err := enc.PrepareForWrite(obj, blockStart, blockEnd) + require.NoError(t, err) + + b2, err := enc.ToObject([][]byte{b1}) + require.NoError(t, err) - err = block.Append(id, b, blockStart, blockEnd) + err = block.Append(id, b2, blockStart, blockEnd) require.NoError(t, err, "unexpected error writing req") } + require.NoError(t, block.Flush()) + require.Equal(t, blockStart, uint32(block.BlockMeta().StartTime.Unix())) require.Equal(t, blockEnd, uint32(block.BlockMeta().EndTime.Unix())) @@ -85,102 +111,213 @@ func TestAppendBlockStartEnd(t *testing.T) { require.Equal(t, blockEnd, uint32(blocks[0].BlockMeta().EndTime.Unix())) } -func TestAppendReplayFind(t *testing.T) { - for _, e := range backend.SupportedEncoding { - t.Run(e.String(), func(t *testing.T) { - testAppendReplayFind(t, backend.EncZstd) +func TestIngestionSlack(t *testing.T) { + encodings := []encoding.VersionedEncoding{ + v2.Encoding{}, + vparquet.Encoding{}, + } + for _, e := range encodings { + t.Run(e.Version(), func(t *testing.T) { + testIngestionSlack(t, e) }) } } -func testAppendReplayFind(t *testing.T, e backend.Encoding) { +func testIngestionSlack(t *testing.T, e encoding.VersionedEncoding) { + wal, err := New(&Config{ - Filepath: t.TempDir(), - Encoding: e, + Filepath: t.TempDir(), + Encoding: backend.EncNone, + IngestionSlack: time.Minute, }) require.NoError(t, err, "unexpected error creating temp wal") blockID := uuid.New() - - block, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, e.Version()) require.NoError(t, err, "unexpected error creating block") enc := model.MustNewSegmentDecoder(model.CurrentEncoding) - objects := 1000 - objs := make([]*tempopb.Trace, 0, objects) - ids := make([][]byte, 0, objects) - for i := 0; i < objects; i++ { - id := make([]byte, 16) - rand.Read(id) - obj := test.MakeTrace(rand.Int()%10+1, id) - ids = append(ids, id) + traceStart := uint32(time.Now().Add(-2 * time.Minute).Unix()) // Outside of range + traceEnd := uint32(time.Now().Add(-1 * time.Minute).Unix()) // At end of range - b1, err := enc.PrepareForWrite(obj, 0, 0) - require.NoError(t, err) + // Append a trace + id := make([]byte, 16) + rand.Read(id) + obj := test.MakeTrace(rand.Int()%10+1, id) - b2, err := enc.ToObject([][]byte{b1}) - require.NoError(t, err) + b1, err := enc.PrepareForWrite(obj, traceStart, traceEnd) + require.NoError(t, err) - objs = append(objs, obj) + b2, err := enc.ToObject([][]byte{b1}) + require.NoError(t, err) - err = block.Append(id, b2, 0, 0) - require.NoError(t, err, "unexpected error writing req") - } + appendTime := time.Now() + err = block.Append(id, b2, traceStart, traceEnd) + require.NoError(t, err, "unexpected error writing req") - ctx := context.Background() - for i, id := range ids { - obj, err := block.FindTraceByID(ctx, id, common.SearchOptions{}) - require.NoError(t, err) - require.Equal(t, objs[i], obj) + blockStart := uint32(block.BlockMeta().StartTime.Unix()) + blockEnd := uint32(block.BlockMeta().EndTime.Unix()) + + require.Equal(t, uint32(appendTime.Unix()), blockStart) + require.Equal(t, traceEnd, blockEnd) +} + +func TestFindByTraceID(t *testing.T) { + for _, e := range encoding.AllEncodings() { + t.Run(e.Version(), func(t *testing.T) { + testFindByTraceID(t, e) + }) } +} - blocks, err := wal.RescanBlocks(0, log.NewNopLogger()) - require.NoError(t, err, "unexpected error getting blocks") - require.Len(t, blocks, 1) +func testFindByTraceID(t *testing.T, e encoding.VersionedEncoding) { + runWALTest(t, e.Version(), func(ids [][]byte, objs []*tempopb.Trace, block common.WALBlock) { + // find all traces pushed + ctx := context.Background() + for i, id := range ids { + obj, err := block.FindTraceByID(ctx, id, common.DefaultSearchOptions()) + require.NoError(t, err) + require.Equal(t, objs[i], obj) + } + }) +} - iterator, err := blocks[0].Iterator() - require.NoError(t, err) - defer iterator.Close() +func TestIterator(t *testing.T) { + for _, e := range encoding.AllEncodings() { + t.Run(e.Version(), func(t *testing.T) { + testIterator(t, e) + }) + } +} - // append block find - for i, id := range ids { - obj, err := blocks[0].FindTraceByID(ctx, id, common.SearchOptions{}) +func testIterator(t *testing.T, e encoding.VersionedEncoding) { + runWALTest(t, e.Version(), func(ids [][]byte, objs []*tempopb.Trace, block common.WALBlock) { + ctx := context.Background() + + iterator, err := block.Iterator() require.NoError(t, err) - require.Equal(t, objs[i], obj) + defer iterator.Close() + + i := 0 + for { + id, obj, err := iterator.Next(ctx) + if err == io.EOF || id == nil { + break + } else { + require.NoError(t, err) + } + + found := false + j := 0 + for ; j < len(ids); j++ { + if bytes.Equal(ids[j], id) { + found = true + break + } + } + + require.True(t, found) + require.Equal(t, objs[j], obj) + require.Equal(t, ids[j], []byte(id)) + i++ + } + + require.Equal(t, len(objs), i) + }) +} + +func TestSearch(t *testing.T) { + for _, e := range encoding.AllEncodings() { + t.Run(e.Version(), func(t *testing.T) { + testSearch(t, e) + }) } +} - i := 0 - for { - id, obj, err := iterator.Next(ctx) - if err == io.EOF { - break - } else { +func testSearch(t *testing.T, e encoding.VersionedEncoding) { + runWALTest(t, e.Version(), func(ids [][]byte, objs []*tempopb.Trace, block common.WALBlock) { + ctx := context.Background() + + for i, o := range objs { + k, v := findFirstAttribute(o) + require.NotEmpty(t, k) + require.NotEmpty(t, v) + + resp, err := block.Search(ctx, &tempopb.SearchRequest{ + Tags: map[string]string{ + k: v, + }, + Limit: 10, + }, common.DefaultSearchOptions()) + if err == common.ErrUnsupported { + return + } require.NoError(t, err) + require.Equal(t, 1, len(resp.Traces)) + require.Equal(t, util.TraceIDToHexString(ids[i]), resp.Traces[0].TraceID) } + }) +} - found := false - j := 0 - for ; j < len(ids); j++ { - if bytes.Equal(ids[j], id) { - found = true - break +func TestFetch(t *testing.T) { + for _, e := range encoding.AllEncodings() { + t.Run(e.Version(), func(t *testing.T) { + testFetch(t, e) + }) + } +} + +func testFetch(t *testing.T, e encoding.VersionedEncoding) { + runWALTest(t, e.Version(), func(ids [][]byte, objs []*tempopb.Trace, block common.WALBlock) { + ctx := context.Background() + + for i, o := range objs { + k, v := findFirstAttribute(o) + require.NotEmpty(t, k) + require.NotEmpty(t, v) + + query := fmt.Sprintf("{ .%s = \"%s\" }", k, v) + condition := traceql.MustExtractCondition(query) + resp, err := block.Fetch(ctx, traceql.FetchSpansRequest{ + Conditions: []traceql.Condition{condition}, + }) + // not all blocks support fetch + if err == common.ErrUnsupported { + return } + require.NoError(t, err) + + // grab the first result and make sure it's the expected id + ss, err := resp.Results.Next(ctx) + require.NoError(t, err) + + expectedID := ids[i] + require.Equal(t, ss.TraceID, expectedID) + + // confirm no more matches + ss, err = resp.Results.Next(ctx) + require.NoError(t, err) + require.Nil(t, ss) } + }) +} - require.True(t, found) - require.Equal(t, objs[j], obj) - require.Equal(t, ids[j], []byte(id)) - i++ +func findFirstAttribute(obj *tempopb.Trace) (string, string) { + for _, b := range obj.Batches { + for _, s := range b.ScopeSpans { + for _, span := range s.Spans { + for _, a := range span.Attributes { + return a.Key, a.Value.GetStringValue() + } + } + } } - require.Equal(t, objects, i) - - err = blocks[0].Clear() - require.NoError(t, err) + return "", "" } -func TestInvalidFiles(t *testing.T) { +func TestInvalidFilesAndFoldersAreHandled(t *testing.T) { tempDir := t.TempDir() wal, err := New(&Config{ Filepath: tempDir, @@ -188,93 +325,241 @@ func TestInvalidFiles(t *testing.T) { }) require.NoError(t, err, "unexpected error creating temp wal") - // create one valid block - block, err := wal.NewBlock(uuid.New(), testTenantID, model.CurrentEncoding) - require.NoError(t, err) + // create all valid blocks + for _, e := range encoding.AllEncodings() { + block, err := wal.newBlock(uuid.New(), testTenantID, model.CurrentEncoding, e.Version()) + require.NoError(t, err) - id := make([]byte, 16) - rand.Read(id) - tr := test.MakeTrace(10, id) - b, err := model.MustNewSegmentDecoder(model.CurrentEncoding).PrepareForWrite(tr, 0, 0) - require.NoError(t, err) - err = block.Append(id, b, 0, 0) - require.NoError(t, err) + id := make([]byte, 16) + rand.Read(id) + tr := test.MakeTrace(10, id) + b1, err := model.MustNewSegmentDecoder(model.CurrentEncoding).PrepareForWrite(tr, 0, 0) + require.NoError(t, err) + b2, err := model.MustNewSegmentDecoder(model.CurrentEncoding).ToObject([][]byte{b1}) + require.NoError(t, err) + err = block.Append(id, b2, 0, 0) + require.NoError(t, err) + err = block.Flush() + require.NoError(t, err) + } // create unparseable filename - err = os.WriteFile(filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e+tenant+v2+notanencoding"), []byte{}, 0644) + err = os.WriteFile(filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e:tenant:v2:notanencoding"), []byte{}, 0644) require.NoError(t, err) // create empty block - err = os.WriteFile(filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e+blerg+v2+gzip"), []byte{}, 0644) + err = os.WriteFile(filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e:blerg:v2:gzip"), []byte{}, 0644) require.NoError(t, err) + // create unparseable block + require.NoError(t, os.MkdirAll(filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e+tenant+vOther"), os.ModePerm)) + blocks, err := wal.RescanBlocks(0, log.NewNopLogger()) require.NoError(t, err, "unexpected error getting blocks") - require.Len(t, blocks, 1) // this is our 1 valid block from above + require.Len(t, blocks, len(encoding.AllEncodings())) // valid blocks created above - // confirm invalid blocks have been cleaned up - require.NoFileExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e+tenant+v2+notanencoding")) - require.NoFileExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e+blerg+v2+gzip")) -} + // empty file should have been removed + require.NoFileExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e:blerg:v2:gzip")) -func BenchmarkWALNone(b *testing.B) { - benchmarkWriteFindReplay(b, backend.EncNone) + // unparseable files/folder should have been ignored + require.FileExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e:tenant:v2:notanencoding")) + require.DirExists(t, filepath.Join(tempDir, "fe0b83eb-a86b-4b6c-9a74-dc272cd5700e+tenant+vOther")) } -func BenchmarkWALSnappy(b *testing.B) { - benchmarkWriteFindReplay(b, backend.EncSnappy) + +func runWALTest(t testing.TB, dbEncoding string, runner func([][]byte, []*tempopb.Trace, common.WALBlock)) { + wal, err := New(&Config{ + Filepath: t.TempDir(), + Encoding: backend.EncNone, + }) + require.NoError(t, err, "unexpected error creating temp wal") + + blockID := uuid.New() + + block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, dbEncoding) + require.NoError(t, err, "unexpected error creating block") + + enc := model.MustNewSegmentDecoder(model.CurrentEncoding) + + objects := 250 + objs := make([]*tempopb.Trace, 0, objects) + ids := make([][]byte, 0, objects) + for i := 0; i < objects; i++ { + id := make([]byte, 16) + rand.Read(id) + obj := test.MakeTrace(rand.Int()%10+1, id) + + trace.SortTrace(obj) + + ids = append(ids, id) + + b1, err := enc.PrepareForWrite(obj, 0, 0) + require.NoError(t, err) + + b2, err := enc.ToObject([][]byte{b1}) + require.NoError(t, err) + + objs = append(objs, obj) + + err = block.Append(id, b2, 0, 0) + require.NoError(t, err) + + if i%100 == 0 { + err = block.Flush() + require.NoError(t, err) + } + } + err = block.Flush() + require.NoError(t, err) + + runner(ids, objs, block) + + // rescan blocks + blocks, err := wal.RescanBlocks(0, log.NewNopLogger()) + require.NoError(t, err, "unexpected error getting blocks") + require.Len(t, blocks, 1) + + runner(ids, objs, blocks[0]) + + err = block.Clear() + require.NoError(t, err) } -func BenchmarkWALLZ4(b *testing.B) { - benchmarkWriteFindReplay(b, backend.EncLZ4_1M) + +func BenchmarkAppendFlush(b *testing.B) { + encodings := []string{ + v2.VersionString, + vparquet.VersionString, + } + for _, enc := range encodings { + b.Run(enc, func(b *testing.B) { + runWALBenchmark(b, enc, b.N, nil) + }) + } } -func BenchmarkWALGZIP(b *testing.B) { - benchmarkWriteFindReplay(b, backend.EncGZIP) + +func BenchmarkFindTraceByID(b *testing.B) { + encodings := []string{ + v2.VersionString, + vparquet.VersionString, + } + for _, enc := range encodings { + b.Run(enc, func(b *testing.B) { + runWALBenchmark(b, enc, 1, func(ids [][]byte, objs []*tempopb.Trace, block common.WALBlock) { + ctx := context.Background() + for i := 0; i < b.N; i++ { + j := i % len(ids) + + obj, err := block.FindTraceByID(ctx, ids[j], common.DefaultSearchOptions()) + require.NoError(b, err) + require.Equal(b, objs[j], obj) + } + }) + }) + } } -func BenchmarkWALZSTD(b *testing.B) { - benchmarkWriteFindReplay(b, backend.EncZstd) + +func BenchmarkFindUnknownTraceID(b *testing.B) { + encodings := []string{ + v2.VersionString, + vparquet.VersionString, + } + for _, enc := range encodings { + b.Run(enc, func(b *testing.B) { + runWALBenchmark(b, enc, 1, func(ids [][]byte, objs []*tempopb.Trace, block common.WALBlock) { + for i := 0; i < b.N; i++ { + _, err := block.FindTraceByID(context.Background(), common.ID{}, common.DefaultSearchOptions()) + require.NoError(b, err) + } + }) + }) + } } -func BenchmarkWALS2(b *testing.B) { - benchmarkWriteFindReplay(b, backend.EncS2) +func BenchmarkSearch(b *testing.B) { + encodings := []string{ + v2.VersionString, + vparquet.VersionString, + } + for _, enc := range encodings { + b.Run(enc, func(b *testing.B) { + runWALBenchmark(b, enc, 1, func(ids [][]byte, objs []*tempopb.Trace, block common.WALBlock) { + ctx := context.Background() + + for i := 0; i < b.N; i++ { + j := i % len(ids) + id, o := ids[j], objs[j] + + k, v := findFirstAttribute(o) + require.NotEmpty(b, k) + require.NotEmpty(b, v) + + resp, err := block.Search(ctx, &tempopb.SearchRequest{ + Tags: map[string]string{ + k: v, + }, + Limit: 10, + }, common.DefaultSearchOptions()) + if err == common.ErrUnsupported { + return + } + require.NoError(b, err) + require.Equal(b, 1, len(resp.Traces)) + require.Equal(b, util.TraceIDToHexString(id), resp.Traces[0].TraceID) + } + }) + }) + } } -func benchmarkWriteFindReplay(b *testing.B, encoding backend.Encoding) { - objects := 1000 +func runWALBenchmark(b *testing.B, encoding string, flushCount int, runner func([][]byte, []*tempopb.Trace, common.WALBlock)) { + wal, err := New(&Config{ + Filepath: b.TempDir(), + Encoding: backend.EncNone, + }) + require.NoError(b, err, "unexpected error creating temp wal") + + blockID := uuid.New() + + block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, encoding) + require.NoError(b, err, "unexpected error creating block") + + dec := model.MustNewSegmentDecoder(model.CurrentEncoding) + + objects := 250 + traces := make([]*tempopb.Trace, 0, objects) objs := make([][]byte, 0, objects) ids := make([][]byte, 0, objects) for i := 0; i < objects; i++ { id := make([]byte, 16) rand.Read(id) - obj := make([]byte, rand.Intn(100)+1) - rand.Read(obj) + obj := test.MakeTrace(rand.Int()%10+1, id) + + trace.SortTrace(obj) + ids = append(ids, id) - objs = append(objs, obj) - } - b.ResetTimer() + traces = append(traces, obj) - for i := 0; i < b.N; i++ { - wal, _ := New(&Config{ - Filepath: b.TempDir(), - Encoding: encoding, - }) + b1, err := dec.PrepareForWrite(obj, 0, 0) + require.NoError(b, err) - blockID := uuid.New() - block, err := wal.NewBlock(blockID, testTenantID, "") + b2, err := dec.ToObject([][]byte{b1}) require.NoError(b, err) - // write - for j, obj := range objs { - err := block.Append(ids[j], obj, 0, 0) - require.NoError(b, err) - } + objs = append(objs, b2) + } - // find - for _, id := range ids { - _, err := block.FindTraceByID(context.Background(), id, common.SearchOptions{}) - require.NoError(b, err) + b.ResetTimer() + + for flush := 0; flush < flushCount; flush++ { + + for i := range objs { + require.NoError(b, block.Append(ids[i], objs[i], 0, 0)) } - // replay - _, err = wal.RescanBlocks(0, log.NewNopLogger()) + err = block.Flush() require.NoError(b, err) } + + if runner != nil { + runner(ids, traces, block) + } }