Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 1585 - Multiple cumulative metric collections without measurement recording. #1586

Merged
merged 4 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts;
AggregationTemporality aggregation_temporarily =
collector->GetAggregationTemporality(instrument_descriptor_.type_);
for (auto &col : collectors)
if (delta_metrics->Size())
{
unreported_metrics_[col.get()].push_back(delta_metrics);
for (auto &col : collectors)
{
unreported_metrics_[col.get()].push_back(delta_metrics);
}
}

// Get the unreported metrics for the `collector` from `unreported metrics stash`
Expand Down Expand Up @@ -88,20 +91,20 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
if (aggregation_temporarily == AggregationTemporality::kCumulative)
{
// merge current delta to previous cumulative
last_aggr_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
merged_metrics->Set(
attributes, DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr));
}
return true;
});
last_aggr_hashmap->GetAllEnteries(
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this jus reformat?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is done by clang-format. The only changes are lines 103, and 104 below. To handle the else scenario when there are no measurements recorded since the last collection.

auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg = DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
}
last_reported_metrics_[collector] =
LastReportedMetrics{std::move(merged_metrics), collection_ts};
Expand Down Expand Up @@ -137,4 +140,4 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,

} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
#endif
62 changes: 61 additions & 1 deletion sdk/test/metrics/sync_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
}
return true;
});

EXPECT_EQ(count_attributes, 2); // GET and PUT
// In case of delta temporarily, subsequent collection would contain new data points, so resetting
// the counts
if (temporality == AggregationTemporality::kDelta)
Expand All @@ -105,6 +105,34 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
expected_total_put_requests = 0;
}

// collect one more time.
collection_ts = std::chrono::system_clock::now();
count_attributes = 0;
ThomsonTan marked this conversation as resolved.
Show resolved Hide resolved
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), expected_total_get_requests);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), expected_total_put_requests);
}
}
return true;
});
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(count_attributes, 2); // GET AND PUT
}

storage.RecordLong(50l, KeyValueIterableView<std::map<std::string, std::string>>(attributes_get),
opentelemetry::context::Context{});
expected_total_get_requests += 50;
Expand Down Expand Up @@ -134,7 +162,9 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT
}

INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
WritableMetricStorageTestFixture,
::testing::Values(AggregationTemporality::kCumulative,
Expand Down Expand Up @@ -205,6 +235,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT

// In case of delta temporarily, subsequent collection would contain new data points, so resetting
// the counts
Expand All @@ -214,6 +245,34 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
expected_total_put_requests = 0;
}

// collect one more time.
collection_ts = std::chrono::system_clock::now();
count_attributes = 0;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<double>(data.value_), expected_total_get_requests);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<double>(data.value_), expected_total_put_requests);
}
}
return true;
});
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(count_attributes, 2); // GET AND PUT
}

storage.RecordDouble(50.0,
KeyValueIterableView<std::map<std::string, std::string>>(attributes_get),
opentelemetry::context::Context{});
Expand Down Expand Up @@ -245,6 +304,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT
}
INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestDouble,
WritableMetricStorageTestFixture,
Expand Down