Skip to content

Commit

Permalink
Do not propagate gRPC deadline when propagating OTel context via java…
Browse files Browse the repository at this point in the history
…agent. (#5543)

* Add test for early return in gRPC pattern.

* Do not propagate gRPC deadline when propagating OTel context via javaagent.
  • Loading branch information
anuraaga authored Mar 11, 2022
1 parent b25043c commit 1d9c23b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public final class GrpcSingletons {

public static final ServerInterceptor SERVER_INTERCEPTOR;

public static final Context.Storage STORAGE = new ContextStorageBridge();
public static final Context.Storage STORAGE = new ContextStorageBridge(false);

static {
boolean experimentalSpanAttributes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
public final class ContextStorageOverride extends Context.Storage {

private static final Context.Storage delegate = new ContextStorageBridge();
private static final Context.Storage delegate = new ContextStorageBridge(true);

@Override
public Context doAttach(Context toAttach) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public final class ContextStorageBridge extends Context.Storage {
Context.key("otel-context");
private static final Context.Key<Scope> OTEL_SCOPE = Context.key("otel-scope");

private final boolean propagateGrpcDeadline;

public ContextStorageBridge(boolean propagateGrpcDeadline) {
this.propagateGrpcDeadline = propagateGrpcDeadline;
}

@Override
public Context doAttach(Context toAttach) {
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();
Expand Down Expand Up @@ -87,6 +93,20 @@ public Context current() {
// This context has already been previously attached and associated with an OTel context. Just
// create a new context referring to the current OTel context to reflect the current stack.
// The previous context is unaffected and will continue to live in its own stack.

if (!propagateGrpcDeadline) {
// Because we are propagating gRPC context via OpenTelemetry here, we may also propagate a
// deadline where it
// wasn't present before. Notably, this could happen with no user intention when using the
// javaagent which will
// add OpenTelemetry propagation automatically, and cause that code to fail with a deadline
// cancellation. While
// ideally we could propagate deadline as well as gRPC intended, we cannot have existing
// code fail because it
// added the javaagent and choose to fork here.
current = current.fork();
}

return current.withValue(OTEL_CONTEXT, otelContext);
}
return current;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
Expand Down Expand Up @@ -1370,6 +1372,87 @@ public void sayHello(
SemanticAttributes.MESSAGE_ID, 2L))))));
}

// Regression test for
// https:/open-telemetry/opentelemetry-java-instrumentation/issues/4169
@Test
void clientCallAfterServerCompleted() throws Exception {
Server backend =
configureServer(
ServerBuilder.forPort(0)
.addService(
new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
Helloworld.Request request,
StreamObserver<Helloworld.Response> responseObserver) {
responseObserver.onNext(
Helloworld.Response.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
}))
.build()
.start();
ManagedChannel backendChannel = createChannel(backend);
closer.add(() -> backendChannel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
closer.add(() -> backend.shutdownNow().awaitTermination());
GreeterGrpc.GreeterBlockingStub backendStub = GreeterGrpc.newBlockingStub(backendChannel);

// This executor does not propagate context without the javaagent available.
ExecutorService executor = Executors.newFixedThreadPool(1);
closer.add(executor::shutdownNow);

CountDownLatch clientCallDone = new CountDownLatch(1);
AtomicReference<Throwable> error = new AtomicReference<>();

Server frontend =
configureServer(
ServerBuilder.forPort(0)
.addService(
new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
Helloworld.Request request,
StreamObserver<Helloworld.Response> responseObserver) {
responseObserver.onNext(
Helloworld.Response.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();

executor.execute(
() -> {
try {
backendStub.sayHello(request);
} catch (Throwable t) {
error.set(t);
}
clientCallDone.countDown();
});
}
}))
.build()
.start();
ManagedChannel frontendChannel = createChannel(frontend);
closer.add(() -> frontendChannel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
closer.add(() -> frontend.shutdownNow().awaitTermination());

GreeterGrpc.GreeterBlockingStub frontendStub = GreeterGrpc.newBlockingStub(frontendChannel);
frontendStub.sayHello(Helloworld.Request.newBuilder().setName("test").build());

// We don't assert on telemetry - the intention of this test is to verify that adding
// instrumentation, either as
// library or javaagent, does not cause exceptions in the business logic. The produced telemetry
// will be different
// for the two cases due to lack of context propagation in the library case, but that isn't what
// we're testing here.

clientCallDone.await(10, TimeUnit.SECONDS);

assertThat(error).hasValue(null);
}

private ManagedChannel createChannel(Server server) throws Exception {
ManagedChannelBuilder<?> channelBuilder =
configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort()));
Expand Down

0 comments on commit 1d9c23b

Please sign in to comment.