diff --git a/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java b/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java index 02223e07cf1..ab0d4ae3e29 100644 --- a/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java +++ b/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java @@ -519,9 +519,9 @@ private static void testLogExporter(LogExporter logExporter) { try (Scope unused = Span.wrap(spanContext).makeCurrent()) { logEmitter - .logBuilder() + .logRecordBuilder() .setBody("log body") - .setAttributes(Attributes.builder().put("key", "value").build()) + .setAllAttributes(Attributes.builder().put("key", "value").build()) .setSeverity(Severity.DEBUG) .setSeverityText("DEBUG") .setEpoch(Instant.now()) diff --git a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/FullConfigTest.java b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/FullConfigTest.java index 53c72a3c31e..59dc3e63225 100644 --- a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/FullConfigTest.java +++ b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/FullConfigTest.java @@ -213,8 +213,8 @@ void configures() throws Exception { LogEmitter logEmitter = autoConfiguredOpenTelemetrySdk.getOpenTelemetrySdk().getSdkLogEmitterProvider().get("test"); - logEmitter.logBuilder().setBody("debug log message").setSeverity(Severity.DEBUG).emit(); - logEmitter.logBuilder().setBody("info log message").setSeverity(Severity.INFO).emit(); + logEmitter.logRecordBuilder().setBody("debug log message").setSeverity(Severity.DEBUG).emit(); + logEmitter.logRecordBuilder().setBody("info log message").setSeverity(Severity.INFO).emit(); await().untilAsserted(() -> assertThat(otlpTraceRequests).hasSize(1)); diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogBuilder.java deleted file mode 100644 index af66889c917..00000000000 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogBuilder.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.logs; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.context.Context; -import io.opentelemetry.sdk.logs.data.Severity; -import java.time.Instant; -import java.util.concurrent.TimeUnit; - -/** - * Used to construct and emit logs from a {@link LogEmitter}. - * - *

Obtain a {@link LogBuilder} via {@link LogEmitter#logBuilder()}, add properties using the - * setters, and emit the log to downstream {@link LogProcessor}(s) by calling {@link #emit()}. - */ -public interface LogBuilder { - - /** Set the epoch timestamp using the timestamp and unit. */ - LogBuilder setEpoch(long timestamp, TimeUnit unit); - - /** Set the epoch timestamp using the instant. */ - LogBuilder setEpoch(Instant instant); - - /** Set the context. */ - LogBuilder setContext(Context context); - - /** Set the severity. */ - LogBuilder setSeverity(Severity severity); - - /** Set the severity text. */ - LogBuilder setSeverityText(String severityText); - - /** Set the body string. */ - LogBuilder setBody(String body); - - /** Set the attributes. */ - LogBuilder setAttributes(Attributes attributes); - - /** Emit the log to downstream {@link LogProcessor}(s). */ - void emit(); -} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogEmitter.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogEmitter.java index 37a0a48a822..451a175b52b 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogEmitter.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogEmitter.java @@ -10,17 +10,17 @@ /** * A {@link LogEmitter} is the entry point into a log pipeline. * - *

Obtain a log builder via {@link #logBuilder()}, add properties using the setters, and emit it - * to downstream {@link LogProcessor}(s) via {@link LogBuilder#emit()}. + *

Obtain a {@link #logRecordBuilder()}, add properties using the setters, and emit it to + * downstream {@link LogProcessor}(s) via {@link LogRecordBuilder#emit()}. */ @ThreadSafe public interface LogEmitter { /** - * Return a {@link LogBuilder} to emit a log. + * Return a {@link LogRecordBuilder} to emit a log record. * - *

Build the log using the {@link LogBuilder} setters, and emit it to downstream {@link - * LogProcessor}(s) via {@link LogBuilder#emit()}. + *

Build the log record using the {@link LogRecordBuilder} setters, and emit it to downstream + * {@link LogProcessor}(s) via {@link LogRecordBuilder#emit()}. */ - LogBuilder logBuilder(); + LogRecordBuilder logRecordBuilder(); } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogProcessor.java index 77a6cbb8858..a31e67b1e35 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogProcessor.java @@ -6,7 +6,6 @@ package io.opentelemetry.sdk.logs; import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.logs.data.LogData; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; @@ -15,8 +14,8 @@ import javax.annotation.concurrent.ThreadSafe; /** - * {@link LogProcessor} is the interface to allow synchronous hooks for logs emitted by {@link - * LogEmitter}s. + * {@link LogProcessor} is the interface to allow synchronous hooks for log records emitted by + * {@link LogEmitter}s. */ @ThreadSafe public interface LogProcessor extends Closeable { @@ -48,11 +47,11 @@ static LogProcessor composite(Iterable processors) { } /** - * Emit a log. + * Called when a {@link LogEmitter} {@link LogRecordBuilder#emit()}s a log record. * - * @param logData the log + * @param logRecord the log record */ - void emit(LogData logData); + void onEmit(ReadWriteLogRecord logRecord); /** * Shutdown the log processor. @@ -64,7 +63,7 @@ default CompletableResultCode shutdown() { } /** - * Process all logs that have not yet been processed. + * Process all log records that have not yet been processed. * * @return result */ @@ -73,7 +72,8 @@ default CompletableResultCode forceFlush() { } /** - * Closes this {@link LogProcessor} after processing any remaining logs, releasing any resources. + * Closes this {@link LogProcessor} after processing any remaining log records, releasing any + * resources. */ @Override default void close() { diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogRecordBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogRecordBuilder.java new file mode 100644 index 00000000000..e1d5c2a463f --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogRecordBuilder.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.logs.data.Severity; +import java.time.Instant; +import java.util.concurrent.TimeUnit; + +/** + * Used to construct and emit logs from a {@link LogEmitter}. + * + *

Obtain a {@link LogEmitter#logRecordBuilder()}, add properties using the setters, and emit the + * log to downstream {@link LogProcessor}(s) by calling {@link #emit()}. + */ +public interface LogRecordBuilder { + + /** Set the epoch timestamp using the timestamp and unit. */ + LogRecordBuilder setEpoch(long timestamp, TimeUnit unit); + + /** Set the epoch timestamp using the instant. */ + LogRecordBuilder setEpoch(Instant instant); + + /** Set the context. */ + LogRecordBuilder setContext(Context context); + + /** Set the severity. */ + LogRecordBuilder setSeverity(Severity severity); + + /** Set the severity text. */ + LogRecordBuilder setSeverityText(String severityText); + + /** Set the body string. */ + LogRecordBuilder setBody(String body); + + /** + * Sets attributes. If the {@link LogRecordBuilder} previously contained a mapping for any of the + * keys, the old values are replaced by the specified values. + */ + @SuppressWarnings("unchecked") + default LogRecordBuilder setAllAttributes(Attributes attributes) { + if (attributes == null || attributes.isEmpty()) { + return this; + } + attributes.forEach( + (attributeKey, value) -> setAttribute((AttributeKey) attributeKey, value)); + return this; + } + + /** Sets an attribute. */ + LogRecordBuilder setAttribute(AttributeKey key, T value); + + /** Emit the log to downstream {@link LogProcessor}(s). */ + void emit(); +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogProcessor.java index 8f07c00a3b6..aa91ac41481 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogProcessor.java @@ -6,7 +6,6 @@ package io.opentelemetry.sdk.logs; import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.logs.data.LogData; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -32,9 +31,9 @@ static LogProcessor create(List logProcessorsList) { } @Override - public void emit(LogData logData) { + public void onEmit(ReadWriteLogRecord logRecord) { for (LogProcessor logProcessor : logProcessors) { - logProcessor.emit(logData); + logProcessor.onEmit(logRecord); } } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogProcessor.java index a7ef2f2f22d..c3b5e0bf353 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogProcessor.java @@ -5,8 +5,6 @@ package io.opentelemetry.sdk.logs; -import io.opentelemetry.sdk.logs.data.LogData; - final class NoopLogProcessor implements LogProcessor { private static final NoopLogProcessor INSTANCE = new NoopLogProcessor(); @@ -17,5 +15,5 @@ static LogProcessor getInstance() { private NoopLogProcessor() {} @Override - public void emit(LogData logData) {} + public void onEmit(ReadWriteLogRecord logRecord) {} } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/ReadWriteLogRecord.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/ReadWriteLogRecord.java new file mode 100644 index 00000000000..c7f8e0b0e8f --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/ReadWriteLogRecord.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.logs.data.LogData; + +/** A log record that can be read from and written to. */ +public interface ReadWriteLogRecord { + + /** + * Sets an attribute on the log record. If the log record previously contained a mapping for the + * key, the old value is replaced by the specified value. + * + *

Note: the behavior of null values is undefined, and hence strongly discouraged. + */ + ReadWriteLogRecord setAttribute(AttributeKey key, T value); + + // TODO: add additional setters + + /** Return an immutable {@link LogData} instance representing this log record. */ + LogData toLogData(); + + // TODO: add additional log record accessors. Currently, all fields can be accessed indirectly via + // #toLogData() with at the expense of additional allocations. + +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogEmitter.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogEmitter.java index ae4f73039b8..bc81cb0fb7c 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogEmitter.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogEmitter.java @@ -21,8 +21,8 @@ final class SdkLogEmitter implements LogEmitter { } @Override - public LogBuilder logBuilder() { - return new SdkLogBuilder(logEmitterSharedState, instrumentationScopeInfo); + public LogRecordBuilder logRecordBuilder() { + return new SdkLogRecordBuilder(logEmitterSharedState, instrumentationScopeInfo); } // VisibleForTesting diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogEmitterProviderBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogEmitterProviderBuilder.java index 315c5234a67..d3b9aab071a 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogEmitterProviderBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogEmitterProviderBuilder.java @@ -39,13 +39,13 @@ public SdkLogEmitterProviderBuilder setResource(Resource resource) { /** * Assign a {@link Supplier} of {@link LogLimits}. {@link LogLimits} will be retrieved each time a - * {@link LogEmitter#logBuilder()} is called. + * {@link LogEmitter#logRecordBuilder()} is called. * *

The {@code logLimitsSupplier} must be thread-safe and return immediately (no remote calls, * as contention free as possible). * * @param logLimitsSupplier the supplier that will be used to retrieve the {@link LogLimits} for - * every {@link LogBuilder}. + * every {@link LogRecordBuilder}. * @return this */ public SdkLogEmitterProviderBuilder setLogLimits(Supplier logLimitsSupplier) { @@ -55,8 +55,8 @@ public SdkLogEmitterProviderBuilder setLogLimits(Supplier logLimitsSu } /** - * Add a log processor. {@link LogProcessor#emit(LogData)} will be called each time a log is - * emitted by {@link LogEmitter} instances obtained from the {@link SdkLogEmitterProvider}. + * Add a log processor. {@link LogProcessor#onEmit(ReadWriteLogRecord)} will be called each time a + * log is emitted by {@link LogEmitter} instances obtained from the {@link SdkLogEmitterProvider}. * * @param processor the log processor * @return this diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java similarity index 60% rename from sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogBuilder.java rename to sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java index e44f70d870c..c147411bca3 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java @@ -5,20 +5,20 @@ package io.opentelemetry.sdk.logs; -import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; -import io.opentelemetry.sdk.internal.AttributeUtil; +import io.opentelemetry.sdk.internal.AttributesMap; import io.opentelemetry.sdk.logs.data.Body; import io.opentelemetry.sdk.logs.data.Severity; import java.time.Instant; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -/** {@link SdkLogBuilder} is the SDK implementation of {@link LogBuilder}. */ -final class SdkLogBuilder implements LogBuilder { +/** SDK implementation of {@link LogRecordBuilder}. */ +final class SdkLogRecordBuilder implements LogRecordBuilder { private final LogEmitterSharedState logEmitterSharedState; private final LogLimits logLimits; @@ -29,9 +29,9 @@ final class SdkLogBuilder implements LogBuilder { private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER; @Nullable private String severityText; private Body body = Body.empty(); - private Attributes attributes = Attributes.empty(); + @Nullable private AttributesMap attributes; - SdkLogBuilder( + SdkLogRecordBuilder( LogEmitterSharedState logEmitterSharedState, InstrumentationScopeInfo instrumentationScopeInfo) { this.logEmitterSharedState = logEmitterSharedState; @@ -40,48 +40,52 @@ final class SdkLogBuilder implements LogBuilder { } @Override - public LogBuilder setEpoch(long timestamp, TimeUnit unit) { + public LogRecordBuilder setEpoch(long timestamp, TimeUnit unit) { this.epochNanos = unit.toNanos(timestamp); return this; } @Override - public LogBuilder setEpoch(Instant instant) { + public LogRecordBuilder setEpoch(Instant instant) { this.epochNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano(); return this; } @Override - public LogBuilder setContext(Context context) { - spanContext = Span.fromContext(context).getSpanContext(); + public LogRecordBuilder setContext(Context context) { + this.spanContext = Span.fromContext(context).getSpanContext(); return this; } @Override - public LogBuilder setSeverity(Severity severity) { + public LogRecordBuilder setSeverity(Severity severity) { this.severity = severity; return this; } @Override - public LogBuilder setSeverityText(String severityText) { + public LogRecordBuilder setSeverityText(String severityText) { this.severityText = severityText; return this; } @Override - public LogBuilder setBody(String body) { + public LogRecordBuilder setBody(String body) { this.body = Body.string(body); return this; } @Override - public LogBuilder setAttributes(Attributes attributes) { - this.attributes = - AttributeUtil.applyAttributesLimit( - attributes, - logLimits.getMaxNumberOfAttributes(), - logLimits.getMaxAttributeValueLength()); + public LogRecordBuilder setAttribute(AttributeKey key, T value) { + if (key == null || key.getKey().isEmpty() || value == null) { + return this; + } + if (this.attributes == null) { + this.attributes = + AttributesMap.create( + logLimits.getMaxNumberOfAttributes(), logLimits.getMaxAttributeValueLength()); + } + this.attributes.put(key, value); return this; } @@ -92,11 +96,14 @@ public void emit() { } logEmitterSharedState .getLogProcessor() - .emit( - SdkLogData.create( + .onEmit( + SdkReadWriteLogRecord.create( + logEmitterSharedState.getLogLimits(), logEmitterSharedState.getResource(), instrumentationScopeInfo, - this.epochNanos == 0 ? logEmitterSharedState.getClock().now() : this.epochNanos, + this.epochNanos == 0 + ? this.logEmitterSharedState.getClock().now() + : this.epochNanos, spanContext, severity, severityText, diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkReadWriteLogRecord.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkReadWriteLogRecord.java new file mode 100644 index 00000000000..bb1eb6336db --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkReadWriteLogRecord.java @@ -0,0 +1,121 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.internal.GuardedBy; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.internal.AttributesMap; +import io.opentelemetry.sdk.logs.data.Body; +import io.opentelemetry.sdk.logs.data.LogData; +import io.opentelemetry.sdk.logs.data.Severity; +import io.opentelemetry.sdk.resources.Resource; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +@ThreadSafe +class SdkReadWriteLogRecord implements ReadWriteLogRecord { + + private final LogLimits logLimits; + private final Resource resource; + private final InstrumentationScopeInfo instrumentationScopeInfo; + private final long epochNanos; + private final SpanContext spanContext; + private final Severity severity; + @Nullable private final String severityText; + private final Body body; + private final Object lock = new Object(); + + @GuardedBy("lock") + @Nullable + private AttributesMap attributes; + + private SdkReadWriteLogRecord( + LogLimits logLimits, + Resource resource, + InstrumentationScopeInfo instrumentationScopeInfo, + long epochNanos, + SpanContext spanContext, + Severity severity, + @Nullable String severityText, + Body body, + @Nullable AttributesMap attributes) { + this.logLimits = logLimits; + this.resource = resource; + this.instrumentationScopeInfo = instrumentationScopeInfo; + this.epochNanos = epochNanos; + this.spanContext = spanContext; + this.severity = severity; + this.severityText = severityText; + this.body = body; + this.attributes = attributes; + } + + /** Create the log record with the given configuration and emit it to the {@code logProcessor}. */ + static SdkReadWriteLogRecord create( + LogLimits logLimits, + Resource resource, + InstrumentationScopeInfo instrumentationScopeInfo, + long epochNanos, + SpanContext spanContext, + Severity severity, + @Nullable String severityText, + Body body, + @Nullable AttributesMap attributes) { + return new SdkReadWriteLogRecord( + logLimits, + resource, + instrumentationScopeInfo, + epochNanos, + spanContext, + severity, + severityText, + body, + attributes); + } + + @Override + public ReadWriteLogRecord setAttribute(AttributeKey key, T value) { + if (key == null || key.getKey().isEmpty() || value == null) { + return this; + } + synchronized (lock) { + if (attributes == null) { + attributes = + AttributesMap.create( + logLimits.getMaxNumberOfAttributes(), logLimits.getMaxAttributeValueLength()); + } + attributes.put(key, value); + } + return this; + } + + private Attributes getImmutableAttributes() { + synchronized (lock) { + if (attributes == null || attributes.isEmpty()) { + return Attributes.empty(); + } + return attributes.immutableCopy(); + } + } + + @Override + public LogData toLogData() { + synchronized (lock) { + return SdkLogData.create( + resource, + instrumentationScopeInfo, + epochNanos, + spanContext, + severity, + severityText, + body, + getImmutableAttributes()); + } + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogProcessor.java index 096bd66a4ce..014eac2ebc8 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogProcessor.java @@ -13,6 +13,7 @@ import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.internal.DaemonThreadFactory; import io.opentelemetry.sdk.logs.LogProcessor; +import io.opentelemetry.sdk.logs.ReadWriteLogRecord; import io.opentelemetry.sdk.logs.data.LogData; import java.util.ArrayList; import java.util.Collections; @@ -79,11 +80,11 @@ public static BatchLogProcessorBuilder builder(LogExporter logExporter) { } @Override - public void emit(LogData logData) { - if (logData == null) { + public void onEmit(ReadWriteLogRecord logRecord) { + if (logRecord == null) { return; } - worker.addLog(logData); + worker.addLog(logRecord); } @Override @@ -121,7 +122,7 @@ private static final class Worker implements Runnable { private long nextExportTime; - private final Queue queue; + private final Queue queue; // When waiting on the logs queue, exporter thread sets this atomic to the number of more // logs it needs before doing an export. Writer threads would then wait for the queue to reach // logsNeeded size before notifying the exporter thread about new entries. @@ -140,7 +141,7 @@ private Worker( long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - Queue queue) { + Queue queue) { this.logExporter = logExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; @@ -182,7 +183,7 @@ private Worker( this.batch = new ArrayList<>(this.maxExportBatchSize); } - private void addLog(LogData logData) { + private void addLog(ReadWriteLogRecord logData) { if (!queue.offer(logData)) { processedLogsCounter.add(1, droppedAttrs); } else { @@ -201,7 +202,7 @@ public void run() { flush(); } while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { - batch.add(queue.poll()); + batch.add(queue.poll().toLogData()); } if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); @@ -226,9 +227,9 @@ public void run() { private void flush() { int logsToFlush = queue.size(); while (logsToFlush > 0) { - LogData logData = queue.poll(); - assert logData != null; - batch.add(logData); + ReadWriteLogRecord logRecord = queue.poll(); + assert logRecord != null; + batch.add(logRecord.toLogData()); logsToFlush--; if (batch.size() >= maxExportBatchSize) { exportCurrentBatch(); diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogProcessor.java index ff1049ae742..70745e3f3de 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogProcessor.java @@ -9,6 +9,7 @@ import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.LogProcessor; +import io.opentelemetry.sdk.logs.ReadWriteLogRecord; import io.opentelemetry.sdk.logs.data.LogData; import java.util.Collections; import java.util.List; @@ -57,9 +58,9 @@ public static LogProcessor create(LogExporter exporter) { } @Override - public void emit(LogData logData) { + public void onEmit(ReadWriteLogRecord logRecord) { try { - List logs = Collections.singletonList(logData); + List logs = Collections.singletonList(logRecord.toLogData()); CompletableResultCode result = logExporter.export(logs); pendingExports.add(result); result.whenComplete( diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogProcessorTest.java index a3ef8771de6..bb938177f9c 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogProcessorTest.java @@ -11,9 +11,6 @@ import static org.mockito.Mockito.when; import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.logs.data.LogData; -import io.opentelemetry.sdk.logs.data.Severity; -import io.opentelemetry.sdk.testing.logs.TestLogData; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,8 +25,7 @@ class MultiLogProcessorTest { @Mock private LogProcessor logProcessor1; @Mock private LogProcessor logProcessor2; - private static final LogData logData = - TestLogData.builder().setSeverity(Severity.DEBUG).setBody("message").build(); + @Mock private ReadWriteLogRecord logRecord; @BeforeEach void setup() { @@ -43,7 +39,7 @@ void setup() { void empty() { LogProcessor multiLogProcessor = LogProcessor.composite(); assertThat(multiLogProcessor).isInstanceOf(NoopLogProcessor.class); - multiLogProcessor.emit(logData); + multiLogProcessor.onEmit(logRecord); multiLogProcessor.shutdown(); } @@ -56,9 +52,9 @@ void oneLogProcessor() { @Test void twoLogProcessor() { LogProcessor multiLogProcessor = LogProcessor.composite(logProcessor1, logProcessor2); - multiLogProcessor.emit(logData); - verify(logProcessor1).emit(same(logData)); - verify(logProcessor2).emit(same(logData)); + multiLogProcessor.onEmit(logRecord); + verify(logProcessor1).onEmit(same(logRecord)); + verify(logProcessor2).onEmit(same(logRecord)); multiLogProcessor.forceFlush(); verify(logProcessor1).forceFlush(); diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogProcessorTest.java index 03652b5fb82..83cde3899c2 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogProcessorTest.java @@ -7,16 +7,20 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import io.opentelemetry.sdk.logs.data.Severity; -import io.opentelemetry.sdk.testing.logs.TestLogData; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +@ExtendWith(MockitoExtension.class) class NoopLogProcessorTest { + @Mock private ReadWriteLogRecord logRecord; + @Test void noCrash() { LogProcessor logProcessor = NoopLogProcessor.getInstance(); - logProcessor.emit(TestLogData.builder().setSeverity(Severity.DEBUG).setBody("message").build()); + logProcessor.onEmit(logRecord); assertThat(logProcessor.forceFlush().isSuccess()).isEqualTo(true); assertThat(logProcessor.shutdown().isSuccess()).isEqualTo(true); } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogEmitterProviderTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogEmitterProviderTest.java index 7c339e2bc05..e0bf8ae2a00 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogEmitterProviderTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogEmitterProviderTest.java @@ -5,23 +5,31 @@ package io.opentelemetry.sdk.logs; +import static io.opentelemetry.sdk.testing.assertj.LogAssertions.assertThat; import static org.assertj.core.api.Assertions.as; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.logs.data.LogData; +import io.opentelemetry.sdk.logs.data.Severity; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -204,6 +212,54 @@ void logEmitterBuilder_DefaultEmitterName() { .isEqualTo(SdkLogEmitterProvider.DEFAULT_EMITTER_NAME); } + @Test + void logEmitterBuilder_WithLogProcessor() { + Resource resource = Resource.builder().put("r1", "v1").build(); + AtomicReference logData = new AtomicReference<>(); + sdkLogEmitterProvider = + SdkLogEmitterProvider.builder() + .setResource(resource) + .addLogProcessor( + logRecord -> { + logRecord.setAttribute(null, null); + // Overwrite k1 + logRecord.setAttribute(AttributeKey.stringKey("k1"), "new-v1"); + // Add new attribute k3 + logRecord.setAttribute(AttributeKey.stringKey("k3"), "v3"); + logData.set(logRecord.toLogData()); + }) + .build(); + + SpanContext spanContext = + SpanContext.create( + "33333333333333333333333333333333", + "7777777777777777", + TraceFlags.getSampled(), + TraceState.getDefault()); + sdkLogEmitterProvider + .get("test") + .logRecordBuilder() + .setEpoch(100, TimeUnit.NANOSECONDS) + .setContext(Span.wrap(spanContext).storeInContext(Context.root())) + .setSeverity(Severity.DEBUG) + .setSeverityText("debug") + .setBody("body") + .setAttribute(AttributeKey.stringKey("k1"), "v1") + .setAttribute(AttributeKey.stringKey("k2"), "v2") + .emit(); + + assertThat(logData.get()) + .hasResource(resource) + .hasInstrumentationScope(InstrumentationScopeInfo.create("test")) + .hasEpochNanos(100) + .hasSpanContext(spanContext) + .hasSeverity(Severity.DEBUG) + .hasSeverityText("debug") + .hasBody("body") + .hasAttributes( + Attributes.builder().put("k1", "new-v1").put("k2", "v2").put("k3", "v3").build()); + } + @Test void forceFlush() { sdkLogEmitterProvider.forceFlush(); @@ -229,12 +285,12 @@ void canSetClock() { long now = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); Clock clock = mock(Clock.class); when(clock.now()).thenReturn(now); - List seenLogs = new ArrayList<>(); + List seenLogs = new ArrayList<>(); logProcessor = seenLogs::add; sdkLogEmitterProvider = SdkLogEmitterProvider.builder().setClock(clock).addLogProcessor(logProcessor).build(); - sdkLogEmitterProvider.logEmitterBuilder(null).build().logBuilder().emit(); + sdkLogEmitterProvider.logEmitterBuilder(null).build().logRecordBuilder().emit(); assertThat(seenLogs.size()).isEqualTo(1); - assertThat(seenLogs.get(0).getEpochNanos()).isEqualTo(now); + assertThat(seenLogs.get(0).toLogData().getEpochNanos()).isEqualTo(now); } } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogEmitterTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogEmitterTest.java index 44c04288fea..03ee9a43eca 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogEmitterTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogEmitterTest.java @@ -11,27 +11,31 @@ import static io.opentelemetry.api.common.AttributeKey.stringArrayKey; import static io.opentelemetry.sdk.testing.assertj.LogAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.internal.StringUtils; import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; -import io.opentelemetry.sdk.logs.data.LogData; import io.opentelemetry.sdk.resources.Resource; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; class SdkLogEmitterTest { @Test - void logBuilder() { + void logRecordBuilder() { LogEmitterSharedState state = mock(LogEmitterSharedState.class); InstrumentationScopeInfo info = InstrumentationScopeInfo.create("foo"); - AtomicReference seenLog = new AtomicReference<>(); + AtomicReference seenLog = new AtomicReference<>(); LogProcessor logProcessor = seenLog::set; Clock clock = mock(Clock.class); when(clock.now()).thenReturn(5L); @@ -41,29 +45,29 @@ void logBuilder() { when(state.getClock()).thenReturn(clock); SdkLogEmitter emitter = new SdkLogEmitter(state, info); - LogBuilder logBuilder = emitter.logBuilder(); - logBuilder.setBody("foo"); + LogRecordBuilder logRecordBuilder = emitter.logRecordBuilder(); + logRecordBuilder.setBody("foo"); // Have to test through the builder - logBuilder.emit(); - assertThat(seenLog.get()).hasBody("foo").hasEpochNanos(5); + logRecordBuilder.emit(); + assertThat(seenLog.get().toLogData()).hasBody("foo").hasEpochNanos(5); } @Test - void logBuilder_maxAttributeLength() { + void logRecordBuilder_maxAttributeLength() { int maxLength = 25; - AtomicReference seenLog = new AtomicReference<>(); + AtomicReference seenLog = new AtomicReference<>(); SdkLogEmitterProvider logEmitterProvider = SdkLogEmitterProvider.builder() .addLogProcessor(seenLog::set) .setLogLimits(() -> LogLimits.builder().setMaxAttributeValueLength(maxLength).build()) .build(); - LogBuilder logBuilder = logEmitterProvider.get("test").logBuilder(); + LogRecordBuilder logRecordBuilder = logEmitterProvider.get("test").logRecordBuilder(); String strVal = StringUtils.padLeft("", maxLength); String tooLongStrVal = strVal + strVal; - logBuilder - .setAttributes( + logRecordBuilder + .setAllAttributes( Attributes.builder() .put("string", tooLongStrVal) .put("boolean", true) @@ -76,7 +80,7 @@ void logBuilder_maxAttributeLength() { .build()) .emit(); - Attributes attributes = seenLog.get().getAttributes(); + Attributes attributes = seenLog.get().toLogData().getAttributes(); assertThat(attributes) .containsEntry("string", strVal) @@ -90,9 +94,9 @@ void logBuilder_maxAttributeLength() { } @Test - void logBuilder_maxAttributes() { + void logRecordBuilder_maxAttributes() { int maxNumberOfAttrs = 8; - AtomicReference seenLog = new AtomicReference<>(); + AtomicReference seenLog = new AtomicReference<>(); SdkLogEmitterProvider logEmitterProvider = SdkLogEmitterProvider.builder() .addLogProcessor(seenLog::set) @@ -105,10 +109,27 @@ void logBuilder_maxAttributes() { attributesBuilder.put("key" + i, i); } - logEmitterProvider.get("test").logBuilder().setAttributes(attributesBuilder.build()).emit(); + logEmitterProvider + .get("test") + .logRecordBuilder() + .setAllAttributes(attributesBuilder.build()) + .emit(); // NOTE: cannot guarantee which attributes are retained, only that there are no more that the // max - assertThat(seenLog.get().getAttributes()).hasSize(maxNumberOfAttrs); + assertThat(seenLog.get().toLogData().getAttributes()).hasSize(maxNumberOfAttrs); + } + + @Test + void logRecordBuilder_AfterShutdown() { + LogProcessor logProcessor = mock(LogProcessor.class); + when(logProcessor.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + SdkLogEmitterProvider logEmitterProvider = + SdkLogEmitterProvider.builder().addLogProcessor(logProcessor).build(); + + logEmitterProvider.shutdown().join(10, TimeUnit.SECONDS); + logEmitterProvider.get("test").logRecordBuilder().emit(); + + verify(logProcessor, never()).onEmit(any()); } } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogBuilderTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java similarity index 83% rename from sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogBuilderTest.java rename to sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java index fef884415aa..73ac77e3f5d 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogBuilderTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java @@ -9,6 +9,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; @@ -18,7 +19,6 @@ import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.logs.data.Body; -import io.opentelemetry.sdk.logs.data.LogData; import io.opentelemetry.sdk.logs.data.Severity; import io.opentelemetry.sdk.resources.Resource; import java.time.Instant; @@ -34,15 +34,15 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) -class SdkLogBuilderTest { +class SdkLogRecordBuilderTest { private static final Resource RESOURCE = Resource.empty(); private static final InstrumentationScopeInfo SCOPE_INFO = InstrumentationScopeInfo.empty(); @Mock LogEmitterSharedState logEmitterSharedState; - private final AtomicReference emittedLog = new AtomicReference<>(); - private SdkLogBuilder builder; + private final AtomicReference emittedLog = new AtomicReference<>(); + private SdkLogRecordBuilder builder; @BeforeEach void setup() { @@ -51,7 +51,7 @@ void setup() { when(logEmitterSharedState.getResource()).thenReturn(RESOURCE); when(logEmitterSharedState.getClock()).thenReturn(Clock.getDefault()); - builder = new SdkLogBuilder(logEmitterSharedState, SCOPE_INFO); + builder = new SdkLogRecordBuilder(logEmitterSharedState, SCOPE_INFO); } @Test @@ -60,7 +60,6 @@ void emit_AllFields() { String bodyStr = "body"; String sevText = "sevText"; Severity severity = Severity.DEBUG3; - Attributes attrs = Attributes.empty(); SpanContext spanContext = SpanContext.create( "33333333333333333333333333333333", @@ -71,17 +70,19 @@ void emit_AllFields() { builder.setBody(bodyStr); builder.setEpoch(123, TimeUnit.SECONDS); builder.setEpoch(now); - builder.setAttributes(attrs); + builder.setAttribute(null, null); + builder.setAttribute(AttributeKey.stringKey("k1"), "v1"); + builder.setAllAttributes(Attributes.builder().put("k2", "v2").put("k3", "v3").build()); builder.setContext(Span.wrap(spanContext).storeInContext(Context.root())); builder.setSeverity(severity); builder.setSeverityText(sevText); builder.emit(); - assertThat(emittedLog.get()) + assertThat(emittedLog.get().toLogData()) .hasResource(RESOURCE) .hasInstrumentationScope(SCOPE_INFO) .hasBody(bodyStr) .hasEpochNanos(TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano()) - .hasAttributes(attrs) + .hasAttributes(Attributes.builder().put("k1", "v1").put("k2", "v2").put("k3", "v3").build()) .hasSpanContext(spanContext) .hasSeverity(severity) .hasSeverityText(sevText); @@ -95,7 +96,7 @@ void emit_NoFields() { builder.emit(); - assertThat(emittedLog.get()) + assertThat(emittedLog.get().toLogData()) .hasResource(RESOURCE) .hasInstrumentationScope(SCOPE_INFO) .hasBody(Body.empty().asString()) @@ -104,13 +105,4 @@ void emit_NoFields() { .hasSpanContext(SpanContext.getInvalid()) .hasSeverity(Severity.UNDEFINED_SEVERITY_NUMBER); } - - @Test - void emit_AfterShutdown() { - when(logEmitterSharedState.hasBeenShutdown()).thenReturn(true); - - builder.emit(); - - assertThat(emittedLog.get()).isNull(); - } } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogProcessorTest.java index 65bf757a1e5..2ca660cf47d 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogProcessorTest.java @@ -58,7 +58,7 @@ private void emitLog(SdkLogEmitterProvider sdkLogEmitterProvider, String message sdkLogEmitterProvider .logEmitterBuilder(getClass().getName()) .build() - .logBuilder() + .logRecordBuilder() .setBody(message) .emit(); } @@ -283,11 +283,7 @@ void emitMoreLogsThanTheMaximumLimit() { void ignoresNullLogs() { BatchLogProcessor processor = BatchLogProcessor.builder(mockLogExporter).build(); try { - assertThatCode( - () -> { - processor.emit(null); - }) - .doesNotThrowAnyException(); + assertThatCode(() -> processor.onEmit(null)).doesNotThrowAnyException(); } finally { processor.shutdown(); } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/InMemoryLogExporterTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/InMemoryLogExporterTest.java index 0ca68f32817..850f206ec9a 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/InMemoryLogExporterTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/InMemoryLogExporterTest.java @@ -42,9 +42,9 @@ void tearDown() { @Test void getFinishedLogItems() { - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 1").emit(); - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 2").emit(); - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 3").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 1").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 2").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 3").emit(); List logItems = exporter.getFinishedLogItems(); assertThat(logItems).isNotNull(); @@ -56,9 +56,9 @@ void getFinishedLogItems() { @Test void reset() { - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 1").emit(); - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 2").emit(); - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 3").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 1").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 2").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 3").emit(); List logItems = exporter.getFinishedLogItems(); assertThat(logItems).isNotNull(); assertThat(logItems.size()).isEqualTo(3); @@ -69,9 +69,9 @@ void reset() { @Test void shutdown() { - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 1").emit(); - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 2").emit(); - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 3").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 1").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 2").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 3").emit(); List logItems = exporter.getFinishedLogItems(); assertThat(logItems).isNotNull(); assertThat(logItems.size()).isEqualTo(3); @@ -79,7 +79,7 @@ void shutdown() { exporter.shutdown(); assertThat(exporter.getFinishedLogItems()).isEmpty(); // Cannot add new elements after the shutdown. - logEmitter.logBuilder().setSeverity(DEBUG).setBody("message 1").emit(); + logEmitter.logRecordBuilder().setSeverity(DEBUG).setBody("message 1").emit(); assertThat(exporter.getFinishedLogItems()).isEmpty(); } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/SimpleLogProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/SimpleLogProcessorTest.java index f279fb58c45..8f52aabbc52 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/SimpleLogProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/SimpleLogProcessorTest.java @@ -5,11 +5,11 @@ package io.opentelemetry.sdk.logs.export; -import static io.opentelemetry.sdk.logs.data.Severity.DEBUG; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -18,6 +18,7 @@ import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.LogProcessor; +import io.opentelemetry.sdk.logs.ReadWriteLogRecord; import io.opentelemetry.sdk.logs.data.LogData; import io.opentelemetry.sdk.testing.logs.TestLogData; import java.util.Collections; @@ -33,7 +34,10 @@ @MockitoSettings(strictness = Strictness.LENIENT) class SimpleLogProcessorTest { + private static final LogData LOG_DATA = TestLogData.builder().build(); + @Mock private LogExporter logExporter; + @Mock private ReadWriteLogRecord readWriteLogRecord; private LogProcessor logProcessor; @@ -42,6 +46,7 @@ void setUp() { logProcessor = SimpleLogProcessor.create(logExporter); when(logExporter.export(anyCollection())).thenReturn(CompletableResultCode.ofSuccess()); when(logExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(readWriteLogRecord.toLogData()).thenReturn(LOG_DATA); } @Test @@ -52,20 +57,18 @@ void create_NullExporter() { } @Test - void addLogRecord() { - LogData logData = TestLogData.builder().setSeverity(DEBUG).setBody("Log message").build(); - logProcessor.emit(logData); - verify(logExporter).export(Collections.singletonList(logData)); + void onEmit() { + logProcessor.onEmit(readWriteLogRecord); + verify(logExporter).export(Collections.singletonList(LOG_DATA)); } @Test @SuppressLogger(SimpleLogProcessor.class) - void addLogRecord_ExporterError() { - LogData logData = TestLogData.builder().setSeverity(DEBUG).setBody("Log message").build(); + void onEmit_ExporterError() { when(logExporter.export(any())).thenThrow(new RuntimeException("Exporter error!")); - logProcessor.emit(logData); - logProcessor.emit(logData); - verify(logExporter, times(2)).export(Collections.singletonList(logData)); + logProcessor.onEmit(readWriteLogRecord); + logProcessor.onEmit(readWriteLogRecord); + verify(logExporter, times(2)).export(anyList()); } @Test @@ -75,11 +78,10 @@ void forceFlush() { when(logExporter.export(any())).thenReturn(export1, export2); - LogData logData = TestLogData.builder().setSeverity(DEBUG).setBody("Log message").build(); - logProcessor.emit(logData); - logProcessor.emit(logData); + logProcessor.onEmit(readWriteLogRecord); + logProcessor.onEmit(readWriteLogRecord); - verify(logExporter, times(2)).export(Collections.singletonList(logData)); + verify(logExporter, times(2)).export(Collections.singletonList(LOG_DATA)); CompletableResultCode flush = logProcessor.forceFlush(); assertThat(flush.isDone()).isFalse(); @@ -99,11 +101,10 @@ void shutdown() { when(logExporter.export(any())).thenReturn(export1, export2); - LogData logData = TestLogData.builder().setSeverity(DEBUG).setBody("Log message").build(); - logProcessor.emit(logData); - logProcessor.emit(logData); + logProcessor.onEmit(readWriteLogRecord); + logProcessor.onEmit(readWriteLogRecord); - verify(logExporter, times(2)).export(Collections.singletonList(logData)); + verify(logExporter, times(2)).export(Collections.singletonList(LOG_DATA)); CompletableResultCode shutdown = logProcessor.shutdown(); assertThat(shutdown.isDone()).isFalse();