Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search: drop use of TagCache, extract tags and tag values on-demand #1068

Merged
merged 30 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
654efd3
Search: drop use of TagCache, extract tags and tag values on-demand
kvrhdn Oct 21, 2021
f4faba8
Fix compile-error in tests
kvrhdn Oct 21, 2021
e8b5d5f
fmt
kvrhdn Oct 21, 2021
b02db6c
Remove TagCache :cat-salute:
kvrhdn Oct 21, 2021
8a177ed
Add error handling
kvrhdn Oct 21, 2021
30c453e
lint
kvrhdn Oct 21, 2021
ec96dc0
Clean up and optimisations
kvrhdn Oct 24, 2021
c260a9a
Merge branch 'main' into drop-tag-cache
kvrhdn Oct 25, 2021
689eba8
Merge branch 'main' into drop-tag-cache
kvrhdn Oct 26, 2021
5f2d553
Fix compilation errors
kvrhdn Oct 26, 2021
f4b62c6
Refactor methods for consistency with instance.Search
kvrhdn Oct 26, 2021
d16fef7
Tweak tag lookup binary search to eliminate last comparison and fetch…
mdisibio Oct 25, 2021
ba89666
Use tempofb.FindTag
kvrhdn Oct 26, 2021
fbb66c1
Add SearchDataMap RangeKeys and RangeKeyValues
kvrhdn Oct 26, 2021
f3dfef9
make fmt
kvrhdn Oct 26, 2021
744be0c
Update CHANGELOG.md
kvrhdn Oct 26, 2021
b8724ee
Cast to string once
kvrhdn Oct 27, 2021
16570c7
Reuse SearchEntry buffer where possible
mdisibio Oct 27, 2021
ab3b5b4
fix key/value typo
mdisibio Oct 27, 2021
f0ceedc
Merge pull request #1 from mdisibio/drog-tag-cache-2
Oct 28, 2021
b66d696
Merge branch 'main' into drop-tag-cache
kvrhdn Oct 28, 2021
15292e1
Merge branch 'main' into drop-tag-cache
kvrhdn Oct 28, 2021
1fd1015
Merge branch 'main' into drop-tag-cache
kvrhdn Nov 8, 2021
6fb4a90
Merge branch 'main' into drop-tag-cache
kvrhdn Nov 8, 2021
6943eb3
Merge branch 'main' into pr-1068
annanay25 Nov 25, 2021
362d97c
Add limit on response size for a tag-values query
annanay25 Nov 25, 2021
f7e95f0
Lint and CHANGELOG
annanay25 Dec 1, 2021
535263c
lint, fix test by adding userID to ctx
annanay25 Dec 2, 2021
c33b392
Merge branch 'main' into pr-1068
annanay25 Dec 2, 2021
c0d8ac5
Merge branch 'main' into pr-1068
annanay25 Dec 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err)
}

// periodically purge tag cache, keep tags within complete block timeout (i.e. data that is locally)
instance.PurgeExpiredSearchTags(time.Now().Add(-i.cfg.CompleteBlockTimeout))
}

func (i *Ingester) flushLoop(j int) {
Expand Down
22 changes: 12 additions & 10 deletions modules/ingester/ingester_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ func (i *Ingester) SearchTags(ctx context.Context, req *tempopb.SearchTagsReques
return &tempopb.SearchTagsResponse{}, nil
}

tags := inst.GetSearchTags()

resp := &tempopb.SearchTagsResponse{
TagNames: tags,
tags, err := inst.GetSearchTags(ctx)
if err != nil {
return nil, err
}

return resp, nil
return &tempopb.SearchTagsResponse{
TagNames: tags,
}, nil
}

func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagValuesRequest) (*tempopb.SearchTagValuesResponse, error) {
Expand All @@ -56,11 +57,12 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa
return &tempopb.SearchTagValuesResponse{}, nil
}

vals := inst.GetSearchTagValues(req.TagName)

