From 877dc3130da43a7fb7229c34c39ed6baf1ff2fb9 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 12 Sep 2024 12:56:52 -0400 Subject: [PATCH] Fix elasticsearch re-connection after network error When the Elasticsearch client fails to publish events, it ends up calling `Close` in the connection (that is reused). To cancel the in-flight requests, the context is cancelled and a new one is created to used in future requests. The callback to check the version holds a reference to the connection via a closure, now the Elasticsearch client holds a pointer to that connection, so whenever Close is called, the callback can create a request with the new, not cancelled, context. An integration test is added to ensure the ES output can always recover from network errors. --- CHANGELOG.next.asciidoc | 1 + NOTICE.txt | 53 +++++++ go.mod | 2 + go.sum | 4 + libbeat/esleg/eslegclient/connection.go | 3 + .../connection_integration_test.go | 7 + libbeat/outputs/elasticsearch/client.go | 8 +- .../tests/integration/elasticsearch_test.go | 142 ++++++++++++++++++ 8 files changed, 218 insertions(+), 2 deletions(-) create mode 100644 libbeat/tests/integration/elasticsearch_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1f0970ba3e37..d474fd7c4ff7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Aborts all active connections for Elasticsearch output. {pull}40572[40572] - Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572] - The journald input now restarts if there is an error/crash {issue}32782[32782] {pull}40558[40558] +- Fix Elasticsearch output not recovering from network errors {issue}40705[40705] {pull}40794[40794] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index b0a0438eb298..9dbe33a1c3e1 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -16057,6 +16057,29 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/mito@v1.15.0/LI limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/elastic/mock-es +Version: v0.0.0-20240712014503-e5b47ece0015 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/elastic/mock-es@v0.0.0-20240712014503-e5b47ece0015/LICENSE: + +Copyright 2024 Elasticsearch B.V. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/tk-btf Version: v0.1.0 @@ -47725,6 +47748,36 @@ Contents of probable licence file $GOMODCACHE/github.com/matttproud/golang_proto limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/mileusna/useragent +Version: v1.3.4 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/mileusna/useragent@v1.3.4/LICENSE.md: + +MIT License + +Copyright (c) 2017 Miloš Mileusnić + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + -------------------------------------------------------------------------------- Dependency : github.com/minio/asm2plan9s Version: v0.0.0-20200509001527-cdd76441f9d8 diff --git a/go.mod b/go.mod index 56318068f272..4d2056b2ad80 100644 --- a/go.mod +++ b/go.mod @@ -197,6 +197,7 @@ require ( github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff github.com/elastic/mito v1.15.0 + github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 github.com/elastic/tk-btf v0.1.0 github.com/elastic/toutoumomoma v0.0.0-20240626215117-76e39db18dfb github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15 @@ -333,6 +334,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/mileusna/useragent v1.3.4 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect diff --git a/go.sum b/go.sum index df15b7c47913..4b137cb26c20 100644 --- a/go.sum +++ b/go.sum @@ -602,6 +602,8 @@ github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/u github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/mito v1.15.0 h1:MicOxLSVkgU2Aonbh3i+++66Wl5wvD8y9gALK8PQDYs= github.com/elastic/mito v1.15.0/go.mod h1:J+wCf4HccW2YoSFmZMGu+d06gN+WmnIlj5ehBqine74= +github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 h1:z8cC8GASpPo8yKlbnXI36HQ/BM9wYjhBPNbDjAWm0VU= +github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015/go.mod h1:qH9DX/Dmflz6EAtaks/+2SsdQzecVAKE174Zl66hk7E= github.com/elastic/pkcs8 v1.0.0 h1:HhitlUKxhN288kcNcYkjW6/ouvuwJWd9ioxpjnD9jVA= github.com/elastic/pkcs8 v1.0.0/go.mod h1:ipsZToJfq1MxclVTwpG7U/bgeDtf+0HkUiOxebk95+0= github.com/elastic/ristretto v0.1.1-0.20220602190459-83b0895ca5b3 h1:ChPwRVv1RR4a0cxoGjKcyWjTEpxYfm5gydMIzo32cAw= @@ -1298,6 +1300,8 @@ github.com/miekg/dns v1.1.29/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7 github.com/miekg/dns v1.1.42 h1:gWGe42RGaIqXQZ+r3WUGEKBEtvPHY2SXo4dqixDNxuY= github.com/miekg/dns v1.1.42/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= github.com/mileusna/useragent v0.0.0-20190129205925-3e331f0949a5/go.mod h1:JWhYAp2EXqUtsxTKdeGlY8Wp44M7VxThC9FEoNGi2IE= +github.com/mileusna/useragent v1.3.4 h1:MiuRRuvGjEie1+yZHO88UBYg8YBC/ddF6T7F56i3PCk= +github.com/mileusna/useragent v1.3.4/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 6a22132080f9..2d3579e9a9b9 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -326,6 +326,9 @@ func (conn *Connection) Ping() (ESPingData, error) { // Close closes a connection. func (conn *Connection) Close() error { conn.HTTP.CloseIdleConnections() + conn.cancelReqs() + // Creates a new context to be use in new requests + conn.reqsContext, conn.cancelReqs = context.WithCancel(context.Background()) return nil } diff --git a/libbeat/esleg/eslegclient/connection_integration_test.go b/libbeat/esleg/eslegclient/connection_integration_test.go index b4e277ed1a6b..3b0afe575a48 100644 --- a/libbeat/esleg/eslegclient/connection_integration_test.go +++ b/libbeat/esleg/eslegclient/connection_integration_test.go @@ -45,6 +45,13 @@ func TestConnect(t *testing.T) { assert.NoError(t, err) } +func TestConnectionCanBeClosedAndReused(t *testing.T) { + conn := getTestingElasticsearch(t) + assert.NoError(t, conn.Connect(), "first connect must succeed") + assert.NoError(t, conn.Close(), "close must succeed") + assert.NoError(t, conn.Connect(), "calling connect after close must succeed") +} + func TestConnectWithProxy(t *testing.T) { wrongPort, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 70c4cc1cce50..e2282ea0eaf8 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -48,7 +48,11 @@ var ( // Client is an elasticsearch client. type Client struct { - conn eslegclient.Connection + // We use a pointer here because the connection holds a context + // that needs to be recreated in case of error and the connection + // that is passed to this client is also used in a closure, we need + // to ensure both hold a reference to the same instance of the connection. + conn *eslegclient.Connection indexSelector outputs.IndexSelector pipelineSelector *outil.Selector @@ -186,7 +190,7 @@ func NewClient( pLogIndex.Start() pLogIndexTryDeadLetter.Start() client := &Client{ - conn: *conn, + conn: conn, indexSelector: s.indexSelector, pipelineSelector: pipeline, observer: observer, diff --git a/libbeat/tests/integration/elasticsearch_test.go b/libbeat/tests/integration/elasticsearch_test.go new file mode 100644 index 000000000000..a076e767f279 --- /dev/null +++ b/libbeat/tests/integration/elasticsearch_test.go @@ -0,0 +1,142 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "errors" + "io" + "net/http" + "testing" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/rcrowley/go-metrics" + "github.com/stretchr/testify/require" + + "github.com/elastic/mock-es/pkg/api" +) + +var esCfg = ` +mockbeat: +logging: + level: debug + selectors: + - publisher_pipeline_output + - esclientleg +queue.mem: + events: 4096 + flush.min_events: 8 + flush.timeout: 0.1s +output.elasticsearch: + allow_older_versions: true + hosts: + - "http://localhost:4242" + backoff: + init: 0.1s + max: 0.2s +` + +func TestESOutputRecoversFromNetworkError(t *testing.T) { + mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test") + mockbeat.WriteConfigFile(esCfg) + + s, mr := startMockES(t, "localhost:4242") + + mockbeat.Start() + + // 1. Wait for one _bulk call + waitForEventToBePublished(t, mr) + + // 2. Stop the mock-es server + if err := s.Close(); err != nil { + t.Fatalf("cannot close mock-es server: %s", err) + } + + // 3. Wait for connection error logs + mockbeat.WaitForLogs( + `Get \"http://localhost:4242\": dial tcp 127.0.0.1:4242: connect: connection refused`, + 2*time.Second, + "did not find connection refused error") + + mockbeat.WaitForLogs( + "Attempting to reconnect to backoff(elasticsearch(http://localhost:4242)) with 2 reconnect attempt(s)", + 2*time.Second, + "did not find two tries to reconnect") + + // 4. Restart mock-es on the same port + s, mr = startMockES(t, "localhost:4242") + + // 5. Wait for reconnection logs + mockbeat.WaitForLogs( + "Connection to backoff(elasticsearch(http://localhost:4242)) established", + 5*time.Second, // There is a backoff, so ensure we wait enough + "did not find re connection confirmation") + + // 6. Ensure one new call to _bulk is made + waitForEventToBePublished(t, mr) + s.Close() +} + +func startMockES(t *testing.T, addr string) (*http.Server, metrics.Registry) { + uid := uuid.Must(uuid.NewV4()) + mr := metrics.NewRegistry() + es := api.NewAPIHandler(uid, "foo2", mr, time.Now().Add(24*time.Hour), 0, 0, 0, 0, 0) + + s := http.Server{Addr: addr, Handler: es} + go func() { + if err := s.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) { + t.Errorf("could not start mock-es server: %s", err) + } + }() + + require.Eventually(t, func() bool { + resp, err := http.Get("http://" + addr) + if err != nil { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return false + } + return true + }, + time.Second, time.Millisecond, "first server must be up") + + return &s, mr +} + +func waitForEventToBePublished(t *testing.T, mr metrics.Registry) { + t.Helper() + require.Eventually(t, func() bool { + total := mr.Get("bulk.create.total") + if total == nil { + return false + } + + sc, ok := total.(*metrics.StandardCounter) + if !ok { + t.Fatalf("expecting 'bulk.create.total' to be *metrics.StandardCounter, but got '%T' instead", + total, + ) + } + + return sc.Count() > 1 + }, + 10*time.Second, 100*time.Millisecond, + "at least one bulk request must be made") +}