From 73f3515b044f8e47db52b4db526daed88d07d1b4 Mon Sep 17 00:00:00 2001 From: WenTao Ou Date: Wed, 6 Apr 2022 15:41:42 +0800 Subject: [PATCH] Merge main into async changes (#1321) --- .github/.codecov.yaml | 6 +- .github/workflows/ci.yml | 4 +- .github/workflows/stale.yml | 2 +- examples/http/CMakeLists.txt | 9 +- exporters/elasticsearch/CMakeLists.txt | 2 +- exporters/jaeger/CMakeLists.txt | 2 +- exporters/ostream/BUILD | 59 ++--- exporters/otlp/CMakeLists.txt | 2 +- exporters/zipkin/CMakeLists.txt | 3 +- ext/src/http/client/curl/CMakeLists.txt | 19 +- ext/test/http/CMakeLists.txt | 6 +- ext/test/w3c_tracecontext_test/CMakeLists.txt | 6 +- .../sdk/metrics/aggregation/aggregation.h | 40 +++- .../metrics/aggregation/default_aggregation.h | 8 +- .../metrics/aggregation/drop_aggregation.h | 20 +- .../aggregation/histogram_aggregation.h | 40 ++-- .../aggregation/lastvalue_aggregation.h | 22 +- .../sdk/metrics/aggregation/sum_aggregation.h | 44 ++-- .../sdk/metrics/async_instruments.h | 53 ++-- .../sdk/metrics/data/metric_data.h | 15 +- .../sdk/metrics/data/point_data.h | 56 +++-- .../sdk/metrics/export/metric_producer.h | 23 +- .../export/periodic_exporting_metric_reader.h | 72 ++++++ .../opentelemetry/sdk/metrics/instruments.h | 2 +- sdk/include/opentelemetry/sdk/metrics/meter.h | 7 +- .../opentelemetry/sdk/metrics/meter_context.h | 1 - .../sdk/metrics/metric_exporter.h | 9 +- .../opentelemetry/sdk/metrics/metric_reader.h | 8 +- .../sdk/metrics/state/async_metric_storage.h | 12 +- .../sdk/metrics/state/attributes_hashmap.h | 19 +- .../sdk/metrics/state/metric_collector.h | 2 +- .../sdk/metrics/state/metric_storage.h | 10 +- .../sdk/metrics/state/sync_metric_storage.h | 28 ++- sdk/src/metrics/CMakeLists.txt | 2 + .../aggregation/histogram_aggregation.cc | 95 +++++--- .../aggregation/lastvalue_aggregation.cc | 103 ++++++-- .../metrics/aggregation/sum_aggregation.cc | 102 +++++--- .../periodic_exporting_metric_reader.cc | 101 ++++++++ sdk/src/metrics/meter.cc | 29 +-- sdk/src/metrics/metric_reader.cc | 13 +- sdk/src/metrics/state/metric_collector.cc | 11 +- sdk/src/metrics/state/sync_metric_storage.cc | 131 ++++++++++ sdk/test/metrics/CMakeLists.txt | 3 +- sdk/test/metrics/aggregation_test.cc | 38 ++- sdk/test/metrics/async_instruments_test.cc | 29 +-- sdk/test/metrics/async_metric_storage_test.cc | 10 +- sdk/test/metrics/meter_provider_sdk_test.cc | 5 +- sdk/test/metrics/metric_reader_test.cc | 4 +- .../periodic_exporting_metric_reader_test.cc | 81 +++++++ sdk/test/metrics/sync_metric_storage_test.cc | 226 +++++++++++++++++- 50 files changed, 1190 insertions(+), 404 deletions(-) create mode 100644 sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h create mode 100644 sdk/src/metrics/export/periodic_exporting_metric_reader.cc create mode 100644 sdk/src/metrics/state/sync_metric_storage.cc create mode 100644 sdk/test/metrics/periodic_exporting_metric_reader_test.cc diff --git a/.github/.codecov.yaml b/.github/.codecov.yaml index e2c304a9b4..2b3003ff57 100644 --- a/.github/.codecov.yaml +++ b/.github/.codecov.yaml @@ -12,11 +12,7 @@ coverage: informational: true target: auto threshold: 10% - patch: - default: - informational: true - target: auto - threshold: 10% + patch: false parsers: gcov: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a851728a2e..3b0af15acb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,7 +4,7 @@ on: push: branches: [ main ] pull_request: - branches: [ main ] + branches: [ main, async-changes ] jobs: cmake_test: @@ -375,7 +375,7 @@ jobs: - name: run tests and generate report run: ./ci/do_ci.sh code.coverage - name: upload report - uses: codecov/codecov-action@v2.1.0 + uses: codecov/codecov-action@v3 with: file: /home/runner/build/coverage.info diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index 53ebe3259b..e55a93d08d 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -13,4 +13,4 @@ jobs: close-issue-message: 'Closed as inactive. Feel free to reopen if this is still an issue.' days-before-issue-stale: 60 days-before-issue-close: 7 - exempt-pr-labels: 'do-not-stale' + exempt-issue-labels: 'do-not-stale' diff --git a/examples/http/CMakeLists.txt b/examples/http/CMakeLists.txt index 2eddcfe03f..a1181c93a4 100644 --- a/examples/http/CMakeLists.txt +++ b/examples/http/CMakeLists.txt @@ -10,10 +10,11 @@ else() add_executable(http_server server.cc) target_link_libraries( - http_client ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace http_client_curl - opentelemetry_exporter_ostream_span ${CURL_LIBRARIES}) + http_client ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace + opentelemetry_http_client_curl opentelemetry_exporter_ostream_span + ${CURL_LIBRARIES}) target_link_libraries( - http_server ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace http_client_curl - opentelemetry_exporter_ostream_span) + http_server ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace + opentelemetry_http_client_curl opentelemetry_exporter_ostream_span) endif() diff --git a/exporters/elasticsearch/CMakeLists.txt b/exporters/elasticsearch/CMakeLists.txt index 1538a6f44a..9b2833854e 100644 --- a/exporters/elasticsearch/CMakeLists.txt +++ b/exporters/elasticsearch/CMakeLists.txt @@ -10,7 +10,7 @@ target_include_directories( target_link_libraries( opentelemetry_exporter_elasticsearch_logs - PUBLIC opentelemetry_trace opentelemetry_logs http_client_curl) + PUBLIC opentelemetry_trace opentelemetry_logs opentelemetry_http_client_curl) install( TARGETS opentelemetry_exporter_elasticsearch_logs diff --git a/exporters/jaeger/CMakeLists.txt b/exporters/jaeger/CMakeLists.txt index 0659c399b0..6d3d8b88fb 100644 --- a/exporters/jaeger/CMakeLists.txt +++ b/exporters/jaeger/CMakeLists.txt @@ -31,7 +31,7 @@ target_include_directories( target_link_libraries( opentelemetry_exporter_jaeger_trace - PUBLIC opentelemetry_resources http_client_curl + PUBLIC opentelemetry_resources opentelemetry_http_client_curl PRIVATE thrift::thrift) if(MSVC) diff --git a/exporters/ostream/BUILD b/exporters/ostream/BUILD index cca74d6693..f74d896344 100644 --- a/exporters/ostream/BUILD +++ b/exporters/ostream/BUILD @@ -43,36 +43,37 @@ cc_library( ], ) -cc_library( - name = "ostream_metric_exporter", - srcs = [ - "src/metric_exporter.cc", - ], - hdrs = [ - "include/opentelemetry/exporters/ostream/metric_exporter.h", - ], - strip_include_prefix = "include", - tags = [ - "metrics", - "ostream", - ], - deps = [ - "//sdk/src/metrics", - ], -) +# TODO - Uncomment once MetricData interface is finalised +#cc_library( +# name = "ostream_metric_exporter", +# srcs = [ +# "src/metric_exporter.cc", +# ], +# hdrs = [ +# "include/opentelemetry/exporters/ostream/metric_exporter.h", +# ], +# strip_include_prefix = "include", +# tags = [ +# "metrics", +# "ostream", +# ], +# deps = [ +# "//sdk/src/metrics", +# ], +#) -cc_test( - name = "ostream_metric_test", - srcs = ["test/ostream_metric_test.cc"], - tags = [ - "ostream", - "test", - ], - deps = [ - ":ostream_metric_exporter", - "@com_google_googletest//:gtest_main", - ], -) +#cc_test( +# name = "ostream_metric_test", +# srcs = ["test/ostream_metric_test.cc"], +# tags = [ +# "ostream", +# "test", +# ], +# deps = [ +# ":ostream_metric_exporter", +# "@com_google_googletest//:gtest_main", +# ], +#) cc_test( name = "ostream_metrics_test_deprecated", diff --git a/exporters/otlp/CMakeLists.txt b/exporters/otlp/CMakeLists.txt index 0332267922..1dba7ea1c7 100755 --- a/exporters/otlp/CMakeLists.txt +++ b/exporters/otlp/CMakeLists.txt @@ -46,7 +46,7 @@ if(WITH_OTLP_HTTP) PROPERTIES EXPORT_NAME otlp_http_client) target_link_libraries( opentelemetry_exporter_otlp_http_client - PUBLIC opentelemetry_sdk opentelemetry_proto http_client_curl + PUBLIC opentelemetry_sdk opentelemetry_proto opentelemetry_http_client_curl nlohmann_json::nlohmann_json) if(nlohmann_json_clone) add_dependencies(opentelemetry_exporter_otlp_http_client diff --git a/exporters/zipkin/CMakeLists.txt b/exporters/zipkin/CMakeLists.txt index b9591324fd..b82339f1ee 100644 --- a/exporters/zipkin/CMakeLists.txt +++ b/exporters/zipkin/CMakeLists.txt @@ -20,7 +20,8 @@ add_library(opentelemetry_exporter_zipkin_trace src/zipkin_exporter.cc target_link_libraries( opentelemetry_exporter_zipkin_trace - PUBLIC opentelemetry_trace http_client_curl nlohmann_json::nlohmann_json) + PUBLIC opentelemetry_trace opentelemetry_http_client_curl + nlohmann_json::nlohmann_json) install( TARGETS opentelemetry_exporter_zipkin_trace diff --git a/ext/src/http/client/curl/CMakeLists.txt b/ext/src/http/client/curl/CMakeLists.txt index 64486b96db..78a81cfe3e 100644 --- a/ext/src/http/client/curl/CMakeLists.txt +++ b/ext/src/http/client/curl/CMakeLists.txt @@ -1,22 +1,23 @@ find_package(CURL) if(CURL_FOUND) - add_library(http_client_curl http_client_factory_curl.cc http_client_curl.cc) + add_library(opentelemetry_http_client_curl http_client_factory_curl.cc + http_client_curl.cc) - set_target_properties(http_client_curl PROPERTIES EXPORT_NAME - http_client_curl) + set_target_properties(opentelemetry_http_client_curl + PROPERTIES EXPORT_NAME http_client_curl) if(TARGET CURL::libcurl) - target_link_libraries(http_client_curl PUBLIC opentelemetry_ext - CURL::libcurl) + target_link_libraries(opentelemetry_http_client_curl + PUBLIC opentelemetry_ext CURL::libcurl) else() - target_include_directories(http_client_curl + target_include_directories(opentelemetry_http_client_curl INTERFACE "${CURL_INCLUDE_DIRS}") - target_link_libraries(http_client_curl PUBLIC opentelemetry_ext - ${CURL_LIBRARIES}) + target_link_libraries(opentelemetry_http_client_curl + PUBLIC opentelemetry_ext ${CURL_LIBRARIES}) endif() install( - TARGETS http_client_curl + TARGETS opentelemetry_http_client_curl EXPORT "${PROJECT_NAME}-target" RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} diff --git a/ext/test/http/CMakeLists.txt b/ext/test/http/CMakeLists.txt index 13b7c8206e..341648085d 100644 --- a/ext/test/http/CMakeLists.txt +++ b/ext/test/http/CMakeLists.txt @@ -7,10 +7,12 @@ if(CURL_FOUND) ${CMAKE_THREAD_LIBS_INIT}) if(TARGET CURL::libcurl) - target_link_libraries(${FILENAME} CURL::libcurl http_client_curl) + target_link_libraries(${FILENAME} opentelemetry_http_client_curl + CURL::libcurl) else() include_directories(${CURL_INCLUDE_DIRS}) - target_link_libraries(${FILENAME} ${CURL_LIBRARIES} http_client_curl) + target_link_libraries(${FILENAME} ${CURL_LIBRARIES} + opentelemetry_http_client_curl) endif() gtest_add_tests( TARGET ${FILENAME} diff --git a/ext/test/w3c_tracecontext_test/CMakeLists.txt b/ext/test/w3c_tracecontext_test/CMakeLists.txt index 30e2f5d0f3..ea74a8eeb0 100644 --- a/ext/test/w3c_tracecontext_test/CMakeLists.txt +++ b/ext/test/w3c_tracecontext_test/CMakeLists.txt @@ -7,9 +7,9 @@ else() add_executable(w3c_tracecontext_test main.cc) target_link_libraries( w3c_tracecontext_test - PRIVATE ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace http_client_curl - opentelemetry_exporter_ostream_span ${CURL_LIBRARIES} - nlohmann_json::nlohmann_json) + PRIVATE ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace + opentelemetry_http_client_curl opentelemetry_exporter_ostream_span + ${CURL_LIBRARIES} nlohmann_json::nlohmann_json) if(nlohmann_json_clone) add_dependencies(w3c_tracecontext_test nlohmann_json::nlohmann_json) endif() diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation.h index 3bb133ac15..7ec9a6ea2b 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation.h @@ -11,16 +11,6 @@ namespace sdk { namespace metrics { -class InstrumentMonotonicityAwareAggregation -{ -public: - InstrumentMonotonicityAwareAggregation(bool is_monotonic) : is_monotonic_(is_monotonic) {} - bool IsMonotonic() { return is_monotonic_; } - -private: - bool is_monotonic_; -}; - class Aggregation { public: @@ -28,7 +18,35 @@ class Aggregation virtual void Aggregate(double value, const PointAttributes &attributes = {}) noexcept = 0; - virtual PointType Collect() noexcept = 0; + /** + * Returns the result of the merge of the two aggregations. + * + * This should always assume that the aggregations do not overlap and merge together for a new + * cumulative report. + * + * @param delta the newly captured (delta) aggregation + * @return the result of the merge of the given aggregation. + */ + + virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept = 0; + + /** + * Returns a new delta aggregation by comparing two cumulative measurements. + * + * @param next the newly captured (cumulative) aggregation. + * @return The resulting delta aggregation. + */ + virtual std::unique_ptr Diff(const Aggregation &next) const noexcept = 0; + + /** + * Returns the point data that the aggregation will produce. + * + * @return PointType + */ + + virtual PointType ToPoint() const noexcept = 0; + + virtual ~Aggregation() = default; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index f193864926..b5a1283d26 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -30,8 +30,8 @@ class DefaultAggregation case InstrumentType::kUpDownCounter: case InstrumentType::kObservableUpDownCounter: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) - ? std::move(std::unique_ptr(new LongSumAggregation(true))) - : std::move(std::unique_ptr(new DoubleSumAggregation(true))); + ? std::move(std::unique_ptr(new LongSumAggregation())) + : std::move(std::unique_ptr(new DoubleSumAggregation())); break; case InstrumentType::kHistogram: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) @@ -79,11 +79,11 @@ class DefaultAggregation case AggregationType::kSum: if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) { - return std::unique_ptr(new LongSumAggregation(true)); + return std::unique_ptr(new LongSumAggregation()); } else { - return std::unique_ptr(new DoubleSumAggregation(true)); + return std::unique_ptr(new DoubleSumAggregation()); } break; default: diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h index 0af5dd43a5..4e29fa2e46 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h @@ -14,6 +14,10 @@ namespace sdk namespace metrics { +/** + * A null Aggregation which denotes no aggregation should occur. + */ + class DropAggregation : public Aggregation { public: @@ -23,7 +27,21 @@ class DropAggregation : public Aggregation void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - PointType Collect() noexcept override { return DropPointData(); } + std::unique_ptr Merge(const Aggregation &delta) const noexcept override + { + return std::unique_ptr(new DropAggregation()); + } + + std::unique_ptr Diff(const Aggregation &next) const noexcept override + { + return std::unique_ptr(new DropAggregation()); + } + + PointType ToPoint() const noexcept override + { + static DropPointData point_data; + return point_data; + } }; } // namespace metrics } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h index 5a66ecb243..8f33fa27b4 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h @@ -13,57 +13,47 @@ namespace sdk { namespace metrics { -template -static inline void PopulateHistogramDataPoint(HistogramPointData &histogram, - opentelemetry::common::SystemTimestamp epoch_nanos, - T sum, - uint64_t count, - std::vector &counts, - std::vector boundaries) -{ - histogram.epoch_nanos_ = epoch_nanos; - histogram.boundaries_ = boundaries; - histogram.sum_ = sum; - histogram.counts_ = counts; - histogram.count_ = count; -} class LongHistogramAggregation : public Aggregation { public: LongHistogramAggregation(); + LongHistogramAggregation(HistogramPointData &&); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - PointType Collect() noexcept override; + virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + + virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + + PointType ToPoint() const noexcept override; private: opentelemetry::common::SpinLockMutex lock_; - std::vector boundaries_; - long sum_; - std::vector counts_; - uint64_t count_; + HistogramPointData point_data_; }; class DoubleHistogramAggregation : public Aggregation { public: DoubleHistogramAggregation(); + DoubleHistogramAggregation(HistogramPointData &&); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override; - PointType Collect() noexcept override; + virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + + virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + + PointType ToPoint() const noexcept override; private: - opentelemetry::common::SpinLockMutex lock_; - std::vector boundaries_; - double sum_; - std::vector counts_; - uint64_t count_; + mutable opentelemetry::common::SpinLockMutex lock_; + mutable HistogramPointData point_data_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h index 092a0df30e..7f185d51a1 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h @@ -17,34 +17,42 @@ class LongLastValueAggregation : public Aggregation { public: LongLastValueAggregation(); + LongLastValueAggregation(LastValuePointData &&); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - PointType Collect() noexcept override; + virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + + virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + + PointType ToPoint() const noexcept override; private: opentelemetry::common::SpinLockMutex lock_; - long value_; - bool is_lastvalue_valid_; + LastValuePointData point_data_; }; class DoubleLastValueAggregation : public Aggregation { public: DoubleLastValueAggregation(); + DoubleLastValueAggregation(LastValuePointData &&); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override; - PointType Collect() noexcept override; + virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + + virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + + PointType ToPoint() const noexcept override; private: - opentelemetry::common::SpinLockMutex lock_; - double value_; - bool is_lastvalue_valid_; + mutable opentelemetry::common::SpinLockMutex lock_; + mutable LastValuePointData point_data_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h index ff99cec733..b0f0169b24 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h @@ -14,52 +14,46 @@ namespace sdk namespace metrics { -template -static inline void PopulateSumPointData(SumPointData &sum, - opentelemetry::common::SystemTimestamp start_ts, - opentelemetry::common::SystemTimestamp end_ts, - T value, - bool is_monotonic) -{ - sum.start_epoch_nanos_ = start_ts; - sum.end_epoch_nanos_ = end_ts; - sum.value_ = value; - sum.is_monotonic_ = is_monotonic; - sum.aggregation_temporality_ = AggregationTemporality::kDelta; -} - -class LongSumAggregation : public Aggregation, InstrumentMonotonicityAwareAggregation +class LongSumAggregation : public Aggregation { public: - LongSumAggregation(bool is_monotonic); + LongSumAggregation(); + LongSumAggregation(SumPointData &&); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - PointType Collect() noexcept override; + virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + + virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + + PointType ToPoint() const noexcept override; private: opentelemetry::common::SpinLockMutex lock_; - opentelemetry::common::SystemTimestamp start_epoch_nanos_; - long sum_; + SumPointData point_data_; }; -class DoubleSumAggregation : public Aggregation, InstrumentMonotonicityAwareAggregation +class DoubleSumAggregation : public Aggregation { public: - DoubleSumAggregation(bool is_monotonic); + DoubleSumAggregation(); + DoubleSumAggregation(SumPointData &&); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override; - PointType Collect() noexcept override; + virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + + virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + + PointType ToPoint() const noexcept override; private: - opentelemetry::common::SpinLockMutex lock_; - opentelemetry::common::SystemTimestamp start_epoch_nanos_; - double sum_; + mutable opentelemetry::common::SpinLockMutex lock_; + SumPointData point_data_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index f38ad6a3b4..8b1f76377b 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -6,7 +6,6 @@ # include "opentelemetry/metrics/async_instruments.h" # include "opentelemetry/metrics/observer_result.h" # include "opentelemetry/nostd/string_view.h" -# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" # include "opentelemetry/sdk/metrics/instruments.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -19,22 +18,14 @@ class Asynchronous { public: Asynchronous(nostd::string_view name, - const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary - *instrumentation_library, void (*callback)(opentelemetry::metrics::ObserverResult &), nostd::string_view description = "", nostd::string_view unit = "") - : name_(name), - instrumentation_library_{instrumentation_library}, - callback_(callback), - description_(description), - unit_(unit) + : name_(name), callback_(callback), description_(description), unit_(unit) {} protected: std::string name_; - const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary - *instrumentation_library_; void (*callback_)(opentelemetry::metrics::ObserverResult &); std::string description_; std::string unit_; @@ -45,12 +36,10 @@ class LongObservableCounter : public opentelemetry::metrics::ObservableCounter &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, instrumentation_library, callback, description, unit) + : Asynchronous(name, callback, description, unit) {} }; @@ -60,12 +49,10 @@ class DoubleObservableCounter : public opentelemetry::metrics::ObservableCounter { public: DoubleObservableCounter(nostd::string_view name, - const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary - *instrumentation_library, void (*callback)(opentelemetry::metrics::ObserverResult &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, instrumentation_library, callback, description, unit) + : Asynchronous(name, callback, description, unit) {} }; @@ -75,12 +62,10 @@ class LongObservableGauge : public opentelemetry::metrics::ObservableGauge { public: LongObservableGauge(nostd::string_view name, - const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary - *instrumentation_library, void (*callback)(opentelemetry::metrics::ObserverResult &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, instrumentation_library, callback, description, unit) + : Asynchronous(name, callback, description, unit) {} }; @@ -90,12 +75,10 @@ class DoubleObservableGauge : public opentelemetry::metrics::ObservableGauge &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, instrumentation_library, callback, description, unit) + : Asynchronous(name, callback, description, unit) {} }; @@ -104,14 +87,11 @@ class LongObservableUpDownCounter : public opentelemetry::metrics::ObservableUpD public Asynchronous { public: - LongObservableUpDownCounter( - nostd::string_view name, - const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary - *instrumentation_library, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : Asynchronous(name, instrumentation_library, callback, description, unit) + LongObservableUpDownCounter(nostd::string_view name, + void (*callback)(opentelemetry::metrics::ObserverResult &), + nostd::string_view description = "", + nostd::string_view unit = "") + : Asynchronous(name, callback, description, unit) {} }; @@ -121,14 +101,11 @@ class DoubleObservableUpDownCounter public Asynchronous { public: - DoubleObservableUpDownCounter( - nostd::string_view name, - const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary - *instrumentation_library, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : Asynchronous(name, instrumentation_library, callback, description, unit) + DoubleObservableUpDownCounter(nostd::string_view name, + void (*callback)(opentelemetry::metrics::ObserverResult &), + nostd::string_view description = "", + nostd::string_view unit = "") + : Asynchronous(name, callback, description, unit) {} }; diff --git a/sdk/include/opentelemetry/sdk/metrics/data/metric_data.h b/sdk/include/opentelemetry/sdk/metrics/data/metric_data.h index cb7bc4cf80..738d4540f7 100644 --- a/sdk/include/opentelemetry/sdk/metrics/data/metric_data.h +++ b/sdk/include/opentelemetry/sdk/metrics/data/metric_data.h @@ -17,18 +17,23 @@ namespace sdk namespace metrics { -using PointAttributes = opentelemetry::sdk::common::AttributeMap; +using PointAttributes = opentelemetry::sdk::common::OrderedAttributeMap; using PointType = opentelemetry::nostd:: variant; +struct PointDataAttributes +{ + PointAttributes attributes; + PointType point_data; +}; + class MetricData { public: - opentelemetry::sdk::resource::Resource *resource_; - opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library_; - PointAttributes attributes_; InstrumentDescriptor instrument_descriptor; - PointType point_data_; + opentelemetry::common::SystemTimestamp start_ts; + opentelemetry::common::SystemTimestamp end_ts; + std::vector point_data_attr_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/data/point_data.h b/sdk/include/opentelemetry/sdk/metrics/data/point_data.h index 5bbcbd0b1e..714f96c9af 100644 --- a/sdk/include/opentelemetry/sdk/metrics/data/point_data.h +++ b/sdk/include/opentelemetry/sdk/metrics/data/point_data.h @@ -8,7 +8,7 @@ # include "opentelemetry/sdk/metrics/instruments.h" # include "opentelemetry/version.h" -# include +# include OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -17,40 +17,62 @@ namespace metrics { using ValueType = nostd::variant; -using ListType = nostd::variant, std::vector>; +using ListType = nostd::variant, std::list>; + +// TODO: remove ctors and initializers from below classes when GCC<5 stops shipping on Ubuntu class SumPointData { public: - opentelemetry::common::SystemTimestamp start_epoch_nanos_; - opentelemetry::common::SystemTimestamp end_epoch_nanos_; - ValueType value_; - AggregationTemporality aggregation_temporality_; - bool is_monotonic_; + // TODO: remove ctors and initializers when GCC<5 stops shipping on Ubuntu + SumPointData(SumPointData &&) = default; + SumPointData(const SumPointData &) = default; + SumPointData &operator=(SumPointData &&) = default; + SumPointData() = default; + + ValueType value_ = {}; }; class LastValuePointData { public: - opentelemetry::common::SystemTimestamp epoch_nanos_; - bool is_lastvalue_valid_; - ValueType value_; + // TODO: remove ctors and initializers when GCC<5 stops shipping on Ubuntu + LastValuePointData(LastValuePointData &&) = default; + LastValuePointData(const LastValuePointData &) = default; + LastValuePointData &operator=(LastValuePointData &&) = default; + LastValuePointData() = default; + + ValueType value_ = {}; + bool is_lastvalue_valid_ = {}; + opentelemetry::common::SystemTimestamp sample_ts_ = {}; }; class HistogramPointData { public: - opentelemetry::common::SystemTimestamp epoch_nanos_; - ListType boundaries_; - ValueType sum_; - std::vector counts_; - uint64_t count_; + // TODO: remove ctors and initializers when GCC<5 stops shipping on Ubuntu + HistogramPointData(HistogramPointData &&) = default; + HistogramPointData &operator=(HistogramPointData &&) = default; + HistogramPointData(const HistogramPointData &) = default; + HistogramPointData() = default; + + ListType boundaries_ = {}; + ValueType sum_ = {}; + std::vector counts_ = {}; + uint64_t count_ = {}; }; class DropPointData -{}; +{ +public: + // TODO: remove ctors and initializers when GCC<5 stops shipping on Ubuntu + DropPointData(DropPointData &&) = default; + DropPointData(const DropPointData &) = default; + DropPointData() = default; + DropPointData &operator=(DropPointData &&) = default; +}; } // namespace metrics } // namespace sdk OPENTELEMETRY_END_NAMESPACE -#endif \ No newline at end of file +#endif diff --git a/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h b/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h index 7c9ee57755..d3b38759ad 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h @@ -3,13 +3,33 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" # include "opentelemetry/sdk/metrics/data/metric_data.h" +# include "opentelemetry/sdk/resource/resource.h" + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { namespace metrics { +/** + * Metric Data to be exported along with resources and + * Instrumentation library. + */ +struct InstrumentationInfoMetrics +{ + const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary + *instrumentation_library_; + std::vector metric_data_; +}; + +struct ResourceMetrics +{ + const opentelemetry::sdk::resource::Resource *resource_; + std::vector instrumentation_info_metric_data_; +}; + /** * MetricProducer is the interface that is used to make metric data available to the * OpenTelemetry exporters. Implementations should be stateful, in that each call to @@ -27,7 +47,8 @@ class MetricProducer * * @return a status of completion of method. */ - virtual bool Collect(nostd::function_ref callback) noexcept = 0; + virtual bool Collect( + nostd::function_ref callback) noexcept = 0; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h new file mode 100644 index 0000000000..29125a6ea2 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/metric_reader.h" +# include "opentelemetry/version.h" + +# include +# include +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +class MetricExporter; +/** + * Struct to hold PeriodicExortingMetricReader options. + */ + +constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000); +constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000); +struct PeriodicExportingMetricReaderOptions +{ + + /* The time interval between two consecutive exports. */ + std::chrono::milliseconds export_interval_millis = + std::chrono::milliseconds(kExportIntervalMillis); + + /* how long the export can run before it is cancelled. */ + std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis); +}; + +class PeriodicExportingMetricReader : public MetricReader +{ + +public: + PeriodicExportingMetricReader( + std::unique_ptr exporter, + const PeriodicExportingMetricReaderOptions &option, + AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative); + +private: + bool OnForceFlush(std::chrono::microseconds timeout) noexcept override; + + bool OnShutDown(std::chrono::microseconds timeout) noexcept override; + + void OnInitialized() noexcept override; + + std::unique_ptr exporter_; + std::chrono::milliseconds export_interval_millis_; + std::chrono::milliseconds export_timeout_millis_; + + void DoBackgroundWork(); + + /* The background worker thread */ + std::thread worker_thread_; + + /* Synchronization primitives */ + std::condition_variable cv_; + std::mutex cv_m_; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/include/opentelemetry/sdk/metrics/instruments.h b/sdk/include/opentelemetry/sdk/metrics/instruments.h index 15e6a25705..5d85357143 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/instruments.h @@ -40,7 +40,7 @@ enum class AggregationTemporality { kUnspecified, kDelta, - kCummulative + kCumulative }; struct InstrumentDescriptor diff --git a/sdk/include/opentelemetry/sdk/metrics/meter.h b/sdk/include/opentelemetry/sdk/metrics/meter.h index fc9fb36503..4a6ea26aeb 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter.h @@ -102,10 +102,9 @@ class Meter final : public opentelemetry::metrics::Meter const sdk::instrumentationlibrary::InstrumentationLibrary *GetInstrumentationLibrary() const noexcept; - /** collect metrics across all the meters **/ - bool collect(CollectorHandle *collector, - opentelemetry::common::SystemTimestamp collect_ts, - nostd::function_ref callback) noexcept; + /** collect metrics across all the instruments configured for the meter **/ + std::vector Collect(CollectorHandle *collector, + opentelemetry::common::SystemTimestamp collect_ts) noexcept; private: // order of declaration is important here - instrumentation library should destroy after diff --git a/sdk/include/opentelemetry/sdk/metrics/meter_context.h b/sdk/include/opentelemetry/sdk/metrics/meter_context.h index 8a31821353..74f67a1c46 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter_context.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter_context.h @@ -111,7 +111,6 @@ class MeterContext : public std::enable_shared_from_this /** * Adds a meter to the list of configured meters. - * * Note: This method is INTERNAL to sdk not thread safe. * * @param meter diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h index 3769259472..b6b5acf3a9 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h @@ -30,11 +30,9 @@ class MetricExporter /** * Exports a batch of metrics recordables. This method must not be called * concurrently for the same exporter instance. - * @param spans a span of unique pointers to metrics data + * @param data metrics data */ - virtual opentelemetry::sdk::common::ExportResult Export( - const nostd::span> - &records) noexcept = 0; + virtual opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &data) noexcept = 0; /** * Force flush the exporter. @@ -49,9 +47,6 @@ class MetricExporter */ virtual bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; - -private: - AggregationTemporality aggregation_temporality_; }; } // namespace metrics } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index 1f7cee8d30..94924315f8 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -6,8 +6,8 @@ # include "opentelemetry/common/spin_lock_mutex.h" # include "opentelemetry/sdk/common/global_log_handler.h" # include "opentelemetry/sdk/metrics/data/metric_data.h" +# include "opentelemetry/sdk/metrics/export/metric_producer.h" # include "opentelemetry/sdk/metrics/instruments.h" - # include "opentelemetry/version.h" # include @@ -19,7 +19,6 @@ namespace sdk namespace metrics { -class MetricProducer; /** * MetricReader defines the interface to collect metrics from SDK */ @@ -27,7 +26,7 @@ class MetricReader { public: MetricReader( - AggregationTemporality aggregation_temporality = AggregationTemporality::kCummulative); + AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative); void SetMetricProducer(MetricProducer *metric_producer); @@ -35,7 +34,7 @@ class MetricReader * Collect the metrics from SDK. * @return return the status of the operation. */ - bool Collect(nostd::function_ref callback) noexcept; + bool Collect(nostd::function_ref callback) noexcept; AggregationTemporality GetAggregationTemporality() const noexcept; @@ -58,6 +57,7 @@ class MetricReader virtual void OnInitialized() noexcept {} +protected: bool IsShutdown() const noexcept; private: diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 019d533006..cfbf521538 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -4,17 +4,15 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/common/attributemap_hash.h" -# include "opentelemetry/sdk/metrics/instruments.h" - -# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" # include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" +# include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/metrics/observer_result.h" # include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +# include "opentelemetry/sdk/metrics/state/metric_collector.h" # include "opentelemetry/sdk/metrics/state/metric_storage.h" # include "opentelemetry/sdk/metrics/view/attributes_processor.h" -# include "opentelemetry/sdk/resource/resource.h" # include -# include "opentelemetry/sdk/metrics/observer_result.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -41,7 +39,7 @@ class AsyncMetricStorage : public MetricStorage nostd::span> collectors, opentelemetry::common::SystemTimestamp sdk_start_ts, opentelemetry::common::SystemTimestamp collection_ts, - nostd::function_ref metric_collection_callback) noexcept override + nostd::function_ref metric_collection_callback) noexcept override { opentelemetry::sdk::metrics::ObserverResult ob_res(attributes_processor_); @@ -58,7 +56,7 @@ class AsyncMetricStorage : public MetricStorage // TBD -> read aggregation from hashmap, and perform metric collection MetricData metric_data; - if (metric_collection_callback(metric_data)) + if (metric_collection_callback(std::move(metric_data))) { return true; } diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index 32301f8038..50d40e0ae6 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -37,7 +37,7 @@ class AttributesHashMap public: Aggregation *Get(const MetricAttributes &attributes) const { - std::lock_guard guard(GetLock()); + std::lock_guard guard(lock_); auto it = hash_map_.find(attributes); if (it != hash_map_.end()) { @@ -52,7 +52,7 @@ class AttributesHashMap */ bool Has(const MetricAttributes &attributes) const { - std::lock_guard guard(GetLock()); + std::lock_guard guard(lock_); return (hash_map_.find(attributes) == hash_map_.end()) ? false : true; } @@ -64,7 +64,8 @@ class AttributesHashMap Aggregation *GetOrSetDefault(const MetricAttributes &attributes, std::function()> aggregation_callback) { - std::lock_guard guard(GetLock()); + std::lock_guard guard(lock_); + auto it = hash_map_.find(attributes); if (it != hash_map_.end()) { @@ -80,7 +81,7 @@ class AttributesHashMap */ void Set(const MetricAttributes &attributes, std::unique_ptr value) { - std::lock_guard guard(GetLock()); + std::lock_guard guard(lock_); hash_map_[attributes] = std::move(value); } @@ -90,7 +91,7 @@ class AttributesHashMap bool GetAllEnteries( nostd::function_ref callback) const { - std::lock_guard guard(GetLock()); + std::lock_guard guard(lock_); for (auto &kv : hash_map_) { if (!callback(kv.first, *(kv.second.get()))) @@ -106,7 +107,7 @@ class AttributesHashMap */ size_t Size() { - std::lock_guard guard(GetLock()); + std::lock_guard guard(lock_); return hash_map_.size(); } @@ -114,11 +115,7 @@ class AttributesHashMap std::unordered_map, AttributeHashGenerator> hash_map_; - static opentelemetry::common::SpinLockMutex &GetLock() noexcept - { - static opentelemetry::common::SpinLockMutex lock; - return lock; - } + mutable opentelemetry::common::SpinLockMutex lock_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h b/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h index 51c9cf6eff..20372f5209 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h @@ -41,7 +41,7 @@ class MetricCollector : public MetricProducer, public CollectorHandle * * @return a status of completion of method. */ - bool Collect(nostd::function_ref callback) noexcept override; + bool Collect(nostd::function_ref callback) noexcept override; bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h index 20f7d56140..e0ba55cbfe 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h @@ -25,7 +25,7 @@ class MetricStorage nostd::span> collectors, opentelemetry::common::SystemTimestamp sdk_start_ts, opentelemetry::common::SystemTimestamp collection_ts, - nostd::function_ref callback) noexcept = 0; + nostd::function_ref callback) noexcept = 0; }; class WritableMetricStorage @@ -54,14 +54,10 @@ class NoopMetricStorage : public MetricStorage nostd::span> collectors, opentelemetry::common::SystemTimestamp sdk_start_ts, opentelemetry::common::SystemTimestamp collection_ts, - nostd::function_ref callback) noexcept override + nostd::function_ref callback) noexcept override { MetricData metric_data; - if (callback(metric_data)) - { - return true; - } - return false; + return callback(std::move(metric_data)); } }; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index bfe50b152d..c16f33ede2 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -9,11 +9,14 @@ # include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" # include "opentelemetry/sdk/metrics/exemplar/reservoir.h" # include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +# include "opentelemetry/sdk/metrics/state/metric_collector.h" # include "opentelemetry/sdk/metrics/state/metric_storage.h" + # include "opentelemetry/sdk/metrics/view/attributes_processor.h" # include "opentelemetry/sdk/metrics/view/view.h" # include "opentelemetry/sdk/resource/resource.h" +# include # include OPENTELEMETRY_BEGIN_NAMESPACE @@ -22,6 +25,12 @@ namespace sdk namespace metrics { +struct LastReportedMetrics +{ + std::unique_ptr attributes_map; + opentelemetry::common::SystemTimestamp collection_ts; +}; + class SyncMetricStorage : public MetricStorage, public WritableMetricStorage { @@ -73,7 +82,6 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage { return; } - exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -88,7 +96,6 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage { return; } - exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); @@ -99,20 +106,19 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage nostd::span> collectors, opentelemetry::common::SystemTimestamp sdk_start_ts, opentelemetry::common::SystemTimestamp collection_ts, - nostd::function_ref callback) noexcept override - { - MetricData data; - if (callback(data)) - { - return true; - } - return false; - } + nostd::function_ref callback) noexcept override; private: InstrumentDescriptor instrument_descriptor_; AggregationType aggregation_type_; + + // hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call) std::unique_ptr attributes_hashmap_; + // unreported metrics stash for all the collectors + std::unordered_map>> + unreported_metrics_; + // last reported metrics stash for all the collectors. + std::unordered_map last_reported_metrics_; const AttributesProcessor *attributes_processor_; std::function()> create_default_aggregation_; nostd::shared_ptr exemplar_reservoir_; diff --git a/sdk/src/metrics/CMakeLists.txt b/sdk/src/metrics/CMakeLists.txt index 119477eb06..b6656b5bf8 100644 --- a/sdk/src/metrics/CMakeLists.txt +++ b/sdk/src/metrics/CMakeLists.txt @@ -4,7 +4,9 @@ add_library( meter.cc meter_context.cc metric_reader.cc + export/periodic_exporting_metric_reader.cc state/metric_collector.cc + state/sync_metric_storage.cc aggregation/histogram_aggregation.cc aggregation/lastvalue_aggregation.cc aggregation/sum_aggregation.cc diff --git a/sdk/src/metrics/aggregation/histogram_aggregation.cc b/sdk/src/metrics/aggregation/histogram_aggregation.cc index 2ebe3715bb..60f65c51b1 100644 --- a/sdk/src/metrics/aggregation/histogram_aggregation.cc +++ b/sdk/src/metrics/aggregation/histogram_aggregation.cc @@ -13,65 +13,104 @@ namespace metrics { LongHistogramAggregation::LongHistogramAggregation() - : boundaries_{0l, 5l, 10l, 25l, 50l, 75l, 100l, 250l, 500l, 1000l}, - counts_(boundaries_.size() + 1, 0), - sum_(0l), - count_(0) +{ + + point_data_.boundaries_ = std::list{0l, 5l, 10l, 25l, 50l, 75l, 100l, 250l, 500l, 1000l}; + point_data_.counts_ = + std::vector(nostd::get>(point_data_.boundaries_).size() + 1, 0); + point_data_.sum_ = 0l; + point_data_.count_ = 0; +} + +LongHistogramAggregation::LongHistogramAggregation(HistogramPointData &&data) + : point_data_{std::move(data)} {} void LongHistogramAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); - count_ += 1; - sum_ += value; - for (auto it = boundaries_.begin(); it != boundaries_.end(); ++it) + point_data_.count_ += 1; + point_data_.sum_ = nostd::get(point_data_.sum_) + value; + size_t index = 0; + for (auto it = nostd::get>(point_data_.boundaries_).begin(); + it != nostd::get>(point_data_.boundaries_).end(); ++it) { if (value < *it) { - counts_[std::distance(boundaries_.begin(), it)] += 1; + point_data_.counts_[index] += 1; return; } + index++; } } -PointType LongHistogramAggregation::Collect() noexcept +std::unique_ptr LongHistogramAggregation::Merge( + const Aggregation &delta) const noexcept { - HistogramPointData point_data; - auto epoch_nanos = std::chrono::system_clock::now(); - const std::lock_guard locked(lock_); - PopulateHistogramDataPoint(point_data, epoch_nanos, sum_, count_, counts_, boundaries_); - return point_data; + return nullptr; +} + +std::unique_ptr LongHistogramAggregation::Diff(const Aggregation &next) const noexcept +{ + return nullptr; +} + +PointType LongHistogramAggregation::ToPoint() const noexcept +{ + return point_data_; } DoubleHistogramAggregation::DoubleHistogramAggregation() - : boundaries_{0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 1000.0}, - counts_(boundaries_.size() + 1, 0), - sum_(0.0), - count_(0) +{ + + point_data_.boundaries_ = + std::list{0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 1000.0}; + point_data_.counts_ = + std::vector(nostd::get>(point_data_.boundaries_).size() + 1, 0); + point_data_.sum_ = 0.0; + point_data_.count_ = 0; +} + +DoubleHistogramAggregation::DoubleHistogramAggregation(HistogramPointData &&data) + : point_data_{std::move(data)} {} void DoubleHistogramAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); - count_ += 1; - sum_ += value; - for (auto it = boundaries_.begin(); it != boundaries_.end(); ++it) + point_data_.count_ += 1; + point_data_.sum_ = nostd::get(point_data_.sum_) + value; + size_t index = 0; + for (auto it = nostd::get>(point_data_.boundaries_).begin(); + it != nostd::get>(point_data_.boundaries_).end(); ++it) { if (value < *it) { - counts_[std::distance(boundaries_.begin(), it)] += 1; + point_data_.counts_[index] += 1; return; } + index++; } } -PointType DoubleHistogramAggregation::Collect() noexcept +std::unique_ptr DoubleHistogramAggregation::Merge( + const Aggregation &delta) const noexcept { - HistogramPointData point_data; - auto epoch_nanos = std::chrono::system_clock::now(); - const std::lock_guard locked(lock_); - PopulateHistogramDataPoint(point_data, epoch_nanos, sum_, count_, counts_, boundaries_); - return point_data; + // TODO - Implement me + return nullptr; +} + +std::unique_ptr DoubleHistogramAggregation::Diff( + const Aggregation &next) const noexcept +{ + // TODO - Implement me + return nullptr; +} + +PointType DoubleHistogramAggregation::ToPoint() const noexcept +{ + // TODO Implement me + return point_data_; } } // namespace metrics diff --git a/sdk/src/metrics/aggregation/lastvalue_aggregation.cc b/sdk/src/metrics/aggregation/lastvalue_aggregation.cc index 290b526344..9c0252be31 100644 --- a/sdk/src/metrics/aggregation/lastvalue_aggregation.cc +++ b/sdk/src/metrics/aggregation/lastvalue_aggregation.cc @@ -14,46 +14,109 @@ namespace sdk namespace metrics { -LongLastValueAggregation::LongLastValueAggregation() : is_lastvalue_valid_(false) {} +LongLastValueAggregation::LongLastValueAggregation() +{ + point_data_.is_lastvalue_valid_ = false; + point_data_.value_ = 0l; +} +LongLastValueAggregation::LongLastValueAggregation(LastValuePointData &&data) + : point_data_{std::move(data)} +{} void LongLastValueAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); - is_lastvalue_valid_ = true; - value_ = value; + point_data_.is_lastvalue_valid_ = true; + point_data_.value_ = value; } -PointType LongLastValueAggregation::Collect() noexcept +std::unique_ptr LongLastValueAggregation::Merge( + const Aggregation &delta) const noexcept { - const std::lock_guard locked(lock_); - if (!is_lastvalue_valid_) + if (nostd::get(ToPoint()).sample_ts_.time_since_epoch() > + nostd::get(delta.ToPoint()).sample_ts_.time_since_epoch()) + { + LastValuePointData merge_data = std::move(nostd::get(ToPoint())); + return std::unique_ptr(new LongLastValueAggregation(std::move(merge_data))); + } + else { - return LastValuePointData{ - opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()), false, 0l}; + LastValuePointData merge_data = std::move(nostd::get(delta.ToPoint())); + return std::unique_ptr(new LongLastValueAggregation(std::move(merge_data))); } - return LastValuePointData{ - opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()), true, value_}; } -DoubleLastValueAggregation::DoubleLastValueAggregation() : is_lastvalue_valid_(false) {} +std::unique_ptr LongLastValueAggregation::Diff(const Aggregation &next) const noexcept +{ + if (nostd::get(ToPoint()).sample_ts_.time_since_epoch() > + nostd::get(next.ToPoint()).sample_ts_.time_since_epoch()) + { + LastValuePointData diff_data = std::move(nostd::get(ToPoint())); + return std::unique_ptr(new LongLastValueAggregation(std::move(diff_data))); + } + else + { + LastValuePointData diff_data = std::move(nostd::get(next.ToPoint())); + return std::unique_ptr(new LongLastValueAggregation(std::move(diff_data))); + } +} + +PointType LongLastValueAggregation::ToPoint() const noexcept +{ + return point_data_; +} + +DoubleLastValueAggregation::DoubleLastValueAggregation() +{ + point_data_.is_lastvalue_valid_ = false; + point_data_.value_ = 0.0; +} +DoubleLastValueAggregation::DoubleLastValueAggregation(LastValuePointData &&data) + : point_data_{std::move(data)} +{} void DoubleLastValueAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); - is_lastvalue_valid_ = true; - value_ = value; + point_data_.is_lastvalue_valid_ = true; + point_data_.value_ = value; } -PointType DoubleLastValueAggregation::Collect() noexcept +std::unique_ptr DoubleLastValueAggregation::Merge( + const Aggregation &delta) const noexcept { - const std::lock_guard locked(lock_); - if (!is_lastvalue_valid_) + if (nostd::get(ToPoint()).sample_ts_.time_since_epoch() > + nostd::get(delta.ToPoint()).sample_ts_.time_since_epoch()) + { + LastValuePointData merge_data = std::move(nostd::get(ToPoint())); + return std::unique_ptr(new LongLastValueAggregation(std::move(merge_data))); + } + else { - return LastValuePointData{ - opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()), false, 0.0}; + LastValuePointData merge_data = std::move(nostd::get(delta.ToPoint())); + return std::unique_ptr(new LongLastValueAggregation(std::move(merge_data))); } - return LastValuePointData{ - opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()), true, value_}; +} + +std::unique_ptr DoubleLastValueAggregation::Diff( + const Aggregation &next) const noexcept +{ + if (nostd::get(ToPoint()).sample_ts_.time_since_epoch() > + nostd::get(next.ToPoint()).sample_ts_.time_since_epoch()) + { + LastValuePointData diff_data = std::move(nostd::get(ToPoint())); + return std::unique_ptr(new LongLastValueAggregation(std::move(diff_data))); + } + else + { + LastValuePointData diff_data = std::move(nostd::get(next.ToPoint())); + return std::unique_ptr(new LongLastValueAggregation(std::move(diff_data))); + } +} + +PointType DoubleLastValueAggregation::ToPoint() const noexcept +{ + return point_data_; } } // namespace metrics } // namespace sdk diff --git a/sdk/src/metrics/aggregation/sum_aggregation.cc b/sdk/src/metrics/aggregation/sum_aggregation.cc index e0d1330f68..94b871cd34 100644 --- a/sdk/src/metrics/aggregation/sum_aggregation.cc +++ b/sdk/src/metrics/aggregation/sum_aggregation.cc @@ -3,8 +3,10 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h" +# include "opentelemetry/sdk/metrics/data/point_data.h" # include "opentelemetry/version.h" +# include # include OPENTELEMETRY_BEGIN_NAMESPACE @@ -13,54 +15,90 @@ namespace sdk namespace metrics { -LongSumAggregation::LongSumAggregation(bool is_monotonic) - : InstrumentMonotonicityAwareAggregation(is_monotonic), - start_epoch_nanos_(opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())), - sum_(0l) -{} +LongSumAggregation::LongSumAggregation() +{ + point_data_.value_ = 0l; +} + +LongSumAggregation::LongSumAggregation(SumPointData &&data) : point_data_{std::move(data)} {} void LongSumAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); - sum_ += value; + point_data_.value_ = nostd::get(point_data_.value_) + value; } -PointType LongSumAggregation::Collect() noexcept +std::unique_ptr LongSumAggregation::Merge(const Aggregation &delta) const noexcept { - opentelemetry::common::SystemTimestamp current_ts(std::chrono::system_clock::now()); - SumPointData sum; - { - const std::lock_guard locked(lock_); - PopulateSumPointData(sum, start_epoch_nanos_, current_ts, sum_, IsMonotonic()); - start_epoch_nanos_ = current_ts; - sum_ = 0; - } - return sum; + long merge_value = + nostd::get( + nostd::get((static_cast(delta).ToPoint())) + .value_) + + nostd::get(nostd::get(ToPoint()).value_); + std::unique_ptr aggr(new LongSumAggregation()); + static_cast(aggr.get())->point_data_.value_ = merge_value; + return aggr; } -DoubleSumAggregation::DoubleSumAggregation(bool is_monotonic) - : InstrumentMonotonicityAwareAggregation(is_monotonic), - start_epoch_nanos_(opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())), - sum_(0L) -{} +std::unique_ptr LongSumAggregation::Diff(const Aggregation &next) const noexcept +{ + + long diff_value = nostd::get(nostd::get( + (static_cast(next).ToPoint())) + .value_) - + nostd::get(nostd::get(ToPoint()).value_); + std::unique_ptr aggr(new LongSumAggregation()); + static_cast(aggr.get())->point_data_.value_ = diff_value; + return aggr; +} + +PointType LongSumAggregation::ToPoint() const noexcept +{ + return point_data_; +} + +DoubleSumAggregation::DoubleSumAggregation() +{ + point_data_.value_ = 0.0; +} + +DoubleSumAggregation::DoubleSumAggregation(SumPointData &&data) : point_data_(std::move(data)) {} void DoubleSumAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); - sum_ += value; + point_data_.value_ = nostd::get(point_data_.value_) + value; +} + +std::unique_ptr DoubleSumAggregation::Merge(const Aggregation &delta) const noexcept +{ + double merge_value = + nostd::get( + nostd::get((static_cast(delta).ToPoint())) + .value_) + + nostd::get(nostd::get(ToPoint()).value_); + std::unique_ptr aggr(new DoubleSumAggregation()); + static_cast(aggr.get())->point_data_.value_ = merge_value; + return aggr; } -PointType DoubleSumAggregation::Collect() noexcept +std::unique_ptr DoubleSumAggregation::Diff(const Aggregation &next) const noexcept { - opentelemetry::common::SystemTimestamp current_ts(std::chrono::system_clock::now()); - SumPointData sum; - { - const std::lock_guard locked(lock_); - PopulateSumPointData(sum, start_epoch_nanos_, current_ts, sum_, IsMonotonic()); - start_epoch_nanos_ = current_ts; - sum_ = 0; - } - return sum; + + double diff_value = + nostd::get( + nostd::get((static_cast(next).ToPoint())) + .value_) - + nostd::get(nostd::get(ToPoint()).value_); + std::unique_ptr aggr(new DoubleSumAggregation()); + static_cast(aggr.get())->point_data_.value_ = diff_value; + return aggr; +} + +PointType DoubleSumAggregation::ToPoint() const noexcept +{ + const std::lock_guard locked(lock_); + return point_data_; } } // namespace metrics diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc new file mode 100644 index 0000000000..f11f84544f --- /dev/null +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" +# include "opentelemetry/sdk/common/global_log_handler.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" + +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +PeriodicExportingMetricReader::PeriodicExportingMetricReader( + std::unique_ptr exporter, + const PeriodicExportingMetricReaderOptions &option, + AggregationTemporality aggregation_temporality) + : MetricReader(aggregation_temporality), + exporter_{std::move(exporter)}, + export_interval_millis_{option.export_interval_millis}, + export_timeout_millis_{option.export_timeout_millis} +{ + if (export_interval_millis_ <= export_timeout_millis_) + { + OTEL_INTERNAL_LOG_WARN( + "[Periodic Exporting Metric Reader] Invalid configuration: " + "export_interval_millis_ should be less than export_timeout_millis_, using default values"); + export_interval_millis_ = kExportIntervalMillis; + export_timeout_millis_ = kExportTimeOutMillis; + } +} + +void PeriodicExportingMetricReader::OnInitialized() noexcept +{ + worker_thread_ = std::thread(&PeriodicExportingMetricReader::DoBackgroundWork, this); +} + +void PeriodicExportingMetricReader::DoBackgroundWork() +{ + std::unique_lock lk(cv_m_); + do + { + if (IsShutdown()) + { + break; + } + std::atomic cancel_export_for_timeout{false}; + auto start = std::chrono::steady_clock::now(); + auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { + Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { + if (cancel_export_for_timeout) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << export_timeout_millis_.count() << " ms, and timed out"); + return false; + } + this->exporter_->Export(metric_data); + return true; + }); + }); + std::future_status status; + do + { + status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); + if (status == std::future_status::timeout) + { + cancel_export_for_timeout = true; + break; + } + } while (status != std::future_status::ready); + auto end = std::chrono::steady_clock::now(); + auto export_time_ms = std::chrono::duration_cast(end - start); + auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; + cv_.wait_for(lk, remaining_wait_interval_ms); + } while (true); +} + +bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept +{ + return exporter_->ForceFlush(timeout); +} + +bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept +{ + if (worker_thread_.joinable()) + { + cv_.notify_one(); + worker_thread_.join(); + } + return exporter_->Shutdown(timeout); +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif \ No newline at end of file diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 20e3b02768..e7ca822b6d 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -64,7 +64,7 @@ nostd::shared_ptr> Meter::CreateLongObservableC nostd::string_view unit) noexcept { return nostd::shared_ptr>{ - new LongObservableCounter(name, GetInstrumentationLibrary(), callback, description, unit)}; + new LongObservableCounter(name, callback, description, unit)}; } nostd::shared_ptr> Meter::CreateDoubleObservableCounter( @@ -74,7 +74,7 @@ nostd::shared_ptr> Meter::CreateDoubleObserva nostd::string_view unit) noexcept { return nostd::shared_ptr>{ - new DoubleObservableCounter(name, GetInstrumentationLibrary(), callback, description, unit)}; + new DoubleObservableCounter(name, callback, description, unit)}; } nostd::shared_ptr> Meter::CreateLongHistogram( @@ -112,7 +112,7 @@ nostd::shared_ptr> Meter::CreateLongObservableGau nostd::string_view unit) noexcept { return nostd::shared_ptr>{ - new LongObservableGauge(name, GetInstrumentationLibrary(), callback, description, unit)}; + new LongObservableGauge(name, callback, description, unit)}; } nostd::shared_ptr> Meter::CreateDoubleObservableGauge( @@ -122,7 +122,7 @@ nostd::shared_ptr> Meter::CreateDoubleObservabl nostd::string_view unit) noexcept { return nostd::shared_ptr>{ - new DoubleObservableGauge(name, GetInstrumentationLibrary(), callback, description, unit)}; + new DoubleObservableGauge(name, callback, description, unit)}; } nostd::shared_ptr> Meter::CreateLongUpDownCounter( @@ -159,8 +159,8 @@ nostd::shared_ptr> Meter::CreateLongObser nostd::string_view description, nostd::string_view unit) noexcept { - return nostd::shared_ptr>{new LongObservableUpDownCounter( - name, GetInstrumentationLibrary(), callback, description, unit)}; + return nostd::shared_ptr>{ + new LongObservableUpDownCounter(name, callback, description, unit)}; } nostd::shared_ptr> @@ -170,8 +170,7 @@ Meter::CreateDoubleObservableUpDownCounter(nostd::string_view name, nostd::string_view unit) noexcept { return nostd::shared_ptr>{ - new DoubleObservableUpDownCounter(name, GetInstrumentationLibrary(), callback, description, - unit)}; + new DoubleObservableUpDownCounter(name, callback, description, unit)}; } const sdk::instrumentationlibrary::InstrumentationLibrary *Meter::GetInstrumentationLibrary() @@ -211,22 +210,20 @@ std::unique_ptr Meter::RegisterMetricStorage( } /** collect metrics across all the meters **/ -bool Meter::collect(CollectorHandle *collector, - opentelemetry::common::SystemTimestamp collect_ts, - nostd::function_ref callback) noexcept +std::vector Meter::Collect(CollectorHandle *collector, + opentelemetry::common::SystemTimestamp collect_ts) noexcept { - std::vector data; + std::vector metric_data_list; for (auto &metric_storage : storage_registry_) { - // TBD - this needs to be asynchronous metric_storage.second->Collect(collector, meter_context_->GetCollectors(), meter_context_->GetSDKStartTime(), collect_ts, - [&callback](MetricData &metric_data) { - callback(metric_data); + [&metric_data_list](MetricData metric_data) { + metric_data_list.push_back(metric_data); return true; }); } - return true; + return metric_data_list; } } // namespace metrics diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc index 8238ad2a55..eafcc60e7e 100644 --- a/sdk/src/metrics/metric_reader.cc +++ b/sdk/src/metrics/metric_reader.cc @@ -20,6 +20,7 @@ MetricReader::MetricReader(AggregationTemporality aggregation_temporality) void MetricReader::SetMetricProducer(MetricProducer *metric_producer) { metric_producer_ = metric_producer; + OnInitialized(); } AggregationTemporality MetricReader::GetAggregationTemporality() const noexcept @@ -27,7 +28,8 @@ AggregationTemporality MetricReader::GetAggregationTemporality() const noexcept return aggregation_temporality_; } -bool MetricReader::Collect(nostd::function_ref callback) noexcept +bool MetricReader::Collect( + nostd::function_ref callback) noexcept { if (!metric_producer_) { @@ -46,18 +48,21 @@ bool MetricReader::Collect(nostd::function_ref callback) noexc bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept { bool status = true; - if (IsShutdown()) { OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!"); } + + { + const std::lock_guard locked(lock_); + shutdown_ = true; + } + if (!OnShutDown(timeout)) { status = false; OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!"); } - const std::lock_guard locked(lock_); - shutdown_ = true; return status; } diff --git a/sdk/src/metrics/state/metric_collector.cc b/sdk/src/metrics/state/metric_collector.cc index 5b9fc4ab70..6cae7bf12c 100644 --- a/sdk/src/metrics/state/metric_collector.cc +++ b/sdk/src/metrics/state/metric_collector.cc @@ -29,13 +29,20 @@ AggregationTemporality MetricCollector::GetAggregationTemporality() noexcept return metric_reader_->GetAggregationTemporality(); } -bool MetricCollector::Collect(nostd::function_ref callback) noexcept +bool MetricCollector::Collect( + nostd::function_ref callback) noexcept { + ResourceMetrics resource_metrics; for (auto &meter : meter_context_->GetMeters()) { auto collection_ts = std::chrono::system_clock::now(); - meter->collect(this, collection_ts, callback); + InstrumentationInfoMetrics instrumentation_info_metrics; + instrumentation_info_metrics.metric_data_ = meter->Collect(this, collection_ts); + instrumentation_info_metrics.instrumentation_library_ = meter->GetInstrumentationLibrary(); + resource_metrics.instrumentation_info_metric_data_.push_back(instrumentation_info_metrics); } + resource_metrics.resource_ = &meter_context_->GetResource(); + callback(resource_metrics); return true; } diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc new file mode 100644 index 0000000000..f42de82b4b --- /dev/null +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/state/sync_metric_storage.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +bool SyncMetricStorage::Collect(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + nostd::function_ref callback) noexcept +{ + opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts; + auto aggregation_temporarily = collector->GetAggregationTemporality(); + + // Add the current delta metrics to `unreported metrics stash` for all the collectors, + // this will also empty the delta metrics hashmap, and make it available for + // recordings + std::shared_ptr delta_metrics = std::move(attributes_hashmap_); + attributes_hashmap_.reset(new AttributesHashMap); + for (auto &col : collectors) + { + unreported_metrics_[col.get()].push_back(delta_metrics); + } + + // Get the unreported metrics for the `collector` from `unreported metrics stash` + // since last collection, this will also cleanup the unreported metrics for `collector` + // from the stash. + auto present = unreported_metrics_.find(collector); + if (present == unreported_metrics_.end()) + { + // no unreported metrics for the collector, return. + return true; + } + auto unreported_list = std::move(present->second); + + // Iterate over the unreporter metrics for `collector` and store result in `merged_metrics` + std::unique_ptr merged_metrics(new AttributesHashMap); + for (auto &agg_hashmap : unreported_list) + { + agg_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes, + Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, std::move(agg->Merge(aggregation))); + } + else + { + merged_metrics->Set( + attributes, + std::move( + DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation))); + merged_metrics->GetAllEnteries( + [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); + } + return true; + }); + } + // Get the last reported metrics for the `collector` from `last reported metrics` stash + // - If the aggregation_temporarily for the collector is cumulative + // - Merge the last reported metrics with unreported metrics (which is in merged_metrics), + // Final result of merge would be in merged_metrics. + // - Move the final merge to the `last reported metrics` stash. + // - If the aggregation_temporarily is delta + // - Store the unreported metrics for `collector` (which is in merged_mtrics) to + // `last reported metrics` stash. + + auto reported = last_reported_metrics_.find(collector); + if (reported != last_reported_metrics_.end()) + { + last_collection_ts = last_reported_metrics_[collector].collection_ts; + auto last_aggr_hashmap = std::move(last_reported_metrics_[collector].attributes_map); + if (aggregation_temporarily == AggregationTemporality::kCumulative) + { + // merge current delta to previous cumulative + last_aggr_hashmap->GetAllEnteries( + [&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, agg->Merge(aggregation)); + } + else + { + merged_metrics->Set(attributes, + DefaultAggregation::CreateAggregation(instrument_descriptor_)); + } + return true; + }); + } + last_reported_metrics_[collector] = + LastReportedMetrics{std::move(merged_metrics), collection_ts}; + } + else + { + merged_metrics->GetAllEnteries( + [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); + last_reported_metrics_.insert( + std::make_pair(collector, LastReportedMetrics{std::move(merged_metrics), collection_ts})); + } + + // Generate the MetricData from the final merged_metrics, and invoke callback over it. + + AttributesHashMap *result_to_export = (last_reported_metrics_[collector]).attributes_map.get(); + MetricData metric_data; + metric_data.instrument_descriptor = instrument_descriptor_; + metric_data.start_ts = last_collection_ts; + metric_data.end_ts = collection_ts; + result_to_export->GetAllEnteries( + [&metric_data](const MetricAttributes &attributes, Aggregation &aggregation) { + PointDataAttributes point_data_attr; + point_data_attr.point_data = aggregation.ToPoint(); + point_data_attr.attributes = attributes; + metric_data.point_data_attr_.push_back(point_data_attr); + return true; + }); + return callback(metric_data); +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index fa1f22c73a..faf2f2b49f 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -11,7 +11,8 @@ foreach( observer_result_test sync_instruments_test async_instruments_test - metric_reader_test) + metric_reader_test + periodic_exporting_metric_reader_test) add_executable(${testname} "${testname}.cc") target_link_libraries( ${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} diff --git a/sdk/test/metrics/aggregation_test.cc b/sdk/test/metrics/aggregation_test.cc index ec753111ff..a32826b145 100644 --- a/sdk/test/metrics/aggregation_test.cc +++ b/sdk/test/metrics/aggregation_test.cc @@ -13,82 +13,80 @@ using namespace opentelemetry::sdk::metrics; namespace nostd = opentelemetry::nostd; TEST(Aggregation, LongSumAggregation) { - LongSumAggregation aggr(true); - auto data = aggr.Collect(); + LongSumAggregation aggr; + auto data = aggr.ToPoint(); ASSERT_TRUE(nostd::holds_alternative(data)); auto sum_data = nostd::get(data); ASSERT_TRUE(nostd::holds_alternative(sum_data.value_)); EXPECT_EQ(nostd::get(sum_data.value_), 0l); - EXPECT_EQ(sum_data.is_monotonic_, true); EXPECT_NO_THROW(aggr.Aggregate(12l, {})); EXPECT_NO_THROW(aggr.Aggregate(0l, {})); - sum_data = nostd::get(aggr.Collect()); + sum_data = nostd::get(aggr.ToPoint()); EXPECT_EQ(nostd::get(sum_data.value_), 12l); } TEST(Aggregation, DoubleSumAggregation) { - DoubleSumAggregation aggr(true); - auto data = aggr.Collect(); + DoubleSumAggregation aggr; + auto data = aggr.ToPoint(); ASSERT_TRUE(nostd::holds_alternative(data)); auto sum_data = nostd::get(data); ASSERT_TRUE(nostd::holds_alternative(sum_data.value_)); EXPECT_EQ(nostd::get(sum_data.value_), 0); - EXPECT_EQ(sum_data.is_monotonic_, true); EXPECT_NO_THROW(aggr.Aggregate(12.0, {})); EXPECT_NO_THROW(aggr.Aggregate(1.0, {})); - sum_data = nostd::get(aggr.Collect()); + sum_data = nostd::get(aggr.ToPoint()); EXPECT_EQ(nostd::get(sum_data.value_), 13.0); } TEST(Aggregation, LongLastValueAggregation) { LongLastValueAggregation aggr; - auto data = aggr.Collect(); + auto data = aggr.ToPoint(); ASSERT_TRUE(nostd::holds_alternative(data)); auto lastvalue_data = nostd::get(data); ASSERT_TRUE(nostd::holds_alternative(lastvalue_data.value_)); EXPECT_EQ(lastvalue_data.is_lastvalue_valid_, false); EXPECT_NO_THROW(aggr.Aggregate(12l, {})); EXPECT_NO_THROW(aggr.Aggregate(1l, {})); - lastvalue_data = nostd::get(aggr.Collect()); + lastvalue_data = nostd::get(aggr.ToPoint()); EXPECT_EQ(nostd::get(lastvalue_data.value_), 1.0); } TEST(Aggregation, DoubleLastValueAggregation) { DoubleLastValueAggregation aggr; - auto data = aggr.Collect(); + auto data = aggr.ToPoint(); ASSERT_TRUE(nostd::holds_alternative(data)); auto lastvalue_data = nostd::get(data); ASSERT_TRUE(nostd::holds_alternative(lastvalue_data.value_)); EXPECT_EQ(lastvalue_data.is_lastvalue_valid_, false); EXPECT_NO_THROW(aggr.Aggregate(12.0, {})); EXPECT_NO_THROW(aggr.Aggregate(1.0, {})); - lastvalue_data = nostd::get(aggr.Collect()); + lastvalue_data = nostd::get(aggr.ToPoint()); EXPECT_EQ(nostd::get(lastvalue_data.value_), 1.0); } TEST(Aggregation, LongHistogramAggregation) { LongHistogramAggregation aggr; - auto data = aggr.Collect(); + auto data = aggr.ToPoint(); ASSERT_TRUE(nostd::holds_alternative(data)); auto histogram_data = nostd::get(data); ASSERT_TRUE(nostd::holds_alternative(histogram_data.sum_)); - ASSERT_TRUE(nostd::holds_alternative>(histogram_data.boundaries_)); + ASSERT_TRUE(nostd::holds_alternative>(histogram_data.boundaries_)); EXPECT_EQ(nostd::get(histogram_data.sum_), 0); EXPECT_EQ(histogram_data.count_, 0); EXPECT_NO_THROW(aggr.Aggregate(12l, {})); // lies in fourth bucket EXPECT_NO_THROW(aggr.Aggregate(100l, {})); // lies in eight bucket - histogram_data = nostd::get(aggr.Collect()); + histogram_data = nostd::get(aggr.ToPoint()); EXPECT_EQ(nostd::get(histogram_data.sum_), 112); EXPECT_EQ(histogram_data.count_, 2); EXPECT_EQ(histogram_data.counts_[3], 1); EXPECT_EQ(histogram_data.counts_[7], 1); EXPECT_NO_THROW(aggr.Aggregate(13l, {})); // lies in fourth bucket EXPECT_NO_THROW(aggr.Aggregate(252l, {})); // lies in ninth bucket - histogram_data = nostd::get(aggr.Collect()); + histogram_data = nostd::get(aggr.ToPoint()); EXPECT_EQ(histogram_data.count_, 4); EXPECT_EQ(histogram_data.counts_[3], 2); EXPECT_EQ(histogram_data.counts_[8], 1); @@ -97,23 +95,23 @@ TEST(Aggregation, LongHistogramAggregation) TEST(Aggregation, DoubleHistogramAggregation) { DoubleHistogramAggregation aggr; - auto data = aggr.Collect(); + auto data = aggr.ToPoint(); ASSERT_TRUE(nostd::holds_alternative(data)); auto histogram_data = nostd::get(data); ASSERT_TRUE(nostd::holds_alternative(histogram_data.sum_)); - ASSERT_TRUE(nostd::holds_alternative>(histogram_data.boundaries_)); + ASSERT_TRUE(nostd::holds_alternative>(histogram_data.boundaries_)); EXPECT_EQ(nostd::get(histogram_data.sum_), 0); EXPECT_EQ(histogram_data.count_, 0); EXPECT_NO_THROW(aggr.Aggregate(12.0, {})); // lies in fourth bucket EXPECT_NO_THROW(aggr.Aggregate(100.0, {})); // lies in eight bucket - histogram_data = nostd::get(aggr.Collect()); + histogram_data = nostd::get(aggr.ToPoint()); EXPECT_EQ(nostd::get(histogram_data.sum_), 112); EXPECT_EQ(histogram_data.count_, 2); EXPECT_EQ(histogram_data.counts_[3], 1); EXPECT_EQ(histogram_data.counts_[7], 1); EXPECT_NO_THROW(aggr.Aggregate(13.0, {})); // lies in fourth bucket EXPECT_NO_THROW(aggr.Aggregate(252.0, {})); // lies in ninth bucket - histogram_data = nostd::get(aggr.Collect()); + histogram_data = nostd::get(aggr.ToPoint()); EXPECT_EQ(histogram_data.count_, 4); EXPECT_EQ(histogram_data.counts_[3], 2); EXPECT_EQ(histogram_data.counts_[8], 1); diff --git a/sdk/test/metrics/async_instruments_test.cc b/sdk/test/metrics/async_instruments_test.cc index ad3a81b031..ff9504f78c 100644 --- a/sdk/test/metrics/async_instruments_test.cc +++ b/sdk/test/metrics/async_instruments_test.cc @@ -3,16 +3,12 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/async_instruments.h" -# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" # include using namespace opentelemetry; -using namespace opentelemetry::sdk::instrumentationlibrary; using namespace opentelemetry::sdk::metrics; -auto instrumentation_library = InstrumentationLibrary::Create("opentelemetry-cpp", "0.1.0"); - using M = std::map; void asyc_generate_measurements_long(opentelemetry::metrics::ObserverResult &observer) {} @@ -22,44 +18,43 @@ void asyc_generate_measurements_double(opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(LongObservableCounter counter("long_counter", instrumentation_library.get(), - asyc_generate_meas_long, "description", "1")); + EXPECT_NO_THROW( + LongObservableCounter counter("long_counter", asyc_generate_meas_long, "description", "1")); } TEST(AsyncInstruments, DoubleObservableCounter) { auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(DoubleObservableCounter counter("long_counter", instrumentation_library.get(), - asyc_generate_meas_double, "description", "1")); + EXPECT_NO_THROW(DoubleObservableCounter counter("long_counter", asyc_generate_meas_double, + "description", "1")); } TEST(AsyncInstruments, LongObservableGauge) { auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(LongObservableGauge counter("long_counter", instrumentation_library.get(), - asyc_generate_meas_long, "description", "1")); + EXPECT_NO_THROW( + LongObservableGauge counter("long_counter", asyc_generate_meas_long, "description", "1")); } TEST(AsyncInstruments, DoubleObservableGauge) { auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(DoubleObservableGauge counter("long_counter", instrumentation_library.get(), - asyc_generate_meas_double, "description", "1")); + EXPECT_NO_THROW( + DoubleObservableGauge counter("long_counter", asyc_generate_meas_double, "description", "1")); } TEST(AsyncInstruments, LongObservableUpDownCounter) { auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(LongObservableUpDownCounter counter("long_counter", instrumentation_library.get(), - asyc_generate_meas_long, "description", "1")); + EXPECT_NO_THROW(LongObservableUpDownCounter counter("long_counter", asyc_generate_meas_long, + "description", "1")); } TEST(AsyncInstruments, DoubleObservableUpDownCounter) { auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW( - DoubleObservableUpDownCounter counter("long_counter", instrumentation_library.get(), - asyc_generate_meas_double, "description", "1")); + EXPECT_NO_THROW(DoubleObservableUpDownCounter counter("long_counter", asyc_generate_meas_double, + "description", "1")); } #endif diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 0527c7f0ca..512e24472c 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -38,10 +38,14 @@ void measurement_fetch(opentelemetry::metrics::ObserverResult &observer_re TEST(AsyncMetricStorageTest, BasicTests) { - auto metric_callback = [](MetricData &metric_data) { return true; }; + auto metric_callback = [](MetricData &&metric_data) { return true; }; InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; + auto sdk_start_ts = std::chrono::system_clock::now(); + // Some computation here + auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5); + std::vector> exporters; std::shared_ptr meter_context(new MeterContext(std::move(exporters))); std::unique_ptr metric_reader(new MockMetricReader(AggregationTemporality::kDelta)); @@ -53,7 +57,7 @@ TEST(AsyncMetricStorageTest, BasicTests) opentelemetry::sdk::metrics::AsyncMetricStorage storage( instr_desc, AggregationType::kSum, &measurement_fetch, new DefaultAttributesProcessor()); - storage.Collect(collector.get(), collectors, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), metric_callback); + EXPECT_NO_THROW( + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, metric_callback)); } #endif \ No newline at end of file diff --git a/sdk/test/metrics/meter_provider_sdk_test.cc b/sdk/test/metrics/meter_provider_sdk_test.cc index 015d3023ae..3c059cc44c 100644 --- a/sdk/test/metrics/meter_provider_sdk_test.cc +++ b/sdk/test/metrics/meter_provider_sdk_test.cc @@ -18,8 +18,7 @@ class MockMetricExporter : public MetricExporter public: MockMetricExporter() = default; - opentelemetry::sdk::common::ExportResult Export( - const opentelemetry::nostd::span> &records) noexcept override + opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &records) noexcept override { return opentelemetry::sdk::common::ExportResult::kSuccess; } @@ -40,9 +39,7 @@ class MockMetricReader : public MetricReader { public: virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; } - virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; } - virtual void OnInitialized() noexcept override {} }; diff --git a/sdk/test/metrics/metric_reader_test.cc b/sdk/test/metrics/metric_reader_test.cc index 214f522118..61a4364934 100644 --- a/sdk/test/metrics/metric_reader_test.cc +++ b/sdk/test/metrics/metric_reader_test.cc @@ -17,9 +17,7 @@ class MockMetricReader : public MetricReader MockMetricReader(AggregationTemporality aggr_temporality) : MetricReader(aggr_temporality) {} virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; } - virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; } - virtual void OnInitialized() noexcept override {} }; @@ -37,6 +35,6 @@ TEST(MetricReaderTest, BasicTests) std::shared_ptr meter_context2(new MeterContext(std::move(exporters))); MetricProducer *metric_producer = new MetricCollector(std::move(meter_context2), std::move(metric_reader2)); - EXPECT_NO_THROW(metric_producer->Collect([](MetricData data) { return true; })); + EXPECT_NO_THROW(metric_producer->Collect([](ResourceMetrics &metric_data) { return true; })); } #endif diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc new file mode 100644 index 0000000000..5219f31100 --- /dev/null +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" +# include "opentelemetry/sdk/metrics/export/metric_producer.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" + +# include + +using namespace opentelemetry; +using namespace opentelemetry::sdk::instrumentationlibrary; +using namespace opentelemetry::sdk::metrics; + +class MockPushMetricExporter : public MetricExporter +{ +public: + opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &record) noexcept override + { + records_.push_back(record); + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + + bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override + { + return false; + } + + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override + { + return true; + } + + size_t GetDataCount() { return records_.size(); } + +private: + std::vector records_; +}; + +class MockMetricProducer : public MetricProducer +{ +public: + MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero()) + : sleep_ms_{sleep_ms}, data_sent_size_(0) + {} + + bool Collect(nostd::function_ref callback) noexcept override + { + std::this_thread::sleep_for(sleep_ms_); + data_sent_size_++; + ResourceMetrics data; + callback(data); + return true; + } + + size_t GetDataCount() { return data_sent_size_; } + +private: + std::chrono::microseconds sleep_ms_; + size_t data_sent_size_; +}; + +TEST(PeriodicExporingMetricReader, BasicTests) +{ + std::unique_ptr exporter(new MockPushMetricExporter()); + PeriodicExportingMetricReaderOptions options; + options.export_timeout_millis = std::chrono::milliseconds(200); + options.export_interval_millis = std::chrono::milliseconds(500); + auto exporter_ptr = exporter.get(); + PeriodicExportingMetricReader reader(std::move(exporter), options); + MockMetricProducer producer; + reader.SetMetricProducer(&producer); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + reader.Shutdown(); + EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), + static_cast(&producer)->GetDataCount()); +} + +#endif \ No newline at end of file diff --git a/sdk/test/metrics/sync_metric_storage_test.cc b/sdk/test/metrics/sync_metric_storage_test.cc index c4bd154460..b0440fdbb3 100644 --- a/sdk/test/metrics/sync_metric_storage_test.cc +++ b/sdk/test/metrics/sync_metric_storage_test.cc @@ -12,23 +12,235 @@ # include using namespace opentelemetry::sdk::metrics; +using namespace opentelemetry::common; using M = std::map; -TEST(WritableMetricStorageTest, BasicTests) +class MockCollectorHandle : public CollectorHandle { - InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, +public: + MockCollectorHandle(AggregationTemporality temp) : temporality(temp) {} + + AggregationTemporality GetAggregationTemporality() noexcept override { return temporality; } + +private: + AggregationTemporality temporality; +}; + +class WritableMetricStorageTestFixture : public ::testing::TestWithParam +{}; + +TEST_P(WritableMetricStorageTestFixture, LongSumAggregation) +{ + AggregationTemporality temporality = GetParam(); + auto sdk_start_ts = std::chrono::system_clock::now(); + long expected_total_get_requests = 0; + long expected_total_put_requests = 0; + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; + std::map attributes_get = {{"RequestType", "GET"}}; + std::map attributes_put = {{"RequestType", "PUT"}}; opentelemetry::sdk::metrics::SyncMetricStorage storage( instr_desc, AggregationType::kSum, new DefaultAttributesProcessor(), NoExemplarReservoir::GetNoExemplarReservoir()); - EXPECT_NO_THROW(storage.RecordLong(10l, opentelemetry::context::Context{})); - EXPECT_NO_THROW(storage.RecordDouble(10.10, opentelemetry::context::Context{})); + + storage.RecordLong(10l, KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_get_requests += 10; + + EXPECT_NO_THROW(storage.RecordLong( + 30l, KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{})); + expected_total_put_requests += 30; + + storage.RecordLong(20l, KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_get_requests += 20; + + EXPECT_NO_THROW(storage.RecordLong( + 40l, KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{})); + expected_total_put_requests += 40; + + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + + // Some computation here + auto collection_ts = std::chrono::system_clock::now(); + size_t count_attributes = 0; + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_get_requests); + count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_put_requests); + count_attributes++; + } + } + return true; + }); + + // In case of delta temporarily, subsequent collection would contain new data points, so resetting + // the counts + if (temporality == AggregationTemporality::kDelta) + { + expected_total_get_requests = 0; + expected_total_put_requests = 0; + } + EXPECT_NO_THROW(storage.RecordLong( - 10l, opentelemetry::common::KeyValueIterableView({{"abc", "123"}, {"xyz", "456"}}), + 50l, KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{})); + expected_total_get_requests += 50; + EXPECT_NO_THROW(storage.RecordLong( + 40l, KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{})); + expected_total_put_requests += 40; + + collection_ts = std::chrono::system_clock::now(); + count_attributes = 0; + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_get_requests); + count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_put_requests); + count_attributes++; + } + } + return true; + }); +} +INSTANTIATE_TEST_CASE_P(WritableMetricStorageTestLong, + WritableMetricStorageTestFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); + +TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation) +{ + AggregationTemporality temporality = GetParam(); + auto sdk_start_ts = std::chrono::system_clock::now(); + double expected_total_get_requests = 0; + double expected_total_put_requests = 0; + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, + InstrumentValueType::kDouble}; + std::map attributes_get = {{"RequestType", "GET"}}; + std::map attributes_put = {{"RequestType", "PUT"}}; + + opentelemetry::sdk::metrics::SyncMetricStorage storage( + instr_desc, AggregationType::kSum, new DefaultAttributesProcessor(), + NoExemplarReservoir::GetNoExemplarReservoir()); + + storage.RecordDouble(10.0, + KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_get_requests += 10; + + EXPECT_NO_THROW(storage.RecordDouble( + 30.0, KeyValueIterableView>(attributes_put), opentelemetry::context::Context{})); + expected_total_put_requests += 30; - EXPECT_NO_THROW(storage.RecordDouble(10.10, opentelemetry::common::KeyValueIterableView({}), - opentelemetry::context::Context{})); + storage.RecordDouble(20.0, + KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{}); + expected_total_get_requests += 20; + + EXPECT_NO_THROW(storage.RecordDouble( + 40.0, KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{})); + expected_total_put_requests += 40; + + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + + // Some computation here + auto collection_ts = std::chrono::system_clock::now(); + size_t count_attributes = 0; + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_get_requests); + count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_put_requests); + count_attributes++; + } + } + return true; + }); + + // In case of delta temporarily, subsequent collection would contain new data points, so resetting + // the counts + if (temporality == AggregationTemporality::kDelta) + { + expected_total_get_requests = 0; + expected_total_put_requests = 0; + } + + EXPECT_NO_THROW(storage.RecordDouble( + 50.0, KeyValueIterableView>(attributes_get), + opentelemetry::context::Context{})); + expected_total_get_requests += 50; + EXPECT_NO_THROW(storage.RecordDouble( + 40.0, KeyValueIterableView>(attributes_put), + opentelemetry::context::Context{})); + expected_total_put_requests += 40; + + collection_ts = std::chrono::system_clock::now(); + count_attributes = 0; + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_get_requests); + count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_put_requests); + count_attributes++; + } + } + return true; + }); } +INSTANTIATE_TEST_CASE_P(WritableMetricStorageTestDouble, + WritableMetricStorageTestFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); + #endif