diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java index 705351e83f5..136fb67db52 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java @@ -11,6 +11,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * Implementation of the {@code SpanProcessor} that simply forwards all received events to a list of @@ -19,6 +21,15 @@ final class MultiSpanProcessor implements SpanProcessor { private final List spanProcessorsStart; private final List spanProcessorsEnd; + + /** + * Will invoke {@link SpanProcessor#onEnd(ReadableSpan, Consumer)} of all processors from {@link + * #spanProcessorsEnd} in order. The output from the first processor is passed to the second, the + * output from the second to the third and so on. The output of the last processor is passed to + * the {@link Consumer} provided as second argument to this biconsumer. + */ + private BiConsumer> processorsEndInvoker; + private final List spanProcessorsAll; private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -46,11 +57,14 @@ public boolean isStartRequired() { return !spanProcessorsStart.isEmpty(); } + @Override + public void onEnd(ReadableSpan span, Consumer spanOutput) { + processorsEndInvoker.accept(span, spanOutput); + } + @Override public void onEnd(ReadableSpan readableSpan) { - for (SpanProcessor spanProcessor : spanProcessorsEnd) { - spanProcessor.onEnd(readableSpan); - } + onEnd(readableSpan, span -> {}); } @Override @@ -91,6 +105,14 @@ private MultiSpanProcessor(List spanProcessors) { spanProcessorsEnd.add(spanProcessor); } } + processorsEndInvoker = (span, drain) -> drain.accept(span); + for (int i = spanProcessorsEnd.size() - 1; i >= 0; i--) { + BiConsumer> nextStage = processorsEndInvoker; + SpanProcessor processor = spanProcessorsEnd.get(i); + processorsEndInvoker = + (span, finalOutput) -> + processor.onEnd(span, outputSpan -> nextStage.accept(outputSpan, finalOutput)); + } } @Override diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java index ba9fb037c84..b23a8fa5d7d 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java @@ -13,6 +13,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; /** @@ -67,6 +68,11 @@ static SpanProcessor composite(Iterable processors) { */ boolean isStartRequired(); + default void onEnd(ReadableSpan span, Consumer spanOutput) { + onEnd(span); + spanOutput.accept(span); + } + /** * Called when a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link * Span#isRecording()} returns true. diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java index c51fe61c954..ae24c9554af 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java @@ -8,6 +8,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -36,8 +37,10 @@ class MultiSpanProcessorTest { void setUp() { when(spanProcessor1.isStartRequired()).thenReturn(true); when(spanProcessor1.isEndRequired()).thenReturn(true); + doCallRealMethod().when(spanProcessor1).onEnd(any(), any()); when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + doCallRealMethod().when(spanProcessor2).onEnd(any(), any()); when(spanProcessor2.isStartRequired()).thenReturn(true); when(spanProcessor2.isEndRequired()).thenReturn(true); when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());