Skip to content

Commit

Permalink
Add SimpleLogProcessor (#3749)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Oct 15, 2021
1 parent 6d41dda commit 2241ae6
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
Comparing source compatibility of against
No changes.
+++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingLogExporter (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.logs.export.LogExporter create()
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.CompletableResultCode export(java.util.Collection)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.CompletableResultCode shutdown()
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs.export;

import static java.util.Objects.requireNonNull;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.LogProcessor;
import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.logs.data.LogRecord;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* An implementation of the {@link LogProcessor} that passes {@link LogRecord }it directly to the
* configured exporter.
*
* <p>This processor will cause all logs to be exported directly as they finish, meaning each export
* request will have a single log. Most backends will not perform well with a single log per request
* so unless you know what you're doing, strongly consider using {@link BatchLogProcessor} instead,
* including in special environments such as serverless runtimes. {@link SimpleLogProcessor} is
* generally meant to for testing only.
*/
public final class SimpleLogProcessor implements LogProcessor {

private static final Logger logger = Logger.getLogger(SimpleLogProcessor.class.getName());

private final LogExporter logExporter;
private final Set<CompletableResultCode> pendingExports =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

/**
* Returns a new {@link SimpleLogProcessor} which exports logs to the {@link LogExporter}
* synchronously.
*
* <p>This processor will cause all logs to be exported directly as they finish, meaning each
* export request will have a single log. Most backends will not perform well with a single log
* per request so unless you know what you're doing, strongly consider using {@link
* BatchLogProcessor} instead, including in special environments such as serverless runtimes.
* {@link SimpleLogProcessor} is generally meant to for testing only.
*/
public static LogProcessor create(LogExporter exporter) {
requireNonNull(exporter, "exporter");
return new SimpleLogProcessor(exporter);
}

SimpleLogProcessor(LogExporter logExporter) {
this.logExporter = requireNonNull(logExporter, "logExporter");
}

@Override
public void addLogRecord(LogRecord logRecord) {
try {
List<LogData> logs = Collections.singletonList(logRecord);
final CompletableResultCode result = logExporter.export(logs);
pendingExports.add(result);
result.whenComplete(
() -> {
pendingExports.remove(result);
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed");
}
});
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
}
}

@Override
public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
return CompletableResultCode.ofSuccess();
}
final CompletableResultCode result = new CompletableResultCode();

final CompletableResultCode flushResult = forceFlush();
flushResult.whenComplete(
() -> {
final CompletableResultCode shutdownResult = logExporter.shutdown();
shutdownResult.whenComplete(
() -> {
if (!flushResult.isSuccess() || !shutdownResult.isSuccess()) {
result.fail();
} else {
result.succeed();
}
});
});

return result;
}

