Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous Aggregation storage #1232

Merged
merged 12 commits into from
Mar 11, 2022
Merged
2 changes: 1 addition & 1 deletion api/include/opentelemetry/metrics/observer_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ObserverResult
nostd::enable_if_t<common::detail::is_key_value_iterable<U>::value> * = nullptr>
void Observe(T value, const U &attributes) noexcept
{
this->Observe(value, common::KeyValueIterableView<T>{attributes});
this->Observe(value, common::KeyValueIterableView<U>{attributes});
}

void Observe(T value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,49 @@ class DefaultAggregation
return std::move(std::unique_ptr<Aggregation>(new DropAggregation()));
};
}

static std::unique_ptr<Aggregation> CreateAggregation(AggregationType aggregation_type,
InstrumentDescriptor instrument_descriptor)
{
switch (aggregation_type)
{
case AggregationType::kDrop:
return std::unique_ptr<Aggregation>(new DropAggregation());
break;
case AggregationType::kHistogram:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongHistogramAggregation());
}
else
{
return std::unique_ptr<Aggregation>(new DoubleHistogramAggregation());
}
break;
case AggregationType::kLastValue:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongLastValueAggregation());
}
else
{
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation());
}
break;
case AggregationType::kSum:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongSumAggregation(true));
}
else
{
return std::unique_ptr<Aggregation>(new DoubleSumAggregation(true));
}
break;
default:
return DefaultAggregation::CreateAggregation(instrument_descriptor);
}
}
};

} // namespace metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
#pragma once
#ifndef ENABLE_METRICS_PREVIEW

# include <map>
# include "opentelemetry/common/key_value_iterable_view.h"
# include "opentelemetry/sdk/metrics/instruments.h"
# include "opentelemetry/sdk/metrics/metric_reader.h"
# include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"

# include <map>
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand Down Expand Up @@ -50,7 +50,7 @@ class DefaultMeasurementProcessor : public MeasurementProcessor
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};
metric_storages_[MakeKey(reader)] = std::unique_ptr<SyncMetricStorage>(
new SyncMetricStorage(instr_desc, AggregationType::kSum));
new SyncMetricStorage(instr_desc, AggregationType::kSum, new DefaultAttributesProcessor()));
return true;
}

Expand Down
44 changes: 44 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/observer_result.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/common/key_value_iterable.h"
# include "opentelemetry/metrics/observer_result.h"
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"

# include <map>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{
template <class T>
class ObserverResult final : public opentelemetry::metrics::ObserverResult<T>
{
public:
ObserverResult(const AttributesProcessor *attributes_processor)
: attributes_processor_(attributes_processor)
{}

virtual void Observe(T value) noexcept override { data_.insert({{}, value}); }

virtual void Observer(T value,
esigo marked this conversation as resolved.
Show resolved Hide resolved
const opentelemetry::common::KeyValueIterable &attributes) noexcept override
{
auto attr = attributes_processor_->process(attributes);
data_.insert({attr, value});
}

std::unordered_map<MetricAttributes, T, AttributeHashGenerator> data_;
esigo marked this conversation as resolved.
Show resolved Hide resolved

private:
const AttributesProcessor *attributes_processor_;
};
} // namespace metrics
} // namespace sdk

OPENTELEMETRY_END_NAMESPACE
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/common/attributemap_hash.h"
# include "opentelemetry/sdk/metrics/instruments.h"

# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h"
# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
# include "opentelemetry/sdk/metrics/state/metric_storage.h"
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"
# include "opentelemetry/sdk/resource/resource.h"

# include <memory>
# include "opentelemetry/sdk/metrics/observer_result.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

template <class T>
class AsyncMetricStorage : public MetricStorage
{
public:
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
void (*measurement_callback)(opentelemetry::metrics::ObserverResult<T> &),
const AttributesProcessor *attributes_processor)
: instrument_descriptor_(instrument_descriptor),
esigo marked this conversation as resolved.
Show resolved Hide resolved
aggregation_type_{aggregation_type},
measurement_collection_callback_{measurement_callback},
active_attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor}
{}

