Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search: Enforce Max Trace Size Bytes #1318

Merged
merged 5 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [ENHANCEMENT] Added a configuration option `search_prefer_self` to allow the queriers to do some work while also leveraging serverless in search. [#1307](https:/grafana/tempo/pull/1307) (@joe-elliott)
* [ENHANCEMENT] Make trace combination/compaction more efficient [#1291](https:/grafana/tempo/pull/1291) (@mdisibio)
* [ENHANCEMENT] Add Content-Type headers to query-frontend paths [#1306](https:/grafana/tempo/pull/1306) (@wperron)
* [ENHANCEMENT] Make search respect per tenant `max_bytes_per_trace` and added `skippedTraces` to returned search metrics. [#1318](https:/grafana/tempo/pull/1318) (@joe-elliott)
* [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https:/grafana/tempo/issues/1245)
* [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https:/grafana/tempo/issues/1136) (@adityapwr)
* [BUGFIX] Update OTLP port in examples (docker-compose & kubernetes) from legacy ports (55680/55681) to new ports (4317/4318) [#1294](https:/grafana/tempo/pull/1294) (@mapno)
Expand Down
14 changes: 13 additions & 1 deletion cmd/tempo-serverless/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"gopkg.in/yaml.v2"
)

const envConfigPrefix = "TEMPO"
const (
envConfigPrefix = "TEMPO"
)

// used to initialize a reader one time
var (
Expand All @@ -50,6 +52,11 @@ func Handler(r *http.Request) (*tempopb.SearchResponse, *HTTPError) {
return nil, httpError("parsing search request", err, http.StatusBadRequest)
}

maxBytes, err := api.ExtractServerlessParams(r)
if err != nil {
return nil, httpError("extracting serverless params", err, http.StatusBadRequest)
}

// load config, fields are set through env vars TEMPO_
reader, cfg, err := loadBackend()
if err != nil {
Expand Down Expand Up @@ -116,6 +123,11 @@ func Handler(r *http.Request) (*tempopb.SearchResponse, *HTTPError) {
resp.Metrics.InspectedTraces++
resp.Metrics.InspectedBytes += uint64(len(obj))

if maxBytes > 0 && len(obj) > maxBytes {
resp.Metrics.SkippedTraces++
continue
}

metadata, err := decoder.Matches(id, obj, searchReq.SearchReq)
if err != nil {
return nil, httpError("matching", err, http.StatusInternalServerError)
Expand Down
1 change: 1 addition & 0 deletions modules/frontend/searchsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (r *searchResponse) addResponse(res *tempopb.SearchResponse) {
r.resultsMetrics.InspectedBytes += res.Metrics.InspectedBytes
r.resultsMetrics.InspectedTraces += res.Metrics.InspectedTraces
r.resultsMetrics.SkippedBlocks += res.Metrics.SkippedBlocks
r.resultsMetrics.SkippedTraces += res.Metrics.SkippedTraces
}

func (r *searchResponse) shouldQuit() bool {
Expand Down
3 changes: 3 additions & 0 deletions modules/frontend/searchsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func TestSearchSharderRoundTrip(t *testing.T) {
InspectedBlocks: 2,
InspectedBytes: 3,
SkippedBlocks: 4,
SkippedTraces: 9,
}},
status2: 200,
response2: &tempopb.SearchResponse{
Expand All @@ -507,6 +508,7 @@ func TestSearchSharderRoundTrip(t *testing.T) {
InspectedBlocks: 6,
InspectedBytes: 7,
SkippedBlocks: 8,
SkippedTraces: 10,
}},
expectedStatus: 200,
expectedResponse: &tempopb.SearchResponse{
Expand All @@ -525,6 +527,7 @@ func TestSearchSharderRoundTrip(t *testing.T) {
InspectedBlocks: 1,
InspectedBytes: 10,
SkippedBlocks: 12,
SkippedTraces: 19,
}},
},
{
Expand Down
20 changes: 18 additions & 2 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,14 @@ func (q *Querier) SearchBlock(ctx context.Context, req *tempopb.SearchBlockReque
}

// proxy externally!
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, errors.Wrap(err, "error extracting org id for externalEndpoint")
}
maxBytes := q.limits.MaxBytesPerTrace(tenantID)

endpoint := q.cfg.SearchExternalEndpoints[rand.Intn(len(q.cfg.SearchExternalEndpoints))]
return searchExternalEndpoint(ctx, endpoint, req)
return searchExternalEndpoint(ctx, endpoint, maxBytes, req)
}

func (q *Querier) internalSearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) {
Expand Down Expand Up @@ -441,6 +447,8 @@ func (q *Querier) internalSearchBlock(ctx context.Context, req *tempopb.SearchBl
DataEncoding: req.DataEncoding,
}

maxBytes := q.limits.MaxBytesPerTrace(tenantID)

var searchErr error
respMtx := sync.Mutex{}
resp := &tempopb.SearchResponse{
Expand All @@ -458,6 +466,13 @@ func (q *Querier) internalSearchBlock(ctx context.Context, req *tempopb.SearchBl
resp.Metrics.InspectedBytes += uint64(len(obj))
respMtx.Unlock()

if maxBytes > 0 && len(obj) > maxBytes {
respMtx.Lock()
resp.Metrics.SkippedTraces++
respMtx.Unlock()
return false
}

metadata, err := decoder.Matches(id, obj, req.SearchReq)

respMtx.Lock()
Expand Down Expand Up @@ -524,7 +539,7 @@ func (q *Querier) postProcessSearchResults(req *tempopb.SearchRequest, rr []resp
return response
}

func searchExternalEndpoint(ctx context.Context, externalEndpoint string, searchReq *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) {
func searchExternalEndpoint(ctx context.Context, externalEndpoint string, maxBytes int, searchReq *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) {
req, err := http.NewRequest(http.MethodGet, externalEndpoint, nil)
if err != nil {
return nil, fmt.Errorf("external endpoint failed to make new request: %w", err)
Expand All @@ -533,6 +548,7 @@ func searchExternalEndpoint(ctx context.Context, externalEndpoint string, search
if err != nil {
return nil, fmt.Errorf("external endpoint failed to build search block request: %w", err)
}
req = api.AddServerlessParams(req, maxBytes)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
if err != nil {
return nil, fmt.Errorf("external endpoint failed to inject tenant id: %w", err)
Expand Down
6 changes: 5 additions & 1 deletion modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
Expand Down Expand Up @@ -60,7 +61,10 @@ func TestQuerierUsesSearchExternalEndpoint(t *testing.T) {
for _, tc := range tests {
numExternalRequests.Store(0)

q, err := New(tc.cfg, client.Config{}, nil, nil, nil)
o, err := overrides.NewOverrides(overrides.Limits{})
require.NoError(t, err)

q, err := New(tc.cfg, client.Config{}, nil, nil, o)
require.NoError(t, err)

for i := 0; i < tc.queriesToExecute; i++ {
Expand Down
34 changes: 34 additions & 0 deletions pkg/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const (
urlParamDataEncoding = "dataEncoding"
urlParamVersion = "version"

// maxBytes (serverless only)
urlParamMaxBytes = "maxBytes"

HeaderAccept = "Accept"
HeaderContentType = "Content-Type"
HeaderAcceptProtobuf = "application/protobuf"
Expand Down Expand Up @@ -334,6 +337,37 @@ func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRe
return req, nil
}

// AddServerlessParams takes an already existing http.Request and adds maxBytes
// to it
func AddServerlessParams(req *http.Request, maxBytes int) *http.Request {
if req == nil {
req = &http.Request{
URL: &url.URL{},
}
}

q := req.URL.Query()
q.Set(urlParamMaxBytes, strconv.FormatInt(int64(maxBytes), 10))
req.URL.RawQuery = q.Encode()

return req
}

// ExtractServerlessParams extracts params for the serverless functions from
// an http.Request
func ExtractServerlessParams(req *http.Request) (int, error) {
s, exists := extractQueryParam(req, urlParamMaxBytes)
if !exists {
return 0, nil
}
maxBytes, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return 0, fmt.Errorf("invalid maxBytes: %w", err)
}

return int(maxBytes), nil
}

func extractQueryParam(r *http.Request, param string) (string, bool) {
value := r.URL.Query().Get(param)
return value, value != ""
Expand Down
28 changes: 28 additions & 0 deletions pkg/api/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/tempo/cmd/tempo-query/tempo"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// For licensing reasons these strings exist in two packages. This test exists to make sure they don't
Expand Down Expand Up @@ -446,3 +447,30 @@ func TestBuildSearchRequest(t *testing.T) {
assert.Equal(t, tc.query, actualURL.URL.String())
}
}

func TestAddServerlessParams(t *testing.T) {
actualURL := AddServerlessParams(nil, 10)
assert.Equal(t, "?maxBytes=10", actualURL.URL.String())

req, err := http.NewRequest("GET", "http://example.com", nil)
require.NoError(t, err)

actualURL = AddServerlessParams(req, 10)
assert.Equal(t, "http://example.com?maxBytes=10", actualURL.URL.String())
}

func TestExtractServerlessParam(t *testing.T) {
r := httptest.NewRequest("GET", "http://example.com", nil)
maxBytes, err := ExtractServerlessParams(r)
require.NoError(t, err)
assert.Equal(t, 0, maxBytes)

r = httptest.NewRequest("GET", "http://example.com?maxBytes=13", nil)
maxBytes, err = ExtractServerlessParams(r)
require.NoError(t, err)
assert.Equal(t, 13, maxBytes)

r = httptest.NewRequest("GET", "http://example.com?maxBytes=blerg", nil)
_, err = ExtractServerlessParams(r)
assert.Error(t, err)
}
Loading