diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index e134fb86404..33f3347662a 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -168,6 +168,71 @@ Configuration options for Kerberos authentication. See <> for more information. +[float] +===== `parsers` + +This option expects a list of parsers that the payload has to go through. + +Available parsers: + +* `ndjson` +* `multiline` + +[float] +===== `ndjson` + +These options make it possible for {beatname_uc} to decode the payload as +JSON messages. + +Example configuration: + +[source,yaml] +---- +- ndjson: + keys_under_root: true + add_error_key: true + message_key: log +---- + +*`keys_under_root`*:: By default, the decoded JSON is placed under a "json" key +in the output document. If you enable this setting, the keys are copied top +level in the output document. The default is false. + +*`overwrite_keys`*:: If `keys_under_root` and this setting are enabled, then the +values from the decoded JSON object overwrite the fields that {beatname_uc} +normally adds (type, source, offset, etc.) in case of conflicts. + +*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively +de-dot keys in the decoded JSON, and expand them into a hierarchical object +structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`. +This setting should be enabled when the input is produced by an +https://github.com/elastic/ecs-logging[ECS logger]. + +*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds an +"error.message" and "error.type: json" key in case of JSON unmarshalling errors +or when a `message_key` is defined in the configuration but cannot be used. + +*`message_key`*:: An optional configuration setting that specifies a JSON key on +which to apply the line filtering and multiline settings. If specified the key +must be at the top level in the JSON object and the value associated with the +key must be a string, otherwise no filtering or multiline aggregation will +occur. + +*`document_id`*:: Option configuration setting that specifies the JSON key to +set the document id. If configured, the field will be removed from the original +JSON document and stored in `@metadata._id` + +*`ignore_decoding_error`*:: An optional configuration setting that specifies if +JSON decoding errors should be logged or not. If set to true, errors will not +be logged. The default is false. + +[float] +===== `multiline` + +Options that control how {beatname_uc} deals with log messages that span +multiple lines. See <> for more information about +configuring multiline options. + [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] diff --git a/filebeat/include/list.go b/filebeat/include/list.go index e4c1396d973..51b04d4f92c 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -23,7 +23,6 @@ import ( // Import packages that need to register themselves. _ "github.com/elastic/beats/v7/filebeat/input/container" _ "github.com/elastic/beats/v7/filebeat/input/docker" - _ "github.com/elastic/beats/v7/filebeat/input/kafka" _ "github.com/elastic/beats/v7/filebeat/input/log" _ "github.com/elastic/beats/v7/filebeat/input/mqtt" _ "github.com/elastic/beats/v7/filebeat/input/redis" diff --git a/filebeat/input/default-inputs/inputs.go b/filebeat/input/default-inputs/inputs.go index 881f3664efd..95dd18e35f6 100644 --- a/filebeat/input/default-inputs/inputs.go +++ b/filebeat/input/default-inputs/inputs.go @@ -20,6 +20,7 @@ package inputs import ( "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/filebeat/input/filestream" + "github.com/elastic/beats/v7/filebeat/input/kafka" "github.com/elastic/beats/v7/filebeat/input/unix" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" @@ -36,6 +37,7 @@ func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.P func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin { return []v2.Plugin{ filestream.Plugin(log, components), + kafka.Plugin(), unix.Plugin(), } } diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index c69b2522a4f..338182e460a 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -28,9 +28,9 @@ import ( "github.com/elastic/beats/v7/libbeat/common/kafka" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/monitoring/adapter" + "github.com/elastic/beats/v7/libbeat/reader/parser" ) type kafkaInputConfig struct { @@ -53,6 +53,7 @@ type kafkaInputConfig struct { Username string `config:"username"` Password string `config:"password"` ExpandEventListFromField string `config:"expand_event_list_from_field"` + Parsers parser.Config `config:",inline"` } type kafkaFetch struct { @@ -215,7 +216,6 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { ) if err := k.Validate(); err != nil { - logp.Err("Invalid kafka configuration: %v", err) return nil, err } return k, nil diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index add3c664224..9c872f1a14b 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -21,96 +21,181 @@ import ( "context" "encoding/json" "fmt" + "io" "strings" "sync" "time" + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/Shopify/sarama" + "github.com/pkg/errors" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/beats/v7/libbeat/common/kafka" + "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" - - "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/parser" ) -func init() { - err := input.Register("kafka", NewInput) +const pluginName = "kafka" + +// Plugin creates a new filestream input plugin for creating a stateful input. +func Plugin() input.Plugin { + return input.Plugin{ + Name: pluginName, + Stability: feature.Stable, + Deprecated: false, + Info: "Kafka input", + Doc: "The Kafka input consumes events from topics by connecting to the configured kafka brokers", + Manager: input.ConfigureWith(configure), + } +} + +func configure(cfg *common.Config) (input.Input, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + saramaConfig, err := newSaramaConfig(config) if err != nil { - panic(err) + return nil, errors.Wrap(err, "initializing Sarama config") } + return NewInput(config, saramaConfig) +} + +func NewInput(config kafkaInputConfig, saramaConfig *sarama.Config) (*kafkaInput, error) { + return &kafkaInput{config: config, saramaConfig: saramaConfig}, nil } -// Input contains the input and its config type kafkaInput struct { config kafkaInputConfig saramaConfig *sarama.Config - context input.Context - outlet channel.Outleter saramaWaitGroup sync.WaitGroup // indicates a sarama consumer group is active - log *logp.Logger - runOnce sync.Once } -// NewInput creates a new kafka input -func NewInput( - cfg *common.Config, - connector channel.Connector, - inputContext input.Context, -) (input.Input, error) { +func (input *kafkaInput) Name() string { return pluginName } - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { - return nil, errors.Wrap(err, "reading kafka input config") +func (input *kafkaInput) Test(ctx input.TestContext) error { + client, err := sarama.NewClient(input.config.Hosts, input.saramaConfig) + if err != nil { + ctx.Logger.Error(err) + } + topics, err := client.Topics() + if err != nil { + ctx.Logger.Error(err) + } + + var missingTopics []string + for _, neededTopic := range input.config.Topics { + if !contains(topics, neededTopic) { + missingTopics = append(missingTopics, neededTopic) + } + } + + if len(missingTopics) > 0 { + return fmt.Errorf("Of configured topics %v, topics: %v are not in available topics %v", input.config.Topics, missingTopics, topics) } - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + return nil +} + +func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error { + log := ctx.Logger.Named("kafka input").With("hosts", input.config.Hosts) + + client, err := pipeline.ConnectWith(beat.ClientConfig{ ACKHandler: acker.ConnectionOnly( acker.EventPrivateReporter(func(_ int, events []interface{}) { for _, event := range events { if meta, ok := event.(eventMeta); ok { - meta.handler.ack(meta.message) + meta.ackHandler() } } }), ), - CloseRef: doneChannelContext(inputContext.Done), - WaitClose: config.WaitClose, + CloseRef: ctx.Cancelation, + WaitClose: input.config.WaitClose, }) if err != nil { - return nil, err + return err } - saramaConfig, err := newSaramaConfig(config) - if err != nil { - return nil, errors.Wrap(err, "initializing Sarama config") + log.Info("Starting Kafka input") + defer log.Info("Kafka input stopped") + + // Sarama uses standard go contexts to control cancellation, so we need + // to wrap our input context channel in that interface. + goContext := doneChannelContext(ctx) + + // If the consumer fails to connect, we use exponential backoff with + // jitter up to 8 * the initial backoff interval. + connectDelay := backoff.NewEqualJitterBackoff( + ctx.Cancelation.Done(), + input.config.ConnectBackoff, + 8*input.config.ConnectBackoff, + ) + + for goContext.Err() == nil { + // Connect to Kafka with a new consumer group. + consumerGroup, err := sarama.NewConsumerGroup( + input.config.Hosts, + input.config.GroupID, + input.saramaConfig, + ) + if err != nil { + log.Errorw("Error initializing kafka consumer group", "error", err) + connectDelay.Wait() + continue + } + // We've successfully connected, reset the backoff timer. + connectDelay.Reset() + + // We have a connected consumer group now, try to start the main event + // loop by calling Consume (which starts an asynchronous consumer). + // In an ideal run, this function never returns until shutdown; if it + // does, it means the errors have been logged and the consumer group + // has been closed, so we try creating a new one in the next iteration. + input.runConsumerGroup(log, client, goContext, consumerGroup) } - input := &kafkaInput{ - config: config, - saramaConfig: saramaConfig, - context: inputContext, - outlet: out, - log: logp.NewLogger("kafka input").With("hosts", config.Hosts), + if ctx.Cancelation.Err() == context.Canceled { + return nil + } else { + return ctx.Cancelation.Err() } +} - return input, nil +// Stop doesn't need to do anything because the kafka consumer group and the +// input's outlet both have a context based on input.context.Done and will +// shut themselves down, since the done channel is already closed as part of +// the shutdown process in Runner.Stop(). +func (input *kafkaInput) Stop() { +} + +// Wait should shut down the input and wait for it to complete, however (see +// Stop above) we don't need to take actions to shut down as long as the +// input.config.Done channel is closed, so we just make a (currently no-op) +// call to Stop() and then wait for sarama to signal completion. +func (input *kafkaInput) Wait() { + input.Stop() + // Wait for sarama to shut down + input.saramaWaitGroup.Wait() } -func (input *kafkaInput) runConsumerGroup( - context context.Context, consumerGroup sarama.ConsumerGroup, -) { +func (input *kafkaInput) runConsumerGroup(log *logp.Logger, client beat.Client, context context.Context, consumerGroup sarama.ConsumerGroup) { handler := &groupHandler{ version: input.config.Version, - outlet: input.outlet, + client: client, + parsers: input.config.Parsers, // expandEventListFromField will be assigned the configuration option expand_event_list_from_field expandEventListFromField: input.config.ExpandEventListFromField, - log: input.log, + log: log, } input.saramaWaitGroup.Add(1) @@ -122,70 +207,20 @@ func (input *kafkaInput) runConsumerGroup( // Listen asynchronously to any errors during the consume process go func() { for err := range consumerGroup.Errors() { - input.log.Errorw("Error reading from kafka", "error", err) + log.Errorw("Error reading from kafka", "error", err) } }() err := consumerGroup.Consume(context, input.config.Topics, handler) if err != nil { - input.log.Errorw("Kafka consume error", "error", err) + log.Errorw("Kafka consume error", "error", err) } } -// Run starts the input by scanning for incoming messages and errors. -func (input *kafkaInput) Run() { - input.runOnce.Do(func() { - go func() { - // Sarama uses standard go contexts to control cancellation, so we need - // to wrap our input context channel in that interface. - context := doneChannelContext(input.context.Done) - - // If the consumer fails to connect, we use exponential backoff with - // jitter up to 8 * the initial backoff interval. - backoff := backoff.NewEqualJitterBackoff( - input.context.Done, - input.config.ConnectBackoff, - 8*input.config.ConnectBackoff) - - for context.Err() == nil { - // Connect to Kafka with a new consumer group. - consumerGroup, err := sarama.NewConsumerGroup( - input.config.Hosts, input.config.GroupID, input.saramaConfig) - if err != nil { - input.log.Errorw( - "Error initializing kafka consumer group", "error", err) - backoff.Wait() - continue - } - // We've successfully connected, reset the backoff timer. - backoff.Reset() - - // We have a connected consumer group now, try to start the main event - // loop by calling Consume (which starts an asynchronous consumer). - // In an ideal run, this function never returns until shutdown; if it - // does, it means the errors have been logged and the consumer group - // has been closed, so we try creating a new one in the next iteration. - input.runConsumerGroup(context, consumerGroup) - } - }() - }) -} - -// Stop doesn't need to do anything because the kafka consumer group and the -// input's outlet both have a context based on input.context.Done and will -// shut themselves down, since the done channel is already closed as part of -// the shutdown process in Runner.Stop(). -func (input *kafkaInput) Stop() { -} - -// Wait should shut down the input and wait for it to complete, however (see -// Stop above) we don't need to take actions to shut down as long as the -// input.config.Done channel is closed, so we just make a (currently no-op) -// call to Stop() and then wait for sarama to signal completion. -func (input *kafkaInput) Wait() { - input.Stop() - // Wait for sarama to shut down - input.saramaWaitGroup.Wait() +// The metadata attached to incoming events, so they can be ACKed once they've +// been successfully sent. +type eventMeta struct { + ackHandler func() } func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string { @@ -209,25 +244,23 @@ func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string { // channels that are more common in the beats codebase. // TODO(faec): Generalize this to a common utility in a shared library // (https://github.com/elastic/beats/issues/13125). -type channelCtx <-chan struct{} - -func doneChannelContext(ch <-chan struct{}) context.Context { - return channelCtx(ch) +type channelCtx struct { + ctx input.Context } -func (c channelCtx) Deadline() (deadline time.Time, ok bool) { return } +func doneChannelContext(ctx input.Context) context.Context { + return channelCtx{ctx} +} +func (c channelCtx) Deadline() (deadline time.Time, ok bool) { + return +} func (c channelCtx) Done() <-chan struct{} { - return (<-chan struct{})(c) + return c.ctx.Cancelation.Done() } func (c channelCtx) Err() error { - select { - case <-c: - return context.Canceled - default: - return nil - } + return c.ctx.Cancelation.Err() } -func (c channelCtx) Value(key interface{}) interface{} { return nil } +func (c channelCtx) Value(_ interface{}) interface{} { return nil } // The group handler for the sarama consumer group interface. In addition to // providing the basic consumption callbacks needed by sarama, groupHandler is @@ -237,68 +270,13 @@ type groupHandler struct { sync.Mutex version kafka.Version session sarama.ConsumerGroupSession - outlet channel.Outleter + client beat.Client + parsers parser.Config // if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned // ex. in this case are the azure fielsets where the events are found under the json object "records" - expandEventListFromField string + expandEventListFromField string // TODO log *logp.Logger -} - -// The metadata attached to incoming events so they can be ACKed once they've -// been successfully sent. -type eventMeta struct { - handler *groupHandler - message *sarama.ConsumerMessage -} - -func (h *groupHandler) createEvents( - sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim, - message *sarama.ConsumerMessage, -) []beat.Event { - timestamp := time.Now() - kafkaFields := common.MapStr{ - "topic": claim.Topic(), - "partition": claim.Partition(), - "offset": message.Offset, - "key": string(message.Key), - } - - version, versionOk := h.version.Get() - if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { - timestamp = message.Timestamp - if !message.BlockTimestamp.IsZero() { - kafkaFields["block_timestamp"] = message.BlockTimestamp - } - } - if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { - kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers) - } - - // if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed - var events []beat.Event - var messages []string - if h.expandEventListFromField == "" { - messages = []string{string(message.Value)} - } else { - messages = h.parseMultipleMessages(message.Value) - } - for _, msg := range messages { - event := beat.Event{ - Timestamp: timestamp, - Fields: common.MapStr{ - "message": msg, - "kafka": kafkaFields, - }, - Private: eventMeta{ - handler: h, - message: message, - }, - } - events = append(events, event) - - } - return events + reader reader.Reader } func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error { @@ -325,34 +303,172 @@ func (h *groupHandler) ack(message *sarama.ConsumerMessage) { } } -func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - events := h.createEvents(sess, claim, msg) - for _, event := range events { - h.outlet.OnEvent(event) +func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + reader := h.createReader(claim) + parser := h.parsers.Create(reader) + for h.session.Context().Err() == nil { + message, err := parser.Next() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + h.client.Publish(beat.Event{ + Timestamp: message.Ts, + Meta: message.Meta, + Fields: message.Fields, + }) + } + return nil +} + +func (h *groupHandler) createReader(claim sarama.ConsumerGroupClaim) reader.Reader { + if h.expandEventListFromField != "" { + return &listFromFieldReader{ + claim: claim, + groupHandler: h, + field: h.expandEventListFromField, + log: h.log, } } + return &recordReader{ + claim: claim, + groupHandler: h, + log: h.log, + } +} + +type recordReader struct { + claim sarama.ConsumerGroupClaim + groupHandler *groupHandler + log *logp.Logger +} + +func (m *recordReader) Close() error { + return nil +} + +func (m *recordReader) Next() (reader.Message, error) { + msg, ok := <-m.claim.Messages() + if !ok { + return reader.Message{}, io.EOF + } + + timestamp, kafkaFields := composeEventMetadata(m.claim, m.groupHandler, msg) + ackHandler := func() { + m.groupHandler.ack(msg) + } + return composeMessage(timestamp, msg.Value, kafkaFields, ackHandler), nil +} + +type listFromFieldReader struct { + claim sarama.ConsumerGroupClaim + groupHandler *groupHandler + buffer []reader.Message + field string + log *logp.Logger +} + +func (l *listFromFieldReader) Close() error { return nil } +func (l *listFromFieldReader) Next() (reader.Message, error) { + if len(l.buffer) != 0 { + return l.returnFromBuffer() + } + + msg, ok := <-l.claim.Messages() + if !ok { + return reader.Message{}, io.EOF + } + + timestamp, kafkaFields := composeEventMetadata(l.claim, l.groupHandler, msg) + messages := l.parseMultipleMessages(msg.Value) + + neededAcks := atomic.MakeInt(len(messages)) + ackHandler := func() { + if neededAcks.Dec() == 0 { + l.groupHandler.ack(msg) + } + } + for _, message := range messages { + newBuffer := append(l.buffer, composeMessage(timestamp, []byte(message), kafkaFields, ackHandler)) + l.buffer = newBuffer + } + + return l.returnFromBuffer() +} + +func (l *listFromFieldReader) returnFromBuffer() (reader.Message, error) { + next := l.buffer[0] + newBuffer := l.buffer[1:] + l.buffer = newBuffer + return next, nil +} + // parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration -func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string { +func (l *listFromFieldReader) parseMultipleMessages(bMessage []byte) []string { var obj map[string][]interface{} err := json.Unmarshal(bMessage, &obj) if err != nil { - h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err) + l.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", l.field), "error", err) return []string{} } var messages []string - if len(obj[h.expandEventListFromField]) > 0 { - for _, ms := range obj[h.expandEventListFromField] { - js, err := json.Marshal(ms) - if err == nil { - messages = append(messages, string(js)) - } else { - h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) - } + for _, ms := range obj[l.field] { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + } else { + l.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) } } return messages } + +func composeEventMetadata(claim sarama.ConsumerGroupClaim, handler *groupHandler, msg *sarama.ConsumerMessage) (time.Time, common.MapStr) { + timestamp := time.Now() + kafkaFields := common.MapStr{ + "topic": claim.Topic(), + "partition": claim.Partition(), + "offset": msg.Offset, + "key": string(msg.Key), + } + + version, versionOk := handler.version.Get() + if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { + timestamp = msg.Timestamp + if !msg.BlockTimestamp.IsZero() { + kafkaFields["block_timestamp"] = msg.BlockTimestamp + } + } + if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { + kafkaFields["headers"] = arrayForKafkaHeaders(msg.Headers) + } + return timestamp, kafkaFields +} + +func composeMessage(timestamp time.Time, content []byte, kafkaFields common.MapStr, ackHandler func()) reader.Message { + return reader.Message{ + Ts: timestamp, + Content: content, + Fields: common.MapStr{ + "kafka": kafkaFields, + "message": string(content), + }, + Meta: common.MapStr{ + "ackHandler": ackHandler, + }, + } +} + +func contains(elements []string, element string) bool { + for _, e := range elements { + if e == element { + return true + } + } + return false +} diff --git a/filebeat/input/kafka/input_test.go b/filebeat/input/kafka/input_test.go index e83c0e908a8..a239149cb48 100644 --- a/filebeat/input/kafka/input_test.go +++ b/filebeat/input/kafka/input_test.go @@ -22,15 +22,32 @@ package kafka import ( "testing" - "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/tests/resources" ) func TestNewInputDone(t *testing.T) { - config := common.MapStr{ + config := common.MustNewConfigFrom(common.MapStr{ "hosts": "localhost:9092", "topics": "messages", "group_id": "filebeat", - } - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) + }) + + AssertNotStartedInputCanBeDone(t, config) +} + +// AssertNotStartedInputCanBeDone checks that the context of an input can be +// done before starting the input, and it doesn't leak goroutines. This is +// important to confirm that leaks don't happen with CheckConfig. +func AssertNotStartedInputCanBeDone(t *testing.T, configMap *common.Config) { + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + config, err := common.NewConfigFrom(configMap) + require.NoError(t, err) + + _, err = Plugin().Manager.Create(config) + require.NoError(t, err) } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index c7ad9fc999c..2f168eb6daf 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -20,20 +20,22 @@ package kafka import ( + "context" "fmt" "math/rand" "os" "strconv" "strings" - "sync" "testing" "time" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" + beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" _ "github.com/elastic/beats/v7/libbeat/outputs/codec/format" @@ -45,41 +47,6 @@ const ( kafkaDefaultPort = "9092" ) -type eventInfo struct { - events []beat.Event -} - -type eventCapturer struct { - closed bool - c chan struct{} - closeOnce sync.Once - events chan beat.Event -} - -func NewEventCapturer(events chan beat.Event) channel.Outleter { - return &eventCapturer{ - c: make(chan struct{}), - events: events, - } -} - -func (o *eventCapturer) OnEvent(event beat.Event) bool { - o.events <- event - return true -} - -func (o *eventCapturer) Close() error { - o.closeOnce.Do(func() { - o.closed = true - close(o.c) - }) - return nil -} - -func (o *eventCapturer) Done() <-chan struct{} { - return o.c -} - type testMessage struct { message string headers []sarama.RecordHeader @@ -93,12 +60,7 @@ func recordHeader(key, value string) sarama.RecordHeader { } func TestInput(t *testing.T) { - id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) - testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) - context := input.Context{ - Done: make(chan struct{}), - BeatDone: make(chan struct{}), - } + testTopic := createTestTopicName() // Send test messages to the topic for the input to read. messages := []testMessage{ @@ -129,22 +91,10 @@ func TestInput(t *testing.T) { "wait_close": 0, }) - // Route input events through our capturer instead of sending through ES. - events := make(chan beat.Event, 100) - defer close(events) - capturer := NewEventCapturer(events) - defer capturer.Close() - connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { - return channel.SubOutlet(capturer), nil - }) - - input, err := NewInput(config, connector, context) - if err != nil { - t.Fatal(err) - } - - // Run the input and wait for finalization - input.Run() + client := beattest.NewChanClient(100) + defer client.Close() + events := client.Channel + input, cancel := run(t, config, client) timeout := time.After(30 * time.Second) for range messages { @@ -169,7 +119,7 @@ func TestInput(t *testing.T) { // Close the done channel and make sure the beat shuts down in a reasonable // amount of time. - close(context.Done) + cancel() didClose := make(chan struct{}) go func() { input.Wait() @@ -184,12 +134,7 @@ func TestInput(t *testing.T) { } func TestInputWithMultipleEvents(t *testing.T) { - id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) - testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) - context := input.Context{ - Done: make(chan struct{}), - BeatDone: make(chan struct{}), - } + testTopic := createTestTopicName() // Send test messages to the topic for the input to read. message := testMessage{ @@ -209,40 +154,155 @@ func TestInputWithMultipleEvents(t *testing.T) { "expand_event_list_from_field": "records", }) - // Route input events through our capturer instead of sending through ES. - events := make(chan beat.Event, 100) - defer close(events) - capturer := NewEventCapturer(events) - defer capturer.Close() - connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { - return channel.SubOutlet(capturer), nil - }) + client := beattest.NewChanClient(100) + defer client.Close() + events := client.Channel + input, cancel := run(t, config, client) - input, err := NewInput(config, connector, context) - if err != nil { - t.Fatal(err) + timeout := time.After(30 * time.Second) + select { + case event := <-events: + text, err := event.Fields.GetValue("message") + if err != nil { + t.Fatal(err) + } + msgs := []string{"{\"val\":\"val1\"}", "{\"val\":\"val2\"}"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") } - // Run the input and wait for finalization - input.Run() + cancel() + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } +} + +func TestInputWithJsonPayload(t *testing.T) { + testTopic := createTestTopicName() + + // Send test message to the topic for the input to read. + message := testMessage{ + message: "{\"val\":\"val1\"}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "parsers": []common.MapStr{ + { + "ndjson": common.MapStr{ + "target": "", + }, + }, + }, + }) + + client := beattest.NewChanClient(100) + defer client.Close() + events := client.Channel + input, cancel := run(t, config, client) timeout := time.After(30 * time.Second) select { case event := <-events: - text, err := event.Fields.GetValue("message") + text, err := event.Fields.GetValue("val") if err != nil { t.Fatal(err) } - msgs := []string{"{\"val\":\"val1\"}", "{\"val\":\"val2\"}"} + msgs := []string{"val1"} assert.Contains(t, msgs, text) checkMatchingHeaders(t, event, message.headers) case <-timeout: t.Fatal("timeout waiting for incoming events") } + cancel() + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } +} + +func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) { + testTopic := createTestTopicName() + + // Send test messages to the topic for the input to read. + message := testMessage{ + message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "expand_event_list_from_field": "records", + "parsers": []common.MapStr{ + { + "ndjson": common.MapStr{ + "target": "", + }, + }, + }, + }) + + client := beattest.NewChanClient(100) + defer client.Close() + events := client.Channel + input, cancel := run(t, config, client) + + timeout := time.After(30 * time.Second) + for i := 0; i < 2; i++ { + select { + case event := <-events: + text, err := event.Fields.GetValue("val") + if err != nil { + t.Fatal(err) + } + msgs := []string{"val1", "val2"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") + } + } + + cancel() // Close the done channel and make sure the beat shuts down in a reasonable // amount of time. - close(context.Done) didClose := make(chan struct{}) go func() { input.Wait() @@ -256,6 +316,44 @@ func TestInputWithMultipleEvents(t *testing.T) { } } +func TestTest(t *testing.T) { + testTopic := createTestTopicName() + + // Send test messages to the topic for the input to read. + message := testMessage{ + message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + }) + + inp, err := Plugin().Manager.Create(config) + if err != nil { + t.Fatal(err) + } + + err = inp.Test(v2.TestContext{ + Logger: logp.NewLogger("kafka_test"), + }) + if err != nil { + t.Fatal(err) + } +} + +func createTestTopicName() string { + id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) + testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) + return testTopic +} + func findMessage(t *testing.T, text string, msgs []testMessage) *testMessage { var msg *testMessage for _, m := range msgs { @@ -274,23 +372,19 @@ func checkMatchingHeaders( ) { kafka, err := event.Fields.GetValue("kafka") if err != nil { - t.Error(err) - return + t.Fatal(err) } kafkaMap, ok := kafka.(common.MapStr) if !ok { - t.Error("event.Fields.kafka isn't MapStr") - return + t.Fatal("event.Fields.kafka isn't MapStr") } headers, err := kafkaMap.GetValue("headers") if err != nil { - t.Error(err) - return + t.Fatal(err) } headerArray, ok := headers.([]string) if !ok { - t.Error("event.Fields.kafka.headers isn't a []string") - return + t.Fatal("event.Fields.kafka.headers isn't a []string") } assert.Equal(t, len(expected), len(headerArray)) for i := 0; i < len(expected); i++ { @@ -357,3 +451,27 @@ func writeToKafkaTopic( t.Fatal(err) } } + +func run(t *testing.T, cfg *common.Config, client *beattest.ChanClient) (*kafkaInput, func()) { + inp, err := Plugin().Manager.Create(cfg) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := newV2Context() + t.Cleanup(cancel) + + pipeline := beattest.ConstClient(client) + input := inp.(*kafkaInput) + go input.Run(ctx, pipeline) + return input, cancel +} + +func newV2Context() (v2.Context, func()) { + ctx, cancel := context.WithCancel(context.Background()) + return v2.Context{ + Logger: logp.NewLogger("kafka_test"), + ID: "test_id", + Cancelation: ctx, + }, cancel +}