diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientBaseTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientBaseTest.groovy index 0d77d5daaaa3..052396fdd489 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientBaseTest.groovy +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientBaseTest.groovy @@ -6,6 +6,8 @@ package io.opentelemetry.instrumentation.kafkaclients import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import org.testcontainers.utility.DockerImageName + import java.time.Duration import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.NewTopic @@ -46,7 +48,7 @@ abstract class KafkaClientBaseTest extends InstrumentationSpecification { static TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, 0) def setupSpec() { - kafka = new KafkaContainer() + kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) .withLogConsumer(new Slf4jLogConsumer(logger)) .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) .withStartupTimeout(Duration.ofMinutes(1)) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md new file mode 100644 index 000000000000..a7bf0e6cbde7 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md @@ -0,0 +1,225 @@ +# Kafka Metrics + +The Kafka client exposes metrics via `org.apache.kafka.common.metrics.MetricsReporter` interface. +OpenTelemetry provides an implementation that bridges the metrics into OpenTelemetry. + +To use, merge the config properties +from `KafkaTelemetry.create(OpenTelemetry).metricConfigProperties()` +with the configuration used when creating your producer or consumer. + +Note: Kafka reports several metrics at multiple attribute granularities. For +example, `records-consumed-total` is reported with attribute key `[client-id]` +and `[client-id, topic]`. If you analyze the sum of records consumed, ignoring dimensions, backends +are likely to double count. The implementation detects this scenario and only records the most +granular set of attributes available. In the case +of `records-consumed-total`, it reports `[client-id, topic]` and ignores `[client-id]`. + +The following table shows the full set of metrics exposed by the kafka client, and the corresponding +OpenTelemetry metric each maps to (if available). Empty values in the Instrument Name, Instrument +Description, etc column indicates there is no registered mapping for the metric and data is NOT +collected. + +| Metric Group | Metric Name | Attribute Keys | Instrument Name | Instrument Description | Instrument Type | +|--------------|-------------|----------------|-----------------|------------------------|-----------------| +| `app-info` | `commit-id` | `client-id` | | | | +| `app-info` | `start-time-ms` | `client-id` | | | | +| `app-info` | `version` | `client-id` | | | | +| `consumer-coordinator-metrics` | `assigned-partitions` | `client-id` | `kafka.consumer.assigned_partitions` | The number of partitions currently assigned to this consumer | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `commit-latency-avg` | `client-id` | `kafka.consumer.commit_latency_avg` | The average time taken for a commit request | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `commit-latency-max` | `client-id` | `kafka.consumer.commit_latency_max` | The max time taken for a commit request | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `commit-rate` | `client-id` | `kafka.consumer.commit_rate` | The number of commit calls per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `commit-total` | `client-id` | `kafka.consumer.commit_total` | The total number of commit calls | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-coordinator-metrics` | `failed-rebalance-rate-per-hour` | `client-id` | `kafka.consumer.failed_rebalance_rate_per_hour` | The number of failed rebalance events per hour | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `failed-rebalance-total` | `client-id` | `kafka.consumer.failed_rebalance_total` | The total number of failed rebalance events | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-coordinator-metrics` | `heartbeat-rate` | `client-id` | `kafka.consumer.heartbeat_rate` | The number of heartbeats per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `heartbeat-response-time-max` | `client-id` | `kafka.consumer.heartbeat_response_time_max` | The max time taken to receive a response to a heartbeat request | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `heartbeat-total` | `client-id` | `kafka.consumer.heartbeat_total` | The total number of heartbeats | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-coordinator-metrics` | `join-rate` | `client-id` | `kafka.consumer.join_rate` | The number of group joins per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `join-time-avg` | `client-id` | `kafka.consumer.join_time_avg` | The average time taken for a group rejoin | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `join-time-max` | `client-id` | `kafka.consumer.join_time_max` | The max time taken for a group rejoin | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `join-total` | `client-id` | `kafka.consumer.join_total` | The total number of group joins | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-coordinator-metrics` | `last-heartbeat-seconds-ago` | `client-id` | `kafka.consumer.last_heartbeat_seconds_ago` | The number of seconds since the last coordinator heartbeat was sent | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `last-rebalance-seconds-ago` | `client-id` | `kafka.consumer.last_rebalance_seconds_ago` | The number of seconds since the last successful rebalance event | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `partition-assigned-latency-avg` | `client-id` | `kafka.consumer.partition_assigned_latency_avg` | The average time taken for a partition-assigned rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `partition-assigned-latency-max` | `client-id` | `kafka.consumer.partition_assigned_latency_max` | The max time taken for a partition-assigned rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `partition-lost-latency-avg` | `client-id` | `kafka.consumer.partition_lost_latency_avg` | The average time taken for a partition-lost rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `partition-lost-latency-max` | `client-id` | `kafka.consumer.partition_lost_latency_max` | The max time taken for a partition-lost rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `partition-revoked-latency-avg` | `client-id` | `kafka.consumer.partition_revoked_latency_avg` | The average time taken for a partition-revoked rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `partition-revoked-latency-max` | `client-id` | `kafka.consumer.partition_revoked_latency_max` | The max time taken for a partition-revoked rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `rebalance-latency-avg` | `client-id` | `kafka.consumer.rebalance_latency_avg` | The average time taken for a group to complete a successful rebalance, which may be composed of several failed re-trials until it succeeded | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `rebalance-latency-max` | `client-id` | `kafka.consumer.rebalance_latency_max` | The max time taken for a group to complete a successful rebalance, which may be composed of several failed re-trials until it succeeded | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `rebalance-latency-total` | `client-id` | `kafka.consumer.rebalance_latency_total` | The total number of milliseconds this consumer has spent in successful rebalances since creation | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-coordinator-metrics` | `rebalance-rate-per-hour` | `client-id` | `kafka.consumer.rebalance_rate_per_hour` | The number of successful rebalance events per hour, each event is composed of several failed re-trials until it succeeded | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `rebalance-total` | `client-id` | `kafka.consumer.rebalance_total` | The total number of successful rebalance events, each event is composed of several failed re-trials until it succeeded | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-coordinator-metrics` | `sync-rate` | `client-id` | `kafka.consumer.sync_rate` | The number of group syncs per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `sync-time-avg` | `client-id` | `kafka.consumer.sync_time_avg` | The average time taken for a group sync | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `sync-time-max` | `client-id` | `kafka.consumer.sync_time_max` | The max time taken for a group sync | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-coordinator-metrics` | `sync-total` | `client-id` | `kafka.consumer.sync_total` | The total number of group syncs | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-fetch-manager-metrics` | `bytes-consumed-rate` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `bytes-consumed-rate` | `client-id`,`topic` | `kafka.consumer.bytes_consumed_rate` | The average number of bytes consumed per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `bytes-consumed-total` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `bytes-consumed-total` | `client-id`,`topic` | `kafka.consumer.bytes_consumed_total` | The total number of bytes consumed | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-fetch-manager-metrics` | `fetch-latency-avg` | `client-id` | `kafka.consumer.fetch_latency_avg` | The average time taken for a fetch request. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `fetch-latency-max` | `client-id` | `kafka.consumer.fetch_latency_max` | The max time taken for any fetch request. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `fetch-rate` | `client-id` | `kafka.consumer.fetch_rate` | The number of fetch requests per second. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `fetch-size-avg` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `fetch-size-avg` | `client-id`,`topic` | `kafka.consumer.fetch_size_avg` | The average number of bytes fetched per request | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `fetch-size-max` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `fetch-size-max` | `client-id`,`topic` | `kafka.consumer.fetch_size_max` | The maximum number of bytes fetched per request | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `fetch-throttle-time-avg` | `client-id` | `kafka.consumer.fetch_throttle_time_avg` | The average throttle time in ms | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `fetch-throttle-time-max` | `client-id` | `kafka.consumer.fetch_throttle_time_max` | The maximum throttle time in ms | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `fetch-total` | `client-id` | `kafka.consumer.fetch_total` | The total number of fetch requests. | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-fetch-manager-metrics` | `preferred-read-replica` | `client-id`,`topic`,`partition` | | | | +| `consumer-fetch-manager-metrics` | `records-consumed-rate` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `records-consumed-rate` | `client-id`,`topic` | `kafka.consumer.records_consumed_rate` | The average number of records consumed per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `records-consumed-total` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `records-consumed-total` | `client-id`,`topic` | `kafka.consumer.records_consumed_total` | The total number of records consumed | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-fetch-manager-metrics` | `records-lag` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lag` | The latest lag of the partition | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `records-lag-avg` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lag_avg` | The average lag of the partition | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `records-lag-max` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `records-lag-max` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lag_max` | The maximum lag in terms of number of records for any partition in this window | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `records-lead` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lead` | The latest lead of the partition | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `records-lead-avg` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lead_avg` | The average lead of the partition | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `records-lead-min` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `records-lead-min` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lead_min` | The minimum lead in terms of number of records for any partition in this window | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-fetch-manager-metrics` | `records-per-request-avg` | `client-id` | | | | +| `consumer-fetch-manager-metrics` | `records-per-request-avg` | `client-id`,`topic` | `kafka.consumer.records_per_request_avg` | The average number of records in each request | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `connection-close-rate` | `client-id` | `kafka.consumer.connection_close_rate` | The number of connections closed per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `connection-close-total` | `client-id` | `kafka.consumer.connection_close_total` | The total number of connections closed | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `connection-count` | `client-id` | `kafka.consumer.connection_count` | The current number of active connections. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `connection-creation-rate` | `client-id` | `kafka.consumer.connection_creation_rate` | The number of new connections established per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `connection-creation-total` | `client-id` | `kafka.consumer.connection_creation_total` | The total number of new connections established | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `failed-authentication-rate` | `client-id` | `kafka.consumer.failed_authentication_rate` | The number of connections with failed authentication per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `failed-authentication-total` | `client-id` | `kafka.consumer.failed_authentication_total` | The total number of connections with failed authentication | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `failed-reauthentication-rate` | `client-id` | `kafka.consumer.failed_reauthentication_rate` | The number of failed re-authentication of connections per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `failed-reauthentication-total` | `client-id` | `kafka.consumer.failed_reauthentication_total` | The total number of failed re-authentication of connections | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `incoming-byte-rate` | `client-id` | | | | +| `consumer-metrics` | `incoming-byte-total` | `client-id` | | | | +| `consumer-metrics` | `io-ratio` | `client-id` | `kafka.consumer.io_ratio` | The fraction of time the I/O thread spent doing I/O | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `io-time-ns-avg` | `client-id` | `kafka.consumer.io_time_ns_avg` | The average length of time for I/O per select call in nanoseconds. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `io-wait-ratio` | `client-id` | `kafka.consumer.io_wait_ratio` | The fraction of time the I/O thread spent waiting | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `io-wait-time-ns-avg` | `client-id` | `kafka.consumer.io_wait_time_ns_avg` | The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `io-waittime-total` | `client-id` | `kafka.consumer.io_waittime_total` | The total time the I/O thread spent waiting | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `iotime-total` | `client-id` | `kafka.consumer.iotime_total` | The total time the I/O thread spent doing I/O | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `last-poll-seconds-ago` | `client-id` | `kafka.consumer.last_poll_seconds_ago` | The number of seconds since the last poll() invocation. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `network-io-rate` | `client-id` | `kafka.consumer.network_io_rate` | The number of network operations (reads or writes) on all connections per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `network-io-total` | `client-id` | `kafka.consumer.network_io_total` | The total number of network operations (reads or writes) on all connections | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `outgoing-byte-rate` | `client-id` | | | | +| `consumer-metrics` | `outgoing-byte-total` | `client-id` | | | | +| `consumer-metrics` | `poll-idle-ratio-avg` | `client-id` | `kafka.consumer.poll_idle_ratio_avg` | The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `reauthentication-latency-avg` | `client-id` | `kafka.consumer.reauthentication_latency_avg` | The average latency observed due to re-authentication | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `reauthentication-latency-max` | `client-id` | `kafka.consumer.reauthentication_latency_max` | The max latency observed due to re-authentication | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `request-rate` | `client-id` | | | | +| `consumer-metrics` | `request-size-avg` | `client-id` | | | | +| `consumer-metrics` | `request-size-max` | `client-id` | | | | +| `consumer-metrics` | `request-total` | `client-id` | | | | +| `consumer-metrics` | `response-rate` | `client-id` | | | | +| `consumer-metrics` | `response-total` | `client-id` | | | | +| `consumer-metrics` | `select-rate` | `client-id` | `kafka.consumer.select_rate` | The number of times the I/O layer checked for new I/O to perform per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `select-total` | `client-id` | `kafka.consumer.select_total` | The total number of times the I/O layer checked for new I/O to perform | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `successful-authentication-no-reauth-total` | `client-id` | `kafka.consumer.successful_authentication_no_reauth_total` | The total number of connections with successful authentication where the client does not support re-authentication | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `successful-authentication-rate` | `client-id` | `kafka.consumer.successful_authentication_rate` | The number of connections with successful authentication per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `successful-authentication-total` | `client-id` | `kafka.consumer.successful_authentication_total` | The total number of connections with successful authentication | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `successful-reauthentication-rate` | `client-id` | `kafka.consumer.successful_reauthentication_rate` | The number of successful re-authentication of connections per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `successful-reauthentication-total` | `client-id` | `kafka.consumer.successful_reauthentication_total` | The total number of successful re-authentication of connections | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-metrics` | `time-between-poll-avg` | `client-id` | `kafka.consumer.time_between_poll_avg` | The average delay between invocations of poll(). | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-metrics` | `time-between-poll-max` | `client-id` | `kafka.consumer.time_between_poll_max` | The max delay between invocations of poll(). | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `incoming-byte-rate` | `client-id`,`node-id` | `kafka.consumer.incoming_byte_rate` | The number of bytes read off all sockets per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `incoming-byte-total` | `client-id`,`node-id` | `kafka.consumer.incoming_byte_total` | The total number of bytes read off all sockets | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-node-metrics` | `outgoing-byte-rate` | `client-id`,`node-id` | `kafka.consumer.outgoing_byte_rate` | The number of outgoing bytes sent to all servers per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `outgoing-byte-total` | `client-id`,`node-id` | `kafka.consumer.outgoing_byte_total` | The total number of outgoing bytes sent to all servers | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-node-metrics` | `request-latency-avg` | `client-id`,`node-id` | `kafka.consumer.request_latency_avg` | | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `request-latency-max` | `client-id`,`node-id` | `kafka.consumer.request_latency_max` | | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `request-rate` | `client-id`,`node-id` | `kafka.consumer.request_rate` | The number of requests sent per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `request-size-avg` | `client-id`,`node-id` | `kafka.consumer.request_size_avg` | The average size of requests sent. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `request-size-max` | `client-id`,`node-id` | `kafka.consumer.request_size_max` | The maximum size of any request sent. | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `request-total` | `client-id`,`node-id` | `kafka.consumer.request_total` | The total number of requests sent | `DOUBLE_OBSERVABLE_COUNTER` | +| `consumer-node-metrics` | `response-rate` | `client-id`,`node-id` | `kafka.consumer.response_rate` | The number of responses received per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `consumer-node-metrics` | `response-total` | `client-id`,`node-id` | `kafka.consumer.response_total` | The total number of responses received | `DOUBLE_OBSERVABLE_COUNTER` | +| `kafka-metrics-count` | `count` | `client-id` | | | | +| `producer-metrics` | `batch-size-avg` | `client-id` | `kafka.producer.batch_size_avg` | The average number of bytes sent per partition per-request. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `batch-size-max` | `client-id` | `kafka.producer.batch_size_max` | The max number of bytes sent per partition per-request. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `batch-split-rate` | `client-id` | `kafka.producer.batch_split_rate` | The average number of batch splits per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `batch-split-total` | `client-id` | `kafka.producer.batch_split_total` | The total number of batch splits | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `buffer-available-bytes` | `client-id` | `kafka.producer.buffer_available_bytes` | The total amount of buffer memory that is not being used (either unallocated or in the free list). | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `buffer-exhausted-rate` | `client-id` | `kafka.producer.buffer_exhausted_rate` | The average per-second number of record sends that are dropped due to buffer exhaustion | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `buffer-exhausted-total` | `client-id` | `kafka.producer.buffer_exhausted_total` | The total number of record sends that are dropped due to buffer exhaustion | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `buffer-total-bytes` | `client-id` | `kafka.producer.buffer_total_bytes` | The maximum amount of buffer memory the client can use (whether or not it is currently used). | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `bufferpool-wait-ratio` | `client-id` | `kafka.producer.bufferpool_wait_ratio` | The fraction of time an appender waits for space allocation. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `bufferpool-wait-time-total` | `client-id` | `kafka.producer.bufferpool_wait_time_total` | The total time an appender waits for space allocation. | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `compression-rate-avg` | `client-id` | `kafka.producer.compression_rate_avg` | The average compression rate of record batches. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `connection-close-rate` | `client-id` | `kafka.producer.connection_close_rate` | The number of connections closed per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `connection-close-total` | `client-id` | `kafka.producer.connection_close_total` | The total number of connections closed | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `connection-count` | `client-id` | `kafka.producer.connection_count` | The current number of active connections. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `connection-creation-rate` | `client-id` | `kafka.producer.connection_creation_rate` | The number of new connections established per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `connection-creation-total` | `client-id` | `kafka.producer.connection_creation_total` | The total number of new connections established | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `failed-authentication-rate` | `client-id` | `kafka.producer.failed_authentication_rate` | The number of connections with failed authentication per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `failed-authentication-total` | `client-id` | `kafka.producer.failed_authentication_total` | The total number of connections with failed authentication | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `failed-reauthentication-rate` | `client-id` | `kafka.producer.failed_reauthentication_rate` | The number of failed re-authentication of connections per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `failed-reauthentication-total` | `client-id` | `kafka.producer.failed_reauthentication_total` | The total number of failed re-authentication of connections | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `incoming-byte-rate` | `client-id` | | | | +| `producer-metrics` | `incoming-byte-total` | `client-id` | | | | +| `producer-metrics` | `io-ratio` | `client-id` | `kafka.producer.io_ratio` | The fraction of time the I/O thread spent doing I/O | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `io-time-ns-avg` | `client-id` | `kafka.producer.io_time_ns_avg` | The average length of time for I/O per select call in nanoseconds. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `io-wait-ratio` | `client-id` | `kafka.producer.io_wait_ratio` | The fraction of time the I/O thread spent waiting | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `io-wait-time-ns-avg` | `client-id` | `kafka.producer.io_wait_time_ns_avg` | The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `io-waittime-total` | `client-id` | `kafka.producer.io_waittime_total` | The total time the I/O thread spent waiting | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `iotime-total` | `client-id` | `kafka.producer.iotime_total` | The total time the I/O thread spent doing I/O | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `metadata-age` | `client-id` | `kafka.producer.metadata_age` | The age in seconds of the current producer metadata being used. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `network-io-rate` | `client-id` | `kafka.producer.network_io_rate` | The number of network operations (reads or writes) on all connections per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `network-io-total` | `client-id` | `kafka.producer.network_io_total` | The total number of network operations (reads or writes) on all connections | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `outgoing-byte-rate` | `client-id` | | | | +| `producer-metrics` | `outgoing-byte-total` | `client-id` | | | | +| `producer-metrics` | `produce-throttle-time-avg` | `client-id` | `kafka.producer.produce_throttle_time_avg` | The average time in ms a request was throttled by a broker | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `produce-throttle-time-max` | `client-id` | `kafka.producer.produce_throttle_time_max` | The maximum time in ms a request was throttled by a broker | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `reauthentication-latency-avg` | `client-id` | `kafka.producer.reauthentication_latency_avg` | The average latency observed due to re-authentication | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `reauthentication-latency-max` | `client-id` | `kafka.producer.reauthentication_latency_max` | The max latency observed due to re-authentication | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `record-error-rate` | `client-id` | | | | +| `producer-metrics` | `record-error-total` | `client-id` | | | | +| `producer-metrics` | `record-queue-time-avg` | `client-id` | `kafka.producer.record_queue_time_avg` | The average time in ms record batches spent in the send buffer. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `record-queue-time-max` | `client-id` | `kafka.producer.record_queue_time_max` | The maximum time in ms record batches spent in the send buffer. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `record-retry-rate` | `client-id` | | | | +| `producer-metrics` | `record-retry-total` | `client-id` | | | | +| `producer-metrics` | `record-send-rate` | `client-id` | | | | +| `producer-metrics` | `record-send-total` | `client-id` | | | | +| `producer-metrics` | `record-size-avg` | `client-id` | `kafka.producer.record_size_avg` | The average record size | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `record-size-max` | `client-id` | `kafka.producer.record_size_max` | The maximum record size | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `records-per-request-avg` | `client-id` | `kafka.producer.records_per_request_avg` | The average number of records per request. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `request-latency-avg` | `client-id` | | | | +| `producer-metrics` | `request-latency-max` | `client-id` | | | | +| `producer-metrics` | `request-rate` | `client-id` | | | | +| `producer-metrics` | `request-size-avg` | `client-id` | | | | +| `producer-metrics` | `request-size-max` | `client-id` | | | | +| `producer-metrics` | `request-total` | `client-id` | | | | +| `producer-metrics` | `requests-in-flight` | `client-id` | `kafka.producer.requests_in_flight` | The current number of in-flight requests awaiting a response. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `response-rate` | `client-id` | | | | +| `producer-metrics` | `response-total` | `client-id` | | | | +| `producer-metrics` | `select-rate` | `client-id` | `kafka.producer.select_rate` | The number of times the I/O layer checked for new I/O to perform per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `select-total` | `client-id` | `kafka.producer.select_total` | The total number of times the I/O layer checked for new I/O to perform | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `successful-authentication-no-reauth-total` | `client-id` | `kafka.producer.successful_authentication_no_reauth_total` | The total number of connections with successful authentication where the client does not support re-authentication | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `successful-authentication-rate` | `client-id` | `kafka.producer.successful_authentication_rate` | The number of connections with successful authentication per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `successful-authentication-total` | `client-id` | `kafka.producer.successful_authentication_total` | The total number of connections with successful authentication | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `successful-reauthentication-rate` | `client-id` | `kafka.producer.successful_reauthentication_rate` | The number of successful re-authentication of connections per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-metrics` | `successful-reauthentication-total` | `client-id` | `kafka.producer.successful_reauthentication_total` | The total number of successful re-authentication of connections | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-metrics` | `waiting-threads` | `client-id` | `kafka.producer.waiting_threads` | The number of user threads blocked waiting for buffer memory to enqueue their records | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `incoming-byte-rate` | `client-id`,`node-id` | `kafka.producer.incoming_byte_rate` | The number of bytes read off all sockets per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `incoming-byte-total` | `client-id`,`node-id` | `kafka.producer.incoming_byte_total` | The total number of bytes read off all sockets | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-node-metrics` | `outgoing-byte-rate` | `client-id`,`node-id` | `kafka.producer.outgoing_byte_rate` | The number of outgoing bytes sent to all servers per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `outgoing-byte-total` | `client-id`,`node-id` | `kafka.producer.outgoing_byte_total` | The total number of outgoing bytes sent to all servers | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-node-metrics` | `request-latency-avg` | `client-id`,`node-id` | `kafka.producer.request_latency_avg` | The average request latency in ms | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `request-latency-max` | `client-id`,`node-id` | `kafka.producer.request_latency_max` | The maximum request latency in ms | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `request-rate` | `client-id`,`node-id` | `kafka.producer.request_rate` | The number of requests sent per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `request-size-avg` | `client-id`,`node-id` | `kafka.producer.request_size_avg` | The average size of requests sent. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `request-size-max` | `client-id`,`node-id` | `kafka.producer.request_size_max` | The maximum size of any request sent. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `request-total` | `client-id`,`node-id` | `kafka.producer.request_total` | The total number of requests sent | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-node-metrics` | `response-rate` | `client-id`,`node-id` | `kafka.producer.response_rate` | The number of responses received per second | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-node-metrics` | `response-total` | `client-id`,`node-id` | `kafka.producer.response_total` | The total number of responses received | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-topic-metrics` | `byte-rate` | `client-id`,`topic` | `kafka.producer.byte_rate` | The average number of bytes sent per second for a topic. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-topic-metrics` | `byte-total` | `client-id`,`topic` | `kafka.producer.byte_total` | The total number of bytes sent for a topic. | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-topic-metrics` | `compression-rate` | `client-id`,`topic` | `kafka.producer.compression_rate` | The average compression rate of record batches for a topic. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-topic-metrics` | `record-error-rate` | `client-id`,`topic` | `kafka.producer.record_error_rate` | The average per-second number of record sends that resulted in errors | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-topic-metrics` | `record-error-total` | `client-id`,`topic` | `kafka.producer.record_error_total` | The total number of record sends that resulted in errors | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-topic-metrics` | `record-retry-rate` | `client-id`,`topic` | `kafka.producer.record_retry_rate` | The average per-second number of retried record sends | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-topic-metrics` | `record-retry-total` | `client-id`,`topic` | `kafka.producer.record_retry_total` | The total number of retried record sends | `DOUBLE_OBSERVABLE_COUNTER` | +| `producer-topic-metrics` | `record-send-rate` | `client-id`,`topic` | `kafka.producer.record_send_rate` | The average number of records sent per second. | `DOUBLE_OBSERVABLE_GAUGE` | +| `producer-topic-metrics` | `record-send-total` | `client-id`,`topic` | `kafka.producer.record_send_total` | The total number of records sent. | `DOUBLE_OBSERVABLE_COUNTER` | diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts index beece7127590..87ab9440f76d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts @@ -10,6 +10,10 @@ dependencies { testImplementation("com.fasterxml.jackson.core:jackson-databind:2.10.2") testImplementation("org.testcontainers:kafka") + testImplementation("org.testcontainers:junit-jupiter") + + testCompileOnly("com.google.auto.value:auto-value-annotations") + testAnnotationProcessor("com.google.auto.value:auto-value") } tasks { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java index 1854d885f03d..62cf703a80aa 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java @@ -17,17 +17,25 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter; import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter; +import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Future; import java.util.function.BiFunction; import java.util.logging.Logger; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.metrics.MetricsReporter; public final class KafkaTelemetry { private static final Logger logger = Logger.getLogger(KafkaTelemetry.class.getName()); @@ -76,6 +84,41 @@ public Consumer wrap(Consumer consumer) { return new TracingConsumer<>(consumer, this); } + /** + * Produces a set of kafka client config properties (consumer or producer) to register a {@link + * MetricsReporter} that records metrics to an {@code openTelemetry} instance. Add these resulting + * properties to the configuration map used to initialize a {@link KafkaConsumer} or {@link + * KafkaProducer}. + * + *

