Skip to content

Commit

Permalink
Propagate context to lettuce callbacks (#3839)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Aug 17, 2021
1 parent c54d192 commit 8c175d4
Show file tree
Hide file tree
Showing 16 changed files with 580 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.lettuce.v4_0;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;

import com.lambdaworks.redis.protocol.AsyncCommand;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class LettuceAsyncCommandInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("com.lambdaworks.redis.protocol.AsyncCommand");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), LettuceAsyncCommandInstrumentation.class.getName() + "$SaveContextAdvice");
transformer.applyAdviceToMethod(
named("complete").or(named("completeExceptionally")).or(named("cancel")),
LettuceAsyncCommandInstrumentation.class.getName() + "$RestoreContextAdvice");
}

@SuppressWarnings("unused")
public static class SaveContextAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void saveContext(@Advice.This AsyncCommand<?, ?, ?> asyncCommand) {
Context context = Java8BytecodeBridge.currentContext();
// get the context that submitted this command and attach it, it will be used to run callbacks
context = context.get(LettuceSingletons.COMMAND_CONTEXT_KEY);
InstrumentationContext.get(AsyncCommand.class, Context.class).put(asyncCommand, context);
}
}

@SuppressWarnings("unused")
public static class RestoreContextAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This AsyncCommand<?, ?, ?> asyncCommand, @Advice.Local("otelScope") Scope scope) {
Context context =
InstrumentationContext.get(AsyncCommand.class, Context.class).get(asyncCommand);
scope = context.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(@Advice.Local("otelScope") Scope scope) {
scope.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public static void onEnter(
@Advice.Argument(0) RedisCommand<?, ?, ?> command,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
context = instrumenter().start(currentContext(), command);
Context parentContext = currentContext();
context = instrumenter().start(parentContext, command);
// remember the context that called dispatch, it is used in LettuceAsyncCommandInstrumentation
context = context.with(LettuceSingletons.COMMAND_CONTEXT_KEY, parentContext);
scope = context.makeCurrent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public LettuceInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new LettuceConnectInstrumentation(), new LettuceAsyncCommandsInstrumentation());
return asList(
new LettuceAsyncCommandInstrumentation(),
new LettuceAsyncCommandsInstrumentation(),
new LettuceConnectInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.protocol.RedisCommand;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
Expand All @@ -19,9 +21,11 @@ public final class LettuceSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.lettuce-4.0";

private static final Instrumenter<RedisCommand<?, ?, ?>, Void> INSTRUMENTER;

private static final Instrumenter<RedisURI, Void> CONNECT_INSTRUMENTER;

public static final ContextKey<Context> COMMAND_CONTEXT_KEY =
ContextKey.named("opentelemetry-lettuce-v4_0-context-key");

static {
DbAttributesExtractor<RedisCommand<?, ?, ?>, Void> attributesExtractor =
new LettuceDbAttributesExtractor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR

import com.lambdaworks.redis.ClientOptions
Expand Down Expand Up @@ -180,29 +181,44 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
Consumer<String> consumer = new Consumer<String>() {
@Override
void accept(String res) {
conds.evaluate {
assert res == "TESTVAL"
runWithSpan("callback") {
conds.evaluate {
assert res == "TESTVAL"
}
}
}
}

when:
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
redisFuture.thenAccept(consumer)
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
redisFuture.thenAccept(consumer)
}

then:
conds.await()
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_OPERATION.key}" "GET"
"${SemanticAttributes.DB_STATEMENT.key}" "GET"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand All @@ -216,40 +232,62 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
BiFunction<String, Throwable, String> firstStage = new BiFunction<String, Throwable, String>() {
@Override
String apply(String res, Throwable throwable) {
conds.evaluate {
assert res == null
assert throwable == null
runWithSpan("callback1") {
conds.evaluate {
assert res == null
assert throwable == null
}
}
return (res == null ? successStr : res)
}
}
Function<String, Object> secondStage = new Function<String, Object>() {
@Override
Object apply(String input) {
conds.evaluate {
assert input == successStr
runWithSpan("callback2") {
conds.evaluate {
assert input == successStr
}
}
return null
}
}

when:
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
redisFuture.handleAsync(firstStage).thenApply(secondStage)
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
redisFuture.handle(firstStage).thenApply(secondStage)
}

then:
conds.await()
assertTraces(1) {
trace(0, 1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_OPERATION.key}" "GET"
"${SemanticAttributes.DB_STATEMENT.key}" "GET"
}
}
span(2) {
name "callback1"
kind INTERNAL
childOf(span(0))
}
span(3) {
name "callback2"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand All @@ -260,29 +298,44 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
@Override
void accept(String keyRetrieved, Throwable throwable) {
conds.evaluate {
assert keyRetrieved != null
runWithSpan("callback") {
conds.evaluate {
assert keyRetrieved != null
}
}
}
}

when:
RedisFuture<String> redisFuture = asyncCommands.randomkey()
redisFuture.whenCompleteAsync(biConsumer)
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.randomkey()
redisFuture.whenCompleteAsync(biConsumer)
}

then:
conds.await()
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "RANDOMKEY"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_OPERATION.key}" "RANDOMKEY"
"${SemanticAttributes.DB_STATEMENT.key}" "RANDOMKEY"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand Down Expand Up @@ -397,12 +450,16 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
setup:
asyncCommands.setAutoFlushCommands(false)
def conds = new AsyncConditions()
RedisFuture redisFuture = asyncCommands.sadd("SKEY", "1", "2")
RedisFuture redisFuture = runWithSpan("parent") {
asyncCommands.sadd("SKEY", "1", "2")
}
redisFuture.whenCompleteAsync({
res, throwable ->
conds.evaluate {
assert throwable != null
assert throwable instanceof CancellationException
runWithSpan("callback") {
conds.evaluate {
assert throwable != null
assert throwable instanceof CancellationException
}
}
})

Expand All @@ -414,17 +471,28 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
conds.await()
cancelSuccess == true
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "SADD"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_OPERATION.key}" "SADD"
"${SemanticAttributes.DB_STATEMENT.key}" "SADD"
"lettuce.command.cancelled" true
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand Down
Loading

0 comments on commit 8c175d4

Please sign in to comment.