Skip to content

Commit

Permalink
Merge pull request #2334 from DataDog/rgs/tpe-instrumentation
Browse files Browse the repository at this point in the history
Targeted Instrumentation for ThreadPoolExecutor
  • Loading branch information
richardstartin authored Jan 27, 2021
2 parents 297a362 + 525bf5d commit 9ba354e
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,38 +1,14 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;

import datadog.trace.context.TraceScope;

public final class ComparableRunnable<T extends Runnable & Comparable<T>>
implements Runnable, Comparable<ComparableRunnable<T>> {

private final T delegate;
private final TraceScope.Continuation continuation;
public final class ComparableRunnable<T extends Runnable & Comparable<T>> extends Wrapper<T>
implements Comparable<ComparableRunnable<T>> {

public ComparableRunnable(T delegate) {
this.delegate = delegate;
TraceScope.Continuation continuation = null;
TraceScope scope = activeScope();
if (null != scope) {
continuation = scope.capture();
}
this.continuation = continuation;
super(delegate);
}

@Override
public int compareTo(ComparableRunnable<T> o) {
return delegate.compareTo(o.delegate);
}

@Override
public void run() {
try (TraceScope scope = activate()) {
delegate.run();
}
}

private TraceScope activate() {
return null == continuation ? null : continuation.activate();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -40,6 +42,8 @@ public static ExcludeType fromFieldType(String typeName) {
}

private static final ExcludeType[] SKIP_TYPE_VALUES = ExcludeType.values();
private static final Map<ExcludeType, List<String>> SKIP_TYPE_PREFIXES =
new EnumMap<>(ExcludeType.class);

public static boolean exclude(ExcludeType type, Object instance) {
return SKIP.get(instance.getClass()).contains(type);
Expand All @@ -58,6 +62,12 @@ protected EnumSet<ExcludeType> computeValue(Class<?> clazz) {
for (ExcludeType type : SKIP_TYPE_VALUES) {
if (exclude(type, name)) {
skipTypes.add(type);
} else {
for (String prefix : SKIP_TYPE_PREFIXES.get(type)) {
if (name.startsWith(prefix)) {
skipTypes.add(type);
}
}
}
}
return skipTypes;
Expand All @@ -71,6 +81,11 @@ protected EnumSet<ExcludeType> computeValue(Class<?> clazz) {
for (ExcludeType type : ExcludeType.values()) {
excludedClassNames.put(type, new HashSet<String>());
}
for (ExcludeType type : SKIP_TYPE_VALUES) {
SKIP_TYPE_PREFIXES.put(type, new ArrayList<String>());
}
// TODO generic prefix registration
SKIP_TYPE_PREFIXES.get(ExcludeType.RUNNABLE).add("slick.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;

import datadog.trace.context.TraceScope;

public class Wrapper<T extends Runnable> implements Runnable {

protected final T delegate;
private final TraceScope.Continuation continuation;

public Wrapper(T delegate) {
this.delegate = delegate;
TraceScope.Continuation continuation = null;
TraceScope scope = activeScope();
if (null != scope) {
continuation = scope.capture();
}
this.continuation = continuation;
}

@Override
public void run() {
try (TraceScope scope = activate()) {
delegate.run();
}
}

public void cancel() {
if (null != continuation) {
continuation.cancel();
}
}

public T unwrap() {
return delegate;
}

private TraceScope activate() {
return null == continuation ? null : continuation.activate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -64,18 +65,24 @@ public static final class Reject {
// must execute after in case the handler actually runs the runnable,
// which is preferable to cancelling the continuation
@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void reject(@Advice.Argument(0) Runnable runnable) {
public static void reject(@Advice.Argument(readOnly = false, value = 0) Runnable runnable) {
// not handling rejected work (which will often not manifest in an exception being thrown)
// leads to unclosed continuations when executors get busy
// note that this does not handle rejection mechanisms used in Scala, so those need to be
// handled another way
if (runnable instanceof RunnableFuture) {
cancelTask(
InstrumentationContext.get(RunnableFuture.class, State.class),
(RunnableFuture) runnable);
if (runnable instanceof Wrapper) {
Wrapper<?> wrapper = (Wrapper<?>) runnable;
wrapper.cancel();
runnable = wrapper.unwrap();
} else {
if (runnable instanceof RunnableFuture) {
cancelTask(
InstrumentationContext.get(RunnableFuture.class, State.class),
(RunnableFuture<?>) runnable);
}
// paranoid about double instrumentation until RunnableInstrumentation is removed
cancelTask(InstrumentationContext.get(Runnable.class, State.class), runnable);
}
// paranoid about double instrumentation until RunnableInstrumentation is removed
cancelTask(InstrumentationContext.get(Runnable.class, State.class), runnable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package datadog.trace.instrumentation.java.concurrent;

import static datadog.trace.agent.tooling.bytebuddy.matcher.DDElementMatchers.extendsClass;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
import static datadog.trace.instrumentation.java.concurrent.AbstractExecutorInstrumentation.EXEC_NAME;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.instrumentation.java.concurrent.ComparableRunnable;
import datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(Instrumenter.class)
public final class ThreadPoolExecutorInstrumentation extends Instrumenter.Tracing {

public ThreadPoolExecutorInstrumentation() {
super(EXEC_NAME);
}

@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return extendsClass(named("java.util.concurrent.ThreadPoolExecutor"));
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<MethodDescription>, String> transformers = new HashMap<>(8);
transformers.put(named("execute").and(isMethod()), getClass().getName() + "$Execute");
transformers.put(
named("beforeExecute")
.and(isMethod())
.and(takesArgument(1, named(Runnable.class.getName()))),
getClass().getName() + "$BeforeExecute");
transformers.put(
named("afterExecute")
.and(isMethod())
.and(takesArgument(0, named(Runnable.class.getName()))),
getClass().getName() + "$AfterExecute");
transformers.put(
named("remove").and(isMethod()).and(returns(Runnable.class)),
getClass().getName() + "$Remove");
return Collections.unmodifiableMap(transformers);
}

public static final class Execute {
@SuppressWarnings({"rawtypes", "unchecked"})
@Advice.OnMethodEnter
public static void wrap(@Advice.Argument(readOnly = false, value = 0) Runnable task) {
if (task instanceof RunnableFuture || null == task || exclude(RUNNABLE, task)) {
return;
}
if (task instanceof Comparable) {
task = new ComparableRunnable(task);
} else {
task = new Wrapper<>(task);
}
}
}

public static final class BeforeExecute {

@Advice.OnMethodEnter
public static void unwrap(@Advice.Argument(readOnly = false, value = 1) Runnable task) {
if (task instanceof Wrapper) {
task = ((Wrapper<?>) task).unwrap();
}
}
}

public static final class AfterExecute {

@Advice.OnMethodEnter
public static void unwrap(@Advice.Argument(readOnly = false, value = 0) Runnable task) {
if (task instanceof Wrapper) {
task = ((Wrapper<?>) task).unwrap();
}
}
}

public static final class Remove {

@Advice.OnMethodExit
public static void unwrap(@Advice.Return(readOnly = false) Runnable removed) {
if (removed instanceof Wrapper) {
Wrapper<?> wrapper = ((Wrapper<?>) removed);
wrapper.cancel();
removed = wrapper.unwrap();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public ElementMatcher<? super TypeDescription> typeMatcher() {
"java.util.concurrent.AbstractExecutorService",
"java.util.concurrent.CompletableFuture$ThreadPerTaskExecutor",
"java.util.concurrent.SubmissionPublisher$ThreadPerTaskExecutor",
"java.util.concurrent.ThreadPoolExecutor",
"org.glassfish.grizzly.threadpool.GrizzlyExecutorService");
}

Expand Down Expand Up @@ -86,11 +85,7 @@ public static final class NewTaskFor {
public static void execute(
@Advice.This Executor executor,
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
// TODO write a slick instrumentation and instrument these types directly
if (task instanceof RunnableFuture
|| null == task
|| exclude(RUNNABLE, task)
|| task.getClass().getName().startsWith("slick.")) {
if (task instanceof RunnableFuture || null == task || exclude(RUNNABLE, task)) {
// no wrapping required
} else if (task instanceof Comparable) {
task = new ComparableRunnable(task);
Expand All @@ -114,11 +109,7 @@ public static final class Wrap {
public static void execute(
@Advice.This Executor executor,
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
// TODO write a slick instrumentation and instrument these types directly
if (task instanceof RunnableFuture
|| null == task
|| exclude(RUNNABLE, task)
|| task.getClass().getName().startsWith("slick.")) {
if (task instanceof RunnableFuture || null == task || exclude(RUNNABLE, task)) {
// no wrapping required
} else if (task instanceof Comparable) {
task = new ComparableRunnable(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
"invokeAny" | invokeAny | new CustomThreadPoolExecutor()
"invokeAny with timeout" | invokeAnyTimeout | new CustomThreadPoolExecutor()


"execute Runnable" | executeRunnable | new TypeAwareThreadPoolExecutor()
"submit Runnable" | submitRunnable | new TypeAwareThreadPoolExecutor()
"submit Callable" | submitCallable | new TypeAwareThreadPoolExecutor()
"invokeAll" | invokeAll | new TypeAwareThreadPoolExecutor()
"invokeAll with timeout" | invokeAllTimeout | new TypeAwareThreadPoolExecutor()
"invokeAny" | invokeAny | new TypeAwareThreadPoolExecutor()
"invokeAny with timeout" | invokeAnyTimeout | new TypeAwareThreadPoolExecutor()

// Internal executor used by CompletableFuture
"execute Runnable" | executeRunnable | java7SafeCompletableFutureThreadPerTaskExecutor()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TypeAwareThreadPoolExecutor extends ThreadPoolExecutor {
public TypeAwareThreadPoolExecutor() {
super(2, 2, 0, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(100));
}

@Override
public boolean remove(Runnable task) {
assertNotWrapper(task);
return super.remove(task);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
assertNotWrapper(r);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
assertNotWrapper(r);
}

private void assertNotWrapper(Runnable r) {
assert !(r instanceof Wrapper);
}
}

0 comments on commit 9ba354e

Please sign in to comment.