From 8728040bacad30fd0e6392f573bb157eddb42129 Mon Sep 17 00:00:00 2001 From: Bogdan Cristian Drutu Date: Thu, 6 Feb 2020 18:30:19 -0800 Subject: [PATCH] Bound registry work in progress Signed-off-by: Bogdan Cristian Drutu --- .../sdk/metrics/AbstractBoundInstrument.java | 67 ++++++++++---- .../sdk/metrics/AbstractInstrument.java | 4 + .../AbstractInstrumentWithBinding.java | 92 +++++++++++++++++++ .../opentelemetry/sdk/metrics/Aggregator.java | 13 +-- .../sdk/metrics/DoubleCounterSdk.java | 28 +++--- .../sdk/metrics/DoubleMeasureSdk.java | 26 ++++-- .../sdk/metrics/DoubleSumAggregator.java | 16 ++-- .../sdk/metrics/LongCounterSdk.java | 28 +++--- .../sdk/metrics/LongMeasureSdk.java | 26 ++++-- .../sdk/metrics/LongSumAggregator.java | 12 ++- .../sdk/metrics/RecordProcessor.java | 23 +++++ 11 files changed, 255 insertions(+), 80 deletions(-) create mode 100644 sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrumentWithBinding.java create mode 100644 sdk/src/main/java/io/opentelemetry/sdk/metrics/RecordProcessor.java diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractBoundInstrument.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractBoundInstrument.java index 44cf11dc21b..dc0dbc595d3 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractBoundInstrument.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractBoundInstrument.java @@ -17,35 +17,68 @@ package io.opentelemetry.sdk.metrics; import io.opentelemetry.metrics.InstrumentWithBinding.BoundInstrument; -import io.opentelemetry.metrics.LabelSet; +import java.util.concurrent.atomic.AtomicLong; +/** + * Abstract class that extends the functionality of the BoundInstrument. + * + *

It atomically counts the number of references (usages) while also keeping a state of + * mapped/unmapped into an external map. It uses an atomic value where the least significant bit is + * used to keep the state of mapping ('1' is used for unmapped and '0' is for mapped) and the rest + * of the bits are used for reference (usage) counting. + */ abstract class AbstractBoundInstrument implements BoundInstrument { - private final LabelSet labels; + // Atomically counts the number of references (usages) while also keeping a state of + // mapped/unmapped into a registry map. + private final AtomicLong refCountMapped; + private final Aggregator aggregator; - AbstractBoundInstrument(LabelSet labels) { - this.labels = labels; - // todo: associate with an aggregator/accumulator + AbstractBoundInstrument(Aggregator aggregator) { + this.aggregator = aggregator; + this.refCountMapped = new AtomicLong(0); } - @Override - public void unbind() {} + /** + * Returns {@code true} if the entry is still mapped and increases the reference usages, if + * unmapped returns {@code false}. + * + * @return {@code true} if successful. + */ + final boolean bind() { + // Every reference adds/removes 2 instead of 1 to avoid changing the mapping bit. + return (refCountMapped.addAndGet(2L) & 1L) == 0; + } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof AbstractBoundInstrument)) { + public final void unbind() { + // Every reference adds/removes 2 instead of 1 to avoid changing the mapping bit. + refCountMapped.getAndAdd(-2L); + } + + /** + * Flips the mapped bit to "unmapped" state and returns true if both of the following conditions + * are true upon entry to this function: 1) There are no active references; 2) The mapped bit is + * in "mapped" state; otherwise no changes are done to mapped bit and false is returned. + * + * @return {@code true} if successful. + */ + final boolean tryUnmap() { + if (refCountMapped.get() != 0) { + // Still references (usages) to this bound or already unmapped. return false; } + return refCountMapped.compareAndSet(0L, 1L); + } - AbstractBoundInstrument that = (AbstractBoundInstrument) o; + final void recordLong(long value) { + aggregator.recordLong(value); + } - return labels.equals(that.labels); + final void recordDouble(double value) { + aggregator.recordDouble(value); } - @Override - public int hashCode() { - return labels.hashCode(); + final void checkpoint() { + aggregator.checkpoint(); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrument.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrument.java index 5fa8972c6ab..6f90dc25546 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrument.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrument.java @@ -42,6 +42,10 @@ abstract class AbstractInstrument implements Instrument { this.labelKeys = labelKeys; } + void collect(RecordProcessor recordProcessor) { + // TODO: Make this abstract. + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrumentWithBinding.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrumentWithBinding.java new file mode 100644 index 00000000000..c5bc74ccd3b --- /dev/null +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrumentWithBinding.java @@ -0,0 +1,92 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.metrics; + +import io.opentelemetry.metrics.LabelSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +abstract class AbstractInstrumentWithBinding + extends AbstractInstrument { + private final ConcurrentHashMap boundLabels; + private final ReentrantLock collectLock; + + AbstractInstrumentWithBinding( + String name, + String description, + String unit, + Map constantLabels, + List labelKeys) { + super(name, description, unit, constantLabels, labelKeys); + boundLabels = new ConcurrentHashMap<>(); + collectLock = new ReentrantLock(); + } + + // Cannot make this "bind" because of a Java problem if we make this class also implement the + // InstrumentWithBinding then the subclass will fail to compile because of different "bind" + // signature. This is a good trade-off. + final B bindInternal(LabelSet labelSet) { + B bound = boundLabels.get(labelSet); + if (bound != null && bound.bind()) { + // At this moment it is guaranteed that the Bound is in the map and will not be removed. + return bound; + } + + // Missing entry or no longer mapped, try to add a new entry. + bound = newBound(); + while (true) { + B oldBound = boundLabels.putIfAbsent(labelSet, bound); + if (oldBound != null) { + if (oldBound.bind()) { + // At this moment it is guaranteed that the Bound is in the map and will not be removed. + return oldBound; + } + // Try to remove the oldBound. This will race with the collect method, but only one will + // succeed. + boundLabels.remove(labelSet, oldBound); + continue; + } + return bound; + } + } + + /** + * Collects records from all the entries (labelSet, Bound) that changed since the last collect() + * call. + */ + @Override + final void collect(RecordProcessor recordProcessor) { + collectLock.lock(); + try { + for (Map.Entry entry : boundLabels.entrySet()) { + if (entry.getValue().tryUnmap()) { + // If able to unmap then remove the record from the current Map. This can race with the + // acquire but because we requested a specific value only one will succeed. + boundLabels.remove(entry.getKey(), entry.getValue()); + } + + entry.getValue().checkpoint(); + } + } finally { + collectLock.unlock(); + } + } + + abstract B newBound(); +} diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java index 10840cc112f..03b8b3b927b 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java @@ -20,15 +20,7 @@ /** Aggregator represents the interface for all the available aggregations. */ @ThreadSafe -interface Aggregator> { - - /** - * Merge aggregated values between the current instance and the given {@code aggregator}. - * - * @param aggregator value to merge with. - */ - void merge(T aggregator); - +interface Aggregator { /** * Updates the current aggregator with a newly recorded {@code long} value. * @@ -42,4 +34,7 @@ interface Aggregator> { * @param value the new {@code double} value to be added. */ void recordDouble(double value); + + /** Checkpoints this aggregator by saving the previous value and resetting the current value. */ + void checkpoint(); } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleCounterSdk.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleCounterSdk.java index 086e7726d18..1e8d2063603 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleCounterSdk.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleCounterSdk.java @@ -18,10 +18,12 @@ import io.opentelemetry.metrics.DoubleCounter; import io.opentelemetry.metrics.LabelSet; +import io.opentelemetry.sdk.metrics.DoubleCounterSdk.BoundInstrument; import java.util.List; import java.util.Map; -final class DoubleCounterSdk extends AbstractInstrument implements DoubleCounter { +final class DoubleCounterSdk extends AbstractInstrumentWithBinding + implements DoubleCounter { private final boolean monotonic; @@ -38,14 +40,19 @@ private DoubleCounterSdk( @Override public void add(double delta, LabelSet labelSet) { - BoundDoubleCounter boundDoubleCounter = bind(labelSet); - boundDoubleCounter.add(delta); - boundDoubleCounter.unbind(); + BoundInstrument boundInstrument = bind(labelSet); + boundInstrument.add(delta); + boundInstrument.unbind(); } @Override - public BoundDoubleCounter bind(LabelSet labelSet) { - return new BoundInstrument(labelSet, monotonic); + public BoundInstrument bind(LabelSet labelSet) { + return bindInternal(labelSet); + } + + @Override + BoundInstrument newBound() { + return new BoundInstrument(monotonic); } @Override @@ -72,13 +79,12 @@ public int hashCode() { return result; } - private static final class BoundInstrument extends AbstractBoundInstrument - implements BoundDoubleCounter { + static final class BoundInstrument extends AbstractBoundInstrument implements BoundDoubleCounter { private final boolean monotonic; - BoundInstrument(LabelSet labels, boolean monotonic) { - super(labels); + BoundInstrument(boolean monotonic) { + super(new DoubleSumAggregator()); this.monotonic = monotonic; } @@ -87,7 +93,7 @@ public void add(double delta) { if (monotonic && delta < 0) { throw new IllegalArgumentException("monotonic counters can only increase"); } - // todo: pass through to an aggregator/accumulator + recordDouble(delta); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleMeasureSdk.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleMeasureSdk.java index 9cbb024919f..38d56d0b7bd 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleMeasureSdk.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleMeasureSdk.java @@ -18,10 +18,12 @@ import io.opentelemetry.metrics.DoubleMeasure; import io.opentelemetry.metrics.LabelSet; +import io.opentelemetry.sdk.metrics.DoubleMeasureSdk.BoundInstrument; import java.util.List; import java.util.Map; -final class DoubleMeasureSdk extends AbstractInstrument implements DoubleMeasure { +final class DoubleMeasureSdk extends AbstractInstrumentWithBinding + implements DoubleMeasure { private final boolean absolute; @@ -38,14 +40,19 @@ private DoubleMeasureSdk( @Override public void record(double value, LabelSet labelSet) { - BoundDoubleMeasure boundDoubleMeasure = bind(labelSet); - boundDoubleMeasure.record(value); - boundDoubleMeasure.unbind(); + BoundInstrument boundInstrument = bind(labelSet); + boundInstrument.record(value); + boundInstrument.unbind(); } @Override - public BoundDoubleMeasure bind(LabelSet labelSet) { - return new BoundInstrument(labelSet, this.absolute); + public BoundInstrument bind(LabelSet labelSet) { + return bindInternal(labelSet); + } + + @Override + BoundInstrument newBound() { + return new BoundInstrument(this.absolute); } @Override @@ -72,13 +79,12 @@ public int hashCode() { return result; } - private static final class BoundInstrument extends AbstractBoundInstrument - implements BoundDoubleMeasure { + static final class BoundInstrument extends AbstractBoundInstrument implements BoundDoubleMeasure { private final boolean absolute; - BoundInstrument(LabelSet labels, boolean absolute) { - super(labels); + BoundInstrument(boolean absolute) { + super(null); this.absolute = absolute; } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleSumAggregator.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleSumAggregator.java index 068cf1d7e00..2ec501c5777 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleSumAggregator.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleSumAggregator.java @@ -18,26 +18,28 @@ import com.google.common.util.concurrent.AtomicDouble; -final class DoubleSumAggregator implements Aggregator { +final class DoubleSumAggregator implements Aggregator { // TODO: Change to use DoubleAdder when changed to java8. private final AtomicDouble current; + private final AtomicDouble checkpoint; DoubleSumAggregator() { current = new AtomicDouble(0.0); - } - - @Override - public void merge(DoubleSumAggregator other) { - this.current.getAndAdd(other.current.get()); + checkpoint = new AtomicDouble(0.0); } @Override public void recordLong(long value) { - throw new UnsupportedOperationException("This Aggregator does not support long values"); + throw new UnsupportedOperationException("This is a DoubleSumAggregator"); } @Override public void recordDouble(double value) { current.getAndAdd(value); } + + @Override + public void checkpoint() { + checkpoint.getAndAdd(current.getAndSet(0)); + } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongCounterSdk.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongCounterSdk.java index 0af5ef61e4d..8ef6382dacc 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongCounterSdk.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongCounterSdk.java @@ -18,10 +18,12 @@ import io.opentelemetry.metrics.LabelSet; import io.opentelemetry.metrics.LongCounter; +import io.opentelemetry.sdk.metrics.LongCounterSdk.BoundInstrument; import java.util.List; import java.util.Map; -final class LongCounterSdk extends AbstractInstrument implements LongCounter { +final class LongCounterSdk extends AbstractInstrumentWithBinding + implements LongCounter { private final boolean monotonic; @@ -38,14 +40,19 @@ private LongCounterSdk( @Override public void add(long delta, LabelSet labelSet) { - BoundLongCounter boundLongCounter = bind(labelSet); - boundLongCounter.add(delta); - boundLongCounter.unbind(); + BoundInstrument boundInstrument = bind(labelSet); + boundInstrument.add(delta); + boundInstrument.unbind(); } @Override - public BoundLongCounter bind(LabelSet labelSet) { - return new BoundInstrument(labelSet, monotonic); + public BoundInstrument bind(LabelSet labelSet) { + return bindInternal(labelSet); + } + + @Override + BoundInstrument newBound() { + return new BoundInstrument(monotonic); } @Override @@ -72,13 +79,12 @@ public int hashCode() { return result; } - private static final class BoundInstrument extends AbstractBoundInstrument - implements BoundLongCounter { + static final class BoundInstrument extends AbstractBoundInstrument implements BoundLongCounter { private final boolean monotonic; - BoundInstrument(LabelSet labels, boolean monotonic) { - super(labels); + BoundInstrument(boolean monotonic) { + super(new LongSumAggregator()); this.monotonic = monotonic; } @@ -87,7 +93,7 @@ public void add(long delta) { if (monotonic && delta < 0) { throw new IllegalArgumentException("monotonic counters can only increase"); } - // todo: pass through to an aggregator/accumulator + recordLong(delta); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongMeasureSdk.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongMeasureSdk.java index 14d921e247f..6d5262c8940 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongMeasureSdk.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongMeasureSdk.java @@ -18,10 +18,12 @@ import io.opentelemetry.metrics.LabelSet; import io.opentelemetry.metrics.LongMeasure; +import io.opentelemetry.sdk.metrics.LongMeasureSdk.BoundInstrument; import java.util.List; import java.util.Map; -final class LongMeasureSdk extends AbstractInstrument implements LongMeasure { +final class LongMeasureSdk extends AbstractInstrumentWithBinding + implements LongMeasure { private final boolean absolute; @@ -38,14 +40,19 @@ private LongMeasureSdk( @Override public void record(long value, LabelSet labelSet) { - BoundLongMeasure boundLongMeasure = bind(labelSet); - boundLongMeasure.record(value); - boundLongMeasure.unbind(); + BoundInstrument boundInstrument = bind(labelSet); + boundInstrument.record(value); + boundInstrument.unbind(); } @Override - public BoundLongMeasure bind(LabelSet labelSet) { - return new BoundInstrument(labelSet, this.absolute); + public BoundInstrument bind(LabelSet labelSet) { + return bindInternal(labelSet); + } + + @Override + BoundInstrument newBound() { + return new BoundInstrument(this.absolute); } @Override @@ -72,13 +79,12 @@ public int hashCode() { return result; } - private static final class BoundInstrument extends AbstractBoundInstrument - implements BoundLongMeasure { + static final class BoundInstrument extends AbstractBoundInstrument implements BoundLongMeasure { private final boolean absolute; - BoundInstrument(LabelSet labels, boolean absolute) { - super(labels); + BoundInstrument(boolean absolute) { + super(null); this.absolute = absolute; } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongSumAggregator.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongSumAggregator.java index 9801ad7a2ec..95aee59b098 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongSumAggregator.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongSumAggregator.java @@ -18,17 +18,19 @@ import java.util.concurrent.atomic.AtomicLong; -final class LongSumAggregator implements Aggregator { +final class LongSumAggregator implements Aggregator { // TODO: Change to use LongAdder when changed to java8. private final AtomicLong current; + private final AtomicLong checkpoint; LongSumAggregator() { current = new AtomicLong(0L); + checkpoint = new AtomicLong(0); } @Override - public void merge(LongSumAggregator other) { - this.current.getAndAdd(other.current.get()); + public void recordDouble(double value) { + throw new UnsupportedOperationException("This Aggregator does not support double values"); } @Override @@ -37,7 +39,7 @@ public void recordLong(long value) { } @Override - public void recordDouble(double value) { - throw new UnsupportedOperationException("This Aggregator does not support double values"); + public void checkpoint() { + checkpoint.getAndAdd(this.checkpoint.getAndSet(0)); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/RecordProcessor.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/RecordProcessor.java new file mode 100644 index 00000000000..484c64af01b --- /dev/null +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/RecordProcessor.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.metrics; + +import io.opentelemetry.metrics.LabelSet; + +public interface RecordProcessor { + void process(LabelSet labelSet, Aggregator aggregator); +}