Skip to content

Commit

Permalink
Fix elasticsearch re-connection after network error
Browse files Browse the repository at this point in the history
When the Elasticsearch client fails to publish events, it ends up
calling `Close` in the connection (that is reused). To cancel the
in-flight requests, the context is cancelled and a new one is created
to used in future requests.

The callback to check the version holds a reference to the connection
via a closure, now the Elasticsearch client holds a pointer to that
connection, so whenever Close is called, the callback can create a
request with the new, not cancelled, context.

An integration test is added to ensure the
ES output can always recover from network errors.
  • Loading branch information
belimawr committed Sep 12, 2024
1 parent df31948 commit 877dc31
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ https:/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Aborts all active connections for Elasticsearch output. {pull}40572[40572]
- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572]
- The journald input now restarts if there is an error/crash {issue}32782[32782] {pull}40558[40558]
- Fix Elasticsearch output not recovering from network errors {issue}40705[40705] {pull}40794[40794]

*Auditbeat*

Expand Down
53 changes: 53 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16057,6 +16057,29 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LI
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/mock-es
Version: v0.0.0-20240712014503-e5b47ece0015
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LICENSE:

Copyright 2024 Elasticsearch B.V.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/tk-btf
Version: v0.1.0
Expand Down Expand Up @@ -47725,6 +47748,36 @@ Contents of probable licence file $GOMODCACHE/github.com/matttproud/golang_proto
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/mileusna/useragent
Version: v1.3.4
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/mileusna/[email protected]/LICENSE.md:

MIT License

