Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

emit kafka topics config as metrics #425

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,17 @@ kafka_brokers 3

**Metrics details**

| Name | Exposed informations |
|----------------------------------------------------|-----------------------------------------------------|
| `kafka_topic_partitions` | Number of partitions for this Topic |
| `kafka_topic_partition_current_offset` | Current Offset of a Broker at Topic/Partition |
| `kafka_topic_partition_oldest_offset` | Oldest Offset of a Broker at Topic/Partition |
| `kafka_topic_partition_in_sync_replica` | Number of In-Sync Replicas for this Topic/Partition |
| `kafka_topic_partition_leader` | Leader Broker ID of this Topic/Partition |
| `kafka_topic_partition_leader_is_preferred` | 1 if Topic/Partition is using the Preferred Broker |
| `kafka_topic_partition_replicas` | Number of Replicas for this Topic/Partition |
| `kafka_topic_partition_under_replicated_partition` | 1 if Topic/Partition is under Replicated |
| Name | Exposed informations |
|----------------------------------------------------|----------------------------------------------------------------------------------------------------------|
| `kafka_topic_partitions` | Number of partitions for this Topic |
| `kafka_topic_partition_current_offset` | Current Offset of a Broker at Topic/Partition |
| `kafka_topic_partition_oldest_offset` | Oldest Offset of a Broker at Topic/Partition |
| `kafka_topic_partition_in_sync_replica` | Number of In-Sync Replicas for this Topic/Partition |
| `kafka_topic_partition_leader` | Leader Broker ID of this Topic/Partition |
| `kafka_topic_partition_leader_is_preferred` | 1 if Topic/Partition is using the Preferred Broker |
| `kafka_topic_partition_replicas` | Number of Replicas for this Topic/Partition |
| `kafka_topic_partition_under_replicated_partition` | 1 if Topic/Partition is under Replicated |
| `kafka_topic_config` | cleanup.policy, max.message.bytes, retention.bytes, retention.ms, segment_bytes for each topic |

**Metrics output example**

Expand Down Expand Up @@ -229,6 +230,11 @@ kafka_topic_partition_replicas{partition="0",topic="__consumer_offsets"} 3
# HELP kafka_topic_partition_under_replicated_partition 1 if Topic/Partition is under Replicated
# TYPE kafka_topic_partition_under_replicated_partition gauge
kafka_topic_partition_under_replicated_partition{partition="0",topic="__consumer_offsets"} 0

# HELP kafka_topic_config 1 by default
# TYPE kafka_topic_config gauge
kafka_topic_config{cleanup_policy="compact",max_message_bytes="33554432",retention_bytes="-1",retention_ms="115200000",segment_bytes="104857600",topic="__consumer_offsets"} 1

```

### Consumer Groups
Expand Down
60 changes: 60 additions & 0 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ var (
consumergroupLagSum *prometheus.Desc
consumergroupLagZookeeper *prometheus.Desc
consumergroupMembers *prometheus.Desc
describeConfigTopic *prometheus.Desc
)

// Exporter collects Kafka stats from the given server and exports them using
// the prometheus metrics package.
type Exporter struct {
client sarama.Client
adminClient sarama.ClusterAdmin
topicFilter *regexp.Regexp
topicExclude *regexp.Regexp
groupFilter *regexp.Regexp
Expand Down Expand Up @@ -116,6 +118,14 @@ type kafkaOpts struct {
verbosityLogLevel int
}

type TopicConfig struct {
CleanupPolicy string
RetentionMs string
MaxMessageBytes string
SegmentBytes string
RetentionBytes string
}

// CanReadCertAndKey returns true if the certificate and key files already exists,
// otherwise returns false. If lost one of cert and key, returns error.
func CanReadCertAndKey(certPath, keyPath string) (bool, error) {
Expand Down Expand Up @@ -261,10 +271,17 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF
return nil, errors.Wrap(err, "Error Init Kafka Client")
}

adminClient, err := sarama.NewClusterAdminFromClient(client)

if err != nil {
return nil, errors.Wrap(err, "Error Init Admin Kafka Client")
}

klog.V(TRACE).Infoln("Done Init Clients")
// Init our exporter.
return &Exporter{
client: client,
adminClient: adminClient,
topicFilter: regexp.MustCompile(topicFilter),
topicExclude: regexp.MustCompile(topicExclude),
groupFilter: regexp.MustCompile(groupFilter),
Expand Down Expand Up @@ -312,6 +329,7 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- consumergroupLag
ch <- consumergroupLagZookeeper
ch <- consumergroupLagSum
ch <- describeConfigTopic
}

// Collect fetches the stats from configured Kafka location and delivers them
Expand Down Expand Up @@ -395,6 +413,42 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
return
}

// Get config for all topics

for _, topic := range topics {
resource := sarama.ConfigResource{
Type: sarama.TopicResource,
Name: topic,
}
entries, err := e.adminClient.DescribeConfig(resource)
if err != nil {
log.Printf("Failed to describe config for topic %s: %v", topic, err)
continue
}

topicConfig := &TopicConfig{}

for _, entry := range entries {
switch entry.Name {
case "cleanup.policy":
topicConfig.CleanupPolicy = entry.Value
case "retention.ms":
topicConfig.RetentionMs = entry.Value
case "max.message.bytes":
topicConfig.MaxMessageBytes = entry.Value
case "segment.bytes":
topicConfig.SegmentBytes = entry.Value
case "retention.bytes":
topicConfig.RetentionBytes = entry.Value
}
}

ch <- prometheus.MustNewConstMetric(
describeConfigTopic, prometheus.GaugeValue, 1, topic, topicConfig.CleanupPolicy, topicConfig.RetentionMs, topicConfig.MaxMessageBytes, topicConfig.SegmentBytes, topicConfig.RetentionBytes,
)
}


topicChannel := make(chan string)

getTopicMetrics := func(topic string) {
Expand Down Expand Up @@ -892,6 +946,12 @@ func setup(
[]string{"consumergroup"}, labels,
)

describeConfigTopic = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "config"),
"Topic Configuration",
[]string{"topic", "cleanup_policy", "retention_ms", "max_message_bytes", "segment_bytes", "retention_bytes"}, labels,
)

if logSarama {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
Expand Down