Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TraceQL: Grouping #2490

Merged
merged 19 commits into from
May 23, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
```
* [FEATURE] Add support for `q` query param in `/api/v2/search/<tag.name>/values` to filter results based on a TraceQL query [#2253](https:/grafana/tempo/pull/2253) (@mapno)
To make use of filtering, configure `autocomplete_filtering_enabled`.
* [FEATURE] Add support for `by()` and `coalesce()` to TraceQL. [#2490](https:/grafana/tempo/pull/2490)
* [FEATURE] Add a GRPC streaming endpoint for traceql search [#2366](https:/grafana/tempo/pull/2366) (@joe-elliott)
* [FEATURE] Add new API to summarize span metrics from generators [#2481](https:/grafana/tempo/pull/2481) (@zalegrala)
* [ENHANCEMENT] Add `scope` parameter to `/api/search/tags` [#2282](https:/grafana/tempo/pull/2282) (@joe-elliott)
Expand All @@ -37,6 +38,7 @@ To make use of filtering, configure `autocomplete_filtering_enabled`.
* [ENHANCEMENT] Introduce `overrides.Interface` to decouple implementation from usage [#2482](https:/grafana/tempo/pull/2482) (@kvrhdn)
* [BUGFIX] tempodb integer divide by zero error [#2167](https:/grafana/tempo/issues/2167) (@kroksys)
* [BUGFIX] metrics-generator: ensure Prometheus will scale up shards when remote write is lagging behind [#2463](https:/grafana/tempo/issues/2463) (@kvrhdn)
* [BUGFIX] Fixes issue where matches and other spanset level attributes were not persisted to the TraceQL results. [#2490](https:/grafana/tempo/pull/2490)
* [CHANGE] **Breaking Change** Rename s3.insecure_skip_verify [#2407](https:/grafana/tempo/pull/2407) (@zalegrala)
```yaml
storage:
Expand Down
50 changes: 26 additions & 24 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,30 +185,31 @@ $ curl -G -s http://localhost:3200/api/search --data-urlencode 'q={ status=error
{
"traces": [
{
"traceID": "169bdefcae1f19",
"rootServiceName": "gme-ruler",
"rootTraceName": "rule",
"startTimeUnixNano": "1675090379953800000",
"durationMs": 3,
"spanSet": {
"spans": [
{
"spanID": "45b795d0c4f9f6ae",
"startTimeUnixNano": "1675090379955688000",
"durationNanos": "525000",
"attributes": [
{
"key": "status",
"value": {
"stringValue": "error"
"traceID": "2f3e0cee77ae5dc9c17ade3689eb2e54",
"rootServiceName": "shop-backend",
"rootTraceName": "update-billing",
"startTimeUnixNano": "1684778327699392724",
"durationMs": 557,
"spanSets": [
{
"spans": [
{
"spanID": "563d623c76514f8e",
"startTimeUnixNano": "1684778327735077898",
"durationNanos": "446979497",
"attributes": [
{
"key": "status",
"value": {
"stringValue": "error"
}
}
}
]
}
],
"matched": 1
}
},
]
}
],
"matched": 1
}
]
],
"metrics": {
"totalBlocks": 13
Expand Down Expand Up @@ -591,7 +592,8 @@ message TraceSearchMetadata {
string rootTraceName = 3;
uint64 startTimeUnixNano = 4;
uint32 durationMs = 5;
SpanSet spanSet = 6;
SpanSet spanSet = 6; // deprecated. use SpanSets field below
repeated SpanSet spanSets = 7;
}

message SpanSet {
Expand Down
9 changes: 9 additions & 0 deletions docs/sources/tempo/traceql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,15 @@ For example, find traces that have more than 3 spans with an attribute `http.sta
{ span.http.status_code = 200 } | count() > 3
```

## Grouping

TraceQL supports a grouping pipeline operator that can be used to group by arbitrary attributes. This can be useful to
find someting like a single service with more than 1 error:

```
{ error = true } | by(resource.service.name) | count() > 1
```

## Arithmetic

TraceQL supports arbitrary arithmetic in your queries. This can be useful to make queries more human readable:
Expand Down
3 changes: 1 addition & 2 deletions docs/sources/tempo/traceql/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ For more information about TraceQL’s design, refer to the [TraceQL Concepts de

### Future work

- Additional aggregates, such as `max()`, `min()`, and others.
- Grouping
- Increase OTEL support: Events, Lists, ILS Scope, etc.
- Structural Queries
- Metrics
- Pipeline comparisons
Expand Down
27 changes: 6 additions & 21 deletions modules/frontend/search_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package frontend
import (
"context"
"net/http"
"sort"
"sync"

"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)

// searchProgressFactory is used to provide a way to construct a shardedSearchProgress to the searchSharder. It exists
Expand Down Expand Up @@ -43,7 +42,7 @@ type searchProgress struct {
statusMsg string
ctx context.Context

resultsMap map[string]*tempopb.TraceSearchMetadata
resultsCombiner *traceql.MetadataCombiner
resultsMetrics *tempopb.SearchMetrics
finishedRequests int

Expand All @@ -62,7 +61,7 @@ func newSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks, total
TotalBlockBytes: uint64(totalBlockBytes),
TotalJobs: uint32(totalJobs),
},
resultsMap: map[string]*tempopb.TraceSearchMetadata{},
resultsCombiner: traceql.NewMetadataCombiner(),
}
}

Expand All @@ -86,15 +85,7 @@ func (r *searchProgress) addResponse(res *tempopb.SearchResponse) {
defer r.mtx.Unlock()

for _, t := range res.Traces {
if _, ok := r.resultsMap[t.TraceID]; !ok {
r.resultsMap[t.TraceID] = t
} else {
// combine into the incoming trace and then set in the map. this prevents
// race conditions on pointers to traces that we've already returned from
// .result()
search.CombineSearchResults(t, r.resultsMap[t.TraceID])
r.resultsMap[t.TraceID] = t
}
r.resultsCombiner.AddMetadata(t)
}

// purposefully ignoring TotalBlocks as that value is set by the sharder
Expand Down Expand Up @@ -126,7 +117,7 @@ func (r *searchProgress) internalShouldQuit() bool {
if r.statusCode/100 != 2 {
return true
}
if len(r.resultsMap) > r.limit {
if r.resultsCombiner.Count() > r.limit {
return true
}

Expand Down Expand Up @@ -154,15 +145,9 @@ func (r *searchProgress) result() *shardedSearchResults {
TotalJobs: r.resultsMetrics.TotalJobs,
TotalBlockBytes: r.resultsMetrics.TotalBlockBytes,
},
Traces: r.resultsCombiner.Metadata(),
}

for _, t := range r.resultsMap {
searchRes.Traces = append(searchRes.Traces, t)
}
sort.Slice(searchRes.Traces, func(i, j int) bool {
return searchRes.Traces[i].StartTimeUnixNano > searchRes.Traces[j].StartTimeUnixNano
})

res.response = searchRes

return res
Expand Down
25 changes: 4 additions & 21 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"context"
"fmt"
"sort"
"strings"
"sync"

Expand Down Expand Up @@ -53,7 +52,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
sr.AllWorkersStarted()

// read and combine search results
resultsMap := map[string]*tempopb.TraceSearchMetadata{}
combiner := traceql.NewMetadataCombiner()

// collect results from all the goroutines via sr.Results channel.
// range loop will exit when sr.Results channel is closed.
Expand All @@ -63,14 +62,8 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
return nil, sr.Error()
}

// Dedupe/combine results
if existing := resultsMap[result.TraceID]; existing != nil {
search.CombineSearchResults(existing, result)
} else {
resultsMap[result.TraceID] = result
}

if len(resultsMap) >= maxResults {
combiner.AddMetadata(result)
if combiner.Count() >= maxResults {
sr.Close() // signal pending workers to exit
break
}
Expand All @@ -81,18 +74,8 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
return nil, sr.Error()
}

results := make([]*tempopb.TraceSearchMetadata, 0, len(resultsMap))
for _, result := range resultsMap {
results = append(results, result)
}

// Sort
sort.Slice(results, func(i, j int) bool {
return results[i].StartTimeUnixNano > results[j].StartTimeUnixNano
})

return &tempopb.SearchResponse{
Traces: results,
Traces: combiner.Metadata(),
Metrics: &tempopb.SearchMetrics{
InspectedTraces: sr.TracesInspected(),
InspectedBytes: sr.BytesInspected(),
Expand Down
27 changes: 0 additions & 27 deletions pkg/search/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,30 +59,3 @@ func GetVirtualIntrinsicValues() []string {
traceql.IntrinsicStatus.String(),
}
}

// CombineSearchResults overlays the incoming search result with the existing result. This is required
// for the following reason: a trace may be present in multiple blocks, or in partial segments
// in live traces. The results should reflect elements of all segments.
func CombineSearchResults(existing *tempopb.TraceSearchMetadata, incoming *tempopb.TraceSearchMetadata) {
if existing.TraceID == "" {
existing.TraceID = incoming.TraceID
}

if existing.RootServiceName == "" {
existing.RootServiceName = incoming.RootServiceName
}

if existing.RootTraceName == "" {
existing.RootTraceName = incoming.RootTraceName
}

// Earliest start time.
if existing.StartTimeUnixNano > incoming.StartTimeUnixNano {
existing.StartTimeUnixNano = incoming.StartTimeUnixNano
}

// Longest duration
if existing.DurationMs < incoming.DurationMs {
existing.DurationMs = incoming.DurationMs
}
}
Loading