From afac7624c9a9aa50228b7b691da43de048919769 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 29 Aug 2023 12:55:38 +0200 Subject: [PATCH 1/3] Ensure OpenTelemetryBridge respects Agents sampling decisions (cherry picked from commit 0422eb92e88200bddaa1868e6fc80a9c1e45f35f) --- src/Elastic.Apm/Api/Tracer.cs | 10 +- src/Elastic.Apm/Model/Span.cs | 10 +- src/Elastic.Apm/Model/Transaction.cs | 21 ++- .../OpenTelemetry/ElasticActivityListener.cs | 173 ++++++++++-------- .../ActivityIntegrationTests.cs | 54 ++++++ 5 files changed, 173 insertions(+), 95 deletions(-) diff --git a/src/Elastic.Apm/Api/Tracer.cs b/src/Elastic.Apm/Api/Tracer.cs index c2e895178..5c810dbf9 100644 --- a/src/Elastic.Apm/Api/Tracer.cs +++ b/src/Elastic.Apm/Api/Tracer.cs @@ -67,18 +67,20 @@ public ITransaction StartTransaction(string name, string type, DistributedTracin internal Transaction StartTransactionInternal(string name, string type, long? timestamp = null, bool ignoreActivity = false, string id = null, string traceId = null, DistributedTracingData distributedTracingData = null, - IEnumerable links = null + IEnumerable links = null, + Activity current = null ) - => StartTransactionInternal(name, type, distributedTracingData, ignoreActivity, timestamp, id, traceId, links); + => StartTransactionInternal(name, type, distributedTracingData, ignoreActivity, timestamp, id, traceId, links, current: current); private Transaction StartTransactionInternal(string name, string type, DistributedTracingData distributedTracingData = null, - bool ignoreActivity = false, long? timestamp = null, string id = null, string traceId = null, IEnumerable links = null + bool ignoreActivity = false, long? timestamp = null, string id = null, string traceId = null, IEnumerable links = null, + Activity current = null ) { var currentConfig = _configurationProvider.CurrentSnapshot; var retVal = new Transaction(_logger, name, type, new Sampler(currentConfig.TransactionSampleRate), distributedTracingData , _sender, currentConfig, CurrentExecutionSegmentsContainer, _apmServerInfo, _breakdownMetricsProvider, ignoreActivity, timestamp, id, - traceId: traceId, links: links) + traceId: traceId, links: links, current: current) { Service = _service }; _logger.Debug()?.Log("Starting {TransactionValue}", retVal); diff --git a/src/Elastic.Apm/Model/Span.cs b/src/Elastic.Apm/Model/Span.cs index 882669292..c31fbb203 100644 --- a/src/Elastic.Apm/Model/Span.cs +++ b/src/Elastic.Apm/Model/Span.cs @@ -53,8 +53,7 @@ private Span(double duration, string id, string name, string parentId) ParentId = parentId; } - public Span( - string name, + public Span(string name, string type, string parentId, string traceId, @@ -69,7 +68,8 @@ public Span( long? timestamp = null, bool isExitSpan = false, string id = null, - IEnumerable links = null + IEnumerable links = null, + Activity current = null ) { InstrumentationFlag = instrumentationFlag; @@ -387,11 +387,11 @@ public ISpan StartSpan(string name, string type, string subType = null, string a internal Span StartSpanInternal(string name, string type, string subType = null, string action = null, InstrumentationFlag instrumentationFlag = InstrumentationFlag.None, bool captureStackTraceOnStart = false, long? timestamp = null, - string id = null, bool isExitSpan = false, IEnumerable links = null + string id = null, bool isExitSpan = false, IEnumerable links = null, Activity current = null ) { var retVal = new Span(name, type, Id, TraceId, _enclosingTransaction, _payloadSender, _logger, _currentExecutionSegmentsContainer, - _apmServerInfo, this, instrumentationFlag, captureStackTraceOnStart, timestamp, isExitSpan, id, links); + _apmServerInfo, this, instrumentationFlag, captureStackTraceOnStart, timestamp, isExitSpan, id, links, current: current); if (!string.IsNullOrEmpty(subType)) retVal.Subtype = subType; diff --git a/src/Elastic.Apm/Model/Transaction.cs b/src/Elastic.Apm/Model/Transaction.cs index 89f3d64d5..880e058dc 100644 --- a/src/Elastic.Apm/Model/Transaction.cs +++ b/src/Elastic.Apm/Model/Transaction.cs @@ -100,6 +100,7 @@ internal Transaction(ApmAgent agent, string name, string type, long? timestamp = /// An optional parameter to pass the id of the transaction /// An optional parameter to pass a trace id which will be applied to the transaction /// Span links associated with this transaction + /// Current activity that represents this transaction internal Transaction( IApmLogger logger, string name, @@ -115,7 +116,8 @@ internal Transaction( long? timestamp = null, string id = null, string traceId = null, - IEnumerable links = null + IEnumerable links = null, + Activity current = null ) { Configuration = configuration; @@ -142,10 +144,19 @@ internal Transaction( (configuration.TraceContinuationStrategy == ConfigConsts.SupportedValues.RestartExternal && (distributedTracingData?.TraceState == null || distributedTracingData is { TraceState: { SampleRate: null } })); + // For each new transaction, start an Activity if we're not ignoring them. // If Activity.Current is not null, the started activity will be a child activity, // so the traceid and tracestate of the parent will flow to it. - if (!ignoreActivity) + + // If the transaction is created as the result of an activity that is passed directly use that as the activity representing this + // transaction + if (current != null) + _activity = current; + + // Otherwise we will start an activity explicitly and ensure it trace_id and trace_state respect our bookkeeping. + // Unless explicitly asked not to through `ignoreActivity`: (https://github.com/elastic/apm-agent-dotnet/issues/867#issuecomment-650170150) + else if (!ignoreActivity) _activity = StartActivity(shouldRestartTrace); var isSamplingFromDistributedTracingData = false; @@ -721,12 +732,12 @@ public ISpan StartSpan(string name, string type, string subType = null, string a internal Span StartSpanInternal(string name, string type, string subType = null, string action = null, InstrumentationFlag instrumentationFlag = InstrumentationFlag.None, bool captureStackTraceOnStart = false, long? timestamp = null, - string id = null, bool isExitSpan = false, IEnumerable links = null + string id = null, bool isExitSpan = false, IEnumerable links = null, Activity current = null ) { var retVal = new Span(name, type, Id, TraceId, this, _sender, _logger, _currentExecutionSegmentsContainer, _apmServerInfo, - instrumentationFlag: instrumentationFlag, captureStackTraceOnStart: captureStackTraceOnStart, timestamp: timestamp, id: id, - isExitSpan: isExitSpan, links: links); + instrumentationFlag: instrumentationFlag, captureStackTraceOnStart: captureStackTraceOnStart, timestamp: timestamp, + isExitSpan: isExitSpan, id: id, links: links, current: current); ChildDurationTimer.OnChildStart(retVal.Timestamp); if (!string.IsNullOrEmpty(subType)) diff --git a/src/Elastic.Apm/OpenTelemetry/ElasticActivityListener.cs b/src/Elastic.Apm/OpenTelemetry/ElasticActivityListener.cs index bc6e63c1d..a0f9e08f4 100644 --- a/src/Elastic.Apm/OpenTelemetry/ElasticActivityListener.cs +++ b/src/Elastic.Apm/OpenTelemetry/ElasticActivityListener.cs @@ -10,6 +10,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Runtime.CompilerServices; using Elastic.Apm.Api; using Elastic.Apm.DiagnosticListeners; using Elastic.Apm.DistributedTracing; @@ -21,11 +22,11 @@ namespace Elastic.Apm.OpenTelemetry { public class ElasticActivityListener : IDisposable { - private static readonly string[] ServerPortAttributeKeys = new[] { SemanticConventions.ServerPort, SemanticConventions.NetPeerPort }; - private static readonly string[] ServerAddressAttributeKeys = new[] { SemanticConventions.ServerAddress, SemanticConventions.NetPeerName, SemanticConventions.NetPeerIp }; + private static readonly string[] ServerPortAttributeKeys = { SemanticConventions.ServerPort, SemanticConventions.NetPeerPort }; + private static readonly string[] ServerAddressAttributeKeys = { SemanticConventions.ServerAddress, SemanticConventions.NetPeerName, SemanticConventions.NetPeerIp }; - private readonly ConcurrentDictionary _activeSpans = new(); - private readonly ConcurrentDictionary _activeTransactions = new(); + private readonly ConditionalWeakTable _activeSpans = new(); + private readonly ConditionalWeakTable _activeTransactions = new(); internal ElasticActivityListener(IApmAgent agent, HttpTraceConfiguration httpTraceConfiguration) => (_logger, _httpTraceConfiguration) = (agent.Logger?.Scoped(nameof(ElasticActivityListener)), httpTraceConfiguration); @@ -41,6 +42,7 @@ internal void Start(Tracer tracerInternal) _httpTraceConfiguration?.AddTracer(new ElasticSearchHttpNonTracer()); _tracer = tracerInternal; + Listener = new ActivityListener { ActivityStarted = ActivityStarted, @@ -61,66 +63,73 @@ internal void Start(Tracer tracerInternal) if (KnownListeners.KnownListenersList.Contains(activity.DisplayName)) return; - Transaction transaction = null; - var spanLinks = new List(activity.Links.Count()); - if (activity.Links != null && activity.Links.Any()) + if (activity.Links.Any()) { foreach (var link in activity.Links) spanLinks.Add(new SpanLink(link.Context.SpanId.ToString(), link.Context.TraceId.ToString())); } - if (activity?.Context != null && activity.ParentId != null && _tracer.CurrentTransaction == null) - { - var dt = TraceContext.TryExtractTracingData(activity.ParentId.ToString(), activity.Context.TraceState); + var timestamp = TimeUtils.ToTimestamp(activity.StartTimeUtc); + if (!CreateTransactionForActivity(activity, timestamp, spanLinks)) + CreateSpanForActivity(activity, timestamp, spanLinks); - transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown", - TimeUtils.ToTimestamp(activity.StartTimeUtc), true, activity.SpanId.ToString(), - distributedTracingData: dt, links: spanLinks); - } - else if (activity.ParentId == null) - { - transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown", - TimeUtils.ToTimestamp(activity.StartTimeUtc), true, activity.SpanId.ToString(), - activity.TraceId.ToString(), links: spanLinks); - } - else - { - Span newSpan; - if (_tracer.CurrentSpan == null) - { - newSpan = (_tracer.CurrentTransaction as Transaction)?.StartSpanInternal(activity.DisplayName, "unknown", - timestamp: TimeUtils.ToTimestamp(activity.StartTimeUtc), id: activity.SpanId.ToString(), links: spanLinks); - } - else - { - newSpan = (_tracer.CurrentSpan as Span)?.StartSpanInternal(activity.DisplayName, "unknown", - timestamp: TimeUtils.ToTimestamp(activity.StartTimeUtc), id: activity.SpanId.ToString(), links: spanLinks); - } + }; - if (newSpan != null) - { - newSpan.Otel = new OTel { SpanKind = activity.Kind.ToString() }; + private bool CreateTransactionForActivity(Activity activity, long timestamp, List spanLinks) + { + Transaction transaction = null; + if (activity.ParentId != null && _tracer.CurrentTransaction == null) + { + var dt = TraceContext.TryExtractTracingData(activity.ParentId, activity.Context.TraceState); - if (activity.Kind == ActivityKind.Internal) - { - newSpan.Type = "app"; - newSpan.Subtype = "internal"; - } + transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown", + timestamp, true, activity.SpanId.ToString(), + distributedTracingData: dt, links: spanLinks, current: activity); + } + else if (activity.ParentId == null) + { + transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown", + timestamp, true, activity.SpanId.ToString(), + activity.TraceId.ToString(), links: spanLinks, current: activity); + } - if (activity.Id != null) - _activeSpans.TryAdd(activity.Id, newSpan); - } - } + if (transaction == null) return false; - if (transaction != null) - { - transaction.Otel = new OTel { SpanKind = activity.Kind.ToString() }; + transaction.Otel = new OTel { SpanKind = activity.Kind.ToString() }; - if (activity.Id != null) - _activeTransactions.TryAdd(activity.Id, transaction); - } - }; + if (activity.Id != null) + _activeTransactions.AddOrUpdate(activity, transaction); + return true; + } + + private void CreateSpanForActivity(Activity activity, long timestamp, List spanLinks) + { + Span newSpan; + if (_tracer.CurrentSpan == null) + { + newSpan = (_tracer.CurrentTransaction as Transaction)?.StartSpanInternal(activity.DisplayName, "unknown", + timestamp: timestamp, id: activity.SpanId.ToString(), links: spanLinks, current: activity); + } + else + { + newSpan = (_tracer.CurrentSpan as Span)?.StartSpanInternal(activity.DisplayName, "unknown", + timestamp: timestamp, id: activity.SpanId.ToString(), links: spanLinks, current: activity); + } + + if (newSpan == null) return; + + newSpan.Otel = new OTel { SpanKind = activity.Kind.ToString() }; + + if (activity.Kind == ActivityKind.Internal) + { + newSpan.Type = "app"; + newSpan.Subtype = "internal"; + } + + if (activity.Id != null) + _activeSpans.AddOrUpdate(activity, newSpan); + } private Action ActivityStopped => activity => @@ -130,49 +139,51 @@ internal void Start(Tracer tracerInternal) _logger.Trace()?.Log("ActivityStopped called with `null` activity. Ignoring `null` activity."); return; } + activity.Stop(); _logger.Trace()?.Log($"ActivityStopped: name:{activity.DisplayName} id:{activity.Id} traceId:{activity.TraceId}"); if (KnownListeners.KnownListenersList.Contains(activity.DisplayName)) return; - if (activity.Id != null) + if (activity.Id == null) return; + + if (_activeTransactions.TryGetValue(activity, out var transaction)) { - if (_activeTransactions.TryRemove(activity.Id, out var transaction)) - { - transaction.Duration = activity.Duration.TotalMilliseconds; + _activeTransactions.Remove(activity); + transaction.Duration = activity.Duration.TotalMilliseconds; - if (activity.TagObjects.Any()) - transaction.Otel.Attributes = new Dictionary(); + if (activity.TagObjects.Any()) + transaction.Otel.Attributes = new Dictionary(); - foreach (var tag in activity.TagObjects) - transaction.Otel.Attributes.Add(tag.Key, tag.Value); + foreach (var tag in activity.TagObjects) + transaction.Otel.Attributes.Add(tag.Key, tag.Value); - InferTransactionType(transaction, activity); + InferTransactionType(transaction, activity); - // By default we set unknown outcome - transaction.Outcome = Outcome.Unknown; + // By default we set unknown outcome + transaction.Outcome = Outcome.Unknown; #if NET6_0_OR_GREATER - switch (activity.Status) - { - case ActivityStatusCode.Unset: - transaction.Outcome = Outcome.Unknown; - break; - case ActivityStatusCode.Ok: - transaction.Outcome = Outcome.Success; - break; - case ActivityStatusCode.Error: - transaction.Outcome = Outcome.Failure; - break; - } -#endif - - transaction.End(); - } - else if (_activeSpans.TryRemove(activity.Id, out var span)) + switch (activity.Status) { - UpdateSpan(activity, span); + case ActivityStatusCode.Unset: + transaction.Outcome = Outcome.Unknown; + break; + case ActivityStatusCode.Ok: + transaction.Outcome = Outcome.Success; + break; + case ActivityStatusCode.Error: + transaction.Outcome = Outcome.Failure; + break; } +#endif + + transaction.End(); + } + else if (_activeSpans.TryGetValue(activity, out var span)) + { + _activeSpans.Remove(activity); + UpdateSpan(activity, span); } }; diff --git a/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs b/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs index 3752f3c47..a57a61848 100644 --- a/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs +++ b/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs @@ -2,12 +2,16 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System; using System.Diagnostics; +using System.Linq; using System.Threading; +using System.Threading.Tasks; using Elastic.Apm.Tests.Utilities; using Elastic.Apm.Tests.Utilities.XUnit; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace Elastic.Apm.Tests { @@ -15,6 +19,12 @@ namespace Elastic.Apm.Tests [CaptureRestoreActivityIdFormat] public class ActivityIntegrationTests { + private readonly ITestOutputHelper _testOutputHelper; + + public ActivityIntegrationTests(ITestOutputHelper testOutputHelper) { + _testOutputHelper = testOutputHelper; + } + /// /// Makes sure that in case there is an active activity, the agent reuses its TraceId when it starts a new transaction. /// The prerequisite is that the IdFormat is W3C @@ -143,5 +153,49 @@ public void MultipleTransactionInOneActivity() payloadSender.Transactions[0].Id.Should().NotBe(payloadSender.Transactions[1].Id); activity.Stop(); } + + /// + /// Makes sure that transactions on the same Activity are part of the same trace. + /// + [Fact] + public async Task ActivityRespectsSampling() + { + const int count = 100; + const double rate = 0.5; + + Activity.Current = null; + Activity.DefaultIdFormat = ActivityIdFormat.W3C; + var source = new ActivitySource(GetType().FullName, "1.0.0"); + + var payloadSender = new MockPayloadSender(); + var config = new MockConfiguration( + transactionSampleRate: rate.ToString("N2") + + ); + using var components = new TestAgentComponents( + apmServerInfo: MockApmServerInfo.Version716, + configuration: config, + payloadSender: payloadSender + + ); + using var agent = new ApmAgent(components); + for (var i = 0; i < count; i++) + { + using var transaction = source.StartActivity($"Trace {i}"); + using var span = new Activity("UnitTestActivity").Start(); + await Task.Delay(1); + } + //Activity.Current.Should().BeNull(); + payloadSender.WaitForTransactions(count: count); + + var sampled = payloadSender.Transactions.Where(t => t.IsSampled).ToArray(); + sampled.Length.Should().BeLessThan(count); + sampled.Length.Should().BeGreaterThan(count / 10); + + var sampledSpans = payloadSender.Spans.Where(t => t.IsSampled).ToArray(); + sampledSpans.Length.Should().Be(sampled.Length); + + + } } } From 1bc310bb612ae68b4df2cf965f6bc49ea5267e5b Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 30 Aug 2023 14:40:10 +0200 Subject: [PATCH 2/3] fix formatting --- test/Elastic.Apm.Tests/ActivityIntegrationTests.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs b/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs index a57a61848..569426a71 100644 --- a/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs +++ b/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs @@ -21,7 +21,8 @@ public class ActivityIntegrationTests { private readonly ITestOutputHelper _testOutputHelper; - public ActivityIntegrationTests(ITestOutputHelper testOutputHelper) { + public ActivityIntegrationTests(ITestOutputHelper testOutputHelper) + { _testOutputHelper = testOutputHelper; } From 23c99ffb2f546d28251efb0c63dac7b6d79c9328 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 30 Aug 2023 16:29:50 +0200 Subject: [PATCH 3/3] ElasticActivityListener should not be tested on fullframework since its not available there --- test/Elastic.Apm.Tests/ActivityIntegrationTests.cs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs b/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs index 569426a71..86870c850 100644 --- a/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs +++ b/test/Elastic.Apm.Tests/ActivityIntegrationTests.cs @@ -19,13 +19,6 @@ namespace Elastic.Apm.Tests [CaptureRestoreActivityIdFormat] public class ActivityIntegrationTests { - private readonly ITestOutputHelper _testOutputHelper; - - public ActivityIntegrationTests(ITestOutputHelper testOutputHelper) - { - _testOutputHelper = testOutputHelper; - } - /// /// Makes sure that in case there is an active activity, the agent reuses its TraceId when it starts a new transaction. /// The prerequisite is that the IdFormat is W3C @@ -154,7 +147,7 @@ public void MultipleTransactionInOneActivity() payloadSender.Transactions[0].Id.Should().NotBe(payloadSender.Transactions[1].Id); activity.Stop(); } - +#if NET5_0_OR_GREATER /// /// Makes sure that transactions on the same Activity are part of the same trace. /// @@ -198,5 +191,6 @@ public async Task ActivityRespectsSampling() } +#endif } }