Skip to content

Commit

Permalink
Support adding 'aws.queue.url' to each individual message span
Browse files Browse the repository at this point in the history
  • Loading branch information
mcculls committed Feb 17, 2023
1 parent 766c8fa commit 97a0248
Show file tree
Hide file tree
Showing 17 changed files with 845 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package datadog.trace.instrumentation.aws.v2;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;

/** AWS SDK v2 instrumentation */
Expand All @@ -19,6 +23,11 @@ public String instrumentedType() {
return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
}

@Override
public Map<String, String> contextStore() {
return singletonMap("software.amazon.awssdk.core.SdkResponse", "java.lang.String");
}

@Override
public void adviceTransformations(AdviceTransformation transformation) {
transformation.applyAdvice(
Expand All @@ -34,7 +43,9 @@ public static void methodExit(@Advice.Return final List<ExecutionInterceptor> in
return; // list already has our interceptor, return to builder
}
}
interceptors.add(new TracingExecutionInterceptor());
interceptors.add(
new TracingExecutionInterceptor(
InstrumentationContext.get(SdkResponse.class, String.class)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

import datadog.trace.api.Config;
import datadog.trace.api.TracePropagationStyle;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
Expand All @@ -28,6 +30,12 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor {

private static final Logger log = LoggerFactory.getLogger(TracingExecutionInterceptor.class);

private final ContextStore<SdkResponse, String> contextStore;

public TracingExecutionInterceptor(ContextStore<SdkResponse, String> contextStore) {
this.contextStore = contextStore;
}

@Override
public void beforeExecution(
final Context.BeforeExecution context, final ExecutionAttributes executionAttributes) {
Expand Down Expand Up @@ -99,6 +107,13 @@ public void afterExecution(
DECORATE.beforeFinish(span);
span.finish();
}
if (!AWS_LEGACY_TRACING && isPollingRequest(context.request())) {
// store queueUrl inside response for SqsReceiveResultInstrumentation
context
.request()
.getValueForField("QueueUrl", String.class)
.ifPresent(queueUrl -> contextStore.put(context.response(), queueUrl));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ public class SqsDecorator extends MessagingClientDecorator {
public static final CharSequence SQS_RECEIVE = UTF8BytesString.create("SQS.ReceiveMessage");
public static final CharSequence SQS_DELIVER = UTF8BytesString.create("SQS.DeliverMessage");

public static final boolean SQS_LEGACY_TRACING =
Config.get().isLegacyTracingEnabled(false, "sqs");
public static final boolean SQS_LEGACY_TRACING = Config.get().isLegacyTracingEnabled(true, "sqs");

private final String spanKind;
private final CharSequence spanType;
Expand Down Expand Up @@ -66,12 +65,17 @@ protected String spanKind() {
return spanKind;
}

public void onConsume(final AgentSpan span) {
public void onConsume(final AgentSpan span, final String queueUrl) {
span.setResourceName(SQS_RECEIVE);
span.setTag("aws.service", "AmazonSQS");
span.setTag("aws.operation", "ReceiveMessageRequest");
span.setTag("aws.agent", COMPONENT_NAME);
span.setTag("aws.queue.url", queueUrl);
}

public void onTimeInQueue(final AgentSpan span) {
span.setResourceName(SQS_DELIVER);
public void onTimeInQueue(final AgentSpan span, final String queueUrl) {
span.setServiceName("sqs");
span.setResourceName(SQS_DELIVER);
span.setTag("aws.queue.url", queueUrl);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;

@AutoService(Instrumenter.class)
Expand All @@ -17,19 +23,28 @@ public class SqsReceiveRequestInstrumentation extends AbstractSqsInstrumentation

@Override
public String instrumentedType() {
return "com.amazonaws.services.sqs.model.ReceiveMessageRequest";
return "com.amazonaws.services.sqs.AmazonSQSClient";
}

@Override
public Map<String, String> contextStore() {
return singletonMap(
"com.amazonaws.services.sqs.model.ReceiveMessageResult", "java.lang.String");
}

@Override
public void adviceTransformations(AdviceTransformation transformation) {
transformation.applyAdvice(
isConstructor().or(isMethod().and(namedOneOf("setAttributeNames", "withAttributeNames"))),
isMethod()
.and(namedOneOf("receiveMessage", "executeReceiveMessage"))
.and(takesArgument(0, named("com.amazonaws.services.sqs.model.ReceiveMessageRequest")))
.and(returns(named("com.amazonaws.services.sqs.model.ReceiveMessageResult"))),
getClass().getName() + "$ReceiveMessageRequestAdvice");
}

public static class ReceiveMessageRequestAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.This ReceiveMessageRequest request) {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) ReceiveMessageRequest request) {
if (Config.get().isSqsPropagationEnabled()) {
// ReceiveMessageRequest always returns a mutable list which we can append to
List<String> attributeNames = request.getAttributeNames();
Expand All @@ -41,5 +56,14 @@ public static void onExit(@Advice.This ReceiveMessageRequest request) {
attributeNames.add("AWSTraceHeader");
}
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) ReceiveMessageRequest request,
@Advice.Return ReceiveMessageResult result) {
// store queueUrl inside response for SqsReceiveResultInstrumentation
InstrumentationContext.get(ReceiveMessageResult.class, String.class)
.put(result, request.getQueueUrl());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;

@AutoService(Instrumenter.class)
Expand Down Expand Up @@ -39,6 +41,12 @@ public String[] helperClassNames() {
};
}

@Override
public Map<String, String> contextStore() {
return singletonMap(
"com.amazonaws.services.sqs.model.ReceiveMessageResult", "java.lang.String");
}

@Override
public void adviceTransformations(AdviceTransformation transformation) {
transformation.applyAdvice(
Expand All @@ -47,12 +55,15 @@ public void adviceTransformations(AdviceTransformation transformation) {

public static class GetMessagesAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) List<Message> messages) {
if (messages != null
&& !messages.isEmpty()
&& !(messages instanceof TracingList)
&& !(activeSpan() instanceof AgentTracer.NoopAgentSpan)) {
messages = new TracingList(messages);
public static void onExit(
@Advice.This ReceiveMessageResult result,
@Advice.Return(readOnly = false) List<Message> messages) {
if (messages != null && !messages.isEmpty() && !(messages instanceof TracingList)) {
String queueUrl =
InstrumentationContext.get(ReceiveMessageResult.class, String.class).get(result);
if (queueUrl != null) {
messages = new TracingList(messages, queueUrl);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ public class TracingIterator<L extends Iterator<Message>> implements Iterator<Me
private static final Logger log = LoggerFactory.getLogger(TracingIterator.class);

protected final L delegate;
private final String queueUrl;

public TracingIterator(L delegate) {
public TracingIterator(L delegate, String queueUrl) {
this.delegate = delegate;
this.queueUrl = queueUrl;
}

@Override
Expand Down Expand Up @@ -57,7 +59,7 @@ protected void startNewMessageSpan(Message message) {
} else {
queueSpan = startSpan(AWS_HTTP, spanContext, MILLISECONDS.toMicros(timeInQueueStart));
BROKER_DECORATE.afterStart(queueSpan);
BROKER_DECORATE.onTimeInQueue(queueSpan);
BROKER_DECORATE.onTimeInQueue(queueSpan, queueUrl);
span = startSpan(AWS_HTTP, queueSpan.context());
BROKER_DECORATE.beforeFinish(queueSpan);
// The queueSpan will be finished after inner span has been activated to ensure that
Expand All @@ -67,7 +69,7 @@ protected void startNewMessageSpan(Message message) {
span = startSpan(AWS_HTTP, null);
}
CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onConsume(span);
CONSUMER_DECORATE.onConsume(span, queueUrl);
activateNext(span);
if (null != queueSpan) {
queueSpan.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

public class TracingList implements List<Message> {
private final List<Message> delegate;
private final String queueUrl;

public TracingList(List<Message> delegate) {
public TracingList(List<Message> delegate, String queueUrl) {
this.delegate = delegate;
this.queueUrl = queueUrl;
}

@Override
Expand Down Expand Up @@ -85,8 +87,7 @@ public void clear() {

@Override
public Message get(int index) {
// TODO: should this be instrumented as well?
return delegate.get(index);
return delegate.get(index); // not currently covered by iteration span
}

@Override
Expand Down Expand Up @@ -122,12 +123,12 @@ public ListIterator<Message> listIterator() {
@Override
public ListIterator<Message> listIterator(int index) {
// every iteration will add spans. Not only the very first one
return new TracingListIterator(delegate.listIterator(index));
return new TracingListIterator(delegate.listIterator(index), queueUrl);
}

@Override
public List<Message> subList(int fromIndex, int toIndex) {
return new TracingList(delegate.subList(fromIndex, toIndex));
return new TracingList(delegate.subList(fromIndex, toIndex), queueUrl);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
public class TracingListIterator extends TracingIterator<ListIterator<Message>>
implements ListIterator<Message> {

public TracingListIterator(ListIterator<Message> delegate) {
super(delegate);
public TracingListIterator(ListIterator<Message> delegate, String queueUrl) {
super(delegate, queueUrl);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SqsClientTest extends AgentTestRunner {
protected void configurePreAgent() {
super.configurePreAgent()
// Set a service name that gets sorted early with SORT_BY_NAMES
injectSysConfig(GeneralConfig.SERVICE_NAME, "A")
injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service")
}

@Shared
Expand Down Expand Up @@ -63,6 +63,8 @@ class SqsClientTest extends AgentTestRunner {
})
def messages = client.receiveMessage(queueUrl).messages

messages.forEach {/* consume to create message spans */ }

then:
def sendSpan
assertTraces(2) {
Expand Down Expand Up @@ -99,24 +101,18 @@ class SqsClientTest extends AgentTestRunner {
serviceName "sqs"
operationName "aws.http"
resourceName "SQS.ReceiveMessage"
spanType DDSpanTypes.HTTP_CLIENT
spanType DDSpanTypes.MESSAGE_CONSUMER
errored false
measured true
parent()
childOf(sendSpan)
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" "http://localhost:${address.port}/"
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" address.port
"$Tags.PEER_HOSTNAME" "localhost"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"aws.service" "AmazonSQS"
"aws.endpoint" "http://localhost:${address.port}"
"aws.operation" "ReceiveMessageRequest"
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
defaultTags()
defaultTags(true)
}
}
}
Expand Down Expand Up @@ -236,24 +232,18 @@ class SqsClientTest extends AgentTestRunner {
serviceName "sqs"
operationName "aws.http"
resourceName "SQS.ReceiveMessage"
spanType DDSpanTypes.HTTP_CLIENT
spanType DDSpanTypes.MESSAGE_CONSUMER
errored false
measured true
parent()
childOf(sendSpan)
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" "http://localhost:${address.port}/"
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" address.port
"$Tags.PEER_HOSTNAME" "localhost"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"aws.service" "AmazonSQS"
"aws.endpoint" "http://localhost:${address.port}"
"aws.operation" "ReceiveMessageRequest"
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
defaultTags()
defaultTags(true)
}
}
}
Expand Down
Loading

0 comments on commit 97a0248

Please sign in to comment.