From 67548186320f36367c3b641761657620107cbd97 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 24 Aug 2021 08:07:41 +0800 Subject: [PATCH] Introduce model.Trace (#5983) Introduce type model.Trace and field APMEvent.Trace, replacing {Error,Span,Transaction}.TraceID. --- model/apmevent.go | 2 + model/apmevent_test.go | 5 + model/error.go | 5 +- model/modeldecoder/rumv3/decoder.go | 6 +- model/modeldecoder/rumv3/metadata_test.go | 1 + model/modeldecoder/rumv3/transaction_test.go | 2 +- model/modeldecoder/v2/decoder.go | 6 +- model/modeldecoder/v2/metadata_test.go | 2 + model/span.go | 6 +- model/span_test.go | 4 +- model/trace.go | 34 ++++++ model/transaction.go | 5 +- processor/otel/exceptions_test.go | 6 +- processor/otel/metadata_test.go | 1 + processor/otel/traces.go | 9 +- processor/otel/traces_test.go | 4 +- .../eventstorage/sharded_bench_test.go | 6 +- .../eventstorage/storage_bench_test.go | 5 +- .../sampling/eventstorage/storage_test.go | 4 +- x-pack/apm-server/sampling/groups.go | 2 +- x-pack/apm-server/sampling/groups_test.go | 36 +++--- x-pack/apm-server/sampling/processor.go | 16 +-- .../sampling/processor_bench_test.go | 13 +-- x-pack/apm-server/sampling/processor_test.go | 104 ++++++++++-------- 24 files changed, 165 insertions(+), 119 deletions(-) create mode 100644 model/trace.go diff --git a/model/apmevent.go b/model/apmevent.go index 8c06713fc7e..7715585a41d 100644 --- a/model/apmevent.go +++ b/model/apmevent.go @@ -53,6 +53,7 @@ type APMEvent struct { Network Network Session Session URL URL + Trace Trace // Timestamp holds the event timestamp. // @@ -132,6 +133,7 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event { fields.maybeSetMapStr("event", e.Event.fields()) fields.maybeSetMapStr("url", e.URL.fields()) fields.maybeSetMapStr("session", e.Session.fields()) + fields.maybeSetMapStr("trace", e.Trace.fields()) fields.maybeSetString("message", e.Message) return event } diff --git a/model/apmevent_test.go b/model/apmevent_test.go index a071a292d63..456689b0372 100644 --- a/model/apmevent_test.go +++ b/model/apmevent_test.go @@ -39,6 +39,7 @@ func TestAPMEventFields(t *testing.T) { outcome := "success" destinationAddress := "1.2.3.4" destinationPort := 1234 + traceID := "trace_id" for _, test := range []struct { input APMEvent @@ -73,6 +74,7 @@ func TestAPMEventFields(t *testing.T) { Message: "bottle", Transaction: &Transaction{}, Timestamp: time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)), + Trace: Trace{ID: traceID}, }, output: common.MapStr{ // common fields @@ -102,6 +104,9 @@ func TestAPMEventFields(t *testing.T) { "c": 123, }, "message": "bottle", + "trace": common.MapStr{ + "id": traceID, + }, // fields related to APMEvent.Transaction "processor": common.MapStr{ diff --git a/model/error.go b/model/error.go index 1ef6b47bf0d..2458d6e1fe9 100644 --- a/model/error.go +++ b/model/error.go @@ -39,7 +39,6 @@ const ( type Error struct { ID string TransactionID string - TraceID string ParentID string GroupingKey string @@ -102,11 +101,9 @@ func (e *Error) fields() common.MapStr { transaction.maybeSetBool("sampled", e.TransactionSampled) fields.maybeSetMapStr("transaction", common.MapStr(transaction)) - var parent, trace mapStr + var parent mapStr parent.maybeSetString("id", e.ParentID) - trace.maybeSetString("id", e.TraceID) fields.maybeSetMapStr("parent", common.MapStr(parent)) - fields.maybeSetMapStr("trace", common.MapStr(trace)) var errorFields mapStr errorFields.maybeSetString("id", e.ID) diff --git a/model/modeldecoder/rumv3/decoder.go b/model/modeldecoder/rumv3/decoder.go index 7d59d2825d6..8be60dca7ce 100644 --- a/model/modeldecoder/rumv3/decoder.go +++ b/model/modeldecoder/rumv3/decoder.go @@ -173,7 +173,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch span := input.Base mapToSpanModel(&s, &span) span.Span.TransactionID = transaction.Transaction.ID - span.Span.TraceID = transaction.Transaction.TraceID + span.Trace = transaction.Trace *batch = append(*batch, span) } spans := (*batch)[offset:] @@ -277,7 +277,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) { event.Timestamp = from.Timestamp.Val } if from.TraceID.IsSet() { - out.TraceID = from.TraceID.Val + event.Trace.ID = from.TraceID.Val } if from.Transaction.Sampled.IsSet() { val := from.Transaction.Sampled.Val @@ -800,7 +800,7 @@ func mapToTransactionModel(from *transaction, event *model.APMEvent) { out.SpanCount.Started = &started } if from.TraceID.IsSet() { - out.TraceID = from.TraceID.Val + event.Trace.ID = from.TraceID.Val } if from.Type.IsSet() { out.Type = from.Type.Val diff --git a/model/modeldecoder/rumv3/metadata_test.go b/model/modeldecoder/rumv3/metadata_test.go index 22ee257d201..14b8da2ab50 100644 --- a/model/modeldecoder/rumv3/metadata_test.go +++ b/model/modeldecoder/rumv3/metadata_test.go @@ -65,6 +65,7 @@ func metadataExceptions(keys ...string) func(key string) bool { "Host", "Event", "Session", + "Trace", "URL", // event-specific fields diff --git a/model/modeldecoder/rumv3/transaction_test.go b/model/modeldecoder/rumv3/transaction_test.go index 33b4c9499dd..3614ac3534b 100644 --- a/model/modeldecoder/rumv3/transaction_test.go +++ b/model/modeldecoder/rumv3/transaction_test.go @@ -74,7 +74,7 @@ func TestDecodeNestedTransaction(t *testing.T) { start := time.Duration(20 * 1000 * 1000) assert.Equal(t, now.Add(start), batch[3].Timestamp) //add start to timestamp assert.Equal(t, "100", batch[3].Span.TransactionID) - assert.Equal(t, "1", batch[3].Span.TraceID) + assert.Equal(t, "1", batch[3].Trace.ID) assert.Equal(t, "100", batch[3].Span.ParentID) for _, event := range batch { diff --git a/model/modeldecoder/v2/decoder.go b/model/modeldecoder/v2/decoder.go index 23149dae4b9..cb242bdaa58 100644 --- a/model/modeldecoder/v2/decoder.go +++ b/model/modeldecoder/v2/decoder.go @@ -334,7 +334,7 @@ func mapToErrorModel(from *errorEvent, config modeldecoder.Config, event *model. event.Timestamp = from.Timestamp.Val } if from.TraceID.IsSet() { - out.TraceID = from.TraceID.Val + event.Trace.ID = from.TraceID.Val } if from.Transaction.Sampled.IsSet() { val := from.Transaction.Sampled.Val @@ -930,7 +930,7 @@ func mapToSpanModel(from *span, config modeldecoder.Config, event *model.APMEven ) } if from.TraceID.IsSet() { - out.TraceID = from.TraceID.Val + event.Trace.ID = from.TraceID.Val } if from.TransactionID.IsSet() { out.TransactionID = from.TransactionID.Val @@ -1126,7 +1126,7 @@ func mapToTransactionModel(from *transaction, config modeldecoder.Config, event event.Timestamp = from.Timestamp.Val } if from.TraceID.IsSet() { - out.TraceID = from.TraceID.Val + event.Trace.ID = from.TraceID.Val } if from.Type.IsSet() { out.Type = from.Type.Val diff --git a/model/modeldecoder/v2/metadata_test.go b/model/modeldecoder/v2/metadata_test.go index 3f4caf17216..1c4c36007e8 100644 --- a/model/modeldecoder/v2/metadata_test.go +++ b/model/modeldecoder/v2/metadata_test.go @@ -87,6 +87,8 @@ func isUnmappedMetadataField(key string) bool { "Session.ID", "Session", "Session.Sequence", + "Trace", + "Trace.ID", "URL", "URL.Original", "URL.Scheme", diff --git a/model/span.go b/model/span.go index a69b2f8630a..a3a9ce92e4b 100644 --- a/model/span.go +++ b/model/span.go @@ -41,7 +41,6 @@ type Span struct { TransactionID string ParentID string ChildIDs []string - TraceID string Message *Message Name string @@ -141,10 +140,7 @@ func (e *Span) fields(apmEvent *APMEvent) common.MapStr { fields := mapStr{"processor": spanProcessorEntry} - var trace, transaction, parent mapStr - if trace.maybeSetString("id", e.TraceID) { - fields.set("trace", common.MapStr(trace)) - } + var transaction, parent mapStr if transaction.maybeSetString("id", e.TransactionID) { fields.set("transaction", common.MapStr(transaction)) } diff --git a/model/span_test.go b/model/span_test.go index 85a3feaf336..b1f4c8419ef 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -30,7 +30,7 @@ import ( func TestSpanTransform(t *testing.T) { path := "test/path" start := 0.65 - hexID, parentID, traceID := "0147258369012345", "abcdef0123456789", "01234567890123456789abcdefa" + hexID, parentID := "0147258369012345", "abcdef0123456789" subtype := "amqp" action := "publish" timestamp := time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, @@ -65,7 +65,6 @@ func TestSpanTransform(t *testing.T) { Msg: "Full Span", Span: Span{ ID: hexID, - TraceID: traceID, ParentID: parentID, Name: "myspan", Type: "myspantype", @@ -135,7 +134,6 @@ func TestSpanTransform(t *testing.T) { }, "processor": common.MapStr{"event": "span", "name": "transaction"}, "timestamp": common.MapStr{"us": timestampUs}, - "trace": common.MapStr{"id": traceID}, "parent": common.MapStr{"id": parentID}, "http": common.MapStr{ "response": common.MapStr{"status_code": statusCode}, diff --git a/model/trace.go b/model/trace.go new file mode 100644 index 00000000000..3933d24894c --- /dev/null +++ b/model/trace.go @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +import ( + "github.com/elastic/beats/v7/libbeat/common" +) + +// Trace holds information about a distributed trace. +type Trace struct { + // ID holds a unique identifier of the trace. + ID string +} + +func (t *Trace) fields() common.MapStr { + var fields mapStr + fields.maybeSetString("id", t.ID) + return common.MapStr(fields) +} diff --git a/model/transaction.go b/model/transaction.go index c62d77ad1cb..9a7fd50765e 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -39,7 +39,6 @@ var ( type Transaction struct { ID string ParentID string - TraceID string Type string Name string @@ -74,11 +73,9 @@ func (e *Transaction) fields() common.MapStr { "processor": transactionProcessorEntry, } - var parent, trace mapStr + var parent mapStr parent.maybeSetString("id", e.ParentID) - trace.maybeSetString("id", e.TraceID) fields.maybeSetMapStr("parent", common.MapStr(parent)) - fields.maybeSetMapStr("trace", common.MapStr(trace)) if e.HTTP != nil { fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields()) } diff --git a/processor/otel/exceptions_test.go b/processor/otel/exceptions_test.go index 93f2f515510..9a547613210 100644 --- a/processor/otel/exceptions_test.go +++ b/processor/otel/exceptions_test.go @@ -114,8 +114,8 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Trace: transactionEvent.Trace, Error: &model.Error{ - TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, TransactionID: transactionEvent.Transaction.ID, TransactionType: transactionEvent.Transaction.Type, @@ -161,8 +161,8 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Trace: transactionEvent.Trace, Error: &model.Error{ - TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, TransactionID: transactionEvent.Transaction.ID, TransactionType: transactionEvent.Transaction.Type, @@ -317,8 +317,8 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) { Service: service, Agent: agent, Timestamp: timestamp, + Trace: transactionEvent.Trace, Error: &model.Error{ - TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, TransactionID: transactionEvent.Transaction.ID, TransactionType: transactionEvent.Transaction.Type, diff --git a/processor/otel/metadata_test.go b/processor/otel/metadata_test.go index 0fcf584a0d3..9c9c66fe21b 100644 --- a/processor/otel/metadata_test.go +++ b/processor/otel/metadata_test.go @@ -234,6 +234,7 @@ func transformResourceMetadata(t *testing.T, resourceAttrs map[string]pdata.Attr otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) events := transformTraces(t, traces) events[0].Transaction = nil + events[0].Trace = model.Trace{} events[0].Event.Outcome = "" events[0].Timestamp = time.Time{} return events[0] diff --git a/processor/otel/traces.go b/processor/otel/traces.go index 4601b63f2f3..7c010f4845b 100644 --- a/processor/otel/traces.go +++ b/processor/otel/traces.go @@ -190,9 +190,6 @@ func (c *Consumer) convertSpan( parentID = otelSpan.ParentSpanID().HexString() } - traceID := otelSpan.TraceID().HexString() - spanID := otelSpan.SpanID().HexString() - startTime := otelSpan.StartTimestamp().AsTime() endTime := otelSpan.EndTimestamp().AsTime() var durationMillis float64 @@ -206,15 +203,16 @@ func (c *Consumer) convertSpan( // now, we assume that the majority of consumption is passive, and // therefore start a transaction whenever span kind == consumer. name := otelSpan.Name() + spanID := otelSpan.SpanID().HexString() event := baseEvent event.Labels = initEventLabels(event.Labels) event.Timestamp = startTime.Add(timeDelta) + event.Trace.ID = otelSpan.TraceID().HexString() event.Event.Outcome = spanStatusOutcome(otelSpan.Status()) if root || otelSpan.Kind() == pdata.SpanKindServer || otelSpan.Kind() == pdata.SpanKindConsumer { event.Transaction = &model.Transaction{ ID: spanID, ParentID: parentID, - TraceID: traceID, Duration: durationMillis, Name: name, Sampled: true, @@ -224,7 +222,6 @@ func (c *Consumer) convertSpan( event.Span = &model.Span{ ID: spanID, ParentID: parentID, - TraceID: traceID, Duration: durationMillis, Name: name, } @@ -948,7 +945,6 @@ func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent) *mo func addTransactionCtxToErr(transaction *model.Transaction, err *model.Error) { err.TransactionID = transaction.ID - err.TraceID = transaction.TraceID err.ParentID = transaction.ID err.HTTP = transaction.HTTP err.Custom = transaction.Custom @@ -957,7 +953,6 @@ func addTransactionCtxToErr(transaction *model.Transaction, err *model.Error) { } func addSpanCtxToErr(span *model.Span, err *model.Error) { - err.TraceID = span.TraceID err.ParentID = span.ID } diff --git a/processor/otel/traces_test.go b/processor/otel/traces_test.go index 0fe56481307..b840044c8da 100644 --- a/processor/otel/traces_test.go +++ b/processor/otel/traces_test.go @@ -790,8 +790,8 @@ func TestConsumer_JaegerTraceID(t *testing.T) { require.NoError(t, (&otel.Consumer{Processor: recorder}).ConsumeTraces(context.Background(), traces)) batch := *batches[0] - assert.Equal(t, "00000000000000000000000046467830", batch[0].Transaction.TraceID) - assert.Equal(t, "00000000464678300000000046467830", batch[1].Transaction.TraceID) + assert.Equal(t, "00000000000000000000000046467830", batch[0].Trace.ID) + assert.Equal(t, "00000000464678300000000046467830", batch[1].Trace.ID) } func TestConsumer_JaegerTransaction(t *testing.T) { diff --git a/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go index 3b6d82b9dba..86f15d80923 100644 --- a/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go @@ -24,7 +24,7 @@ func BenchmarkShardedWriteTransactionUncontended(b *testing.B) { b.RunParallel(func(pb *testing.PB) { traceID := uuid.Must(uuid.NewV4()).String() transaction := &model.APMEvent{ - Transaction: &model.Transaction{TraceID: traceID, ID: traceID}, + Transaction: &model.Transaction{ID: traceID}, } for pb.Next() { if err := sharded.WriteTraceEvent(traceID, traceID, transaction); err != nil { @@ -48,9 +48,7 @@ func BenchmarkShardedWriteTransactionContended(b *testing.B) { b.RunParallel(func(pb *testing.PB) { transactionID := uuid.Must(uuid.NewV4()).String() transaction := &model.APMEvent{ - Transaction: &model.Transaction{ - TraceID: traceID, ID: transactionID, - }, + Transaction: &model.Transaction{ID: transactionID}, } for pb.Next() { if err := sharded.WriteTraceEvent(traceID, transactionID, transaction); err != nil { diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go index d505d3a5f49..8a9241163be 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -28,7 +28,7 @@ func BenchmarkWriteTransaction(b *testing.B) { traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8}) transaction := &model.APMEvent{ - Transaction: &model.Transaction{TraceID: traceID, ID: transactionID}, + Transaction: &model.Transaction{ID: transactionID}, } b.ResetTimer() @@ -69,8 +69,7 @@ func BenchmarkReadEvents(b *testing.B) { transactionID := uuid.Must(uuid.NewV4()).String() transaction := &model.APMEvent{ Transaction: &model.Transaction{ - TraceID: traceID, - ID: transactionID, + ID: transactionID, }, } if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil { diff --git a/x-pack/apm-server/sampling/eventstorage/storage_test.go b/x-pack/apm-server/sampling/eventstorage/storage_test.go index c8638c72020..fd0f2f054f2 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_test.go @@ -43,7 +43,7 @@ func testWriteEvents(t *testing.T, numSpans int) { traceID := uuid.Must(uuid.NewV4()).String() transactionID := uuid.Must(uuid.NewV4()).String() transaction := model.APMEvent{ - Transaction: &model.Transaction{TraceID: traceID, ID: transactionID}, + Transaction: &model.Transaction{ID: transactionID}, } assert.NoError(t, readWriter.WriteTraceEvent(traceID, transactionID, &transaction)) @@ -51,7 +51,7 @@ func testWriteEvents(t *testing.T, numSpans int) { for i := 0; i < numSpans; i++ { spanID := uuid.Must(uuid.NewV4()).String() span := model.APMEvent{ - Span: &model.Span{TraceID: traceID, ID: spanID}, + Span: &model.Span{ID: spanID}, } assert.NoError(t, readWriter.WriteTraceEvent(traceID, spanID, &span)) spanEvents = append(spanEvents, span) diff --git a/x-pack/apm-server/sampling/groups.go b/x-pack/apm-server/sampling/groups.go index bc9d23c3cf0..2a619e826d6 100644 --- a/x-pack/apm-server/sampling/groups.go +++ b/x-pack/apm-server/sampling/groups.go @@ -163,7 +163,7 @@ func (g *traceGroup) sampleTrace(transactionEvent *model.APMEvent) (bool, error) g.mu.Lock() defer g.mu.Unlock() g.total++ - return g.reservoir.Sample(transactionEvent.Transaction.Duration, transactionEvent.Transaction.TraceID), nil + return g.reservoir.Sample(transactionEvent.Transaction.Duration, transactionEvent.Trace.ID), nil } // finalizeSampledTraces locks the groups, appends their current trace IDs to diff --git a/x-pack/apm-server/sampling/groups_test.go b/x-pack/apm-server/sampling/groups_test.go index 7b29c41253e..776b82a7ee5 100644 --- a/x-pack/apm-server/sampling/groups_test.go +++ b/x-pack/apm-server/sampling/groups_test.go @@ -25,10 +25,12 @@ func TestTraceGroupsPolicies(t *testing.T) { Event: model.Event{ Outcome: traceOutcome, }, + Trace: model.Trace{ + ID: uuid.Must(uuid.NewV4()).String(), + }, Transaction: &model.Transaction{ - Name: traceName, - TraceID: uuid.Must(uuid.NewV4()).String(), - ID: uuid.Must(uuid.NewV4()).String(), + Name: traceName, + ID: uuid.Must(uuid.NewV4()).String(), }, } } @@ -98,10 +100,12 @@ func TestTraceGroupsMax(t *testing.T) { Service: model.Service{ Name: serviceName, }, + Trace: model.Trace{ + ID: uuid.Must(uuid.NewV4()).String(), + }, Transaction: &model.Transaction{ - Name: "whatever", - TraceID: uuid.Must(uuid.NewV4()).String(), - ID: uuid.Must(uuid.NewV4()).String(), + Name: "whatever", + ID: uuid.Must(uuid.NewV4()).String(), }, }) require.NoError(t, err) @@ -110,10 +114,12 @@ func TestTraceGroupsMax(t *testing.T) { } admitted, err := groups.sampleTrace(&model.APMEvent{ + Trace: model.Trace{ + ID: uuid.Must(uuid.NewV4()).String(), + }, Transaction: &model.Transaction{ - Name: "overflow", - TraceID: uuid.Must(uuid.NewV4()).String(), - ID: uuid.Must(uuid.NewV4()).String(), + Name: "overflow", + ID: uuid.Must(uuid.NewV4()).String(), }, }) assert.Equal(t, errTooManyTraceGroups, err) @@ -131,9 +137,11 @@ func TestTraceGroupReservoirResize(t *testing.T) { sendTransactions := func(n int) { for i := 0; i < n; i++ { groups.sampleTrace(&model.APMEvent{ + Trace: model.Trace{ + ID: "0102030405060708090a0b0c0d0e0f10", + }, Transaction: &model.Transaction{ - TraceID: "0102030405060708090a0b0c0d0e0f10", - ID: "0102030405060708", + ID: "0102030405060708", }, }) } @@ -171,9 +179,11 @@ func TestTraceGroupReservoirResizeMinimum(t *testing.T) { sendTransactions := func(n int) { for i := 0; i < n; i++ { groups.sampleTrace(&model.APMEvent{ + Trace: model.Trace{ + ID: "0102030405060708090a0b0c0d0e0f10", + }, Transaction: &model.Transaction{ - TraceID: "0102030405060708090a0b0c0d0e0f10", - ID: "0102030405060708", + ID: "0102030405060708", }, }) } diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 94750d3ae92..5ca32aa753b 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -208,7 +208,7 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo return true, false, nil } - traceSampled, err := p.storage.IsTraceSampled(event.Transaction.TraceID) + traceSampled, err := p.storage.IsTraceSampled(event.Trace.ID) switch err { case nil: // Tail-sampling decision has been made: report the transaction @@ -226,7 +226,7 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo // Non-root transaction: write to local storage while we wait // for a sampling decision. return false, true, p.storage.WriteTraceEvent( - event.Transaction.TraceID, event.Transaction.ID, event, + event.Trace.ID, event.Transaction.ID, event, ) } @@ -251,24 +251,24 @@ sampling policies without service name specified. // This is a local optimisation only. To avoid creating network // traffic and load on Elasticsearch for uninteresting root // transactions, we do not propagate this to other APM Servers. - return false, false, p.storage.WriteTraceSampled(event.Transaction.TraceID, false) + return false, false, p.storage.WriteTraceSampled(event.Trace.ID, false) } // The root transaction was admitted to the sampling reservoir, so we // can proceed to write the transaction to storage; we may index it later, // after finalising the sampling decision. return false, true, p.storage.WriteTraceEvent( - event.Transaction.TraceID, event.Transaction.ID, event, + event.Trace.ID, event.Transaction.ID, event, ) } func (p *Processor) processSpan(event *model.APMEvent) (report, stored bool, _ error) { - traceSampled, err := p.storage.IsTraceSampled(event.Span.TraceID) + traceSampled, err := p.storage.IsTraceSampled(event.Trace.ID) if err != nil { if err == eventstorage.ErrNotFound { // Tail-sampling decision has not yet been made, write event to local storage. return false, true, p.storage.WriteTraceEvent( - event.Span.TraceID, event.Span.ID, event, + event.Trace.ID, event.Span.ID, event, ) } return false, false, err @@ -465,11 +465,11 @@ func (p *Processor) Run() error { // at-most-once, not guaranteed. for _, event := range events { if event.Transaction != nil { - if err := p.storage.DeleteTraceEvent(event.Transaction.TraceID, event.Transaction.ID); err != nil { + if err := p.storage.DeleteTraceEvent(event.Trace.ID, event.Transaction.ID); err != nil { return errors.Wrap(err, "failed to delete transaction from local storage") } } else if event.Span != nil { - if err := p.storage.DeleteTraceEvent(event.Span.TraceID, event.Span.ID); err != nil { + if err := p.storage.DeleteTraceEvent(event.Trace.ID, event.Span.ID); err != nil { return errors.Wrap(err, "failed to delete span from local storage") } } diff --git a/x-pack/apm-server/sampling/processor_bench_test.go b/x-pack/apm-server/sampling/processor_bench_test.go index 2a20fc0034d..3c881443848 100644 --- a/x-pack/apm-server/sampling/processor_bench_test.go +++ b/x-pack/apm-server/sampling/processor_bench_test.go @@ -37,21 +37,20 @@ func BenchmarkProcess(b *testing.B) { binary.LittleEndian.PutUint64(traceID[8:], rng.Uint64()) transactionID := traceID[:8] spanID := traceID[8:] + trace := model.Trace{ID: hex.EncodeToString(traceID[:])} transaction := &model.Transaction{ - TraceID: hex.EncodeToString(traceID[:]), - ID: hex.EncodeToString(transactionID), + ID: hex.EncodeToString(transactionID), } spanParentID := hex.EncodeToString(transactionID) span := &model.Span{ - TraceID: hex.EncodeToString(traceID[:]), ID: hex.EncodeToString(spanID), ParentID: spanParentID, } batch := model.Batch{ - {Transaction: transaction}, - {Span: span}, - {Span: span}, - {Span: span}, + {Trace: trace, Transaction: transaction}, + {Trace: trace, Span: span}, + {Trace: trace, Span: span}, + {Trace: trace, Span: span}, } if err := processor.ProcessBatch(context.Background(), &batch); err != nil { b.Fatal(err) diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index f36889a69b2..ef3b3022df4 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -37,8 +37,10 @@ func TestProcessUnsampled(t *testing.T) { defer processor.Stop(context.Background()) in := model.Batch{{ + Trace: model.Trace{ + ID: "0102030405060708090a0b0c0d0e0f10", + }, Transaction: &model.Transaction{ - TraceID: "0102030405060708090a0b0c0d0e0f10", ID: "0102030405060708", Sampled: false, }, @@ -56,19 +58,19 @@ func TestProcessAlreadyTailSampled(t *testing.T) { // Seed event storage with a tail-sampling decisions, to show that // subsequent events in the trace will be reported immediately. - traceID1 := "0102030405060708090a0b0c0d0e0f10" - traceID2 := "0102030405060708090a0b0c0d0e0f11" + trace1 := model.Trace{ID: "0102030405060708090a0b0c0d0e0f10"} + trace2 := model.Trace{ID: "0102030405060708090a0b0c0d0e0f11"} withBadger(t, config.StorageDir, func(db *badger.DB) { storage := eventstorage.New(db, eventstorage.JSONCodec{}, time.Minute) writer := storage.NewReadWriter() defer writer.Close() - assert.NoError(t, writer.WriteTraceSampled(traceID1, true)) + assert.NoError(t, writer.WriteTraceSampled(trace1.ID, true)) assert.NoError(t, writer.Flush()) storage = eventstorage.New(db, eventstorage.JSONCodec{}, -1) // expire immediately writer = storage.NewReadWriter() defer writer.Close() - assert.NoError(t, writer.WriteTraceSampled(traceID2, true)) + assert.NoError(t, writer.WriteTraceSampled(trace2.ID, true)) assert.NoError(t, writer.Flush()) }) @@ -78,29 +80,25 @@ func TestProcessAlreadyTailSampled(t *testing.T) { defer processor.Stop(context.Background()) transaction1 := &model.Transaction{ - TraceID: traceID1, ID: "0102030405060708", Sampled: true, } span1 := &model.Span{ - TraceID: traceID1, - ID: "0102030405060709", + ID: "0102030405060709", } transaction2 := &model.Transaction{ - TraceID: traceID2, ID: "0102030405060710", Sampled: true, } span2 := &model.Span{ - TraceID: traceID2, - ID: "0102030405060711", + ID: "0102030405060711", } batch := model.Batch{ - {Transaction: transaction1}, - {Transaction: transaction2}, - {Span: span1}, - {Span: span2}, + {Trace: trace1, Transaction: transaction1}, + {Trace: trace2, Transaction: transaction2}, + {Trace: trace1, Span: span1}, + {Trace: trace2, Span: span2}, } err = processor.ProcessBatch(context.Background(), &batch) require.NoError(t, err) @@ -109,8 +107,8 @@ func TestProcessAlreadyTailSampled(t *testing.T) { // reported immediately, whereas the second ones should be written storage since // they were received after the trace sampling entry expired. assert.Equal(t, model.Batch{ - {Transaction: transaction1}, - {Span: span1}, + {Trace: trace1, Transaction: transaction1}, + {Trace: trace1, Span: span1}, }, batch) expectedMonitoring := monitoring.MakeFlatSnapshot() @@ -127,15 +125,15 @@ func TestProcessAlreadyTailSampled(t *testing.T) { defer reader.Close() var batch model.Batch - err := reader.ReadTraceEvents(traceID1, &batch) + err := reader.ReadTraceEvents(trace1.ID, &batch) assert.NoError(t, err) assert.Zero(t, batch) - err = reader.ReadTraceEvents(traceID2, &batch) + err = reader.ReadTraceEvents(trace2.ID, &batch) assert.NoError(t, err) assert.Equal(t, model.Batch{ - {Transaction: transaction2}, - {Span: span2}, + {Trace: trace2, Transaction: transaction2}, + {Trace: trace2, Span: span2}, }, batch) }) } @@ -150,30 +148,32 @@ func TestProcessLocalTailSampling(t *testing.T) { processor, err := sampling.NewProcessor(config) require.NoError(t, err) - traceID1 := "0102030405060708090a0b0c0d0e0f10" - traceID2 := "0102030405060708090a0b0c0d0e0f11" - trace1Events := model.Batch{ - {Transaction: &model.Transaction{ - TraceID: traceID1, + trace1 := model.Trace{ID: "0102030405060708090a0b0c0d0e0f10"} + trace2 := model.Trace{ID: "0102030405060708090a0b0c0d0e0f11"} + trace1Events := model.Batch{{ + Trace: trace1, + Transaction: &model.Transaction{ ID: "0102030405060708", Duration: 123, Sampled: true, - }}, - {Span: &model.Span{ - TraceID: traceID1, + }, + }, { + Trace: trace1, + Span: &model.Span{ ID: "0102030405060709", Duration: 123, - }}, - } - trace2Events := model.Batch{ - {Transaction: &model.Transaction{ - TraceID: traceID2, + }, + }} + trace2Events := model.Batch{{ + Trace: trace2, + Transaction: &model.Transaction{ ID: "0102030405060710", Duration: 456, Sampled: true, - }}, - {Span: &model.Span{ - TraceID: traceID2, + }, + }, { + Trace: trace2, + Span: &model.Span{ ID: "0102030405060711", Duration: 456, }}, @@ -207,11 +207,11 @@ func TestProcessLocalTailSampling(t *testing.T) { case <-time.After(50 * time.Millisecond): } - unsampledTraceID := traceID2 + unsampledTraceID := trace2.ID sampledTraceEvents := trace1Events unsampledTraceEvents := trace2Events - if sampledTraceID == traceID2 { - unsampledTraceID = traceID1 + if sampledTraceID == trace2.ID { + unsampledTraceID = trace1.ID unsampledTraceEvents = trace1Events sampledTraceEvents = trace2Events } @@ -266,8 +266,10 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) { traceID := uuid.Must(uuid.NewV4()).String() traceIDs[i] = traceID batch := model.Batch{{ + Trace: model.Trace{ + ID: traceID, + }, Transaction: &model.Transaction{ - TraceID: traceID, ID: traceID, Duration: 1, Sampled: true, @@ -331,9 +333,11 @@ func TestProcessLocalTailSamplingPolicyOrder(t *testing.T) { require.NoError(t, err) events[i] = model.APMEvent{ Service: service, + Trace: model.Trace{ + ID: fmt.Sprintf("%x", traceIDBytes[:]), + }, Transaction: &model.Transaction{ Name: "trace_name", - TraceID: fmt.Sprintf("%x", traceIDBytes[:]), ID: fmt.Sprintf("%x", traceIDBytes[8:]), Duration: 123, Sampled: true, @@ -399,8 +403,10 @@ func TestProcessRemoteTailSampling(t *testing.T) { traceID1 := "0102030405060708090a0b0c0d0e0f10" traceID2 := "0102030405060708090a0b0c0d0e0f11" trace1Events := model.Batch{{ + Trace: model.Trace{ + ID: traceID1, + }, Span: &model.Span{ - TraceID: traceID1, ID: "0102030405060709", Duration: 123, }, @@ -481,8 +487,10 @@ func TestGroupsMonitoring(t *testing.T) { for i := 0; i < config.MaxDynamicServices+1; i++ { err := processor.ProcessBatch(context.Background(), &model.Batch{{ Service: model.Service{Name: fmt.Sprintf("service_%d", i)}, + Trace: model.Trace{ + ID: uuid.Must(uuid.NewV4()).String(), + }, Transaction: &model.Transaction{ - TraceID: uuid.Must(uuid.NewV4()).String(), ID: "0102030405060709", Duration: 123, Sampled: true, @@ -509,8 +517,10 @@ func TestStorageMonitoring(t *testing.T) { for i := 0; i < 100; i++ { traceID := uuid.Must(uuid.NewV4()).String() batch := model.Batch{{ + Trace: model.Trace{ + ID: traceID, + }, Transaction: &model.Transaction{ - TraceID: traceID, ID: traceID, Duration: 123, Sampled: true, @@ -552,8 +562,10 @@ func TestStorageGC(t *testing.T) { for i := 0; i < n; i++ { traceID := uuid.Must(uuid.NewV4()).String() batch := model.Batch{{ + Trace: model.Trace{ + ID: traceID, + }, Span: &model.Span{ - TraceID: traceID, ID: traceID, Duration: 123, },