Skip to content

Commit

Permalink
[SDK] Add AdaptingCircularBufferCounter for exponential histograms (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
euroelessar authored Jun 1, 2023
1 parent 8ba9529 commit f5fd906
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 0 deletions.
158 changes: 158 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/data/circular_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include "opentelemetry/nostd/variant.h"

#include <limits>
#include <vector>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

/**
* An integer array that automatically expands its memory consumption (via copy/allocation) when
* reaching limits. This assumes counts remain low, to lower memory overhead.
*
* This class is NOT thread-safe. It is expected to be behind a synchronized incrementer.
*
* Instances start by attempting to store one-byte per-cell in the integer array. As values grow,
* this will automatically instantiate the next-size integer array (uint8_t -> uint16_t -> uint32_t
* -> uint64_t) and copy over values into the larger array. This class expects most usage to remain
* within the uint8_t boundary (e.g. cell values < 256).
*/
class AdaptingIntegerArray
{
public:
// Construct an adapting integer array of a given size.
explicit AdaptingIntegerArray(size_t size) : backing_(std::vector<uint8_t>(size, 0)) {}
AdaptingIntegerArray(const AdaptingIntegerArray &other) = default;
AdaptingIntegerArray(AdaptingIntegerArray &&other) = default;
AdaptingIntegerArray &operator=(const AdaptingIntegerArray &other) = default;
AdaptingIntegerArray &operator=(AdaptingIntegerArray &&other) = default;

/**
* Increments the value at the specified index by the given count in the array.
*
* @param index The index of the value to increment.
* @param count The count by which to increment the value.
*/
void Increment(size_t index, uint64_t count);

/**
* Returns the value at the specified index from the array.
*
* @param index The index of the value to retrieve.
* @return The value at the specified index.
*/
uint64_t Get(size_t index) const;

/**
* Returns the size of the array.
*
* @return The size of the array.
*/
size_t Size() const;

/**
* Clears the array, resetting all values to zero.
*/
void Clear();

private:
void EnlargeToFit(uint64_t value);

nostd::variant<std::vector<uint8_t>,
std::vector<uint16_t>,
std::vector<uint32_t>,
std::vector<uint64_t>>
backing_;
};

/**
* A circle-buffer-backed exponential counter.
*
* The first recorded value becomes the 'base_index'. Going backwards leads to start/stop index.
*
* This expand start/end index as it sees values.
*
* This class is NOT thread-safe. It is expected to be behind a synchronized incrementer.
*/
class AdaptingCircularBufferCounter
{
public:
explicit AdaptingCircularBufferCounter(size_t max_size) : backing_(max_size) {}
AdaptingCircularBufferCounter(const AdaptingCircularBufferCounter &other) = default;
AdaptingCircularBufferCounter(AdaptingCircularBufferCounter &&other) = default;
AdaptingCircularBufferCounter &operator=(const AdaptingCircularBufferCounter &other) = default;
AdaptingCircularBufferCounter &operator=(AdaptingCircularBufferCounter &&other) = default;

/**
* The first index with a recording. May be negative.
*
* Note: the returned value is not meaningful when Empty returns true.
*
* @return the first index with a recording.
*/
int32_t StartIndex() const { return start_index_; }

/**
* The last index with a recording. May be negative.
*
* Note: the returned value is not meaningful when Empty returns true.
*
* @return The last index with a recording.
*/
int32_t EndIndex() const { return end_index_; }

/**
* Returns true if no recordings, false if at least one recording.
*/
bool Empty() const { return base_index_ == kNullIndex; }

/**
* Returns the maximum number of buckets allowed in this counter.
*/
size_t MaxSize() const { return backing_.Size(); }

/** Resets all bucket counts to zero and resets index start/end tracking. **/
void Clear();

/**
* Persist new data at index, incrementing by delta amount.
*
* @param index The index of where to perform the incrementation.
* @param delta How much to increment the index by.
* @return success status.
*/
bool Increment(int32_t index, uint64_t delta);

/**
* Get the number of recordings for the given index.
*
* @return the number of recordings for the index, or 0 if the index is out of bounds.
*/
uint64_t Get(int32_t index);

private:
size_t ToBufferIndex(int32_t index) const;

static constexpr int32_t kNullIndex = std::numeric_limits<int32_t>::min();

// Index of the first populated element, may be kNullIndex if container is empty.
int32_t start_index_ = kNullIndex;
// Index of the last populated element, may be kNullIndex if container is empty.
int32_t end_index_ = kNullIndex;
// Index corresponding to the element located at the start of the backing array, may be kNullIndex
// if container is empty.
int32_t base_index_ = kNullIndex;
AdaptingIntegerArray backing_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
1 change: 1 addition & 0 deletions sdk/src/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_library(
aggregation/histogram_aggregation.cc
aggregation/lastvalue_aggregation.cc
aggregation/sum_aggregation.cc
data/circular_buffer.cc
exemplar/filter.cc
exemplar/reservoir.cc
sync_instruments.cc)
Expand Down
182 changes: 182 additions & 0 deletions sdk/src/metrics/data/circular_buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/sdk/metrics/data/circular_buffer.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

namespace
{

struct AdaptingIntegerArrayIncrement
{
size_t index;
uint64_t count;

template <typename T>
uint64_t operator()(std::vector<T> &backing)
{
const uint64_t result = backing[index] + count;
OPENTELEMETRY_LIKELY_IF(result <= uint64_t(std::numeric_limits<T>::max()))
{
backing[index] = static_cast<T>(result);
return 0;
}
return result;
}
};

struct AdaptingIntegerArrayGet
{
size_t index;

template <typename T>
uint64_t operator()(const std::vector<T> &backing)
{
return backing[index];
}
};

struct AdaptingIntegerArraySize
{
template <typename T>
size_t operator()(const std::vector<T> &backing)
{
return backing.size();
}
};

struct AdaptingIntegerArrayClear
{
template <typename T>
void operator()(std::vector<T> &backing)
{
std::fill(backing.begin(), backing.end(), static_cast<T>(0));
}
};

struct AdaptingIntegerArrayCopy
{
template <class T1, class T2>
void operator()(const std::vector<T1> &from, std::vector<T2> &to)
{
for (size_t i = 0; i < from.size(); i++)
{
to[i] = static_cast<T2>(from[i]);
}
}
};

} // namespace

void AdaptingIntegerArray::Increment(size_t index, uint64_t count)
{
const uint64_t result = nostd::visit(AdaptingIntegerArrayIncrement{index, count}, backing_);
OPENTELEMETRY_LIKELY_IF(result == 0) { return; }
EnlargeToFit(result);
Increment(index, count);
}

uint64_t AdaptingIntegerArray::Get(size_t index) const
{
return nostd::visit(AdaptingIntegerArrayGet{index}, backing_);
}

size_t AdaptingIntegerArray::Size() const
{
return nostd::visit(AdaptingIntegerArraySize{}, backing_);
}

void AdaptingIntegerArray::Clear()
{
nostd::visit(AdaptingIntegerArrayClear{}, backing_);
}

void AdaptingIntegerArray::EnlargeToFit(uint64_t value)
{
const size_t backing_size = Size();
decltype(backing_) backing;
if (value <= std::numeric_limits<uint16_t>::max())
{
backing = std::vector<uint16_t>(backing_size, 0);
}
else if (value <= std::numeric_limits<uint32_t>::max())
{
backing = std::vector<uint32_t>(backing_size, 0);
}
else
{
backing = std::vector<uint64_t>(backing_size, 0);
}
std::swap(backing_, backing);
nostd::visit(AdaptingIntegerArrayCopy{}, backing, backing_);
}

void AdaptingCircularBufferCounter::Clear()
{
start_index_ = kNullIndex;
end_index_ = kNullIndex;
base_index_ = kNullIndex;
backing_.Clear();
}

bool AdaptingCircularBufferCounter::Increment(int32_t index, uint64_t delta)
{
if (Empty())
{
start_index_ = index;
end_index_ = index;
base_index_ = index;
backing_.Increment(0, delta);
return true;
}

if (index > end_index_)
{
// Move end, check max size.
if (index + 1 > static_cast<int32_t>(backing_.Size()) + start_index_)
{
return false;
}
end_index_ = index;
}
else if (index < start_index_)
{
// Move end, check max size.
if (end_index_ + 1 > static_cast<int32_t>(backing_.Size()) + index)
{
return false;
}
start_index_ = index;
}
backing_.Increment(ToBufferIndex(index), delta);
return true;
}

uint64_t AdaptingCircularBufferCounter::Get(int32_t index)
{
if (index < start_index_ || index > end_index_)
{
return 0;
}
return backing_.Get(ToBufferIndex(index));
}

size_t AdaptingCircularBufferCounter::ToBufferIndex(int32_t index) const
{
// Figure out the index relative to the start of the circular buffer.
if (index < base_index_)
{
// If index is before the base one, wrap around.
return static_cast<size_t>(index + backing_.Size() - base_index_);
}
return static_cast<size_t>(index - base_index_);
}

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
15 changes: 15 additions & 0 deletions sdk/test/metrics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,21 @@ cc_test(
],
)

cc_test(
name = "circular_buffer_counter_test",
srcs = [
"circular_buffer_counter_test.cc",
],
tags = [
"metrics",
"test",
],
deps = [
"//sdk/src/metrics",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "histogram_aggregation_test",
srcs = [
Expand Down
1 change: 1 addition & 0 deletions sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ foreach(
histogram_aggregation_test
attributes_processor_test
attributes_hashmap_test
circular_buffer_counter_test
histogram_test
sync_metric_storage_counter_test
sync_metric_storage_histogram_test
Expand Down
Loading

0 comments on commit f5fd906

Please sign in to comment.