Skip to content

Commit

Permalink
One more micrometer AsyncInstrumentRegistry fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek committed Jan 14, 2022
1 parent 9583ca6 commit e599517
Showing 1 changed file with 98 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.internal.GuardedBy;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
Expand All @@ -32,23 +31,17 @@ final class AsyncInstrumentRegistry {
// OpentelemetryMeterRegistry is GC'd
private final WeakReference<Meter> meter;

// we're always locking lock on the whole instrument map; the add/remove methods aren't called
// that often, so it's probably better to opt for correctness in that case - there is a small
// window between removing a single measurement and removing the whole instrument (if it has no
// more measurements) when potentially a new measurement could be added; a ConcurrentHashMap
// wouldn't be enough in this case
// values from the maps below are never removed - that is because the underlying OpenTelemetry
// async instruments are never removed; if we removed the recorder and tried to register it once
// again OTel would log an error and basically ignore the new callback
// these maps are GC'd together with this AsyncInstrumentRegistry instance - that is, when the
// whole OpenTelemetry Meter gets GC'd

@GuardedBy("gauges")
private final Map<String, DoubleMeasurementsRecorder> gauges = new HashMap<>();

@GuardedBy("doubleCounters")
private final Map<String, DoubleMeasurementsRecorder> doubleCounters = new HashMap<>();

@GuardedBy("longCounters")
private final Map<String, LongMeasurementsRecorder> longCounters = new HashMap<>();

@GuardedBy("upDownDoubleCounters")
private final Map<String, DoubleMeasurementsRecorder> upDownDoubleCounters = new HashMap<>();
private final Map<String, DoubleMeasurementsRecorder> gauges = new ConcurrentHashMap<>();
private final Map<String, DoubleMeasurementsRecorder> doubleCounters = new ConcurrentHashMap<>();
private final Map<String, LongMeasurementsRecorder> longCounters = new ConcurrentHashMap<>();
private final Map<String, DoubleMeasurementsRecorder> upDownDoubleCounters =
new ConcurrentHashMap<>();

AsyncInstrumentRegistry(Meter meter) {
this.meter = new WeakReference<>(meter);
Expand All @@ -71,29 +64,22 @@ <T> AsyncMeasurementHandle buildGauge(
@Nullable T obj,
ToDoubleFunction<T> objMetric) {

synchronized (gauges) {
// use the gauges map as lock for the recorder state - this way all gauge-related mutable
// state will always be accessed in synchronized(gauges)
Object recorderLock = gauges;

DoubleMeasurementsRecorder recorder =
gauges.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback =
new DoubleMeasurementsRecorder(recorderLock);
otelMeter()
.gaugeBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(gauges, name, attributes);
}
DoubleMeasurementsRecorder recorder =
gauges.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.gaugeBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(recorder, attributes);
}

<T> AsyncMeasurementHandle buildDoubleCounter(
Expand All @@ -113,30 +99,23 @@ <T> AsyncMeasurementHandle buildDoubleCounter(
@Nullable T obj,
ToDoubleFunction<T> objMetric) {

synchronized (doubleCounters) {
// use the counters map as lock for the recorder state - this way all double counter-related
// mutable state will always be accessed in synchronized(doubleCounters)
Object recorderLock = doubleCounters;

DoubleMeasurementsRecorder recorder =
doubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback =
new DoubleMeasurementsRecorder(recorderLock);
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(doubleCounters, name, attributes);
}
DoubleMeasurementsRecorder recorder =
doubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(recorder, attributes);
}

<T> AsyncMeasurementHandle buildLongCounter(
Expand All @@ -147,29 +126,22 @@ <T> AsyncMeasurementHandle buildLongCounter(
@Nullable T obj,
ToLongFunction<T> objMetric) {

synchronized (longCounters) {
// use the counters map as lock for the recorder state - this way all gauge-related mutable
// state will always be accessed in synchronized(longCounters)
Object recorderLock = longCounters;

LongMeasurementsRecorder recorder =
longCounters.computeIfAbsent(
name,
n -> {
LongMeasurementsRecorder recorderCallback =
new LongMeasurementsRecorder(recorderLock);
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new LongMeasurementSource(obj, (ToLongFunction<Object>) objMetric));

return new AsyncMeasurementHandle(longCounters, name, attributes);
}
LongMeasurementsRecorder recorder =
longCounters.computeIfAbsent(
name,
n -> {
LongMeasurementsRecorder recorderCallback = new LongMeasurementsRecorder();
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new LongMeasurementSource(obj, (ToLongFunction<Object>) objMetric));

return new AsyncMeasurementHandle(recorder, attributes);
}

<T> AsyncMeasurementHandle buildUpDownDoubleCounter(
Expand All @@ -180,30 +152,23 @@ <T> AsyncMeasurementHandle buildUpDownDoubleCounter(
T obj,
ToDoubleFunction<T> objMetric) {

synchronized (upDownDoubleCounters) {
// use the counters map as lock for the recorder state - this way all double counter-related
// mutable state will always be accessed in synchronized(upDownDoubleCounters)
Object recorderLock = upDownDoubleCounters;

DoubleMeasurementsRecorder recorder =
upDownDoubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback =
new DoubleMeasurementsRecorder(recorderLock);
otelMeter()
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(upDownDoubleCounters, name, attributes);
}
DoubleMeasurementsRecorder recorder =
upDownDoubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(recorder, attributes);
}

private Meter otelMeter() {
Expand All @@ -217,79 +182,46 @@ private Meter otelMeter() {

private abstract static class MeasurementsRecorder<I> {

private final Object lock;

@GuardedBy("lock")
private final Map<Attributes, I> measurements = new HashMap<>();

protected MeasurementsRecorder(Object lock) {
this.lock = lock;
}

Map<Attributes, I> copyForRead() {
synchronized (lock) {
return new HashMap<>(measurements);
}
}
final Map<Attributes, I> measurements = new ConcurrentHashMap<>();

void addMeasurement(Attributes attributes, I info) {
synchronized (lock) {
measurements.put(attributes, info);
}
measurements.put(attributes, info);
}

void removeMeasurement(Attributes attributes) {
synchronized (lock) {
measurements.remove(attributes);
}
}

boolean isEmpty() {
synchronized (lock) {
return measurements.isEmpty();
}
measurements.remove(attributes);
}
}

private static final class DoubleMeasurementsRecorder
extends MeasurementsRecorder<DoubleMeasurementSource>
implements Consumer<ObservableDoubleMeasurement> {

private DoubleMeasurementsRecorder(Object lock) {
super(lock);
}

@Override
public void accept(ObservableDoubleMeasurement measurement) {
copyForRead()
.forEach(
(attributes, gauge) -> {
Object obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsDouble(obj), attributes);
}
});
measurements.forEach(
(attributes, gauge) -> {
Object obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsDouble(obj), attributes);
}
});
}
}

private static final class LongMeasurementsRecorder
extends MeasurementsRecorder<LongMeasurementSource>
implements Consumer<ObservableLongMeasurement> {

private LongMeasurementsRecorder(Object lock) {
super(lock);
}

@Override
public void accept(ObservableLongMeasurement measurement) {
copyForRead()
.forEach(
(attributes, gauge) -> {
Object obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsLong(obj), attributes);
}
});
measurements.forEach(
(attributes, gauge) -> {
Object obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsLong(obj), attributes);
}
});
}
}

Expand Down Expand Up @@ -317,32 +249,16 @@ private LongMeasurementSource(@Nullable Object obj, ToLongFunction<Object> metri

static final class AsyncMeasurementHandle {

@GuardedBy("instrumentRegistry")
private final Map<String, ? extends MeasurementsRecorder<?>> instrumentRegistry;

private final String name;
private final MeasurementsRecorder<?> measurementsRecorder;
private final Attributes attributes;

AsyncMeasurementHandle(
Map<String, ? extends MeasurementsRecorder<?>> instrumentRegistry,
String name,
Attributes attributes) {
this.instrumentRegistry = instrumentRegistry;
this.name = name;
AsyncMeasurementHandle(MeasurementsRecorder<?> measurementsRecorder, Attributes attributes) {
this.measurementsRecorder = measurementsRecorder;
this.attributes = attributes;
}

void remove() {
synchronized (instrumentRegistry) {
MeasurementsRecorder<?> recorder = instrumentRegistry.get(name);
if (recorder != null) {
recorder.removeMeasurement(attributes);
// if this was the last measurement then let's remove the whole recorder
if (recorder.isEmpty()) {
instrumentRegistry.remove(name);
}
}
}
measurementsRecorder.removeMeasurement(attributes);
}
}
}

0 comments on commit e599517

Please sign in to comment.