resp := &tempopb.SearchTagValuesResponse{
TagValues: vals,
vals, err := inst.GetSearchTagValues(ctx, req.TagName)
if err != nil {
return nil, err
}

return resp, nil
return &tempopb.SearchTagValuesResponse{
TagValues: vals,
}, nil
}
6 changes: 0 additions & 6 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type instance struct {
searchHeadBlock *searchStreamingBlockEntry
searchAppendBlocks map[*wal.AppendBlock]*searchStreamingBlockEntry
searchCompleteBlocks map[*wal.LocalBlock]*searchLocalBlockEntry
searchTagCache *search.TagCache

lastBlockCut time.Time

Expand Down Expand Up @@ -107,7 +106,6 @@ func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l *
traces: map[uint32]*trace{},
searchAppendBlocks: map[*wal.AppendBlock]*searchStreamingBlockEntry{},
searchCompleteBlocks: map[*wal.LocalBlock]*searchLocalBlockEntry{},
searchTagCache: search.NewTagCache(),

instanceID: instanceID,
tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID),
Expand Down Expand Up @@ -176,10 +174,6 @@ func (i *instance) PushBytes(ctx context.Context, id []byte, traceBytes []byte,
return status.Errorf(codes.FailedPrecondition, "%s max live traces per tenant exceeded: %v", overrides.ErrorPrefixLiveTracesExceeded, err)
}

if searchData != nil {
i.RecordSearchLookupValues(searchData)
}

i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

Expand Down
144 changes: 134 additions & 10 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"context"
"sort"
"time"

cortex_util "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -181,19 +180,144 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr
}
}

func (i *instance) GetSearchTags() []string {
return i.searchTagCache.GetNames()
func (i *instance) GetSearchTags(ctx context.Context) ([]string, error) {
tags := map[string]struct{}{}

i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
kv := &tempofb.KeyValues{}

for i, ii := 0, entry.TagsLength(); i < ii; i++ {
entry.Tags(kv, i)
tags[string(kv.Key())] = struct{}{}
}
})

extractTagsFromSearchableBlock := func(block search.SearchableBlock) error {
return block.Tags(ctx, tags)
}
err := func() error {
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

err := i.visitSearchableBlocksWAL(ctx, extractTagsFromSearchableBlock)
if err != nil {
return err
}
return i.visitSearchableBlocksLocalBlocks(ctx, extractTagsFromSearchableBlock)
}()
if err != nil {
return nil, err
}

tagsSlice := make([]string, 0, len(tags))
for tag := range tags {
tagsSlice = append(tagsSlice, tag)
}

return tagsSlice, nil
}

func (i *instance) GetSearchTagValues(ctx context.Context, tagName string) ([]string, error) {
values := map[string]struct{}{}

i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
kv := &tempofb.KeyValues{}

for i, tagsLength := 0, entry.TagsLength(); i < tagsLength; i++ {
entry.Tags(kv, i)

if string(kv.Key()) == tagName {
kvrhdn marked this conversation as resolved.
Show resolved Hide resolved
for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ {
values[string(kv.Value(j))] = struct{}{}
}
break
}
}
})

extractTagValuesFromSearchableBlocks := func(block search.SearchableBlock) error {
return block.TagValues(ctx, tagName, values)
}

err := func() error {
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

err := i.visitSearchableBlocksWAL(ctx, extractTagValuesFromSearchableBlocks)
if err != nil {
return err
}
return i.visitSearchableBlocksLocalBlocks(ctx, extractTagValuesFromSearchableBlocks)
}()
if err != nil {
return nil, err
}

valuesSlice := make([]string, 0, len(values))
for tag := range values {
valuesSlice = append(valuesSlice, tag)
}

return valuesSlice, nil
}

func (i *instance) GetSearchTagValues(tagName string) []string {
return i.searchTagCache.GetValues(tagName)
func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visit func(entry *tempofb.SearchEntry)) {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchEntriesLiveTraces")
defer span.Finish()

i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

for _, t := range i.traces {
for _, s := range t.searchData {
visit(tempofb.SearchEntryFromBytes(s))
}
}
}

