Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into support-ephemeral…
Browse files Browse the repository at this point in the history
…-container-k8s-provider
  • Loading branch information
MichaelKatsoulis committed Sep 7, 2021
2 parents 7b4d126 + 9d4597a commit 094f149
Show file tree
Hide file tree
Showing 28 changed files with 2,146 additions and 483 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ https:/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update indentation for azure filebeat configuration. {pull}26604[26604]
- Auditd: Fix Top Exec Commands dashboard visualization. {pull}27638[27638]
- Store offset in `log.offset` field of events from the filestream input. {pull}27688[27688]
- Fix `httpjson` input rate limit processing and documentation. {pull}[]

*Heartbeat*

Expand Down Expand Up @@ -518,6 +519,8 @@ https:/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- update ECS field definitions to ECS 1.11.0. {pull}27107[27107]
- The disk queue is now GA. {pull}27515[27515]
- Allow non-padded base64 data to be decoded by decode_base64_field {pull}27311[27311], {issue}27021[27021]
- The Kafka support library Sarama has been updated to 1.29.1. {pull}27717[27717]
- Kafka is now supported up to version 2.8.0. {pull}27720[27720]

*Auditbeat*

Expand Down Expand Up @@ -738,6 +741,7 @@ https:/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Move AWS module and filesets to GA. {pull}27428[27428]
- update ecs.version to ECS 1.11.0. {pull}27107[27107]
- Add base64 Encode functionality to httpjson input. {pull}27681[27681]
- Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735]


