Skip to content

Commit

Permalink
Ensure OpenTelemetryBridge respects Agents sampling decisions
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Aug 29, 2023
1 parent cd06cf0 commit 0422eb9
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 95 deletions.
10 changes: 6 additions & 4 deletions src/Elastic.Apm/Api/Tracer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ public ITransaction StartTransaction(string name, string type, DistributedTracin
internal Transaction StartTransactionInternal(string name, string type,
long? timestamp = null, bool ignoreActivity = false, string id = null, string traceId = null,
DistributedTracingData distributedTracingData = null,
IEnumerable<SpanLink> links = null
IEnumerable<SpanLink> links = null,
Activity current = null
)
=> StartTransactionInternal(name, type, distributedTracingData, ignoreActivity, timestamp, id, traceId, links);
=> StartTransactionInternal(name, type, distributedTracingData, ignoreActivity, timestamp, id, traceId, links, current: current);

private Transaction StartTransactionInternal(string name, string type, DistributedTracingData distributedTracingData = null,
bool ignoreActivity = false, long? timestamp = null, string id = null, string traceId = null, IEnumerable<SpanLink> links = null
bool ignoreActivity = false, long? timestamp = null, string id = null, string traceId = null, IEnumerable<SpanLink> links = null,
Activity current = null
)
{
var currentConfig = _configurationProvider.CurrentSnapshot;
var retVal = new Transaction(_logger, name, type, new Sampler(currentConfig.TransactionSampleRate), distributedTracingData
, _sender, currentConfig, CurrentExecutionSegmentsContainer, _apmServerInfo, _breakdownMetricsProvider, ignoreActivity, timestamp, id,
traceId: traceId, links: links)
traceId: traceId, links: links, current: current)
{ Service = _service };

_logger.Debug()?.Log("Starting {TransactionValue}", retVal);
Expand Down
10 changes: 5 additions & 5 deletions src/Elastic.Apm/Model/Span.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ private Span(double duration, string id, string name, string parentId)
ParentId = parentId;
}

