Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable strict context check and fix some context issues. #2637

Merged
merged 14 commits into from
Mar 29, 2021
6 changes: 6 additions & 0 deletions gradle/java.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ tasks.withType(Test).configureEach {
testLogging {
exceptionFormat = 'full'
}

// There's no real harm in setting this for all tests even if any happen to not be using context
// propagation.
jvmArgs "-Dio.opentelemetry.context.enableStrictContext=true"
// TODO(anuraaga): Have agent map unshaded to shaded.
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=true"
}

tasks.withType(AbstractArchiveTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ compileVersion101TestGroovy {
classpath = classpath.plus(files(compileVersion101TestScala.destinationDir))
dependsOn compileVersion101TestScala
}

tasks.withType(Test) {
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to see where this fails

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ private void populateGenericAttributes(Span span, ExecutionAttributes attributes
@Override
public void afterExecution(
Context.AfterExecution context, ExecutionAttributes executionAttributes) {
Scope scope = executionAttributes.getAttribute(SCOPE_ATTRIBUTE);
if (scope != null) {
scope.close();
}
io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
clearAttributes(executionAttributes);
Span span = Span.fromContext(otelContext);
Expand Down Expand Up @@ -168,6 +164,10 @@ public void onExecutionFailure(
}

private void clearAttributes(ExecutionAttributes executionAttributes) {
Scope scope = executionAttributes.getAttribute(SCOPE_ATTRIBUTE);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😱

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. Both why this change is significant and your reaction to it :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a real context leak with big damage - if sync AWS SDK request failed and a subsequent client request of some form was made, it would parent to the AWS SDK request (well it'd be suppressed as a result usually)! Hooray for strict context checking

if (scope != null) {
scope.close();
}
executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, null);
executionAttributes.putAttribute(AWS_SDK_REQUEST_ATTRIBUTE, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ muzzle {

tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.executors.include=ExecutorInstrumentationTest\$CustomThreadPoolExecutor"
// NB(anuraaga): Unlike other similar exclusions this one does not seem to fail 100% of the time.
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static Scope enter(@Advice.This Runnable thiz) {
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exit(@Advice.Enter Scope scope) {
public static void exit(@Advice.This Runnable thiz, @Advice.Enter Scope scope) {
if (scope != null) {
scope.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,3 @@ dependencies {

latestDepTestLibrary group: 'com.squareup.okhttp', name: 'okhttp', version: '[2.6,3)'
}


Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* always stores and retrieves them from the agent context, even when accessed from the application.
* All other accesses are to the concrete application context.
*/
public class AgentContextStorage implements ContextStorage {
public class AgentContextStorage implements ContextStorage, AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(AgentContextStorage.class);

Expand Down Expand Up @@ -145,6 +145,16 @@ public Context current() {
return new AgentContextWrapper(io.opentelemetry.context.Context.current(), applicationContext);
}

@Override
public void close() throws Exception {
io.opentelemetry.context.ContextStorage agentStorage =
io.opentelemetry.context.ContextStorage.get();
System.out.println(agentStorage.getClass());
if (agentStorage instanceof AutoCloseable) {
((AutoCloseable) agentStorage).close();
}
}

public static class AgentContextWrapper implements Context {
private final io.opentelemetry.context.Context agentContext;
private final Context applicationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,18 @@ public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
Context traceContext = tracer.startSpan(Context.current(), context.getMsgList());
ContextAndScope contextAndScope = new ContextAndScope(traceContext, traceContext.makeCurrent());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can ContextAndScope be removed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks

context.setMqTraceContext(contextAndScope);
Context otelContext = tracer.startSpan(Context.current(), context.getMsgList());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like similar netty libraries, there's no thread guarantee for before and end. Since there's no downstream instrumentation it's not a big deal anyways

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about e.g. user-provided message hooks that want to augment existing span?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to provide something like RocketMqTracing.getOpenTelemetryContext(ConsumeMessageContext) to let them extract it I guess. In the meantime, if someone had such code using Context.current() in consumeMessageAfter it would generally be buggy with the chance of Context.current() not actually corresponding to the hook's message due to context leakage so this fix is important first.

I hope such hooks will be able to use our instrumentation-api instead of that though.

context.setMqTraceContext(otelContext);
}

@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
contextAndScope.closeScope();
tracer.end(contextAndScope.getContext());
if (context.getMqTraceContext() instanceof Context) {
Context otelContext = (Context) context.getMqTraceContext();
tracer.end(otelContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
final class TracingSendMessageHookImpl implements SendMessageHook {

private final RocketMqProducerTracer tracer;
private boolean propagationEnabled;
private final boolean propagationEnabled;

TracingSendMessageHookImpl(RocketMqProducerTracer tracer, boolean propagationEnabled) {
this.tracer = tracer;
Expand All @@ -32,27 +32,25 @@ public void sendMessageBefore(SendMessageContext context) {
if (context == null) {
return;
}
Context traceContext =
Context otelContext =
tracer.startProducerSpan(Context.current(), context.getBrokerAddr(), context.getMessage());
if (propagationEnabled) {
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(traceContext, context.getMessage().getProperties(), SETTER);
.inject(otelContext, context.getMessage().getProperties(), SETTER);
}
ContextAndScope contextAndScope = new ContextAndScope(traceContext, traceContext.makeCurrent());
context.setMqTraceContext(contextAndScope);
context.setMqTraceContext(otelContext);
}

@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null || context.getMqTraceContext() == null || context.getSendResult() == null) {
return;
}
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
tracer.afterProduce(contextAndScope.getContext(), context.getSendResult());
contextAndScope.closeScope();
tracer.end(contextAndScope.getContext());
if (context.getMqTraceContext() instanceof Context) {
Context otelContext = (Context) context.getMqTraceContext();
tracer.afterProduce(otelContext, context.getSendResult());
tracer.end(otelContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ dependencies {

// Run Slick library tests along with the rest of tests
test.dependsOn slickTest

tasks.withType(Test) {
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
tasks.withType(Test) {
// TODO run tests both with and without experimental span attributes
jvmArgs '-Dotel.instrumentation.spring-webflux.experimental-span-attributes=true'
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"

systemProperty "testLatestDeps", testLatestDeps
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,46 @@ public static boolean shouldAttachStateToTask(Object task) {
Class<?> taskClass = task.getClass();
Class<?> enclosingClass = taskClass.getEnclosingClass();

// not much point in propagating root context
// plus it causes failures under otel.javaagent.testing.fail-on-context-leak=true
return Context.current() != Context.root()
// TODO Workaround for
// https:/open-telemetry/opentelemetry-java-instrumentation/issues/787
&& !taskClass.getName().equals("org.apache.tomcat.util.net.NioEndpoint$SocketProcessor")
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
// process a new request which should not have context from them current request.
&& (enclosingClass == null
|| !enclosingClass.getName().equals("org.eclipse.jetty.io.nio.SelectChannelEndPoint"))
// Don't instrument the executor's own runnables. These runnables may never return until
// netty shuts down.
&& (enclosingClass == null
|| !enclosingClass
.getName()
.equals("io.netty.util.concurrent.SingleThreadEventExecutor"));
if (Context.current() == Context.root()) {
// not much point in propagating root context
// plus it causes failures under otel.javaagent.testing.fail-on-context-leak=true
return false;
}

// TODO Workaround for
// https:/open-telemetry/opentelemetry-java-instrumentation/issues/787
if (taskClass.getName().equals("org.apache.tomcat.util.net.NioEndpoint$SocketProcessor")) {
return false;
}

if (enclosingClass != null) {
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
// process a new request which should not have context from them current request.
if (enclosingClass.getName().equals("org.eclipse.jetty.io.nio.SelectChannelEndPoint")) {
return false;
}

// Don't instrument the executor's own runnables. These runnables may never return until
// netty shuts down.
if (enclosingClass.getName().equals("io.netty.util.concurrent.SingleThreadEventExecutor")) {
return false;
}

// OkHttp task runner is a lazily-initialized shared pool of continuosly running threads
// similar to an event loop. The submitted tasks themselves should already be instrumented to
// allow async propagation.
if (enclosingClass.getName().equals("okhttp3.internal.concurrent.TaskRunner")) {
return false;
}

// OkHttp connection pool lazily initializes a long running task to detect expired connections
// and should not itself be instrumented.
if (enclosingClass.getName().equals("com.squareup.okhttp.ConnectionPool")) {
return false;
}
}

return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.ContextStorage
import io.opentelemetry.instrumentation.test.asserts.InMemoryExporterAssert
import io.opentelemetry.instrumentation.testing.InstrumentationTestRunner
import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil
Expand Down Expand Up @@ -36,6 +37,13 @@ abstract class InstrumentationSpecification extends Specification {
testRunner().clearAllExportedData()
}

def cleanup() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, perhaps we should consider adding an afterTest() method to InstrumentationTestRunnerand calling it here and in InstrumentationExtension?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out InstrumentationExtension - adding afterTest seemed a bit weird since it's the same code for agent and library. For now I just copied the code since it isn't so much and I think we can revisit later.

ContextStorage storage = ContextStorage.get()
if (storage instanceof AutoCloseable) {
((AutoCloseable) storage).close()
}
}

def cleanupSpec() {
testRunner().afterTestClass()
}
Expand Down