Copyright (c) 2017 Miloš Mileusnić

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/minio/asm2plan9s
Version: v0.0.0-20200509001527-cdd76441f9d8
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ require (
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff
github.com/elastic/mito v1.15.0
github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015
github.com/elastic/tk-btf v0.1.0
github.com/elastic/toutoumomoma v0.0.0-20240626215117-76e39db18dfb
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15
Expand Down Expand Up @@ -333,6 +334,7 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mileusna/useragent v1.3.4 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/u
github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elastic/mito v1.15.0 h1:MicOxLSVkgU2Aonbh3i+++66Wl5wvD8y9gALK8PQDYs=
github.com/elastic/mito v1.15.0/go.mod h1:J+wCf4HccW2YoSFmZMGu+d06gN+WmnIlj5ehBqine74=
github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 h1:z8cC8GASpPo8yKlbnXI36HQ/BM9wYjhBPNbDjAWm0VU=
github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015/go.mod h1:qH9DX/Dmflz6EAtaks/+2SsdQzecVAKE174Zl66hk7E=
github.com/elastic/pkcs8 v1.0.0 h1:HhitlUKxhN288kcNcYkjW6/ouvuwJWd9ioxpjnD9jVA=
github.com/elastic/pkcs8 v1.0.0/go.mod h1:ipsZToJfq1MxclVTwpG7U/bgeDtf+0HkUiOxebk95+0=
github.com/elastic/ristretto v0.1.1-0.20220602190459-83b0895ca5b3 h1:ChPwRVv1RR4a0cxoGjKcyWjTEpxYfm5gydMIzo32cAw=
Expand Down Expand Up @@ -1298,6 +1300,8 @@ github.com/miekg/dns v1.1.29/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7
github.com/miekg/dns v1.1.42 h1:gWGe42RGaIqXQZ+r3WUGEKBEtvPHY2SXo4dqixDNxuY=
github.com/miekg/dns v1.1.42/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4=
github.com/mileusna/useragent v0.0.0-20190129205925-3e331f0949a5/go.mod h1:JWhYAp2EXqUtsxTKdeGlY8Wp44M7VxThC9FEoNGi2IE=
github.com/mileusna/useragent v1.3.4 h1:MiuRRuvGjEie1+yZHO88UBYg8YBC/ddF6T7F56i3PCk=
github.com/mileusna/useragent v1.3.4/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
Expand Down
3 changes: 3 additions & 0 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ func (conn *Connection) Ping() (ESPingData, error) {
// Close closes a connection.
func (conn *Connection) Close() error {
conn.HTTP.CloseIdleConnections()
conn.cancelReqs()
// Creates a new context to be use in new requests
conn.reqsContext, conn.cancelReqs = context.WithCancel(context.Background())
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions libbeat/esleg/eslegclient/connection_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ func TestConnect(t *testing.T) {
assert.NoError(t, err)
}

func TestConnectionCanBeClosedAndReused(t *testing.T) {
conn := getTestingElasticsearch(t)
assert.NoError(t, conn.Connect(), "first connect must succeed")
assert.NoError(t, conn.Close(), "close must succeed")
assert.NoError(t, conn.Connect(), "calling connect after close must succeed")
}

func TestConnectWithProxy(t *testing.T) {
wrongPort, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ var (

// Client is an elasticsearch client.
type Client struct {
conn eslegclient.Connection
// We use a pointer here because the connection holds a context
// that needs to be recreated in case of error and the connection
// that is passed to this client is also used in a closure, we need
// to ensure both hold a reference to the same instance of the connection.
conn *eslegclient.Connection

indexSelector outputs.IndexSelector
pipelineSelector *outil.Selector
Expand Down Expand Up @@ -186,7 +190,7 @@ func NewClient(
pLogIndex.Start()
pLogIndexTryDeadLetter.Start()
client := &Client{
conn: *conn,
conn: conn,
indexSelector: s.indexSelector,
pipelineSelector: pipeline,
observer: observer,
Expand Down
142 changes: 142 additions & 0 deletions libbeat/tests/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build integration

package integration

import (
"errors"
"io"
"net/http"
"testing"
"time"

"github.com/gofrs/uuid/v5"
"github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/require"

"github.com/elastic/mock-es/pkg/api"
)

var esCfg = `
mockbeat:
logging:
level: debug
selectors:
- publisher_pipeline_output
- esclientleg
queue.mem:
events: 4096
flush.min_events: 8
flush.timeout: 0.1s
output.elasticsearch:
allow_older_versions: true
hosts:
- "http://localhost:4242"
backoff:
init: 0.1s
max: 0.2s
`

func TestESOutputRecoversFromNetworkError(t *testing.T) {
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(esCfg)

s, mr := startMockES(t, "localhost:4242")

mockbeat.Start()

// 1. Wait for one _bulk call
waitForEventToBePublished(t, mr)

// 2. Stop the mock-es server
if err := s.Close(); err != nil {
t.Fatalf("cannot close mock-es server: %s", err)
}

// 3. Wait for connection error logs
mockbeat.WaitForLogs(
`Get \"http://localhost:4242\": dial tcp 127.0.0.1:4242: connect: connection refused`,
2*time.Second,
"did not find connection refused error")

mockbeat.WaitForLogs(
"Attempting to reconnect to backoff(elasticsearch(http://localhost:4242)) with 2 reconnect attempt(s)",
2*time.Second,
"did not find two tries to reconnect")

// 4. Restart mock-es on the same port
s, mr = startMockES(t, "localhost:4242")

// 5. Wait for reconnection logs
mockbeat.WaitForLogs(
"Connection to backoff(elasticsearch(http://localhost:4242)) established",
5*time.Second, // There is a backoff, so ensure we wait enough
"did not find re connection confirmation")

// 6. Ensure one new call to _bulk is made
waitForEventToBePublished(t, mr)
s.Close()
}

func startMockES(t *testing.T, addr string) (*http.Server, metrics.Registry) {
uid := uuid.Must(uuid.NewV4())
mr := metrics.NewRegistry()
es := api.NewAPIHandler(uid, "foo2", mr, time.Now().Add(24*time.Hour), 0, 0, 0, 0, 0)

s := http.Server{Addr: addr, Handler: es}

Check failure on line 102 in libbeat/tests/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

G112: Potential Slowloris Attack because ReadHeaderTimeout is not configured in the http.Server (gosec)
go func() {
if err := s.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) {
t.Errorf("could not start mock-es server: %s", err)
}
}()

require.Eventually(t, func() bool {
resp, err := http.Get("http://" + addr)

Check failure on line 110 in libbeat/tests/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

net/http.Get must not be called (noctx)
if err != nil {
io.Copy(io.Discard, resp.Body)

Check failure on line 112 in libbeat/tests/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `io.Copy` is not checked (errcheck)
resp.Body.Close()
return false
}
return true
},
time.Second, time.Millisecond, "first server must be up")

return &s, mr
}

func waitForEventToBePublished(t *testing.T, mr metrics.Registry) {
t.Helper()
require.Eventually(t, func() bool {
total := mr.Get("bulk.create.total")
if total == nil {
return false
}

sc, ok := total.(*metrics.StandardCounter)
if !ok {
t.Fatalf("expecting 'bulk.create.total' to be *metrics.StandardCounter, but got '%T' instead",
total,
)
}

return sc.Count() > 1
},
10*time.Second, 100*time.Millisecond,
"at least one bulk request must be made")
}

0 comments on commit 877dc31

Please sign in to comment.