Skip to content

Commit

Permalink
Allow logs to be mutated by LogProcessor (#4643)
Browse files Browse the repository at this point in the history
* Allow logs to be mutated by LogProcessor

* wip

* Improve test coverage
  • Loading branch information
jack-berg authored Aug 11, 2022
1 parent dcb9bbb commit b979ea1
Show file tree
Hide file tree
Showing 23 changed files with 433 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,9 @@ private static void testLogExporter(LogExporter logExporter) {

try (Scope unused = Span.wrap(spanContext).makeCurrent()) {
logEmitter
.logBuilder()
.logRecordBuilder()
.setBody("log body")
.setAttributes(Attributes.builder().put("key", "value").build())
.setAllAttributes(Attributes.builder().put("key", "value").build())
.setSeverity(Severity.DEBUG)
.setSeverityText("DEBUG")
.setEpoch(Instant.now())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ void configures() throws Exception {

LogEmitter logEmitter =
autoConfiguredOpenTelemetrySdk.getOpenTelemetrySdk().getSdkLogEmitterProvider().get("test");
logEmitter.logBuilder().setBody("debug log message").setSeverity(Severity.DEBUG).emit();
logEmitter.logBuilder().setBody("info log message").setSeverity(Severity.INFO).emit();
logEmitter.logRecordBuilder().setBody("debug log message").setSeverity(Severity.DEBUG).emit();
logEmitter.logRecordBuilder().setBody("info log message").setSeverity(Severity.INFO).emit();

await().untilAsserted(() -> assertThat(otlpTraceRequests).hasSize(1));

Expand Down
45 changes: 0 additions & 45 deletions sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogBuilder.java

This file was deleted.

12 changes: 6 additions & 6 deletions sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
/**
* A {@link LogEmitter} is the entry point into a log pipeline.
*
* <p>Obtain a log builder via {@link #logBuilder()}, add properties using the setters, and emit it
* to downstream {@link LogProcessor}(s) via {@link LogBuilder#emit()}.
* <p>Obtain a {@link #logRecordBuilder()}, add properties using the setters, and emit it to
* downstream {@link LogProcessor}(s) via {@link LogRecordBuilder#emit()}.
*/
@ThreadSafe
public interface LogEmitter {

/**
* Return a {@link LogBuilder} to emit a log.
* Return a {@link LogRecordBuilder} to emit a log record.
*
* <p>Build the log using the {@link LogBuilder} setters, and emit it to downstream {@link
* LogProcessor}(s) via {@link LogBuilder#emit()}.
* <p>Build the log record using the {@link LogRecordBuilder} setters, and emit it to downstream
* {@link LogProcessor}(s) via {@link LogRecordBuilder#emit()}.
*/
LogBuilder logBuilder();
LogRecordBuilder logRecordBuilder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.sdk.logs;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogData;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -15,8 +14,8 @@
import javax.annotation.concurrent.ThreadSafe;

/**
* {@link LogProcessor} is the interface to allow synchronous hooks for logs emitted by {@link
* LogEmitter}s.
* {@link LogProcessor} is the interface to allow synchronous hooks for log records emitted by
* {@link LogEmitter}s.
*/
@ThreadSafe
public interface LogProcessor extends Closeable {
Expand Down Expand Up @@ -48,11 +47,11 @@ static LogProcessor composite(Iterable<LogProcessor> processors) {
}

/**
* Emit a log.
* Called when a {@link LogEmitter} {@link LogRecordBuilder#emit()}s a log record.
*
* @param logData the log
* @param logRecord the log record
*/
void emit(LogData logData);
void onEmit(ReadWriteLogRecord logRecord);

/**
* Shutdown the log processor.
Expand All @@ -64,7 +63,7 @@ default CompletableResultCode shutdown() {
}

/**
* Process all logs that have not yet been processed.
* Process all log records that have not yet been processed.
*
* @return result
*/
Expand All @@ -73,7 +72,8 @@ default CompletableResultCode forceFlush() {
}

/**
* Closes this {@link LogProcessor} after processing any remaining logs, releasing any resources.
* Closes this {@link LogProcessor} after processing any remaining log records, releasing any
* resources.
*/
@Override
default void close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.logs.data.Severity;
import java.time.Instant;
import java.util.concurrent.TimeUnit;

/**
* Used to construct and emit logs from a {@link LogEmitter}.
*
* <p>Obtain a {@link LogEmitter#logRecordBuilder()}, add properties using the setters, and emit the
* log to downstream {@link LogProcessor}(s) by calling {@link #emit()}.
*/
public interface LogRecordBuilder {

/** Set the epoch timestamp using the timestamp and unit. */
LogRecordBuilder setEpoch(long timestamp, TimeUnit unit);

/** Set the epoch timestamp using the instant. */
LogRecordBuilder setEpoch(Instant instant);

/** Set the context. */
LogRecordBuilder setContext(Context context);

/** Set the severity. */
LogRecordBuilder setSeverity(Severity severity);

/** Set the severity text. */
LogRecordBuilder setSeverityText(String severityText);

/** Set the body string. */
LogRecordBuilder setBody(String body);

/**
* Sets attributes. If the {@link LogRecordBuilder} previously contained a mapping for any of the
* keys, the old values are replaced by the specified values.
*/
@SuppressWarnings("unchecked")
default LogRecordBuilder setAllAttributes(Attributes attributes) {
if (attributes == null || attributes.isEmpty()) {
return this;
}
attributes.forEach(
(attributeKey, value) -> setAttribute((AttributeKey<Object>) attributeKey, value));
return this;
}

/** Sets an attribute. */
<T> LogRecordBuilder setAttribute(AttributeKey<T> key, T value);

/** Emit the log to downstream {@link LogProcessor}(s). */
void emit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.sdk.logs;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogData;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -32,9 +31,9 @@ static LogProcessor create(List<LogProcessor> logProcessorsList) {
}

@Override
public void emit(LogData logData) {
public void onEmit(ReadWriteLogRecord logRecord) {
for (LogProcessor logProcessor : logProcessors) {
logProcessor.emit(logData);
logProcessor.onEmit(logRecord);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package io.opentelemetry.sdk.logs;

import io.opentelemetry.sdk.logs.data.LogData;

final class NoopLogProcessor implements LogProcessor {
private static final NoopLogProcessor INSTANCE = new NoopLogProcessor();

Expand All @@ -17,5 +15,5 @@ static LogProcessor getInstance() {
private NoopLogProcessor() {}

@Override
public void emit(LogData logData) {}
public void onEmit(ReadWriteLogRecord logRecord) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.sdk.logs.data.LogData;

/** A log record that can be read from and written to. */
public interface ReadWriteLogRecord {

/**
* Sets an attribute on the log record. If the log record previously contained a mapping for the
* key, the old value is replaced by the specified value.
*
* <p>Note: the behavior of null values is undefined, and hence strongly discouraged.
*/
<T> ReadWriteLogRecord setAttribute(AttributeKey<T> key, T value);

// TODO: add additional setters

/** Return an immutable {@link LogData} instance representing this log record. */
LogData toLogData();

// TODO: add additional log record accessors. Currently, all fields can be accessed indirectly via
// #toLogData() with at the expense of additional allocations.

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ final class SdkLogEmitter implements LogEmitter {
}

@Override
public LogBuilder logBuilder() {
return new SdkLogBuilder(logEmitterSharedState, instrumentationScopeInfo);
public LogRecordBuilder logRecordBuilder() {
return new SdkLogRecordBuilder(logEmitterSharedState, instrumentationScopeInfo);
}

// VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public SdkLogEmitterProviderBuilder setResource(Resource resource) {

/**
* Assign a {@link Supplier} of {@link LogLimits}. {@link LogLimits} will be retrieved each time a
* {@link LogEmitter#logBuilder()} is called.
* {@link LogEmitter#logRecordBuilder()} is called.
*
* <p>The {@code logLimitsSupplier} must be thread-safe and return immediately (no remote calls,
* as contention free as possible).
*
* @param logLimitsSupplier the supplier that will be used to retrieve the {@link LogLimits} for
* every {@link LogBuilder}.
* every {@link LogRecordBuilder}.
* @return this
*/
public SdkLogEmitterProviderBuilder setLogLimits(Supplier<LogLimits> logLimitsSupplier) {
Expand All @@ -55,8 +55,8 @@ public SdkLogEmitterProviderBuilder setLogLimits(Supplier<LogLimits> logLimitsSu
}

/**
* Add a log processor. {@link LogProcessor#emit(LogData)} will be called each time a log is
* emitted by {@link LogEmitter} instances obtained from the {@link SdkLogEmitterProvider}.
* Add a log processor. {@link LogProcessor#onEmit(ReadWriteLogRecord)} will be called each time a
* log is emitted by {@link LogEmitter} instances obtained from the {@link SdkLogEmitterProvider}.
*
* @param processor the log processor
* @return this
Expand Down
Loading

0 comments on commit b979ea1

Please sign in to comment.