Skip to content

Commit

Permalink
Add support to use trace propagated from client (#9506)
Browse files Browse the repository at this point in the history
* Add support to use trace propagated from client

Signed-off-by: Gagan Juneja <[email protected]>

* Add support to use trace propagated from client

Signed-off-by: Gagan Juneja <[email protected]>

* Refactor code

Signed-off-by: Gagan Juneja <[email protected]>

* Add support to use trace propagated from client

Signed-off-by: Gagan Juneja <[email protected]>

* Add support to use trace propagated from client

Signed-off-by: Gagan Juneja <[email protected]>

* Refactor code

Signed-off-by: Gagan Juneja <[email protected]>

* Refactor code

Signed-off-by: Gagan Juneja <[email protected]>

* Merged CHANGELOG

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

---------

Signed-off-by: Gagan Juneja <[email protected]>
Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored Aug 28, 2023
1 parent 569d5c2 commit f4106a4
Show file tree
Hide file tree
Showing 16 changed files with 212 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https:/opensearch-project/OpenSearch/pull/9412))
- Fix sort related ITs for concurrent search ([#9177](https:/opensearch-project/OpenSearch/pull/9466)
- Removing the vec file extension from INDEX_STORE_HYBRID_NIO_EXTENSIONS, to ensure the no performance degradation for vector search via Lucene Engine.([#9528](https:/opensearch-project/OpenSearch/pull/9528)))
- Add support to use trace propagated from client ([#9506](https:/opensearch-project/OpenSearch/pull/9506))
- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https:/opensearch-project/OpenSearch/pull/9469))
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https:/opensearch-project/OpenSearch/pull/9448/))
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https:/opensearch-project/OpenSearch/pull/9264))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
*
Expand Down Expand Up @@ -44,7 +47,7 @@ public SpanScope startSpan(String spanName) {

@Override
public SpanScope startSpan(String spanName, Attributes attributes) {
return startSpan(spanName, null, attributes);
return startSpan(spanName, (SpanContext) null, attributes);
}

@Override
Expand Down Expand Up @@ -97,4 +100,10 @@ protected void addDefaultAttributes(Span span) {
span.addAttribute(THREAD_NAME, Thread.currentThread().getName());
}

@Override
public SpanScope startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.http.HttpTracer;

import java.io.Closeable;

Expand All @@ -18,7 +19,7 @@
*
* All methods on the Tracer object are multi-thread safe.
*/
public interface Tracer extends Closeable {
public interface Tracer extends HttpTracer, Closeable {

/**
* Starts the {@link Span} with given name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

package org.opensearch.telemetry.tracing;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

/**
Expand All @@ -23,7 +25,15 @@ public interface TracingContextPropagator {
* @param props properties
* @return current span
*/
Span extract(Map<String, String> props);
Optional<Span> extract(Map<String, String> props);

/**
* Extracts current span from HTTP headers.
*
* @param headers request headers to extract the context from
* @return current span
*/
Optional<Span> extractFromHeaders(Map<String, List<String>> headers);

/**
* Injects tracing context
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.http;

import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.attributes.Attributes;

import java.util.List;
import java.util.Map;

/**
* HttpTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HttpRequest header and propagate the span accordingly.
*
* All methods on the Tracer object are multi-thread safe.
*/
public interface HttpTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanName span name.
* @param header http request header.
* @param attributes span attributes.
* @return scope of the span, must be closed with explicit close or with try-with-resource
*/
SpanScope startSpan(String spanName, Map<String, List<String>> header, Attributes attributes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains No-op implementations
*/
package org.opensearch.telemetry.tracing.http;
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.attributes.Attributes;

import java.util.List;
import java.util.Map;

/**
* No-op implementation of Tracer
*
Expand Down Expand Up @@ -51,4 +54,9 @@ public SpanContext getCurrentSpan() {
public void close() {

}

@Override
public SpanScope startSpan(String spanName, Map<String, List<String>> header, Attributes attributes) {
return SpanScope.NO_OP;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import org.junit.Assert;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -104,14 +108,36 @@ public void testCreateSpanWithParent() {
Assert.assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
}

public void testHttpTracer() {
String traceId = "trace_id";
String spanId = "span_id";
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();

DefaultTracer defaultTracer = new DefaultTracer(
tracingTelemetry,
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
);

Map<String, List<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList(traceId + "~" + spanId));

SpanScope spanScope = defaultTracer.startSpan("test_span", requestHeaders, Attributes.EMPTY);
SpanContext currentSpan = defaultTracer.getCurrentSpan();
assertNotNull(currentSpan);
assertEquals(traceId, currentSpan.getSpan().getTraceId());
assertEquals(traceId, currentSpan.getSpan().getParentSpan().getTraceId());
assertEquals(spanId, currentSpan.getSpan().getParentSpan().getSpanId());
spanScope.close();
}

public void testCreateSpanWithNullParent() {
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
DefaultTracer defaultTracer = new DefaultTracer(
tracingTelemetry,
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
);

defaultTracer.startSpan("span_name", null, Attributes.EMPTY);
defaultTracer.startSpan("span_name");

Assert.assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
Assert.assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ public void testRunnableWithParent() throws Exception {
DefaultTracer defaultTracer = new DefaultTracer(new MockTracingTelemetry(), contextStorage);
defaultTracer.startSpan(parentSpanName);
SpanContext parentSpan = defaultTracer.getCurrentSpan();
AtomicReference<SpanContext> currrntSpan = new AtomicReference<>(new SpanContext(null));
AtomicReference<SpanContext> currentSpan = new AtomicReference<>();
final AtomicBoolean isRunnableCompleted = new AtomicBoolean(false);
TraceableRunnable traceableRunnable = new TraceableRunnable(defaultTracer, spanName, parentSpan, Attributes.EMPTY, () -> {
isRunnableCompleted.set(true);
currrntSpan.set(defaultTracer.getCurrentSpan());
currentSpan.set(defaultTracer.getCurrentSpan());
});
traceableRunnable.run();
assertTrue(isRunnableCompleted.get());
assertEquals(spanName, currrntSpan.get().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), currrntSpan.get().getSpan().getParentSpan());
assertEquals(spanName, currentSpan.get().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), currentSpan.get().getSpan().getParentSpan());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.core.common.Strings;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

import io.opentelemetry.api.OpenTelemetry;
Expand All @@ -32,15 +37,25 @@ public OTelTracingContextPropagator(OpenTelemetry openTelemetry) {
}

@Override
public Span extract(Map<String, String> props) {
public Optional<Span> extract(Map<String, String> props) {
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), props, TEXT_MAP_GETTER);
return Optional.ofNullable(getPropagatedSpan(context));
}

private static OTelPropagatedSpan getPropagatedSpan(Context context) {
if (context != null) {
io.opentelemetry.api.trace.Span span = io.opentelemetry.api.trace.Span.fromContext(context);
return new OTelPropagatedSpan(span);
}
return null;
}

@Override
public Optional<Span> extractFromHeaders(Map<String, List<String>> headers) {
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER);
return Optional.ofNullable(getPropagatedSpan(context));
}

@Override
public void inject(Span currentSpan, BiConsumer<String, String> setter) {
openTelemetry.getPropagators().getTextMapPropagator().inject(context((OTelSpan) currentSpan), setter, TEXT_MAP_SETTER);
Expand Down Expand Up @@ -72,4 +87,23 @@ public String get(Map<String, String> headers, String key) {
}
};

private static final TextMapGetter<Map<String, List<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
@Override
public Iterable<String> keys(Map<String, List<String>> headers) {
if (headers != null) {
return headers.keySet();
} else {
return Collections.emptySet();
}
}

@Override
public String get(Map<String, List<String>> headers, String key) {
if (headers != null && headers.containsKey(key)) {
return Strings.collectionToCommaDelimitedString(headers.get(key));
}
return null;
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.opentelemetry.api.OpenTelemetry;
Expand All @@ -19,6 +21,7 @@
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -48,8 +51,39 @@ public void testExtractTracerContextFromHeader() {
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders).orElse(null);
assertEquals(TRACE_ID, span.getTraceId());
assertEquals(SPAN_ID, span.getSpanId());
}

public void testExtractTracerContextFromHttpHeader() {
Map<String, List<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00"));
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(requestHeaders).get();
assertEquals(TRACE_ID, span.getTraceId());
assertEquals(SPAN_ID, span.getSpanId());
}

public void testExtractTracerContextFromHttpHeaderNull() {
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(null).get();
org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root()));
assertEquals(propagatedSpan.getTraceId(), span.getTraceId());
assertEquals(propagatedSpan.getSpanId(), span.getSpanId());
}

public void testExtractTracerContextFromHttpHeaderEmpty() {
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(new HashMap<>()).get();
org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root()));
assertEquals(propagatedSpan.getTraceId(), span.getTraceId());
assertEquals(propagatedSpan.getSpanId(), span.getSpanId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private Optional<Span> spanFromThreadContext(String key) {
}

private Span spanFromHeader() {
return tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders());
Optional<Span> span = tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders());
return span.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Wrapper implementation of Tracer. This delegates call to right tracer based on the tracer settings
Expand Down Expand Up @@ -42,7 +44,7 @@ public SpanScope startSpan(String spanName) {

@Override
public SpanScope startSpan(String spanName, Attributes attributes) {
return startSpan(spanName, null, attributes);
return startSpan(spanName, (SpanContext) null, attributes);
}

@Override
Expand All @@ -66,4 +68,9 @@ public void close() throws IOException {
Tracer getDelegateTracer() {
return telemetrySettings.isTracingEnabled() ? defaultTracer : NoopTracer.INSTANCE;
}

@Override
public SpanScope startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
return defaultTracer.startSpan(spanName, headers, attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracer() throws Excepti
wrappedTracer.startSpan("foo");

assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer);
verify(mockDefaultTracer).startSpan(eq("foo"), eq(null), any(Attributes.class));
verify(mockDefaultTracer).startSpan(eq("foo"), eq((SpanContext) null), any(Attributes.class));
}
}

Expand All @@ -64,7 +64,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracerWithAttr() throws
wrappedTracer.startSpan("foo", attributes);

assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer);
verify(mockDefaultTracer).startSpan("foo", null, attributes);
verify(mockDefaultTracer).startSpan("foo", (SpanContext) null, attributes);
}
}

Expand Down
Loading

0 comments on commit f4106a4

Please sign in to comment.