Skip to content

Commit

Permalink
Reset start_time_unix_nano in _ViewInstrumentMatch for aggregatio…
Browse files Browse the repository at this point in the history
…n creation (#4137)

* test to fail

* Use current time when creating an aggregation in _ViewInstrumentMatch

* add changelog and improve test

* fix changelog

* fix test

Signed-off-by: emdneto <[email protected]>

---------

Signed-off-by: emdneto <[email protected]>
Co-authored-by: Diego Hurtado <[email protected]>
  • Loading branch information
emdneto and ocelotl authored Aug 21, 2024
1 parent b380e53 commit 909708c
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 15 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Implementation of Events API
([#4054](https:/open-telemetry/opentelemetry-python/pull/4054))
- Make log sdk add `exception.message` to logRecord for exceptions whose argument
- Make log sdk add `exception.message` to logRecord for exceptions whose argument
is an exception not a string message
([#4122](https:/open-telemetry/opentelemetry-python/pull/4122))
- Fix use of `link.attributes.dropped`, which may not exist
Expand All @@ -31,6 +31,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4134](https:/open-telemetry/opentelemetry-python/pull/4134))
- Implement Client Key and Certificate File Support for All OTLP Exporters
([#4116](https:/open-telemetry/opentelemetry-python/pull/4116))
- Remove `_start_time_unix_nano` attribute from `_ViewInstrumentMatch` in favor
of using `time_ns()` at the moment when the aggregation object is created
([#4137](https:/open-telemetry/opentelemetry-python/pull/4137))

## Version 1.26.0/0.47b0 (2024-07-25)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def __init__(
instrument: Instrument,
instrument_class_aggregation: Dict[type, Aggregation],
):
self._start_time_unix_nano = time_ns()
self._view = view
self._instrument = instrument
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
Expand Down Expand Up @@ -107,7 +106,7 @@ def consume_measurement(self, measurement: Measurement) -> None:
self._view._aggregation._create_aggregation(
self._instrument,
attributes,
self._start_time_unix_nano,
time_ns(),
)
)
else:
Expand All @@ -116,7 +115,7 @@ def consume_measurement(self, measurement: Measurement) -> None:
]._create_aggregation(
self._instrument,
attributes,
self._start_time_unix_nano,
time_ns(),
)
self._attributes_aggregation[aggr_key] = aggregation

Expand Down
42 changes: 32 additions & 10 deletions opentelemetry-sdk/tests/metrics/integration_test/test_time_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@


class TestTimeAlign(TestCase):

# This delay is needed for these tests to pass when they are run in
# Windows.
delay = 0.001

def test_time_align_cumulative(self):
reader = InMemoryMetricReader()
meter_provider = MeterProvider(metric_readers=[reader])
Expand All @@ -36,9 +41,11 @@ def test_time_align_cumulative(self):
counter_1 = meter.create_counter("counter_1")

counter_0.add(10, {"label": "value1"})
sleep(self.delay)
counter_0.add(10, {"label": "value2"})
sleep(0.5)
sleep(self.delay)
counter_1.add(10, {"label": "value1"})
sleep(self.delay)
counter_1.add(10, {"label": "value2"})

metrics = reader.get_metrics_data()
Expand All @@ -55,12 +62,14 @@ def test_time_align_cumulative(self):
.metrics[1]
.data.data_points
)
self.assertEqual(len(data_points_0_0), 2)
self.assertEqual(len(data_points_0_1), 2)

self.assertEqual(
self.assertLess(
data_points_0_0[0].start_time_unix_nano,
data_points_0_0[1].start_time_unix_nano,
)
self.assertEqual(
self.assertLess(
data_points_0_1[0].start_time_unix_nano,
data_points_0_1[1].start_time_unix_nano,
)
Expand All @@ -83,9 +92,11 @@ def test_time_align_cumulative(self):
)

counter_0.add(10, {"label": "value1"})
sleep(self.delay)
counter_0.add(10, {"label": "value2"})
sleep(0.5)
sleep(self.delay)
counter_1.add(10, {"label": "value1"})
sleep(self.delay)
counter_1.add(10, {"label": "value2"})

metrics = reader.get_metrics_data()
Expand All @@ -103,11 +114,14 @@ def test_time_align_cumulative(self):
.data.data_points
)

self.assertEqual(
self.assertEqual(len(data_points_1_0), 2)
self.assertEqual(len(data_points_1_1), 2)

self.assertLess(
data_points_1_0[0].start_time_unix_nano,
data_points_1_0[1].start_time_unix_nano,
)
self.assertEqual(
self.assertLess(
data_points_1_1[0].start_time_unix_nano,
data_points_1_1[1].start_time_unix_nano,
)
Expand Down Expand Up @@ -161,9 +175,11 @@ def test_time_align_delta(self):
counter_1 = meter.create_counter("counter_1")

counter_0.add(10, {"label": "value1"})
sleep(self.delay)
counter_0.add(10, {"label": "value2"})
sleep(0.5)
sleep(self.delay)
counter_1.add(10, {"label": "value1"})
sleep(self.delay)
counter_1.add(10, {"label": "value2"})

metrics = reader.get_metrics_data()
Expand All @@ -180,12 +196,14 @@ def test_time_align_delta(self):
.metrics[1]
.data.data_points
)
self.assertEqual(len(data_points_0_0), 2)
self.assertEqual(len(data_points_0_1), 2)

self.assertEqual(
self.assertLess(
data_points_0_0[0].start_time_unix_nano,
data_points_0_0[1].start_time_unix_nano,
)
self.assertEqual(
self.assertLess(
data_points_0_1[0].start_time_unix_nano,
data_points_0_1[1].start_time_unix_nano,
)
Expand All @@ -208,9 +226,11 @@ def test_time_align_delta(self):
)

counter_0.add(10, {"label": "value1"})
sleep(self.delay)
counter_0.add(10, {"label": "value2"})
sleep(0.5)
sleep(self.delay)
counter_1.add(10, {"label": "value1"})
sleep(self.delay)
counter_1.add(10, {"label": "value2"})

metrics = reader.get_metrics_data()
Expand All @@ -227,6 +247,8 @@ def test_time_align_delta(self):
.metrics[1]
.data.data_points
)
self.assertEqual(len(data_points_1_0), 2)
self.assertEqual(len(data_points_1_1), 2)

self.assertEqual(
data_points_1_0[0].start_time_unix_nano,
Expand Down
86 changes: 85 additions & 1 deletion opentelemetry-sdk/tests/metrics/test_view_instrument_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

# pylint: disable=protected-access

from time import time_ns
from unittest import TestCase
from unittest.mock import MagicMock, Mock
from unittest.mock import MagicMock, Mock, patch

from opentelemetry.sdk.metrics._internal._view_instrument_match import (
_ViewInstrumentMatch,
Expand Down Expand Up @@ -214,6 +215,89 @@ def test_collect(self):
self.assertEqual(number_data_point.attributes, {"c": "d"})
self.assertEqual(number_data_point.value, 0)

@patch(
"opentelemetry.sdk.metrics._internal._view_instrument_match.time_ns",
side_effect=[0, 1, 2],
)
def test_collect_resets_start_time_unix_nano(self, mock_time_ns):
instrument = Mock(name="instrument")
instrument.instrumentation_scope = self.mock_instrumentation_scope
view_instrument_match = _ViewInstrumentMatch(
view=View(
instrument_name="instrument",
name="name",
aggregation=self.mock_aggregation_factory,
),
instrument=instrument,
instrument_class_aggregation=MagicMock(
**{"__getitem__.return_value": DefaultAggregation()}
),
)
start_time_unix_nano = 0
self.assertEqual(mock_time_ns.call_count, 0)

# +1 call to _create_aggregation
view_instrument_match.consume_measurement(
Measurement(
value=0, instrument=instrument, attributes={"foo": "bar0"}
)
)
view_instrument_match._view._aggregation._create_aggregation.assert_called_with(
instrument, {"foo": "bar0"}, start_time_unix_nano
)
collection_start_time_unix_nano = time_ns()
collected_data_points = view_instrument_match.collect(
AggregationTemporality.CUMULATIVE, collection_start_time_unix_nano
)
self.assertIsNotNone(collected_data_points)
self.assertEqual(len(collected_data_points), 1)

# +1 call to _create_aggregation
view_instrument_match.consume_measurement(
Measurement(
value=0, instrument=instrument, attributes={"foo": "bar1"}
)
)
view_instrument_match._view._aggregation._create_aggregation.assert_called_with(
instrument, {"foo": "bar1"}, 1
)
collection_start_time_unix_nano = time_ns()
collected_data_points = view_instrument_match.collect(
AggregationTemporality.CUMULATIVE, collection_start_time_unix_nano
)
self.assertIsNotNone(collected_data_points)
self.assertEqual(len(collected_data_points), 2)
collected_data_points = view_instrument_match.collect(
AggregationTemporality.CUMULATIVE, collection_start_time_unix_nano
)
# +1 call to create_aggregation
view_instrument_match.consume_measurement(
Measurement(
value=0, instrument=instrument, attributes={"foo": "bar"}
)
)
view_instrument_match._view._aggregation._create_aggregation.assert_called_with(
instrument, {"foo": "bar"}, 2
)
# No new calls to _create_aggregation because attributes remain the same
view_instrument_match.consume_measurement(
Measurement(
value=0, instrument=instrument, attributes={"foo": "bar"}
)
)
view_instrument_match.consume_measurement(
Measurement(
value=0, instrument=instrument, attributes={"foo": "bar"}
)
)
# In total we have 5 calls for _create_aggregation
# 1 from the _ViewInstrumentMatch initialization and 4
# from the consume_measurement calls with different attributes
self.assertEqual(
view_instrument_match._view._aggregation._create_aggregation.call_count,
5,
)

def test_data_point_check(self):
instrument1 = _Counter(
"instrument1",
Expand Down

0 comments on commit 909708c

Please sign in to comment.