Skip to content

Commit

Permalink
Use IndexPrefix for kafka and logstash output. (elastic#10841)
Browse files Browse the repository at this point in the history
Output index differed to Elasticsearch output.

fixes elastic#10839
  • Loading branch information
simitt committed Feb 25, 2019
1 parent 4b21d37 commit 5e4c60b
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 45 deletions.
8 changes: 8 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func defaultConfig() kafkaConfig {
}
}

func readConfig(cfg *common.Config) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, err
}
return &c, nil
}

func (c *kafkaConfig) Validate() error {
if len(c.Hosts) == 0 {
return errors.New("no hosts configured")
Expand Down
13 changes: 4 additions & 9 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,13 @@ func TestConfigAcceptValid(t *testing.T) {
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
c, err := common.NewConfigFrom(test)
c := common.MustNewConfigFrom(test)
c.SetString("hosts", 0, "localhost")
cfg, err := readConfig(c)
if err != nil {
t.Fatalf("Can not create test configuration: %v", err)
}
c.SetString("hosts", 0, "localhost")

cfg := defaultConfig()
if err := c.Unpack(&cfg); err != nil {
t.Fatalf("Unpacking configuration failed: %v", err)
}

if _, err := newSaramaConfig(&cfg); err != nil {
if _, err := newSaramaConfig(cfg); err != nil {
t.Fatalf("Failure creating sarama config: %v", err)
}
})
Expand Down
8 changes: 4 additions & 4 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func makeKafka(
) (outputs.Group, error) {
debugf("initialize kafka output")

config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
config, err := readConfig(cfg)
if err != nil {
return outputs.Fail(err)
}

Expand All @@ -91,7 +91,7 @@ func makeKafka(
return outputs.Fail(err)
}

libCfg, err := newSaramaConfig(&config)
libCfg, err := newSaramaConfig(config)
if err != nil {
return outputs.Fail(err)
}
Expand All @@ -106,7 +106,7 @@ func makeKafka(
return outputs.Fail(err)
}

client, err := newKafkaClient(observer, hosts, beat.Beat, config.Key, topic, codec, libCfg)
client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, codec, libCfg)
if err != nil {
return outputs.Fail(err)
}
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestKafkaPublish(t *testing.T) {
}

t.Run(name, func(t *testing.T) {
grp, err := makeKafka(beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), cfg)
grp, err := makeKafka(beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg)
if err != nil {
t.Fatal(err)
}
Expand All @@ -208,6 +208,7 @@ func TestKafkaPublish(t *testing.T) {
if err := output.Connect(); err != nil {
t.Fatal(err)
}
assert.Equal(t, output.index, "testbeat")
defer output.Close()

// publish test events
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestAsyncStructuredEvent(t *testing.T) {
}

func makeAsyncTestClient(conn *transport.Client) testClientDriver {
config := defaultConfig
config := defaultConfig()
config.Timeout = 1 * time.Second
config.Pipelining = 3
client, err := newAsyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config)
Expand Down
55 changes: 37 additions & 18 deletions libbeat/outputs/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ package logstash
import (
"time"

"github.com/elastic/beats/libbeat/beat"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/outputs/transport"
)
Expand All @@ -46,24 +50,39 @@ type Backoff struct {
Max time.Duration
}

var defaultConfig = Config{
Port: 5044,
LoadBalance: false,
Pipelining: 2,
BulkMaxSize: 2048,
SlowStart: false,
CompressionLevel: 3,
Timeout: 30 * time.Second,
MaxRetries: 3,
TTL: 0 * time.Second,
Backoff: Backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
EscapeHTML: true,
func defaultConfig() Config {
return Config{
Port: 5044,
LoadBalance: false,
Pipelining: 2,
BulkMaxSize: 2048,
SlowStart: false,
CompressionLevel: 3,
Timeout: 30 * time.Second,
MaxRetries: 3,
TTL: 0 * time.Second,
Backoff: Backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
EscapeHTML: false,
}
}

func newConfig() *Config {
c := defaultConfig
return &c
func readConfig(cfg *common.Config, info beat.Info) (*Config, error) {
c := defaultConfig()

if err := cfg.Unpack(&c); err != nil {
return nil, err
}

if cfg.HasField("port") {
cfgwarn.Deprecate("7.0.0", "The Logstash outputs port setting")
}

if c.Index == "" {
c.Index = info.IndexPrefix
}

return &c, nil
}
100 changes: 100 additions & 0 deletions libbeat/outputs/logstash/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package logstash

import (
"testing"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"

"github.com/stretchr/testify/assert"
)

func TestConfig(t *testing.T) {

info := beat.Info{Beat: "testbeat", Name: "foo", IndexPrefix: "bar"}
for name, test := range map[string]struct {
config *common.Config
expectedConfig *Config
err bool
}{
"default config": {
config: common.MustNewConfigFrom([]byte(`{ }`)),
expectedConfig: &Config{
LoadBalance: false,
Pipelining: 2,
BulkMaxSize: 2048,
SlowStart: false,
CompressionLevel: 3,
Timeout: 30 * time.Second,
MaxRetries: 3,
TTL: 0 * time.Second,
Backoff: Backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
EscapeHTML: false,
Index: "bar",
},
},
"config given": {
config: common.MustNewConfigFrom(common.MapStr{
"index": "beat-index",
"loadbalance": true,
"bulk_max_size": 1024,
"slow_start": false,
}),
expectedConfig: &Config{
LoadBalance: true,
BulkMaxSize: 1024,
Pipelining: 2,
SlowStart: false,
CompressionLevel: 3,
Timeout: 30 * time.Second,
MaxRetries: 3,
TTL: 0 * time.Second,
Backoff: Backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
EscapeHTML: false,
Index: "beat-index",
},
},
"removed config setting": {
config: common.MustNewConfigFrom(common.MapStr{
"port": "8080",
}),
expectedConfig: nil,
err: true,
},
} {
t.Run(name, func(t *testing.T) {
cfg, err := readConfig(test.config, info)
if test.err {
assert.Error(t, err)
assert.Nil(t, cfg)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expectedConfig, cfg)
}
})
}
}
12 changes: 2 additions & 10 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package logstash
import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
Expand All @@ -43,19 +42,12 @@ func makeLogstash(
observer outputs.Observer,
cfg *common.Config,
) (outputs.Group, error) {
if !cfg.HasField("index") {
cfg.SetString("index", -1, beat.Beat)
}

config := newConfig()
if err := cfg.Unpack(config); err != nil {
config, err := readConfig(cfg, beat)
if err != nil {
return outputs.Fail(err)
}

if cfg.HasField("port") {
cfgwarn.Deprecate("7.0.0", "The Logstash outputs port setting")
}

hosts, err := outputs.ReadHostList(cfg)
if err != nil {
return outputs.Fail(err)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newClientServerTCP(t *testing.T, to time.Duration) *clientServer {
}

func makeTestClient(conn *transport.Client) testClientDriver {
config := defaultConfig
config := defaultConfig()
config.Timeout = 1 * time.Second
config.TTL = 5 * time.Second
client, err := newSyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestShrinkWindowSizeNeverZero(t *testing.T) {

windowSize := 124
var w window
w.init(windowSize, defaultConfig.BulkMaxSize)
w.init(windowSize, defaultConfig().BulkMaxSize)

w.windowSize = int32(windowSize)
for i := 0; i < 100; i++ {
Expand Down

0 comments on commit 5e4c60b

Please sign in to comment.