*Heartbeat*
Expand Down
27 changes: 16 additions & 11 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pipeline {
RUNBLD_DISABLE_NOTIFICATIONS = 'true'
SLACK_CHANNEL = "#beats-build"
SNAPSHOT = 'true'
TERRAFORM_VERSION = "0.12.30"
TERRAFORM_VERSION = "0.13.7"
XPACK_MODULE_PATTERN = '^x-pack\\/[a-z0-9]+beat\\/module\\/([^\\/]+)\\/.*'
}
options {
Expand Down Expand Up @@ -277,14 +277,16 @@ def generateStages(Map args = [:]) {
}

def cloud(Map args = [:]) {
withNode(labels: args.label, forceWorkspace: true){
startCloudTestEnv(name: args.directory, dirs: args.dirs)
}
withCloudTestEnv() {
try {
target(context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id)
} finally {
terraformCleanup(name: args.directory, dir: args.directory)
withGithubNotify(context: args.context) {
withNode(labels: args.label, forceWorkspace: true){
startCloudTestEnv(name: args.directory, dirs: args.dirs)
}
withCloudTestEnv() {
try {
target(context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id)
} finally {
terraformCleanup(name: args.directory, dir: args.directory)
}
}
}
}
Expand Down Expand Up @@ -838,8 +840,10 @@ def withCloudTestEnv(Closure body) {
def maskedVars = []
def testTags = "${env.TEST_TAGS}"

// AWS
if (params.allCloudTests || params.awsCloudTests) {
// Allow AWS credentials when the build was configured to do so with:
// - the cloudtests build parameters
// - the aws github label
if (params.allCloudTests || params.awsCloudTests || matchesPrLabel(label: 'aws')) {
testTags = "${testTags},aws"
def aws = getVaultSecret(secret: "${AWS_ACCOUNT_SECRET}").data
if (!aws.containsKey('access_key')) {
Expand Down Expand Up @@ -891,6 +895,7 @@ def startCloudTestEnv(Map args = [:]) {
// If it failed then cleanup without failing the build
sh(label: 'Terraform Cleanup', script: ".ci/scripts/terraform-cleanup.sh ${folder}", returnStatus: true)
}
error('startCloudTestEnv: terraform apply failed.')
} finally {
// Archive terraform states in case manual cleanup is needed.
archiveArtifacts(allowEmptyArchive: true, artifacts: '**/terraform.tfstate')
Expand Down
1,210 changes: 1,155 additions & 55 deletions NOTICE.txt

Large diffs are not rendered by default.

67 changes: 66 additions & 1 deletion filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ link:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecos
[[kafka-input-compatibility]]
==== Compatibility

This input works with all Kafka versions in between 0.11 and 2.1.0. Older versions
This input works with all Kafka versions in between 0.11 and 2.8.0. Older versions
might work as well, but are not supported.

[id="{beatname_lc}-input-{type}-options"]
Expand Down Expand Up @@ -168,6 +168,71 @@ Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.

[float]
===== `parsers`

This option expects a list of parsers that the payload has to go through.

Available parsers:

* `ndjson`
* `multiline`

[float]
===== `ndjson`

These options make it possible for {beatname_uc} to decode the payload as
JSON messages.

Example configuration:

[source,yaml]
----
- ndjson:
keys_under_root: true
add_error_key: true
message_key: log
----

*`keys_under_root`*:: By default, the decoded JSON is placed under a "json" key
in the output document. If you enable this setting, the keys are copied top
level in the output document. The default is false.

*`overwrite_keys`*:: If `keys_under_root` and this setting are enabled, then the
values from the decoded JSON object overwrite the fields that {beatname_uc}
normally adds (type, source, offset, etc.) in case of conflicts.

*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively
de-dot keys in the decoded JSON, and expand them into a hierarchical object
structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
This setting should be enabled when the input is produced by an
https:/elastic/ecs-logging[ECS logger].

*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds an
"error.message" and "error.type: json" key in case of JSON unmarshalling errors
or when a `message_key` is defined in the configuration but cannot be used.

*`message_key`*:: An optional configuration setting that specifies a JSON key on
which to apply the line filtering and multiline settings. If specified the key
must be at the top level in the JSON object and the value associated with the
key must be a string, otherwise no filtering or multiline aggregation will
occur.

*`document_id`*:: Option configuration setting that specifies the JSON key to
set the document id. If configured, the field will be removed from the original
JSON document and stored in `@metadata._id`

*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
be logged. The default is false.

[float]
===== `multiline`

Options that control how {beatname_uc} deals with log messages that span
multiple lines. See <<multiline-examples>> for more information about
configuring multiline options.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
1 change: 0 additions & 1 deletion filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package inputs
import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/kafka"
"github.com/elastic/beats/v7/filebeat/input/unix"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -36,6 +37,7 @@ func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.P
func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin {
return []v2.Plugin{
filestream.Plugin(log, components),
kafka.Plugin(),
unix.Plugin(),
}
}
11 changes: 8 additions & 3 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/go-concert/unison"
)

type inputTestingEnvironment struct {
Expand All @@ -50,7 +51,8 @@ type inputTestingEnvironment struct {
pluginInitOnce sync.Once
plugin v2.Plugin

wg sync.WaitGroup
wg sync.WaitGroup
grp unison.TaskGroup
}

type registryEntry struct {
Expand All @@ -70,7 +72,9 @@ func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
}

func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) v2.Input {
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp, v2.ModeRun)
c := common.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -88,12 +92,13 @@ func (e *inputTestingEnvironment) getManager() v2.InputManager {

func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) {
e.wg.Add(1)
go func(wg *sync.WaitGroup) {
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
defer wg.Done()
defer grp.Stop()

inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx}
inp.Run(inputCtx, e.pipeline)
}(&e.wg)
}(&e.wg, &e.grp)
}

func (e *inputTestingEnvironment) waitUntilInputStops() {
Expand Down
1 change: 0 additions & 1 deletion filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func TestFilestreamCloseEOF(t *testing.T) {

// test_empty_lines from test_harvester.py
func TestFilestreamEmptyLine(t *testing.T) {
t.Skip("Flaky test https:/elastic/beats/issues/27585")
env := newInputTestingEnvironment(t)

testlogName := "test.log"
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/monitoring/adapter"
"github.com/elastic/beats/v7/libbeat/reader/parser"
)

type kafkaInputConfig struct {
Expand All @@ -53,6 +53,7 @@ type kafkaInputConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
Parsers parser.Config `config:",inline"`
}

type kafkaFetch struct {
Expand Down Expand Up @@ -215,7 +216,6 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) {
)

if err := k.Validate(); err != nil {
logp.Err("Invalid kafka configuration: %v", err)
return nil, err
}
return k, nil
Expand Down
Loading

0 comments on commit 094f149

Please sign in to comment.