Skip to content

Commit

Permalink
Bound registry work in progress
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Cristian Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 7, 2020
1 parent a9a31a0 commit 8728040
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,68 @@
package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.InstrumentWithBinding.BoundInstrument;
import io.opentelemetry.metrics.LabelSet;
import java.util.concurrent.atomic.AtomicLong;

/**
* Abstract class that extends the functionality of the BoundInstrument.
*
* <p>It atomically counts the number of references (usages) while also keeping a state of
* mapped/unmapped into an external map. It uses an atomic value where the least significant bit is
* used to keep the state of mapping ('1' is used for unmapped and '0' is for mapped) and the rest
* of the bits are used for reference (usage) counting.
*/
abstract class AbstractBoundInstrument implements BoundInstrument {
private final LabelSet labels;
// Atomically counts the number of references (usages) while also keeping a state of
// mapped/unmapped into a registry map.
private final AtomicLong refCountMapped;
private final Aggregator aggregator;

AbstractBoundInstrument(LabelSet labels) {
this.labels = labels;
// todo: associate with an aggregator/accumulator
AbstractBoundInstrument(Aggregator aggregator) {
this.aggregator = aggregator;
this.refCountMapped = new AtomicLong(0);
}

@Override
public void unbind() {}
/**
* Returns {@code true} if the entry is still mapped and increases the reference usages, if
* unmapped returns {@code false}.
*
* @return {@code true} if successful.
*/
final boolean bind() {
// Every reference adds/removes 2 instead of 1 to avoid changing the mapping bit.
return (refCountMapped.addAndGet(2L) & 1L) == 0;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AbstractBoundInstrument)) {
public final void unbind() {
// Every reference adds/removes 2 instead of 1 to avoid changing the mapping bit.
refCountMapped.getAndAdd(-2L);
}

/**
* Flips the mapped bit to "unmapped" state and returns true if both of the following conditions
* are true upon entry to this function: 1) There are no active references; 2) The mapped bit is
* in "mapped" state; otherwise no changes are done to mapped bit and false is returned.
*
* @return {@code true} if successful.
*/
final boolean tryUnmap() {
if (refCountMapped.get() != 0) {
// Still references (usages) to this bound or already unmapped.
return false;
}
return refCountMapped.compareAndSet(0L, 1L);
}

AbstractBoundInstrument that = (AbstractBoundInstrument) o;
final void recordLong(long value) {
aggregator.recordLong(value);
}

return labels.equals(that.labels);
final void recordDouble(double value) {
aggregator.recordDouble(value);
}

@Override
public int hashCode() {
return labels.hashCode();
final void checkpoint() {
aggregator.checkpoint();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ abstract class AbstractInstrument implements Instrument {
this.labelKeys = labelKeys;
}

void collect(RecordProcessor recordProcessor) {
// TODO: Make this abstract.
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.LabelSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

abstract class AbstractInstrumentWithBinding<B extends AbstractBoundInstrument>
extends AbstractInstrument {
private final ConcurrentHashMap<LabelSet, B> boundLabels;
private final ReentrantLock collectLock;

AbstractInstrumentWithBinding(
String name,
String description,
String unit,
Map<String, String> constantLabels,
List<String> labelKeys) {
super(name, description, unit, constantLabels, labelKeys);
boundLabels = new ConcurrentHashMap<>();
collectLock = new ReentrantLock();
}

// Cannot make this "bind" because of a Java problem if we make this class also implement the
// InstrumentWithBinding then the subclass will fail to compile because of different "bind"
// signature. This is a good trade-off.
final B bindInternal(LabelSet labelSet) {
B bound = boundLabels.get(labelSet);
if (bound != null && bound.bind()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return bound;
}

// Missing entry or no longer mapped, try to add a new entry.
bound = newBound();
while (true) {
B oldBound = boundLabels.putIfAbsent(labelSet, bound);
if (oldBound != null) {
if (oldBound.bind()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return oldBound;
}
// Try to remove the oldBound. This will race with the collect method, but only one will
// succeed.
boundLabels.remove(labelSet, oldBound);
continue;
}
return bound;
}
}

/**
* Collects records from all the entries (labelSet, Bound) that changed since the last collect()
* call.
*/
@Override
final void collect(RecordProcessor recordProcessor) {
collectLock.lock();
try {
for (Map.Entry<LabelSet, B> entry : boundLabels.entrySet()) {
if (entry.getValue().tryUnmap()) {
// If able to unmap then remove the record from the current Map. This can race with the
// acquire but because we requested a specific value only one will succeed.
boundLabels.remove(entry.getKey(), entry.getValue());
}

entry.getValue().checkpoint();
}
} finally {
collectLock.unlock();
}
}

abstract B newBound();
}
13 changes: 4 additions & 9 deletions sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,7 @@

/** Aggregator represents the interface for all the available aggregations. */
@ThreadSafe
interface Aggregator<T extends Aggregator<?>> {

/**
* Merge aggregated values between the current instance and the given {@code aggregator}.
*
* @param aggregator value to merge with.
*/
void merge(T aggregator);

interface Aggregator {
/**
* Updates the current aggregator with a newly recorded {@code long} value.
*
Expand All @@ -42,4 +34,7 @@ interface Aggregator<T extends Aggregator<?>> {
* @param value the new {@code double} value to be added.
*/
void recordDouble(double value);

/** Checkpoints this aggregator by saving the previous value and resetting the current value. */
void checkpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import io.opentelemetry.metrics.DoubleCounter;
import io.opentelemetry.metrics.LabelSet;
import io.opentelemetry.sdk.metrics.DoubleCounterSdk.BoundInstrument;
import java.util.List;
import java.util.Map;

final class DoubleCounterSdk extends AbstractInstrument implements DoubleCounter {
final class DoubleCounterSdk extends AbstractInstrumentWithBinding<BoundInstrument>
implements DoubleCounter {

private final boolean monotonic;

Expand All @@ -38,14 +40,19 @@ private DoubleCounterSdk(

@Override
public void add(double delta, LabelSet labelSet) {
BoundDoubleCounter boundDoubleCounter = bind(labelSet);
boundDoubleCounter.add(delta);
boundDoubleCounter.unbind();
BoundInstrument boundInstrument = bind(labelSet);
boundInstrument.add(delta);
boundInstrument.unbind();
}

@Override
public BoundDoubleCounter bind(LabelSet labelSet) {
return new BoundInstrument(labelSet, monotonic);
public BoundInstrument bind(LabelSet labelSet) {
return bindInternal(labelSet);
}

@Override
BoundInstrument newBound() {
return new BoundInstrument(monotonic);
}

@Override
Expand All @@ -72,13 +79,12 @@ public int hashCode() {
return result;
}

private static final class BoundInstrument extends AbstractBoundInstrument
implements BoundDoubleCounter {
static final class BoundInstrument extends AbstractBoundInstrument implements BoundDoubleCounter {

private final boolean monotonic;

BoundInstrument(LabelSet labels, boolean monotonic) {
super(labels);
BoundInstrument(boolean monotonic) {
super(new DoubleSumAggregator());
this.monotonic = monotonic;
}

Expand All @@ -87,7 +93,7 @@ public void add(double delta) {
if (monotonic && delta < 0) {
throw new IllegalArgumentException("monotonic counters can only increase");
}
// todo: pass through to an aggregator/accumulator
recordDouble(delta);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import io.opentelemetry.metrics.DoubleMeasure;
import io.opentelemetry.metrics.LabelSet;
import io.opentelemetry.sdk.metrics.DoubleMeasureSdk.BoundInstrument;
import java.util.List;
import java.util.Map;

final class DoubleMeasureSdk extends AbstractInstrument implements DoubleMeasure {
final class DoubleMeasureSdk extends AbstractInstrumentWithBinding<BoundInstrument>
implements DoubleMeasure {

private final boolean absolute;

Expand All @@ -38,14 +40,19 @@ private DoubleMeasureSdk(

@Override
public void record(double value, LabelSet labelSet) {
BoundDoubleMeasure boundDoubleMeasure = bind(labelSet);
boundDoubleMeasure.record(value);
boundDoubleMeasure.unbind();
BoundInstrument boundInstrument = bind(labelSet);
boundInstrument.record(value);
boundInstrument.unbind();
}

@Override
public BoundDoubleMeasure bind(LabelSet labelSet) {
return new BoundInstrument(labelSet, this.absolute);
public BoundInstrument bind(LabelSet labelSet) {
return bindInternal(labelSet);
}

@Override
BoundInstrument newBound() {
return new BoundInstrument(this.absolute);
}

@Override
Expand All @@ -72,13 +79,12 @@ public int hashCode() {
return result;
}

private static final class BoundInstrument extends AbstractBoundInstrument
implements BoundDoubleMeasure {
static final class BoundInstrument extends AbstractBoundInstrument implements BoundDoubleMeasure {

private final boolean absolute;

BoundInstrument(LabelSet labels, boolean absolute) {
super(labels);
BoundInstrument(boolean absolute) {
super(null);
this.absolute = absolute;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,28 @@

import com.google.common.util.concurrent.AtomicDouble;

final class DoubleSumAggregator implements Aggregator<DoubleSumAggregator> {
final class DoubleSumAggregator implements Aggregator {
// TODO: Change to use DoubleAdder when changed to java8.
private final AtomicDouble current;
private final AtomicDouble checkpoint;

DoubleSumAggregator() {
current = new AtomicDouble(0.0);
}

@Override
public void merge(DoubleSumAggregator other) {
this.current.getAndAdd(other.current.get());
checkpoint = new AtomicDouble(0.0);
}

@Override
public void recordLong(long value) {
throw new UnsupportedOperationException("This Aggregator does not support long values");
throw new UnsupportedOperationException("This is a DoubleSumAggregator");
}

@Override
public void recordDouble(double value) {
current.getAndAdd(value);
}

@Override
public void checkpoint() {
checkpoint.getAndAdd(current.getAndSet(0));
}
}
Loading

0 comments on commit 8728040

Please sign in to comment.