Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable strict context check and fix some context issues. #2637

Merged
merged 14 commits into from
Mar 29, 2021
6 changes: 6 additions & 0 deletions gradle/java.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ tasks.withType(Test).configureEach {
testLogging {
exceptionFormat = 'full'
}

// There's no real harm in setting this for all tests even if any happen to not be using context
// propagation.
jvmArgs "-Dio.opentelemetry.context.enableStrictContext=true"
// TODO(anuraaga): Have agent map unshaded to shaded.
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=true"
}

tasks.withType(AbstractArchiveTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,8 @@ compileVersion101TestGroovy {
classpath = classpath.plus(files(compileVersion101TestScala.destinationDir))
dependsOn compileVersion101TestScala
}

tasks.withType(Test) {
// https:/open-telemetry/opentelemetry-java-instrumentation/issues/2639
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to see where this fails

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ private void populateGenericAttributes(Span span, ExecutionAttributes attributes
@Override
public void afterExecution(
Context.AfterExecution context, ExecutionAttributes executionAttributes) {
Scope scope = executionAttributes.getAttribute(SCOPE_ATTRIBUTE);
if (scope != null) {
scope.close();
}
io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
clearAttributes(executionAttributes);
Span span = Span.fromContext(otelContext);
Expand Down Expand Up @@ -168,6 +164,10 @@ public void onExecutionFailure(
}

private void clearAttributes(ExecutionAttributes executionAttributes) {
Scope scope = executionAttributes.getAttribute(SCOPE_ATTRIBUTE);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😱

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. Both why this change is significant and your reaction to it :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a real context leak with big damage - if sync AWS SDK request failed and a subsequent client request of some form was made, it would parent to the AWS SDK request (well it'd be suppressed as a result usually)! Hooray for strict context checking

if (scope != null) {
scope.close();
}
executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, null);
executionAttributes.putAttribute(AWS_SDK_REQUEST_ATTRIBUTE, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.Future
Expand Down Expand Up @@ -134,7 +135,7 @@ class ExecutorInstrumentationTest extends AgentInstrumentationSpecification {

def "#poolImpl '#name' wrap lambdas"() {
setup:
def pool = poolImpl
ExecutorService pool = poolImpl
def m = method
def w = wrap

Expand All @@ -160,7 +161,7 @@ class ExecutorInstrumentationTest extends AgentInstrumentationSpecification {
}

cleanup:
pool?.shutdown()
pool.shutdown()

where:
name | method | wrap | poolImpl
Expand All @@ -173,7 +174,7 @@ class ExecutorInstrumentationTest extends AgentInstrumentationSpecification {

def "#poolImpl '#name' reports after canceled jobs"() {
setup:
def pool = poolImpl
ExecutorService pool = poolImpl
def m = method
List<JavaAsyncChild> children = new ArrayList<>()
List<Future> jobFutures = new ArrayList<>()
Expand Down Expand Up @@ -216,6 +217,11 @@ class ExecutorInstrumentationTest extends AgentInstrumentationSpecification {
expect:
waitForTraces(1).size() == 1

// Wait for shutdown since we didn't wait on task completion and want to confirm any pending
// ones clean up context.
pool.shutdown()
pool.awaitTermination(10, TimeUnit.SECONDS)

where:
name | method | poolImpl
"submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ dependencies {

tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.grizzly.enabled=true"
}

tasks.withType(Test) {
// https:/open-telemetry/opentelemetry-java-instrumentation/issues/2640
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.hystrix.experimental-span-attributes=true"
// Disable so failure testing below doesn't inadvertently change the behavior.
jvmArgs "-Dhystrix.command.default.circuitBreaker.enabled=false"
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"

// Uncomment for debugging:
// jvmArgs "-Dhystrix.command.default.execution.timeout.enabled=false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,3 @@ dependencies {

latestDepTestLibrary group: 'com.squareup.okhttp', name: 'okhttp', version: '[2.6,3)'
}


Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* always stores and retrieves them from the agent context, even when accessed from the application.
* All other accesses are to the concrete application context.
*/
public class AgentContextStorage implements ContextStorage {
public class AgentContextStorage implements ContextStorage, AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(AgentContextStorage.class);

Expand Down Expand Up @@ -145,6 +145,15 @@ public Context current() {
return new AgentContextWrapper(io.opentelemetry.context.Context.current(), applicationContext);
}

@Override
public void close() throws Exception {
io.opentelemetry.context.ContextStorage agentStorage =
io.opentelemetry.context.ContextStorage.get();
if (agentStorage instanceof AutoCloseable) {
((AutoCloseable) agentStorage).close();
}
}

public static class AgentContextWrapper implements Context {
private final io.opentelemetry.context.Context agentContext;
private final Context applicationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ dependencies {
testImplementation group: 'com.sun.activation', name: 'jakarta.activation', version: '1.2.2'
}
}

tasks.withType(Test) {
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that our ratpack instrumentation actually leaks context, or is it an artifact of something else? If we are really leaking contexts, should we create issues to fix all of these, so we make sure to circle back and fix them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, filed an issue

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,18 @@ public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
Context traceContext = tracer.startSpan(Context.current(), context.getMsgList());
ContextAndScope contextAndScope = new ContextAndScope(traceContext, traceContext.makeCurrent());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can ContextAndScope be removed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks

context.setMqTraceContext(contextAndScope);
Context otelContext = tracer.startSpan(Context.current(), context.getMsgList());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like similar netty libraries, there's no thread guarantee for before and end. Since there's no downstream instrumentation it's not a big deal anyways

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about e.g. user-provided message hooks that want to augment existing span?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to provide something like RocketMqTracing.getOpenTelemetryContext(ConsumeMessageContext) to let them extract it I guess. In the meantime, if someone had such code using Context.current() in consumeMessageAfter it would generally be buggy with the chance of Context.current() not actually corresponding to the hook's message due to context leakage so this fix is important first.

I hope such hooks will be able to use our instrumentation-api instead of that though.

context.setMqTraceContext(otelContext);
}

@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
contextAndScope.closeScope();
tracer.end(contextAndScope.getContext());
if (context.getMqTraceContext() instanceof Context) {
Context otelContext = (Context) context.getMqTraceContext();
tracer.end(otelContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
final class TracingSendMessageHookImpl implements SendMessageHook {

private final RocketMqProducerTracer tracer;
private boolean propagationEnabled;
private final boolean propagationEnabled;

TracingSendMessageHookImpl(RocketMqProducerTracer tracer, boolean propagationEnabled) {
this.tracer = tracer;
Expand All @@ -32,27 +32,25 @@ public void sendMessageBefore(SendMessageContext context) {
if (context == null) {
return;
}
Context traceContext =
Context otelContext =
tracer.startProducerSpan(Context.current(), context.getBrokerAddr(), context.getMessage());
if (propagationEnabled) {
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(traceContext, context.getMessage().getProperties(), SETTER);
.inject(otelContext, context.getMessage().getProperties(), SETTER);
}
ContextAndScope contextAndScope = new ContextAndScope(traceContext, traceContext.makeCurrent());
context.setMqTraceContext(contextAndScope);
context.setMqTraceContext(otelContext);
}

@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null || context.getMqTraceContext() == null || context.getSendResult() == null) {
return;
}
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
tracer.afterProduce(contextAndScope.getContext(), context.getSendResult());
contextAndScope.closeScope();
tracer.end(contextAndScope.getContext());
if (context.getMqTraceContext() instanceof Context) {
Context otelContext = (Context) context.getMqTraceContext();
tracer.afterProduce(otelContext, context.getSendResult());
tracer.end(otelContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
Expand Down Expand Up @@ -87,7 +88,7 @@ class ScalaExecutorInstrumentationTest extends AgentInstrumentationSpecification

def "#poolImpl '#name' reports after canceled jobs"() {
setup:
def pool = poolImpl
ExecutorService pool = poolImpl
def m = method
List<ScalaAsyncChild> children = new ArrayList<>()
List<Future> jobFutures = new ArrayList<>()
Expand Down Expand Up @@ -129,6 +130,11 @@ class ScalaExecutorInstrumentationTest extends AgentInstrumentationSpecification
expect:
waitForTraces(1).size() == 1

// Wait for shutdown to make sure any remaining tasks finish and cleanup context since we don't
// wait on the tasks.
pool.shutdown()
pool.awaitTermination(10, TimeUnit.SECONDS)

where:
name | method | poolImpl
"submit Runnable" | submitRunnable | new ForkJoinPool()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ dependencies {
tasks.withType(Test) {
// TODO run tests both with and without experimental span attributes
jvmArgs '-Dotel.instrumentation.spring-webflux.experimental-span-attributes=true'
// TODO(anuraaga): There is no actual context leak - it just seems that the server-side does not
// fully complete processing before the test cases finish, which is when we check for context
// leaks. Adding Thread.sleep(1000) just before checking for leaks allows it to pass but is not
// a good approach. Come up with a better one and enable this.
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"

systemProperty "testLatestDeps", testLatestDeps
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,43 @@
/** Utils for concurrent instrumentations. */
public class ExecutorInstrumentationUtils {

private static final ClassValue<Boolean> NOT_INSTRUMENTED_RUNNABLE_ENCLOSING_CLASS =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, i'm trying to remember to look out for opportunities to use also 😄

new ClassValue<Boolean>() {
@Override
protected Boolean computeValue(Class<?> enclosingClass) {
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
// process a new request which should not have context from them current request.
if (enclosingClass.getName().equals("org.eclipse.jetty.io.nio.SelectChannelEndPoint")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea, but maybe those class names should be specified by instrumentation modules? Each instrumented library may bring its own set of excluded classes (also see AdditionalLibraryIgnoresMatcher) and maybe it'd better to define them in the lib instrumentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! That's the approach Datadog has implemented, it would be nice for us to follow it at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice - I originally thought about adding some void excludeClasses(ExcludeBuilder) method to InstrumentationModule

return true;
}

// Don't instrument the executor's own runnables. These runnables may never return until
// netty shuts down.
if (enclosingClass
.getName()
.equals("io.netty.util.concurrent.SingleThreadEventExecutor")) {
return true;
}

// OkHttp task runner is a lazily-initialized shared pool of continuosly running threads
// similar to an event loop. The submitted tasks themselves should already be instrumented
// to
// allow async propagation.
if (enclosingClass.getName().equals("okhttp3.internal.concurrent.TaskRunner")) {
return true;
}

// OkHttp connection pool lazily initializes a long running task to detect expired
// connections
// and should not itself be instrumented.
if (enclosingClass.getName().equals("com.squareup.okhttp.ConnectionPool")) {
return true;
}

return false;
}
};

/**
* Checks if given task should get state attached.
*
Expand All @@ -28,22 +65,37 @@ public static boolean shouldAttachStateToTask(Object task) {
Class<?> taskClass = task.getClass();
Class<?> enclosingClass = taskClass.getEnclosingClass();

// not much point in propagating root context
// plus it causes failures under otel.javaagent.testing.fail-on-context-leak=true
return Context.current() != Context.root()
// TODO Workaround for
// https:/open-telemetry/opentelemetry-java-instrumentation/issues/787
&& !taskClass.getName().equals("org.apache.tomcat.util.net.NioEndpoint$SocketProcessor")
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
// process a new request which should not have context from them current request.
&& (enclosingClass == null
|| !enclosingClass.getName().equals("org.eclipse.jetty.io.nio.SelectChannelEndPoint"))
// Don't instrument the executor's own runnables. These runnables may never return until
// netty shuts down.
&& (enclosingClass == null
|| !enclosingClass
.getName()
.equals("io.netty.util.concurrent.SingleThreadEventExecutor"));
if (Context.current() == Context.root()) {
// not much point in propagating root context
// plus it causes failures under otel.javaagent.testing.fail-on-context-leak=true
return false;
}

// ForkJoinPool threads are initialized lazily and continue to handle tasks similar to an event
// loop. They should not have context propagated to the base of the thread, tasks themselves
// will have it through other means.
if (taskClass.getName().equals("java.util.concurrent.ForkJoinWorkerThread")) {
return false;
}

// ThreadPoolExecutor worker threads may be initialized lazily and manage interruption of other
// threads. The actual tasks being run on those threads will propagate context but we should not
// propagate onto this management thread.
if (taskClass.getName().equals("java.util.concurrent.ThreadPoolExecutor$Worker")) {
return false;
}

// TODO Workaround for
// https:/open-telemetry/opentelemetry-java-instrumentation/issues/787
if (taskClass.getName().equals("org.apache.tomcat.util.net.NioEndpoint$SocketProcessor")) {
return false;
}

if (enclosingClass != null && NOT_INSTRUMENTED_RUNNABLE_ENCLOSING_CLASS.get(enclosingClass)) {
return false;
}

return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.ContextStorage
import io.opentelemetry.instrumentation.test.asserts.InMemoryExporterAssert
import io.opentelemetry.instrumentation.testing.InstrumentationTestRunner
import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil
Expand Down Expand Up @@ -36,6 +37,13 @@ abstract class InstrumentationSpecification extends Specification {
testRunner().clearAllExportedData()
}

def cleanup() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, perhaps we should consider adding an afterTest() method to InstrumentationTestRunnerand calling it here and in InstrumentationExtension?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out InstrumentationExtension - adding afterTest seemed a bit weird since it's the same code for agent and library. For now I just copied the code since it isn't so much and I think we can revisit later.

ContextStorage storage = ContextStorage.get()
if (storage instanceof AutoCloseable) {
((AutoCloseable) storage).close()
}
}

def cleanupSpec() {
testRunner().afterTestClass()
}
Expand Down