bool Collect(
MetricCollector *collector,
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData &)> metric_collection_callback) noexcept override
{
opentelemetry::sdk::metrics::ObserverResult<T> ob_res(attributes_processor_);

// read the measurement using configured callback
measurement_collection_callback_(ob_res);

// process the read measurements - aggregate and store in hashmap
for (auto &measurement : ob_res.data_)
{
auto agg = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
agg->Aggregate(measurement.second);
active_attributes_hashmap_->Set(measurement.first, std::move(agg));
}

// TBD -> read aggregation from hashmap, and perform metric collection
MetricData metric_data;
if (metric_collection_callback(metric_data))
{
return true;
}
return false;
}

private:
void (*measurement_collection_callback_)(opentelemetry::metrics::ObserverResult<T> &);
std::unique_ptr<AttributesHashMap> active_attributes_hashmap_;
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;
const AttributesProcessor *attributes_processor_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MetricStorage
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept = 0;
nostd::function_ref<bool(MetricData &)> callback) noexcept = 0;
};

class WritableMetricStorage
Expand All @@ -48,7 +48,7 @@ class NoopMetricStorage : public MetricStorage
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept override
nostd::function_ref<bool(MetricData &)> callback) noexcept override
{
MetricData metric_data;
if (callback(metric_data))
Expand Down
74 changes: 14 additions & 60 deletions sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
{

public:
SyncMetricStorage(
InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor = new DefaultAttributesProcessor())
SyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor}
{
create_default_aggregation_ = [&]() -> std::unique_ptr<Aggregation> {
return std::move(this->create_aggregation());
return std::move(
DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_));
};
}

Expand All @@ -45,8 +45,7 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
{
return;
}
auto aggregation = attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_);
aggregation->Aggregate(value);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
}

void RecordLong(long value,
Expand All @@ -57,9 +56,8 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto attr = attributes_processor_->process(attributes);
auto aggregation = attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_);
aggregation->Aggregate(value);
auto attr = attributes_processor_->process(attributes);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
}

void RecordDouble(double value) noexcept override
Expand All @@ -69,8 +67,7 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto aggregation = attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_);
aggregation->Aggregate(value);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
}

void RecordDouble(double value,
Expand All @@ -81,20 +78,19 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto attr = attributes_processor_->process(attributes);
auto aggregation = attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_);
aggregation->Aggregate(value);
auto attr = attributes_processor_->process(attributes);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
}

bool Collect(
MetricCollector *collector,
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept override
nostd::function_ref<bool(MetricData &)> callback) noexcept override
{

if (callback(MetricData()))
MetricData metric_data;
if (callback(metric_data))
{
return true;
}
Expand All @@ -107,48 +103,6 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
const AttributesProcessor *attributes_processor_;
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;

std::unique_ptr<Aggregation> create_aggregation()
{
switch (aggregation_type_)
{
case AggregationType::kDrop:
return std::move(std::unique_ptr<Aggregation>(new DropAggregation()));
break;
case AggregationType::kHistogram:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongHistogramAggregation()));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleHistogramAggregation()));
}
break;
case AggregationType::kLastValue:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongLastValueAggregation()));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleLastValueAggregation()));
}
break;
case AggregationType::kSum:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(true)));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleSumAggregation(true)));
}
break;
default:
return std::move(DefaultAggregation::CreateAggregation(instrument_descriptor_));
}
}
};

} // namespace metrics
Expand Down
32 changes: 32 additions & 0 deletions sdk/test/metrics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,38 @@ cc_test(
],
)

cc_test(
name = "async_metric_storage_test",
srcs = [
"async_metric_storage_test.cc",
],
tags = [
"metrics",
"test",
],
deps = [
"//sdk/src/metrics",
"//sdk/src/resource",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "observer_result_test",
srcs = [
"observer_result_test.cc",
],
tags = [
"metrics",
"test",
],
deps = [
"//sdk/src/metrics",
"//sdk/src/resource",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "multi_metric_storage_test",
srcs = [
Expand Down
Loading