Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement observer instrument #425

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions examples/metrics/observer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This example shows how the Observer metric instrument can be used to capture
asynchronous metrics data.
"""
import psutil

from opentelemetry import metrics
from opentelemetry.sdk.metrics import LabelSet, Meter
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.metrics.export.controller import PushController

# Configure a stateful batcher
batcher = UngroupedBatcher(stateful=True)

metrics.set_preferred_meter_implementation(lambda _: Meter(batcher))
meter = metrics.meter()

# Exporter to export metrics to the console
exporter = ConsoleMetricsExporter()

# Configure a push controller
controller = PushController(meter=meter, exporter=exporter, interval=2)


# Callback to gather cpu usage
def get_cpu_usage_callback(observer):
for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)):
label_set = meter.get_label_set({"cpu_number": str(number)})
observer.observe(percent, label_set)


meter.register_observer(
callback=get_cpu_usage_callback,
name="cpu_percent",
description="per-cpu usage",
unit="1",
value_type=float,
label_keys=("cpu_number",),
)


# Callback to gather RAM memory usage
def get_ram_usage_callback(observer):
ram_percent = psutil.virtual_memory().percent
observer.observe(ram_percent, LabelSet())


meter.register_observer(
callback=get_ram_usage_callback,
name="ram_percent",
description="RAM memory usage",
unit="1",
value_type=float,
label_keys=(),
)

input("Press a key to finish...\n")
16 changes: 13 additions & 3 deletions examples/metrics/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,25 @@
exporter = ConsoleMetricsExporter()
# controller collects metrics created from meter and exports it via the
# exporter every interval
controller = PushController(meter, exporter, 5)
controller = PushController(meter=meter, exporter=exporter, interval=5)

# Example to show how to record using the meter
counter = meter.create_metric(
"requests", "number of requests", 1, int, Counter, ("environment",)
name="requests",
description="number of requests",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

counter2 = meter.create_metric(
"clicks", "number of clicks", 1, int, Counter, ("environment",)
name="clicks",
description="number of clicks",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

# Labelsets are used to identify key-values that are associated with a specific
Expand Down
31 changes: 21 additions & 10 deletions examples/metrics/simple_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from opentelemetry.sdk.metrics.export.controller import PushController

batcher_mode = "stateful"
stateful_bacher = False


def usage(argv):
Expand All @@ -50,7 +51,8 @@ def usage(argv):
# lifetime.
# False indicates the batcher computes checkpoints which describe the updates
# of a single collection period (deltas)
batcher = UngroupedBatcher(batcher_mode == "stateful")
stateful_bacher = batcher_mode == "stateful"
batcher = UngroupedBatcher(stateful=stateful_bacher)

# If a batcher is not provided, a default batcher is used
# Meter is responsible for creating and recording metrics
Expand All @@ -66,15 +68,30 @@ def usage(argv):

# Metric instruments allow to capture measurements
requests_counter = meter.create_metric(
"requests", "number of requests", 1, int, Counter, ("environment",)
name="requests",
description="number of requests",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

clicks_counter = meter.create_metric(
"clicks", "number of clicks", 1, int, Counter, ("environment",)
name="clicks",
description="number of clicks",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

requests_size = meter.create_metric(
"requests_size", "size of requests", 1, int, Measure, ("environment",)
name="requests_size",
description="size of requests",
unit="1",
value_type=int,
metric_type=Measure,
label_keys=("environment",),
)

# Labelsets are used to identify key-values that are associated with a specific
Expand All @@ -86,21 +103,15 @@ def usage(argv):
# Update the metric instruments using the direct calling convention
requests_size.record(100, staging_label_set)
requests_counter.add(25, staging_label_set)
# Sleep for 5 seconds, exported value should be 25
time.sleep(5)

requests_size.record(5000, staging_label_set)
requests_counter.add(50, staging_label_set)
# Exported value should be 75
time.sleep(5)

requests_size.record(2, testing_label_set)
requests_counter.add(35, testing_label_set)
# There should be two exported values 75 and 35, one for each labelset
time.sleep(5)

clicks_counter.add(5, staging_label_set)
# There should be three exported values, labelsets can be reused for different
# metrics but will be recorded seperately, 75, 35 and 5

time.sleep(5)
126 changes: 75 additions & 51 deletions opentelemetry-api/src/opentelemetry/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ def add(self, value: ValueT) -> None:
value: The value to add to the handle.
"""

def set(self, value: ValueT) -> None:
"""No-op implementation of `GaugeHandle` set.

Args:
value: The value to set to the handle.
"""

def record(self, value: ValueT) -> None:
"""No-op implementation of `MeasureHandle` record.

Expand All @@ -71,15 +64,6 @@ def add(self, value: ValueT) -> None:
"""


class GaugeHandle:
def set(self, value: ValueT) -> None:
"""Sets the current value of the handle to ``value``.

Args:
value: The value to set to the handle.
"""


