Skip to content

Commit

Permalink
Add compression/acks/retry conf to Kafka output plugin
Browse files Browse the repository at this point in the history
The following configuration is now possible

	## CompressionCodec represents the various compression codecs
recognized by Kafka in messages.
	##  "none" : No compression
	##  "gzip" : Gzip compression
	##  "snappy" : Snappy compression
	# compression_codec = "none"

	##  RequiredAcks is used in Produce Requests to tell the broker how
many replica acknowledgements it must see before responding
	##  "none" : the producer never waits for an acknowledgement from the
broker. This option provides the lowest latency but the weakest
durability guarantees (some data will be lost when a server fails).
	##  "leader" : the producer gets an acknowledgement after the leader
replica has received the data. This option provides better durability
as the client waits until the server acknowledges the request as
successful (only messages that were written to the now-dead leader but
not yet replicated will be lost).
	##  "leader_and_replicas" : the producer gets an acknowledgement after
all in-sync replicas have received the data. This option provides the
best durability, we guarantee that no messages will be lost as long as
at least one in sync replica remains.
	# required_acks = "leader_and_replicas"

	##  The total number of times to retry sending a message
	# max_retry = "3"
  • Loading branch information
framiere authored and sparrc committed Apr 1, 2016
1 parent 9347a70 commit 6ff0fc6
Showing 1 changed file with 81 additions and 4 deletions.
85 changes: 81 additions & 4 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kafka
import (
"crypto/tls"
"fmt"
"strconv"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -19,6 +21,12 @@ type Kafka struct {
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec string
// RequiredAcks Tag
RequiredAcks string
// MaxRetry Tag
MaxRetry string

// Legacy SSL config options
// TLS client certificate
Expand Down Expand Up @@ -53,6 +61,21 @@ var sampleConfig = `
## ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
## CompressionCodec represents the various compression codecs recognized by Kafka in messages.
## "none" : No compression
## "gzip" : Gzip compression
## "snappy" : Snappy compression
# compression_codec = "none"
## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding
## "none" : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
## "leader" : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
## "leader_and_replicas" : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
# required_acks = "leader_and_replicas"
## The total number of times to retry sending a message
# max_retry = "3"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
Expand All @@ -71,12 +94,66 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}

func requiredAcks(value string) (sarama.RequiredAcks, error) {
switch strings.ToLower(value) {
case "none":
return sarama.NoResponse, nil
case "leader":
return sarama.WaitForLocal, nil
case "", "leader_and_replicas":
return sarama.WaitForAll, nil
default:
return 0, fmt.Errorf("Failed to recognize required_acks: %s", value)
}
}

func compressionCodec(value string) (sarama.CompressionCodec, error) {
switch strings.ToLower(value) {
case "gzip":
return sarama.CompressionGZIP, nil
case "snappy":
return sarama.CompressionSnappy, nil
case "", "none":
return sarama.CompressionNone, nil
default:
return 0, fmt.Errorf("Failed to recognize compression_codec: %s", value)
}
}

func maxRetry(value string) (int, error) {
if value == "" {
return 3, nil
}
maxRetry, err := strconv.Atoi(value)
if err != nil {
return -1, fmt.Errorf("Failed to parse max_retry: %s", value)
}
if maxRetry < 0 {
return -1, fmt.Errorf("max_retry is %s but it should not be negative", value)
}
return maxRetry, nil
}

func (k *Kafka) Connect() error {
config := sarama.NewConfig()
// Wait for all in-sync replicas to ack the message
config.Producer.RequiredAcks = sarama.WaitForAll
// Retry up to 10 times to produce the message
config.Producer.Retry.Max = 10

requiredAcks, err := requiredAcks(k.RequiredAcks)
if err != nil {
return err
}
config.Producer.RequiredAcks = requiredAcks

compressionCodec, err := compressionCodec(k.CompressionCodec)
if err != nil {
return err
}
config.Producer.Compression = compressionCodec

maxRetry, err := maxRetry(k.MaxRetry)
if err != nil {
return err
}
config.Producer.Retry.Max = maxRetry

// Legacy support ssl config
if k.Certificate != "" {
Expand Down

0 comments on commit 6ff0fc6

Please sign in to comment.