diff --git a/instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts b/instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts index 95e72f360382..2a4293ad9f9f 100644 --- a/instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts +++ b/instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts @@ -18,14 +18,20 @@ muzzle { versions.set("[1.3.9,)") } } + dependencies { compileOnly("io.opentelemetry:opentelemetry-extension-kotlin") compileOnly("org.jetbrains.kotlin:kotlin-stdlib-jdk8") + testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent")) + testImplementation("io.opentelemetry:opentelemetry-extension-kotlin") testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") + testImplementation(project(":instrumentation:reactor:reactor-3.1:library")) + // Use first version with flow support since we have tests for it. testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0") + testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0") } tasks { diff --git a/instrumentation/kotlinx-coroutines/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentation.java b/instrumentation/kotlinx-coroutines/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentation.java index ae1134a6ea9e..9910a0c53503 100644 --- a/instrumentation/kotlinx-coroutines/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentation.java +++ b/instrumentation/kotlinx-coroutines/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentation.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; @@ -19,40 +18,27 @@ public class KotlinCoroutinesInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return named("kotlinx.coroutines.BuildersKt"); + return named("kotlinx.coroutines.CoroutineContextKt"); } @Override public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( - namedOneOf("launch", "launch$default") + named("newCoroutineContext") .and(takesArgument(1, named("kotlin.coroutines.CoroutineContext"))), - this.getClass().getName() + "$LaunchAdvice"); - transformer.applyAdviceToMethod( - namedOneOf("runBlocking", "runBlocking$default") - .and(takesArgument(0, named("kotlin.coroutines.CoroutineContext"))), - this.getClass().getName() + "$RunBlockingAdvice"); + this.getClass().getName() + "$ContextAdvice"); } @SuppressWarnings("unused") - public static class LaunchAdvice { + public static class ContextAdvice { @Advice.OnMethodEnter public static void enter( @Advice.Argument(value = 1, readOnly = false) CoroutineContext coroutineContext) { - coroutineContext = - KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext); - } - } - - @SuppressWarnings("unused") - public static class RunBlockingAdvice { - - @Advice.OnMethodEnter - public static void enter( - @Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext) { - coroutineContext = - KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext); + if (coroutineContext != null) { + coroutineContext = + KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext); + } } } } diff --git a/instrumentation/kotlinx-coroutines/javaagent/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy b/instrumentation/kotlinx-coroutines/javaagent/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy index fcf9ce792675..8ab030cf46be 100644 --- a/instrumentation/kotlinx-coroutines/javaagent/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy +++ b/instrumentation/kotlinx-coroutines/javaagent/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy @@ -225,4 +225,87 @@ class KotlinCoroutineInstrumentationTest extends AgentInstrumentationSpecificati assert seenItersA.equals(expectedIters) assert seenItersB.equals(expectedIters) } + + def "kotlin traced mono"() { + setup: + KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) + + when: + kotlinTest.tracedMono() + + then: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "parent" + attributes { + } + } + span("child") { + childOf span(0) + attributes { + } + } + } + } + + where: + dispatcher << dispatchersToTest + } + + def "kotlin traced mono with context propagation operator"() { + setup: + KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) + + when: + kotlinTest.tracedMonoContextPropagationOperator() + + then: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "parent" + attributes { + } + } + span("child") { + childOf span(0) + attributes { + } + } + } + } + + where: + dispatcher << dispatchersToTest + } + + def "kotlin traced flux"() { + setup: + KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) + + when: + kotlinTest.tracedFlux() + + then: + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + attributes { + } + } + (0..2).each { + span("child_$it") { + childOf span(0) + attributes { + } + } + } + } + } + + where: + dispatcher << dispatchersToTest + } } diff --git a/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/KotlinCoroutineTests.kt b/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/KotlinCoroutineTests.kt index 8e511908bd65..4afd2e48c67c 100644 --- a/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/KotlinCoroutineTests.kt +++ b/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/KotlinCoroutineTests.kt @@ -5,7 +5,9 @@ import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.api.trace.Tracer +import io.opentelemetry.context.Context import io.opentelemetry.extension.kotlin.asContextElement +import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope @@ -19,6 +21,11 @@ import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactive.collect +import kotlinx.coroutines.reactor.ReactorContext +import kotlinx.coroutines.reactor.flux +import kotlinx.coroutines.reactor.mono import kotlinx.coroutines.runBlocking import kotlinx.coroutines.selects.select import kotlinx.coroutines.withContext @@ -125,6 +132,38 @@ class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) { } } + fun tracedMono(): Unit = runTest { + mono(dispatcher) { + tracedChild("child") + }.awaitSingle() + } + + fun tracedMonoContextPropagationOperator(): Unit = runTest { + val currentContext = Context.current() + // clear current context to ensure that ContextPropagationOperator is used for context propagation + withContext(Context.root().asContextElement()) { + val mono = mono(dispatcher) { + // extract context from reactor and propagate it into coroutine + val reactorContext = coroutineContext[ReactorContext.Key]?.context + val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current()) + withContext(otelContext.asContextElement()) { + tracedChild("child") + } + } + ContextPropagationOperator.runWithContext(mono, currentContext).awaitSingle() + } + } + + fun tracedFlux() = runTest { + flux(dispatcher) { + repeat(3) { + tracedChild("child_$it") + send(it) + } + }.collect { + } + } + fun launchConcurrentSuspendFunctions(numIters: Int) { runBlocking { for (i in 0 until numIters) {