class MeasureHandle:
def record(self, value: ValueT) -> None:
"""Records the given ``value`` to this handle.
Expand Down Expand Up @@ -121,7 +105,7 @@ def get_handle(self, label_set: LabelSet) -> "object":

Handles are useful to reduce the cost of repeatedly recording a metric
with a pre-defined set of label values. All metric kinds (counter,
gauge, measure) support declaring a set of required label keys. The
measure) support declaring a set of required label keys. The
values corresponding to these keys should be specified in every handle.
"Unspecified" label values, in cases where a handle is requested but
a value was not provided are permitted.
Expand Down Expand Up @@ -150,14 +134,6 @@ def add(self, value: ValueT, label_set: LabelSet) -> None:
label_set: `LabelSet` to associate with the returned handle.
"""

def set(self, value: ValueT, label_set: LabelSet) -> None:
"""No-op implementation of `Gauge` set.

Args:
value: The value to set the gauge metric to.
label_set: `LabelSet` to associate with the returned handle.
"""

def record(self, value: ValueT, label_set: LabelSet) -> None:
"""No-op implementation of `Measure` record.

Expand All @@ -183,28 +159,6 @@ def add(self, value: ValueT, label_set: LabelSet) -> None:
"""


class Gauge(Metric):
"""A gauge type metric that expresses a pre-calculated value.

Gauge metrics have a value that is either ``Set`` by explicit
instrumentation or observed through a callback. This kind of metric
should be used when the metric cannot be expressed as a sum or because
the measurement interval is arbitrary.
"""

def get_handle(self, label_set: LabelSet) -> "GaugeHandle":
"""Gets a `GaugeHandle`."""
return GaugeHandle()

def set(self, value: ValueT, label_set: LabelSet) -> None:
"""Sets the value of the gauge to ``value``.

Args:
value: The value to set the gauge metric to.
label_set: `LabelSet` to associate with the returned handle.
"""


class Measure(Metric):
"""A measure type metric that represent raw stats that are recorded.

Expand All @@ -224,15 +178,46 @@ def record(self, value: ValueT, label_set: LabelSet) -> None:
"""


MetricT = TypeVar("MetricT", Counter, Gauge, Measure)
class Observer:
mauriciovasquezbernal marked this conversation as resolved.
Show resolved Hide resolved
"""An observer type metric instrument used to capture a current set of values.


Observer instruments are asynchronous, a callback is invoked with the
observer instrument as argument allowing the user to capture multiple
values per collection interval.
"""

def observe(self, value: ValueT, label_set: LabelSet) -> None:
"""Captures ``value`` to the observer.

Args:
value: The value to capture to this observer metric.
label_set: `LabelSet` associated to ``value``.
"""


class DefaultObserver(Observer):
"""No-op implementation of ``Observer``."""

def observe(self, value: ValueT, label_set: LabelSet) -> None:
"""Captures ``value`` to the observer.

Args:
value: The value to capture to this observer metric.
label_set: `LabelSet` associated to ``value``.
"""


MetricT = TypeVar("MetricT", Counter, Measure, Observer)
ObserverCallbackT = Callable[[Observer], None]


# pylint: disable=unused-argument
class Meter(abc.ABC):
"""An interface to allow the recording of metrics.

`Metric` s are used for recording pre-defined aggregation (gauge and
counter), or raw values (measure) in which the aggregation and labels
`Metric` s are used for recording pre-defined aggregation (counter),
or raw values (measure) in which the aggregation and labels
for the exported metric are deferred.
"""

Expand Down Expand Up @@ -272,14 +257,41 @@ def create_metric(
Args:
name: The name of the metric.
description: Human-readable description of the metric.
unit: Unit of the metric values.
unit: Unit of the metric values following the UCUM convention
(https://unitsofmeasure.org/ucum.html).
value_type: The type of values being recorded by the metric.
metric_type: The type of metric being created.
label_keys: The keys for the labels with dynamic values.
enabled: Whether to report the metric by default.
Returns: A new ``metric_type`` metric with values of ``value_type``.
"""

@abc.abstractmethod
def register_observer(
self,
callback: ObserverCallbackT,
name: str,
description: str,
unit: str,
value_type: Type[ValueT],
label_keys: Sequence[str] = (),
enabled: bool = True,
) -> "Observer":
"""Registers an ``Observer`` metric instrument.

Args:
callback: Callback invoked each collection interval with the
observer as argument.
name: The name of the metric.
description: Human-readable description of the metric.
unit: Unit of the metric values following the UCUM convention
(https://unitsofmeasure.org/ucum.html).
value_type: The type of values being recorded by the metric.
label_keys: The keys for the labels with dynamic values.
enabled: Whether to report the metric by default.
Returns: A new ``Observer`` metric instrument.
"""

@abc.abstractmethod
def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
"""Gets a `LabelSet` with the given labels.
Expand Down Expand Up @@ -314,6 +326,18 @@ def create_metric(
# pylint: disable=no-self-use
return DefaultMetric()

def register_observer(
self,
callback: ObserverCallbackT,
name: str,
description: str,
unit: str,
value_type: Type[ValueT],
label_keys: Sequence[str] = (),
enabled: bool = True,
) -> "Observer":
return DefaultObserver()

def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
# pylint: disable=no-self-use
return DefaultLabelSet()
Expand Down
Loading