For producers: + * + *

{@code
+   * //    Map config = new HashMap<>();
+   * //    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
+   * //    config.putAll(kafkaTelemetry.metricConfigProperties());
+   * //    try (KafkaProducer producer = new KafkaProducer<>(config)) { ... }
+   * }
+ * + *

For consumers: + * + *

{@code
+   * //    Map config = new HashMap<>();
+   * //    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
+   * //    config.putAll(kafkaTelemetry.metricConfigProperties());
+   * //    try (KafkaConsumer consumer = new KafkaConsumer<>(config)) { ... }
+   * }
+ * + * @return the kafka client properties + */ + public Map metricConfigProperties() { + Map config = new HashMap<>(); + config.put( + CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + OpenTelemetryMetricsReporter.class.getName()); + config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry); + return Collections.unmodifiableMap(config); + } + /** * Build and inject span into record. * diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java new file mode 100644 index 000000000000..ddf5bf6f3b13 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java @@ -0,0 +1,519 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import static java.lang.System.lineSeparator; +import static java.util.Comparator.comparing; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +class OpenTelemetryMetricsReporterTest { + + private static final Logger logger = + LoggerFactory.getLogger(OpenTelemetryMetricsReporterTest.class); + + private static final List TOPICS = Arrays.asList("foo", "bar", "baz", "qux"); + private static final Random RANDOM = new Random(); + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + private static KafkaContainer kafka; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + @BeforeAll + static void beforeAll() { + kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) + .withStartupTimeout(Duration.ofMinutes(1)); + kafka.start(); + producer = new KafkaProducer<>(producerConfig()); + consumer = new KafkaConsumer<>(consumerConfig()); + } + + @AfterAll + static void afterAll() { + kafka.stop(); + producer.close(); + consumer.close(); + } + + @AfterEach + void tearDown() { + OpenTelemetryMetricsReporter.resetForTest(); + } + + private static Map producerConfig() { + Map producerConfig = new HashMap<>(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-client-id"); + producerConfig.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + producerConfig.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + producerConfig.putAll( + KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties()); + producerConfig.merge( + CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + TestMetricsReporter.class.getName(), + (o, o2) -> o + "," + o2); + return producerConfig; + } + + private static Map consumerConfig() { + Map consumerConfig = new HashMap<>(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group"); + consumerConfig.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfig.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); + consumerConfig.putAll( + KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties()); + consumerConfig.merge( + CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + TestMetricsReporter.class.getName(), + (o, o2) -> o + "," + o2); + return consumerConfig; + } + + @Test + void badConfig() { + // Bad producer config + assertThatThrownBy( + () -> { + Map producerConfig = producerConfig(); + producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE); + new KafkaProducer<>(producerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage("Missing required configuration property: opentelemetry.instance"); + assertThatThrownBy( + () -> { + Map producerConfig = producerConfig(); + producerConfig.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo"); + new KafkaProducer<>(producerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.instance is not instance of OpenTelemetry"); + + // Bad consumer config + assertThatThrownBy( + () -> { + Map consumerConfig = consumerConfig(); + consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE); + new KafkaConsumer<>(consumerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage("Missing required configuration property: opentelemetry.instance"); + assertThatThrownBy( + () -> { + Map consumerConfig = consumerConfig(); + consumerConfig.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo"); + new KafkaConsumer<>(consumerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.instance is not instance of OpenTelemetry"); + } + + @Test + void observeMetrics() { + produceRecords(); + consumeRecords(); + + Set expectedMetricNames = + new HashSet<>( + Arrays.asList( + "kafka.consumer.commit_latency_avg", + "kafka.consumer.commit_latency_max", + "kafka.consumer.commit_rate", + "kafka.consumer.commit_total", + "kafka.consumer.failed_rebalance_rate_per_hour", + "kafka.consumer.failed_rebalance_total", + "kafka.consumer.heartbeat_rate", + "kafka.consumer.heartbeat_response_time_max", + "kafka.consumer.heartbeat_total", + "kafka.consumer.join_rate", + "kafka.consumer.join_time_avg", + "kafka.consumer.join_time_max", + "kafka.consumer.join_total", + "kafka.consumer.last_heartbeat_seconds_ago", + "kafka.consumer.last_rebalance_seconds_ago", + "kafka.consumer.partition_assigned_latency_avg", + "kafka.consumer.partition_assigned_latency_max", + "kafka.consumer.partition_lost_latency_avg", + "kafka.consumer.partition_lost_latency_max", + "kafka.consumer.partition_revoked_latency_avg", + "kafka.consumer.partition_revoked_latency_max", + "kafka.consumer.rebalance_latency_avg", + "kafka.consumer.rebalance_latency_max", + "kafka.consumer.rebalance_latency_total", + "kafka.consumer.rebalance_rate_per_hour", + "kafka.consumer.rebalance_total", + "kafka.consumer.sync_rate", + "kafka.consumer.sync_time_avg", + "kafka.consumer.sync_time_max", + "kafka.consumer.sync_total", + "kafka.consumer.bytes_consumed_rate", + "kafka.consumer.bytes_consumed_total", + "kafka.consumer.fetch_latency_avg", + "kafka.consumer.fetch_latency_max", + "kafka.consumer.fetch_rate", + "kafka.consumer.fetch_size_avg", + "kafka.consumer.fetch_size_max", + "kafka.consumer.fetch_throttle_time_avg", + "kafka.consumer.fetch_throttle_time_max", + "kafka.consumer.fetch_total", + "kafka.consumer.records_consumed_rate", + "kafka.consumer.records_consumed_total", + "kafka.consumer.records_lag", + "kafka.consumer.records_lag_avg", + "kafka.consumer.records_lag_max", + "kafka.consumer.records_lead", + "kafka.consumer.records_lead_avg", + "kafka.consumer.records_lead_min", + "kafka.consumer.records_per_request_avg", + "kafka.consumer.connection_close_rate", + "kafka.consumer.connection_close_total", + "kafka.consumer.connection_count", + "kafka.consumer.connection_creation_rate", + "kafka.consumer.connection_creation_total", + "kafka.consumer.failed_authentication_rate", + "kafka.consumer.failed_authentication_total", + "kafka.consumer.failed_reauthentication_rate", + "kafka.consumer.failed_reauthentication_total", + "kafka.consumer.incoming_byte_rate", + "kafka.consumer.incoming_byte_total", + "kafka.consumer.io_ratio", + "kafka.consumer.io_time_ns_avg", + "kafka.consumer.io_wait_ratio", + "kafka.consumer.io_wait_time_ns_avg", + "kafka.consumer.io_waittime_total", + "kafka.consumer.iotime_total", + "kafka.consumer.last_poll_seconds_ago", + "kafka.consumer.network_io_rate", + "kafka.consumer.network_io_total", + "kafka.consumer.outgoing_byte_rate", + "kafka.consumer.outgoing_byte_total", + "kafka.consumer.poll_idle_ratio_avg", + "kafka.consumer.reauthentication_latency_avg", + "kafka.consumer.reauthentication_latency_max", + "kafka.consumer.request_rate", + "kafka.consumer.request_size_avg", + "kafka.consumer.request_size_max", + "kafka.consumer.request_total", + "kafka.consumer.response_rate", + "kafka.consumer.response_total", + "kafka.consumer.select_rate", + "kafka.consumer.select_total", + "kafka.consumer.successful_authentication_no_reauth_total", + "kafka.consumer.successful_authentication_rate", + "kafka.consumer.successful_authentication_total", + "kafka.consumer.successful_reauthentication_rate", + "kafka.consumer.successful_reauthentication_total", + "kafka.consumer.time_between_poll_avg", + "kafka.consumer.time_between_poll_max", + "kafka.consumer.incoming_byte_rate", + "kafka.consumer.incoming_byte_total", + "kafka.consumer.outgoing_byte_rate", + "kafka.consumer.outgoing_byte_total", + "kafka.consumer.request_latency_avg", + "kafka.consumer.request_latency_max", + "kafka.consumer.request_rate", + "kafka.consumer.request_size_avg", + "kafka.consumer.request_size_max", + "kafka.consumer.request_total", + "kafka.consumer.response_rate", + "kafka.consumer.response_total", + "kafka.producer.batch_size_avg", + "kafka.producer.batch_size_max", + "kafka.producer.batch_split_rate", + "kafka.producer.batch_split_total", + "kafka.producer.buffer_available_bytes", + "kafka.producer.buffer_exhausted_rate", + "kafka.producer.buffer_exhausted_total", + "kafka.producer.buffer_total_bytes", + "kafka.producer.bufferpool_wait_ratio", + "kafka.producer.bufferpool_wait_time_total", + "kafka.producer.compression_rate_avg", + "kafka.producer.connection_close_rate", + "kafka.producer.connection_close_total", + "kafka.producer.connection_count", + "kafka.producer.connection_creation_rate", + "kafka.producer.connection_creation_total", + "kafka.producer.failed_authentication_rate", + "kafka.producer.failed_authentication_total", + "kafka.producer.failed_reauthentication_rate", + "kafka.producer.failed_reauthentication_total", + "kafka.producer.incoming_byte_rate", + "kafka.producer.incoming_byte_total", + "kafka.producer.io_ratio", + "kafka.producer.io_time_ns_avg", + "kafka.producer.io_wait_ratio", + "kafka.producer.io_wait_time_ns_avg", + "kafka.producer.io_waittime_total", + "kafka.producer.iotime_total", + "kafka.producer.metadata_age", + "kafka.producer.network_io_rate", + "kafka.producer.network_io_total", + "kafka.producer.outgoing_byte_rate", + "kafka.producer.outgoing_byte_total", + "kafka.producer.produce_throttle_time_avg", + "kafka.producer.produce_throttle_time_max", + "kafka.producer.reauthentication_latency_avg", + "kafka.producer.reauthentication_latency_max", + "kafka.producer.record_error_rate", + "kafka.producer.record_error_total", + "kafka.producer.record_queue_time_avg", + "kafka.producer.record_queue_time_max", + "kafka.producer.record_retry_rate", + "kafka.producer.record_retry_total", + "kafka.producer.record_send_rate", + "kafka.producer.record_send_total", + "kafka.producer.record_size_avg", + "kafka.producer.record_size_max", + "kafka.producer.records_per_request_avg", + "kafka.producer.request_latency_avg", + "kafka.producer.request_latency_max", + "kafka.producer.request_rate", + "kafka.producer.request_size_avg", + "kafka.producer.request_size_max", + "kafka.producer.request_total", + "kafka.producer.requests_in_flight", + "kafka.producer.response_rate", + "kafka.producer.response_total", + "kafka.producer.select_rate", + "kafka.producer.select_total", + "kafka.producer.successful_authentication_no_reauth_total", + "kafka.producer.successful_authentication_rate", + "kafka.producer.successful_authentication_total", + "kafka.producer.successful_reauthentication_rate", + "kafka.producer.successful_reauthentication_total", + "kafka.producer.waiting_threads", + "kafka.producer.incoming_byte_rate", + "kafka.producer.incoming_byte_total", + "kafka.producer.outgoing_byte_rate", + "kafka.producer.outgoing_byte_total", + "kafka.producer.request_latency_avg", + "kafka.producer.request_latency_max", + "kafka.producer.request_rate", + "kafka.producer.request_size_avg", + "kafka.producer.request_size_max", + "kafka.producer.request_total", + "kafka.producer.response_rate", + "kafka.producer.response_total", + "kafka.producer.byte_rate", + "kafka.producer.byte_total", + "kafka.producer.compression_rate", + "kafka.producer.record_error_rate", + "kafka.producer.record_error_total", + "kafka.producer.record_retry_rate", + "kafka.producer.record_retry_total", + "kafka.producer.record_send_rate", + "kafka.producer.record_send_total")); + + List metrics = testing.metrics(); + Set metricNames = metrics.stream().map(MetricData::getName).collect(toSet()); + assertThat(metricNames).containsAll(expectedMetricNames); + + assertThat(metrics) + .allSatisfy( + metricData -> { + Set expectedKeys = + metricData.getData().getPoints().stream() + .findFirst() + .map( + point -> + point.getAttributes().asMap().keySet().stream() + .map(AttributeKey::getKey) + .collect(toSet())) + .orElse(Collections.emptySet()); + assertThat(metricData.getData().getPoints()) + .extracting(PointData::getAttributes) + .extracting( + attributes -> + attributes.asMap().keySet().stream() + .map(AttributeKey::getKey) + .collect(toSet())) + .allSatisfy(attributeKeys -> assertThat(attributeKeys).isEqualTo(expectedKeys)); + }); + + // Print mapping table + printMappingTable(); + } + + private static void produceRecords() { + for (int i = 0; i < 100; i++) { + producer.send( + new ProducerRecord<>( + TOPICS.get(RANDOM.nextInt(TOPICS.size())), + 0, + System.currentTimeMillis(), + "key".getBytes(StandardCharsets.UTF_8), + "value".getBytes(StandardCharsets.UTF_8))); + } + } + + private static void consumeRecords() { + consumer.subscribe(TOPICS); + Instant stopTime = Instant.now().plusSeconds(10); + while (Instant.now().isBefore(stopTime)) { + consumer.poll(Duration.ofSeconds(1)); + } + } + + /** + * Print a table mapping kafka metrics to equivalent OpenTelemetry metrics, in markdown format. + */ + private static void printMappingTable() { + StringBuilder sb = new StringBuilder(); + // Append table headers + sb.append( + "| Metric Group | Metric Name | Attribute Keys | Instrument Name | Instrument Description | Instrument Type |") + .append(lineSeparator()) + .append( + "|--------------|-------------|----------------|-----------------|------------------------|-----------------|") + .append(lineSeparator()); + Map> kafkaMetricsByGroup = + TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup)); + List registeredObservables = + OpenTelemetryMetricsReporter.getRegisteredObservables(); + // Iterate through groups in alpha order + for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) { + List kafkaMetricIds = + kafkaMetricsByGroup.get(group).stream() + .sorted( + comparing(KafkaMetricId::getName) + .thenComparing(kafkaMetricId -> kafkaMetricId.getAttributeKeys().size())) + .collect(toList()); + // Iterate through metrics in alpha order by name + for (KafkaMetricId kafkaMetricId : kafkaMetricIds) { + // Find first (there may be multiple) registered instrument that matches the kafkaMetricId + Optional descriptor = + registeredObservables.stream() + .filter( + registeredObservable -> + KafkaMetricId.create(registeredObservable.getKafkaMetricName()) + .equals(kafkaMetricId)) + .findFirst() + .map(RegisteredObservable::getInstrumentDescriptor); + // Append table row + sb.append( + String.format( + "| %s | %s | %s | %s | %s | %s |%n", + "`" + group + "`", + "`" + kafkaMetricId.getName() + "`", + kafkaMetricId.getAttributeKeys().stream() + .map(key -> "`" + key + "`") + .collect(joining(",")), + descriptor.map(i -> "`" + i.getName() + "`").orElse(""), + descriptor.map(InstrumentDescriptor::getDescription).orElse(""), + descriptor.map(i -> "`" + i.getInstrumentType() + "`").orElse(""))); + } + } + logger.info("Mapping table" + System.lineSeparator() + sb); + } + + /** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ + public static class TestMetricsReporter implements MetricsReporter { + + private static final Set seenMetrics = new HashSet<>(); + + @Override + public void init(List list) { + list.forEach(this::metricChange); + } + + @Override + public void metricChange(KafkaMetric kafkaMetric) { + seenMetrics.add(KafkaMetricId.create(kafkaMetric.metricName())); + } + + @Override + public void metricRemoval(KafkaMetric kafkaMetric) {} + + @Override + public void close() {} + + @Override + public void configure(Map map) {} + } + + @AutoValue + abstract static class KafkaMetricId { + + abstract String getGroup(); + + abstract String getName(); + + abstract Set getAttributeKeys(); + + static KafkaMetricId create(MetricName metricName) { + return new AutoValue_OpenTelemetryMetricsReporterTest_KafkaMetricId( + metricName.group(), metricName.name(), metricName.tags().keySet()); + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/InstrumentDescriptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/InstrumentDescriptor.java new file mode 100644 index 000000000000..51b52130b904 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/InstrumentDescriptor.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import com.google.auto.value.AutoValue; + +/** A description of an OpenTelemetry metric instrument. */ +@AutoValue +abstract class InstrumentDescriptor { + + static final String INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE = "DOUBLE_OBSERVABLE_GAUGE"; + static final String INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER = "DOUBLE_OBSERVABLE_COUNTER"; + + abstract String getName(); + + abstract String getDescription(); + + abstract String getInstrumentType(); + + static InstrumentDescriptor createDoubleGauge(String name, String description) { + return new AutoValue_InstrumentDescriptor( + name, description, INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE); + } + + static InstrumentDescriptor createDoubleCounter(String name, String description) { + return new AutoValue_InstrumentDescriptor( + name, description, INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaMetricRegistry.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaMetricRegistry.java new file mode 100644 index 000000000000..d1beb57ffa4e --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaMetricRegistry.java @@ -0,0 +1,145 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import static io.opentelemetry.instrumentation.kafka.internal.InstrumentDescriptor.INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER; +import static io.opentelemetry.instrumentation.kafka.internal.InstrumentDescriptor.INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; + +/** A registry mapping kafka metrics to corresponding OpenTelemetry metric definitions. */ +final class KafkaMetricRegistry { + + private static final Set groups = new HashSet<>(Arrays.asList("consumer", "producer")); + private static final Map, String> measureableToInstrumentType = new HashMap<>(); + private static final Map descriptionCache = new ConcurrentHashMap<>(); + + static { + Map classNameToType = new HashMap<>(); + classNameToType.put( + "org.apache.kafka.common.metrics.stats.Rate", INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE); + classNameToType.put( + "org.apache.kafka.common.metrics.stats.Avg", INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE); + classNameToType.put( + "org.apache.kafka.common.metrics.stats.Max", INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE); + classNameToType.put( + "org.apache.kafka.common.metrics.stats.Value", INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE); + classNameToType.put( + "org.apache.kafka.common.metrics.stats.CumulativeSum", + INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER); + classNameToType.put( + "org.apache.kafka.common.metrics.stats.CumulativeCount", + INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER); + + for (Map.Entry entry : classNameToType.entrySet()) { + try { + measureableToInstrumentType.put(Class.forName(entry.getKey()), entry.getValue()); + } catch (ClassNotFoundException e) { + // Class doesn't exist in this version of kafka client - skip + } + } + } + + @Nullable + static RegisteredObservable getRegisteredObservable(Meter meter, KafkaMetric kafkaMetric) { + // If metric is not a Measureable, we can't map it to an instrument + Class measurable = getMeasurable(kafkaMetric); + if (measurable == null) { + return null; + } + MetricName metricName = kafkaMetric.metricName(); + Optional matchingGroup = + groups.stream().filter(group -> metricName.group().contains(group)).findFirst(); + // Only map metrics that have a matching group + if (!matchingGroup.isPresent()) { + return null; + } + String instrumentName = + "kafka." + matchingGroup.get() + "." + metricName.name().replace("-", "_"); + String instrumentDescription = + descriptionCache.computeIfAbsent(instrumentName, s -> metricName.description()); + String instrumentType = + measureableToInstrumentType.getOrDefault( + measurable, INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE); + + InstrumentDescriptor instrumentDescriptor = + toInstrumentDescriptor(instrumentType, instrumentName, instrumentDescription); + Attributes attributes = toAttributes(metricName.tags()); + AutoCloseable observable = + createObservable(meter, attributes, instrumentDescriptor, kafkaMetric); + return RegisteredObservable.create(metricName, instrumentDescriptor, attributes, observable); + } + + @Nullable + private static Class getMeasurable(KafkaMetric kafkaMetric) { + try { + return kafkaMetric.measurable().getClass(); + } catch (IllegalStateException e) { + return null; + } + } + + private static InstrumentDescriptor toInstrumentDescriptor( + String instrumentType, String instrumentName, String instrumentDescription) { + switch (instrumentType) { + case INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE: + return InstrumentDescriptor.createDoubleGauge(instrumentName, instrumentDescription); + case INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER: + return InstrumentDescriptor.createDoubleCounter(instrumentName, instrumentDescription); + default: // Continue below to throw + } + throw new IllegalStateException("Unrecognized instrument type. This is a bug."); + } + + private static Attributes toAttributes(Map tags) { + AttributesBuilder attributesBuilder = Attributes.builder(); + tags.forEach(attributesBuilder::put); + return attributesBuilder.build(); + } + + private static AutoCloseable createObservable( + Meter meter, + Attributes attributes, + InstrumentDescriptor instrumentDescriptor, + KafkaMetric kafkaMetric) { + Consumer callback = + observableMeasurement -> observableMeasurement.record(kafkaMetric.value(), attributes); + switch (instrumentDescriptor.getInstrumentType()) { + case INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE: + return meter + .gaugeBuilder(instrumentDescriptor.getName()) + .setDescription(instrumentDescriptor.getDescription()) + .buildWithCallback(callback); + case INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER: + return meter + .counterBuilder(instrumentDescriptor.getName()) + .setDescription(instrumentDescriptor.getDescription()) + .ofDoubles() + .buildWithCallback(callback); + default: // Continue below to throw + } + // TODO: add support for other instrument types and value types as needed for new instruments. + // This should not happen. + throw new IllegalStateException("Unrecognized instrument type. This is a bug."); + } + + private KafkaMetricRegistry() {} +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java new file mode 100644 index 000000000000..f888ff621613 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java @@ -0,0 +1,162 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.instrumentation.api.internal.GuardedBy; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; + +/** + * A {@link MetricsReporter} which bridges Kafka metrics to OpenTelemetry metrics. + * + *

To configure, use: + * + *

<{@code
+ * // KafkaTelemetry.KafkaTelemetry.create(OpenTelemetry).metricConfigProperties()
+ * }
+ * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OpenTelemetryMetricsReporter implements MetricsReporter { + + public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance"; + + private static final Logger logger = + Logger.getLogger(OpenTelemetryMetricsReporter.class.getName()); + private volatile Meter meter; + + private static final Object lock = new Object(); + + @GuardedBy("lock") + private static final List registeredObservables = new ArrayList<>(); + + /** + * Reset for test by reseting the {@link #meter} to {@code null} and closing all registered + * instruments. + */ + static void resetForTest() { + closeAllInstruments(); + } + + // Visible for test + static List getRegisteredObservables() { + synchronized (lock) { + return new ArrayList<>(registeredObservables); + } + } + + @Override + public void init(List metrics) { + metrics.forEach(this::metricChange); + } + + @Override + public void metricChange(KafkaMetric metric) { + Meter currentMeter = meter; + if (currentMeter == null) { + // Ignore if meter hasn't been initialized in configure(Map> attributeKeys = registeredObservable.getAttributes().asMap().keySet(); + synchronized (lock) { + for (Iterator it = registeredObservables.iterator(); it.hasNext(); ) { + RegisteredObservable curRegisteredObservable = it.next(); + Set> curAttributeKeys = + curRegisteredObservable.getAttributes().asMap().keySet(); + if (curRegisteredObservable.getKafkaMetricName().equals(metric.metricName())) { + logger.log(Level.FINEST, "Replacing instrument: {0}", curRegisteredObservable); + closeInstrument(curRegisteredObservable.getObservable()); + it.remove(); + } else if (curRegisteredObservable + .getInstrumentDescriptor() + .equals(registeredObservable.getInstrumentDescriptor()) + && attributeKeys.size() > curAttributeKeys.size() + && attributeKeys.containsAll(curAttributeKeys)) { + logger.log( + Level.FINEST, + "Replacing instrument with higher dimension version: {0}", + curRegisteredObservable); + closeInstrument(curRegisteredObservable.getObservable()); + it.remove(); + } + } + + registeredObservables.add(registeredObservable); + } + } + + @Override + public void metricRemoval(KafkaMetric metric) { + logger.log(Level.FINEST, "Metric removed: {0}", metric.metricName()); + synchronized (lock) { + for (Iterator it = registeredObservables.iterator(); it.hasNext(); ) { + RegisteredObservable current = it.next(); + if (current.getKafkaMetricName().equals(metric.metricName())) { + closeInstrument(current.getObservable()); + it.remove(); + } + } + } + } + + @Override + public void close() { + closeAllInstruments(); + } + + private static void closeAllInstruments() { + synchronized (lock) { + for (Iterator it = registeredObservables.iterator(); it.hasNext(); ) { + closeInstrument(it.next().getObservable()); + it.remove(); + } + } + } + + private static void closeInstrument(AutoCloseable observable) { + try { + observable.close(); + } catch (Exception e) { + throw new IllegalStateException("Error occurred closing instrument", e); + } + } + + @Override + public void configure(Map configs) { + Object openTelemetry = configs.get(CONFIG_KEY_OPENTELEMETRY_INSTANCE); + if (openTelemetry == null) { + throw new IllegalStateException( + "Missing required configuration property: " + CONFIG_KEY_OPENTELEMETRY_INSTANCE); + } + if (!(openTelemetry instanceof OpenTelemetry)) { + throw new IllegalStateException( + "Configuration property " + + CONFIG_KEY_OPENTELEMETRY_INSTANCE + + " is not instance of OpenTelemetry"); + } + meter = ((OpenTelemetry) openTelemetry).getMeter("io.opentelemetry.kafka-clients"); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/RegisteredObservable.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/RegisteredObservable.java new file mode 100644 index 000000000000..4717b63d53f6 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/RegisteredObservable.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.api.common.Attributes; +import org.apache.kafka.common.MetricName; + +@AutoValue +abstract class RegisteredObservable { + + abstract MetricName getKafkaMetricName(); + + abstract InstrumentDescriptor getInstrumentDescriptor(); + + abstract Attributes getAttributes(); + + abstract AutoCloseable getObservable(); + + static RegisteredObservable create( + MetricName metricName, + InstrumentDescriptor instrumentDescriptor, + Attributes attributes, + AutoCloseable observable) { + return new AutoValue_RegisteredObservable( + metricName, instrumentDescriptor, attributes, observable); + } +}