public Span(
string name,
public Span(string name,
string type,
string parentId,
string traceId,
Expand All @@ -69,7 +68,8 @@ public Span(
long? timestamp = null,
bool isExitSpan = false,
string id = null,
IEnumerable<SpanLink> links = null
IEnumerable<SpanLink> links = null,
Activity current = null
)
{
InstrumentationFlag = instrumentationFlag;
Expand Down Expand Up @@ -387,11 +387,11 @@ public ISpan StartSpan(string name, string type, string subType = null, string a

internal Span StartSpanInternal(string name, string type, string subType = null, string action = null,
InstrumentationFlag instrumentationFlag = InstrumentationFlag.None, bool captureStackTraceOnStart = false, long? timestamp = null,
string id = null, bool isExitSpan = false, IEnumerable<SpanLink> links = null
string id = null, bool isExitSpan = false, IEnumerable<SpanLink> links = null, Activity current = null
)
{
var retVal = new Span(name, type, Id, TraceId, _enclosingTransaction, _payloadSender, _logger, _currentExecutionSegmentsContainer,
_apmServerInfo, this, instrumentationFlag, captureStackTraceOnStart, timestamp, isExitSpan, id, links);
_apmServerInfo, this, instrumentationFlag, captureStackTraceOnStart, timestamp, isExitSpan, id, links, current: current);

if (!string.IsNullOrEmpty(subType))
retVal.Subtype = subType;
Expand Down
21 changes: 16 additions & 5 deletions src/Elastic.Apm/Model/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ internal Transaction(ApmAgent agent, string name, string type, long? timestamp =
/// <param name="id">An optional parameter to pass the id of the transaction</param>
/// <param name="traceId">An optional parameter to pass a trace id which will be applied to the transaction</param>
/// <param name="links">Span links associated with this transaction</param>
/// <param name="current">Current activity that represents this transaction</param>
internal Transaction(
IApmLogger logger,
string name,
Expand All @@ -115,7 +116,8 @@ internal Transaction(
long? timestamp = null,
string id = null,
string traceId = null,
IEnumerable<SpanLink> links = null
IEnumerable<SpanLink> links = null,
Activity current = null
)
{
Configuration = configuration;
Expand All @@ -142,10 +144,19 @@ internal Transaction(
(configuration.TraceContinuationStrategy == ConfigConsts.SupportedValues.RestartExternal
&& (distributedTracingData?.TraceState == null || distributedTracingData is { TraceState: { SampleRate: null } }));


// For each new transaction, start an Activity if we're not ignoring them.
// If Activity.Current is not null, the started activity will be a child activity,
// so the traceid and tracestate of the parent will flow to it.
if (!ignoreActivity)

// If the transaction is created as the result of an activity that is passed directly use that as the activity representing this
// transaction
if (current != null)
_activity = current;

// Otherwise we will start an activity explicitly and ensure it trace_id and trace_state respect our bookkeeping.
// Unless explicitly asked not to through `ignoreActivity`: (https:/elastic/apm-agent-dotnet/issues/867#issuecomment-650170150)
else if (!ignoreActivity)
_activity = StartActivity(shouldRestartTrace);

var isSamplingFromDistributedTracingData = false;
Expand Down Expand Up @@ -721,12 +732,12 @@ public ISpan StartSpan(string name, string type, string subType = null, string a

internal Span StartSpanInternal(string name, string type, string subType = null, string action = null,
InstrumentationFlag instrumentationFlag = InstrumentationFlag.None, bool captureStackTraceOnStart = false, long? timestamp = null,
string id = null, bool isExitSpan = false, IEnumerable<SpanLink> links = null
string id = null, bool isExitSpan = false, IEnumerable<SpanLink> links = null, Activity current = null
)
{
var retVal = new Span(name, type, Id, TraceId, this, _sender, _logger, _currentExecutionSegmentsContainer, _apmServerInfo,
instrumentationFlag: instrumentationFlag, captureStackTraceOnStart: captureStackTraceOnStart, timestamp: timestamp, id: id,
isExitSpan: isExitSpan, links: links);
instrumentationFlag: instrumentationFlag, captureStackTraceOnStart: captureStackTraceOnStart, timestamp: timestamp,
isExitSpan: isExitSpan, id: id, links: links, current: current);

ChildDurationTimer.OnChildStart(retVal.Timestamp);
if (!string.IsNullOrEmpty(subType))
Expand Down
173 changes: 92 additions & 81 deletions src/Elastic.Apm/OpenTelemetry/ElasticActivityListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using Elastic.Apm.Api;
using Elastic.Apm.DiagnosticListeners;
using Elastic.Apm.DistributedTracing;
Expand All @@ -21,11 +22,11 @@ namespace Elastic.Apm.OpenTelemetry
{
public class ElasticActivityListener : IDisposable
{
private static readonly string[] ServerPortAttributeKeys = new[] { SemanticConventions.ServerPort, SemanticConventions.NetPeerPort };
private static readonly string[] ServerAddressAttributeKeys = new[] { SemanticConventions.ServerAddress, SemanticConventions.NetPeerName, SemanticConventions.NetPeerIp };
private static readonly string[] ServerPortAttributeKeys = { SemanticConventions.ServerPort, SemanticConventions.NetPeerPort };
private static readonly string[] ServerAddressAttributeKeys = { SemanticConventions.ServerAddress, SemanticConventions.NetPeerName, SemanticConventions.NetPeerIp };

private readonly ConcurrentDictionary<string, Span> _activeSpans = new();
private readonly ConcurrentDictionary<string, Transaction> _activeTransactions = new();
private readonly ConditionalWeakTable<Activity, Span> _activeSpans = new();
private readonly ConditionalWeakTable<Activity, Transaction> _activeTransactions = new();

internal ElasticActivityListener(IApmAgent agent, HttpTraceConfiguration httpTraceConfiguration) => (_logger, _httpTraceConfiguration) =
(agent.Logger?.Scoped(nameof(ElasticActivityListener)), httpTraceConfiguration);
Expand All @@ -41,6 +42,7 @@ internal void Start(Tracer tracerInternal)
_httpTraceConfiguration?.AddTracer(new ElasticSearchHttpNonTracer());
_tracer = tracerInternal;


Listener = new ActivityListener
{
ActivityStarted = ActivityStarted,
Expand All @@ -61,66 +63,73 @@ internal void Start(Tracer tracerInternal)
if (KnownListeners.KnownListenersList.Contains(activity.DisplayName))
return;
Transaction transaction = null;
var spanLinks = new List<SpanLink>(activity.Links.Count());
if (activity.Links != null && activity.Links.Any())
if (activity.Links.Any())
{
foreach (var link in activity.Links)
spanLinks.Add(new SpanLink(link.Context.SpanId.ToString(), link.Context.TraceId.ToString()));
}
if (activity?.Context != null && activity.ParentId != null && _tracer.CurrentTransaction == null)
{
var dt = TraceContext.TryExtractTracingData(activity.ParentId.ToString(), activity.Context.TraceState);
var timestamp = TimeUtils.ToTimestamp(activity.StartTimeUtc);
if (!CreateTransactionForActivity(activity, timestamp, spanLinks))
CreateSpanForActivity(activity, timestamp, spanLinks);
transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown",
TimeUtils.ToTimestamp(activity.StartTimeUtc), true, activity.SpanId.ToString(),
distributedTracingData: dt, links: spanLinks);
}
else if (activity.ParentId == null)
{
transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown",
TimeUtils.ToTimestamp(activity.StartTimeUtc), true, activity.SpanId.ToString(),
activity.TraceId.ToString(), links: spanLinks);
}
else
{
Span newSpan;
if (_tracer.CurrentSpan == null)
{
newSpan = (_tracer.CurrentTransaction as Transaction)?.StartSpanInternal(activity.DisplayName, "unknown",
timestamp: TimeUtils.ToTimestamp(activity.StartTimeUtc), id: activity.SpanId.ToString(), links: spanLinks);
}
else
{
newSpan = (_tracer.CurrentSpan as Span)?.StartSpanInternal(activity.DisplayName, "unknown",
timestamp: TimeUtils.ToTimestamp(activity.StartTimeUtc), id: activity.SpanId.ToString(), links: spanLinks);
}
};

if (newSpan != null)
{
newSpan.Otel = new OTel { SpanKind = activity.Kind.ToString() };
private bool CreateTransactionForActivity(Activity activity, long timestamp, List<SpanLink> spanLinks)
{
Transaction transaction = null;
if (activity.ParentId != null && _tracer.CurrentTransaction == null)
{
var dt = TraceContext.TryExtractTracingData(activity.ParentId, activity.Context.TraceState);

if (activity.Kind == ActivityKind.Internal)
{
newSpan.Type = "app";
newSpan.Subtype = "internal";
}
transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown",
timestamp, true, activity.SpanId.ToString(),
distributedTracingData: dt, links: spanLinks, current: activity);
}
else if (activity.ParentId == null)
{
transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown",
timestamp, true, activity.SpanId.ToString(),
activity.TraceId.ToString(), links: spanLinks, current: activity);
}

if (activity.Id != null)
_activeSpans.TryAdd(activity.Id, newSpan);
}
}
if (transaction == null) return false;

if (transaction != null)
{
transaction.Otel = new OTel { SpanKind = activity.Kind.ToString() };
transaction.Otel = new OTel { SpanKind = activity.Kind.ToString() };

if (activity.Id != null)
_activeTransactions.TryAdd(activity.Id, transaction);
}
};
if (activity.Id != null)
_activeTransactions.AddOrUpdate(activity, transaction);
return true;
}

private void CreateSpanForActivity(Activity activity, long timestamp, List<SpanLink> spanLinks)
{
Span newSpan;
if (_tracer.CurrentSpan == null)
{
newSpan = (_tracer.CurrentTransaction as Transaction)?.StartSpanInternal(activity.DisplayName, "unknown",
timestamp: timestamp, id: activity.SpanId.ToString(), links: spanLinks, current: activity);
}
else
{
newSpan = (_tracer.CurrentSpan as Span)?.StartSpanInternal(activity.DisplayName, "unknown",
timestamp: timestamp, id: activity.SpanId.ToString(), links: spanLinks, current: activity);
}

if (newSpan == null) return;

newSpan.Otel = new OTel { SpanKind = activity.Kind.ToString() };

if (activity.Kind == ActivityKind.Internal)
{
newSpan.Type = "app";
newSpan.Subtype = "internal";
}

if (activity.Id != null)
_activeSpans.AddOrUpdate(activity, newSpan);
}

private Action<Activity> ActivityStopped =>
activity =>
Expand All @@ -130,49 +139,51 @@ internal void Start(Tracer tracerInternal)
_logger.Trace()?.Log("ActivityStopped called with `null` activity. Ignoring `null` activity.");
return;
}
activity.Stop();
_logger.Trace()?.Log($"ActivityStopped: name:{activity.DisplayName} id:{activity.Id} traceId:{activity.TraceId}");
if (KnownListeners.KnownListenersList.Contains(activity.DisplayName))
return;
if (activity.Id != null)
if (activity.Id == null) return;
if (_activeTransactions.TryGetValue(activity, out var transaction))
{
if (_activeTransactions.TryRemove(activity.Id, out var transaction))
{
transaction.Duration = activity.Duration.TotalMilliseconds;
_activeTransactions.Remove(activity);
transaction.Duration = activity.Duration.TotalMilliseconds;
if (activity.TagObjects.Any())
transaction.Otel.Attributes = new Dictionary<string, object>();
if (activity.TagObjects.Any())
transaction.Otel.Attributes = new Dictionary<string, object>();
foreach (var tag in activity.TagObjects)
transaction.Otel.Attributes.Add(tag.Key, tag.Value);
foreach (var tag in activity.TagObjects)
transaction.Otel.Attributes.Add(tag.Key, tag.Value);
InferTransactionType(transaction, activity);
InferTransactionType(transaction, activity);
// By default we set unknown outcome
transaction.Outcome = Outcome.Unknown;
// By default we set unknown outcome
transaction.Outcome = Outcome.Unknown;
#if NET6_0_OR_GREATER
switch (activity.Status)
{
case ActivityStatusCode.Unset:
transaction.Outcome = Outcome.Unknown;
break;
case ActivityStatusCode.Ok:
transaction.Outcome = Outcome.Success;
break;
case ActivityStatusCode.Error:
transaction.Outcome = Outcome.Failure;
break;
}
#endif
transaction.End();
}
else if (_activeSpans.TryRemove(activity.Id, out var span))
switch (activity.Status)
{
UpdateSpan(activity, span);
case ActivityStatusCode.Unset:
transaction.Outcome = Outcome.Unknown;
break;
case ActivityStatusCode.Ok:
transaction.Outcome = Outcome.Success;
break;
case ActivityStatusCode.Error:
transaction.Outcome = Outcome.Failure;
break;
}
#endif
transaction.End();
}
else if (_activeSpans.TryGetValue(activity, out var span))
{
_activeSpans.Remove(activity);
UpdateSpan(activity, span);
}
};

Expand Down
Loading

0 comments on commit 0422eb9

Please sign in to comment.