Skip to content

Commit

Permalink
Fix open-telemetry#1588 - Observable Gauge does not reflect updated v…
Browse files Browse the repository at this point in the history
…alues, and send the old value always (open-telemetry#1641)
  • Loading branch information
lalitb authored and yxue committed Dec 5, 2022
1 parent aa47ee5 commit 4a8c540
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 18 deletions.
7 changes: 5 additions & 2 deletions examples/common/metrics_foo_library/foo_library.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ class MeasurementFetcher
if (nostd::holds_alternative<
nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>>>(observer_result))
{
double val = (rand() % 700) + 1.1;
double random_incr = (rand() % 5) + 1.1;
value_ += random_incr;
nostd::get<nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>>>(
observer_result)
->Observe(val /*, labelkv */);
->Observe(value_ /*, labelkv */);
}
}
static double value_;
};
double MeasurementFetcher::value_ = 0.0;
} // namespace

void foo_library::counter_example(const std::string &name)
Expand Down
2 changes: 1 addition & 1 deletion examples/metrics_simple/metrics_ostream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void initMetrics(const std::string &name)
std::unique_ptr<metric_sdk::MeterSelector> observable_meter_selector{
new metric_sdk::MeterSelector(name, version, schema)};
std::unique_ptr<metric_sdk::View> observable_sum_view{
new metric_sdk::View{name, "description", metric_sdk::AggregationType::kSum}};
new metric_sdk::View{name, "test_description", metric_sdk::AggregationType::kSum}};
p->AddView(std::move(observable_instrument_selector), std::move(observable_meter_selector),
std::move(observable_sum_view));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
void Record(const std::unordered_map<MetricAttributes, T, AttributeHashGenerator> &measurements,
opentelemetry::common::SystemTimestamp /* observation_time */) noexcept
{
// process the read measurements - aggregate and store in hashmap
// Async counter always record monotonically increasing values, and the
// exporter/reader can request either for delta or cumulative value.
// So we convert the async counter value to delta before passing it to temporal storage.
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
for (auto &measurement : measurements)
{
Expand All @@ -53,13 +55,14 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
if (prev)
{
auto delta = prev->Diff(*aggr);
cumulative_hash_map_->Set(measurement.first,
DefaultAggregation::CloneAggregation(
aggregation_type_, instrument_descriptor_, *delta));
// store received value in cumulative map, and the diff in delta map (to pass it to temporal
// storage)
cumulative_hash_map_->Set(measurement.first, std::move(aggr));
delta_hash_map_->Set(measurement.first, std::move(delta));
}
else
{
// store received value in cumulative and delta map.
cumulative_hash_map_->Set(
measurement.first,
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));
Expand Down
1 change: 0 additions & 1 deletion sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp collect_ts) noexcept
{

observable_registry_->Observe(collect_ts);
std::vector<MetricData> metric_data_list;
auto ctx = meter_context_.lock();
Expand Down
1 change: 1 addition & 0 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts;
AggregationTemporality aggregation_temporarily =
collector->GetAggregationTemporality(instrument_descriptor_.type_);

if (delta_metrics->Size())
{
for (auto &col : collectors)
Expand Down
57 changes: 47 additions & 10 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kSum, default_attributes_processor.get(),
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
long get_count = 20l;
long put_count = 10l;
size_t attribute_count = 2;
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements = {
{{{"RequestType", "GET"}}, get_count}, {{{"RequestType", "PUT"}}, put_count}};
storage.RecordLong(measurements,
long get_count1 = 20l;
long put_count1 = 10l;
size_t attribute_count = 2;
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements1 = {
{{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}};
storage.RecordLong(measurements1,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));

storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts,
Expand All @@ -123,20 +123,57 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count);
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count1);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count);
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count1);
}
}
return true;
});
// subsequent recording after collection shouldn't fail
storage.RecordLong(measurements,
// monotonic increasing values;
long get_count2 = 50l;
long put_count2 = 70l;

std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements2 = {
{{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}};
storage.RecordLong(measurements2,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
EXPECT_EQ(MeasurementFetcher::number_of_attributes, attribute_count);
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count2);
}
else
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count2 - get_count1);
}
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count2);
}
else
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count2 - put_count1);
}
}
}
return true;
});
}

INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
Expand Down

0 comments on commit 4a8c540

Please sign in to comment.