func (i *instance) RecordSearchLookupValues(b []byte) {
s := tempofb.SearchEntryFromBytes(b)
i.searchTagCache.SetData(time.Now(), s)
// visitSearchableBlocksWAL visits every WAL block. Must be called under lock.
func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visit func(block search.SearchableBlock) error) error {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksWAL")
defer span.Finish()

visitUnderLock := func(entry *searchStreamingBlockEntry) error {
entry.mtx.RLock()
defer entry.mtx.RUnlock()

return visit(entry.b)
}

err := visitUnderLock(i.searchHeadBlock)
if err != nil {
return err
}

for _, b := range i.searchAppendBlocks {
err := visitUnderLock(b)
if err != nil {
return err
}
}
return nil
}

func (i *instance) PurgeExpiredSearchTags(before time.Time) {
i.searchTagCache.PurgeExpired(before)
// visitSearchableBlocksWAL visits every local block. Must be called under lock.
func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visit func(block search.SearchableBlock) error) error {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksLocalBlocks")
defer span.Finish()

visitUnderLock := func(entry *searchLocalBlockEntry) error {
entry.mtx.RLock()
defer entry.mtx.RUnlock()

return visit(entry.b)
}

for _, b := range i.searchCompleteBlocks {
err := visitUnderLock(b)
if err != nil {
return err
}
}
return nil
}
6 changes: 4 additions & 2 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,13 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
})

go concurrent(func() {
i.GetSearchTags()
_, err := i.GetSearchTags(context.Background())
require.NoError(t, err, "error getting search tags")
})

go concurrent(func() {
i.GetSearchTagValues(tagKey)
_, err := i.GetSearchTagValues(context.Background(), tagKey)
require.NoError(t, err, "error getting search tag values")
})

time.Sleep(2000 * time.Millisecond)
Expand Down
38 changes: 38 additions & 0 deletions tempodb/search/backend_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,44 @@ func (s *BackendSearchBlock) BlockID() uuid.UUID {
return s.id
}

func (s *BackendSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error {
hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true)
if err != nil {
return err
}
header := tempofb.GetRootAsSearchBlockHeader(hb, 0)

kv := &tempofb.KeyValues{}
for i, ii := 0, header.TagsLength(); i < ii; i++ {
header.Tags(kv, i)
tags[string(kv.Key())] = struct{}{}
}

return nil
}

func (s *BackendSearchBlock) TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error {
hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true)
if err != nil {
return err
}
header := tempofb.GetRootAsSearchBlockHeader(hb, 0)

kv := &tempofb.KeyValues{}
for i, tagsLength := 0, header.TagsLength(); i < tagsLength; i++ {
header.Tags(kv, i)

if string(kv.Key()) == tag {
kvrhdn marked this conversation as resolved.
Show resolved Hide resolved
for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ {
tagValues[string(kv.Value(j))] = struct{}{}
}
break
}
}

return nil
}

// Search iterates through the block looking for matches.
func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error {
var pageBuf []byte
Expand Down
2 changes: 2 additions & 0 deletions tempodb/search/searchable_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
)

type SearchableBlock interface {
Tags(ctx context.Context, tags map[string]struct{}) error
TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error
Search(ctx context.Context, p Pipeline, sr *Results) error
}

Expand Down
16 changes: 16 additions & 0 deletions tempodb/search/streaming_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD
return s.appender.Append(id, combined)
}

func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error {
for k := range s.header.Tags {
tags[k] = struct{}{}
}
return nil
}

func (s *StreamingSearchBlock) TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error {
if values, ok := s.header.Tags[tag]; ok {
for _, v := range values {
tagValues[v] = struct{}{}
}
}
return nil
}

// Search the streaming block.
func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error {
if s.closed.Load() {
Expand Down
Loading