@Override
public CompletableResultCode forceFlush() {
return CompletableResultCode.ofAll(pendingExports);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,25 @@

package io.opentelemetry.sdk.logs;

import static io.opentelemetry.sdk.logs.util.TestUtil.createLog;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.logs.data.LogRecord;
import io.opentelemetry.sdk.logs.data.Severity;
import io.opentelemetry.sdk.logs.export.BatchLogProcessor;
import io.opentelemetry.sdk.logs.util.TestLogExporter;
import io.opentelemetry.sdk.logs.util.TestLogProcessor;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

class LogSinkSdkProviderTest {

private static LogRecord createLog(Severity severity, String message) {
return LogRecord.builder(
Resource.create(Attributes.builder().put("testKey", "testValue").build()),
InstrumentationLibraryInfo.create("instrumentation", "1"))
.setEpochMillis(System.currentTimeMillis())
.setTraceId(TraceId.getInvalid())
.setSpanId(SpanId.getInvalid())
.setFlags(TraceFlags.getDefault().asByte())
.setSeverity(severity)
.setSeverityText("really severe")
.setName("log1")
.setBody(message)
.setAttributes(Attributes.builder().put("animal", "cat").build())
.build();
}

@Test
void testLogSinkSdkProvider() {
TestLogExporter exporter = new TestLogExporter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,20 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs.sdk;
package io.opentelemetry.sdk.logs.export;

import static io.opentelemetry.sdk.logs.util.TestUtil.createLog;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.awaitility.Awaitility.await;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.logs.data.LogRecord;
import io.opentelemetry.sdk.logs.data.Severity;
import io.opentelemetry.sdk.logs.export.BatchLogProcessor;
import io.opentelemetry.sdk.logs.util.TestLogExporter;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

class BatchLogProcessorTest {

private static LogRecord createLog(Severity severity, String message) {
return LogRecord.builder(
Resource.create(Attributes.builder().put("testKey", "testValue").build()),
InstrumentationLibraryInfo.create("instrumentation", "1"))
.setEpochMillis(System.currentTimeMillis())
.setTraceId(TraceId.getInvalid())
.setSpanId(SpanId.getInvalid())
.setFlags(TraceFlags.getDefault().asByte())
.setSeverity(severity)
.setSeverityText("really severe")
.setName("log1")
.setBody(message)
.setAttributes(Attributes.builder().put("animal", "cat").build())
.build();
}

@Test
void testForceExport() {
int batchSize = 10;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs.export;

import static io.opentelemetry.sdk.logs.data.Severity.DEBUG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.LogProcessor;
import io.opentelemetry.sdk.logs.data.LogRecord;
import io.opentelemetry.sdk.logs.util.TestUtil;
import java.util.Collections;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class SimpleLogProcessorTest {

@Mock private LogExporter logExporter;

private LogProcessor logProcessor;

@BeforeEach
void setUp() {
logProcessor = SimpleLogProcessor.create(logExporter);
when(logExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
}

@Test
void create_NullExporter() {
assertThatThrownBy(() -> SimpleLogProcessor.create(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("exporter");
}

@Test
void addLogRecord() {
LogRecord logRecord = TestUtil.createLog(DEBUG, "Log message");
logProcessor.addLogRecord(logRecord);
verify(logExporter).export(Collections.singletonList(logRecord));
}

@Test
void addLogRecord_ExporterError() {
LogRecord logRecord = TestUtil.createLog(DEBUG, "Log message");
when(logExporter.export(any())).thenThrow(new RuntimeException("Exporter error!"));
logProcessor.addLogRecord(logRecord);
logProcessor.addLogRecord(logRecord);
verify(logExporter, times(2)).export(Collections.singletonList(logRecord));
}

@Test
void forceFlush() {
CompletableResultCode export1 = new CompletableResultCode();
CompletableResultCode export2 = new CompletableResultCode();

when(logExporter.export(any())).thenReturn(export1, export2);

LogRecord logRecord = TestUtil.createLog(DEBUG, "Log message");
logProcessor.addLogRecord(logRecord);
logProcessor.addLogRecord(logRecord);

verify(logExporter, times(2)).export(Collections.singletonList(logRecord));

CompletableResultCode flush = logProcessor.forceFlush();
assertThat(flush.isDone()).isFalse();

export1.succeed();
assertThat(flush.isDone()).isFalse();

export2.succeed();
assertThat(flush.isDone()).isTrue();
assertThat(flush.isSuccess()).isTrue();
}

@Test
void shutdown() {
CompletableResultCode export1 = new CompletableResultCode();
CompletableResultCode export2 = new CompletableResultCode();

when(logExporter.export(any())).thenReturn(export1, export2);

LogRecord logRecord = TestUtil.createLog(DEBUG, "Log message");
logProcessor.addLogRecord(logRecord);
logProcessor.addLogRecord(logRecord);

verify(logExporter, times(2)).export(Collections.singletonList(logRecord));

CompletableResultCode shutdown = logProcessor.shutdown();
assertThat(shutdown.isDone()).isFalse();

export1.succeed();
assertThat(shutdown.isDone()).isFalse();
verify(logExporter, never()).shutdown();

export2.succeed();
assertThat(shutdown.isDone()).isTrue();
assertThat(shutdown.isSuccess()).isTrue();
verify(logExporter).shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs.util;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.logs.data.LogRecord;
import io.opentelemetry.sdk.logs.data.Severity;
import io.opentelemetry.sdk.resources.Resource;

public final class TestUtil {

public static LogRecord createLog(Severity severity, String message) {
return LogRecord.builder(
Resource.create(Attributes.builder().put("testKey", "testValue").build()),
InstrumentationLibraryInfo.create("instrumentation", "1"))
.setEpochMillis(System.currentTimeMillis())
.setTraceId(TraceId.getInvalid())
.setSpanId(SpanId.getInvalid())
.setFlags(TraceFlags.getDefault().asByte())
.setSeverity(severity)
.setSeverityText("really severe")
.setName("log1")
.setBody(message)
.setAttributes(Attributes.builder().put("animal", "cat").build())
.build();
}

private TestUtil() {}
}

0 comments on commit 2241ae6

Please sign in to comment.