From 28526fa0b7580cbedb500b500c35ce7ad135a8a9 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Aug 2024 17:33:56 +0200 Subject: [PATCH] [libbeat] Stop publisher properly (#40572) * Stop publisher properly * Just call beater.Stop from manager * Delete duplicated lines * Make call to stopBeat idempotent * Add context at request creation to not break tracing * Remove unused lint * Add default WaitClose timeout * Adjust wait on close time * Add delay to account for the stop of the publisher * Fix lint issues * Fix lint issues * Fix lint (cherry picked from commit 480826919329d700a9c70a7f30ea4941b2022100) --- CHANGELOG.next.asciidoc | 2 ++ libbeat/cmd/instance/beat.go | 29 ++++++++++++++++-------- libbeat/esleg/eslegclient/connection.go | 12 +++++++++- libbeat/publisher/pipeline/controller.go | 11 +-------- libbeat/tests/integration/http_test.go | 22 ++++++++++++------ 5 files changed, 49 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4dd7d83a032..430774bec2a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -76,6 +76,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] - Update Go version to 1.22.6. {pull}40528[40528] +- Aborts all active connections for Elasticsearch output. {pull}40572[40572] +- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572] *Auditbeat* diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index c15d9b8c200..939cdff56ea 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -35,6 +35,7 @@ import ( "runtime/debug" "strconv" "strings" + "sync" "time" "github.com/gofrs/uuid" @@ -386,6 +387,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { } outputFactory := b.makeOutputFactory(b.Config.Output) settings := pipeline.Settings{ + // Since now publisher is closed on Stop, we want to give some + // time to ack any pending events by default to avoid + // changing on stop behavior too much. + WaitClose: time.Second, Processors: b.processors, InputQueueSize: b.InputQueueSize, } @@ -396,10 +401,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) - // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, - // but refine publisher to disconnect clients on stop automatically - // defer pipeline.Close() - b.Publisher = publisher beater, err := bt(&b.Beat, sub) if err != nil { @@ -512,12 +513,25 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { } ctx, cancel := context.WithCancel(context.Background()) + + // stopBeat must be idempotent since it will be called both from a signal and by the manager. + // Since publisher.Close is not safe to be called more than once this is necessary. + var once sync.Once stopBeat := func() { - b.Instrumentation.Tracer().Close() - beater.Stop() + once.Do(func() { + b.Instrumentation.Tracer().Close() + // If the publisher has a Close() method, call it before stopping the beater. + if c, ok := b.Publisher.(io.Closer); ok { + c.Close() + } + beater.Stop() + }) } svc.HandleSignals(stopBeat, cancel) + // Allow the manager to stop a currently running beats out of bound. + b.Manager.SetStopCallback(stopBeat) + err = b.loadDashboards(ctx, false) if err != nil { return err @@ -525,9 +539,6 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { logp.Info("%s start running.", b.Info.Beat) - // Allow the manager to stop a currently running beats out of bound. - b.Manager.SetStopCallback(beater.Stop) - err = beater.Run(&b.Beat) if b.shouldReexec { if err := b.reexec(); err != nil { diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 122a47b8081..6f98935fab7 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -19,6 +19,7 @@ package eslegclient import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" @@ -62,6 +63,11 @@ type Connection struct { responseBuffer *bytes.Buffer isServerless bool + + // requests will share the same cancellable context + // so they can be aborted on Close() + reqsContext context.Context + cancelReqs func() } // ConnectionSettings are the settings needed for a Connection @@ -178,12 +184,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { logger.Info("kerberos client created") } + ctx, cancelFunc := context.WithCancel(context.Background()) conn := Connection{ ConnectionSettings: s, HTTP: esClient, Encoder: encoder, log: logger, responseBuffer: bytes.NewBuffer(nil), + reqsContext: ctx, + cancelReqs: cancelFunc, } if s.APIKey != "" { @@ -317,6 +326,7 @@ func (conn *Connection) Ping() (ESPingData, error) { // Close closes a connection. func (conn *Connection) Close() error { conn.HTTP.CloseIdleConnections() + conn.cancelReqs() return nil } @@ -391,7 +401,7 @@ func (conn *Connection) execRequest( method, url string, body io.Reader, ) (int, []byte, error) { - req, err := http.NewRequest(method, url, body) //nolint:noctx // keep legacy behaviour + req, err := http.NewRequestWithContext(conn.reqsContext, method, url, body) if err != nil { conn.log.Warnf("Failed to create request %+v", err) return 0, nil, err diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 4c27494fa68..4ac2373bcea 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -113,11 +113,7 @@ func (c *outputController) WaitClose(timeout time.Duration) error { c.consumer.close() close(c.workerChan) - // Signal the output workers to close. This step is a hint, and carries - // no guarantees. For example, on close the Elasticsearch output workers - // will close idle connections, but will not change any behavior for - // active connections, giving any remaining events a chance to ingest - // before we terminate. + // Signal the output workers to close. for _, out := range c.workers { out.Close() } @@ -209,11 +205,6 @@ func (c *outputController) closeQueue(timeout time.Duration) { // pipeline but it was shut down before any output was set. // In this case, return nil and Pipeline.ConnectWith will pass on a // real error to the caller. - // NOTE: under the current shutdown process, Pipeline.Close (and hence - // outputController.Close) is ~never called. So even if we did have - // blocked callers here, in a real shutdown they will never be woken - // up. But in hopes of a day when the shutdown process is more robust, - // I've decided to do the right thing here anyway. req.responseChan <- nil } } diff --git a/libbeat/tests/integration/http_test.go b/libbeat/tests/integration/http_test.go index bb2f7bde924..41382ab9e09 100644 --- a/libbeat/tests/integration/http_test.go +++ b/libbeat/tests/integration/http_test.go @@ -21,7 +21,7 @@ package integration import ( "encoding/json" - "io/ioutil" + "io" "net/http" "testing" "time" @@ -57,12 +57,14 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066") + r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) + r.Body.Close() require.NoError(t, err) var m map[string]interface{} err = json.Unmarshal(body, &m) @@ -88,12 +90,14 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/stats") + r, err := http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) + r.Body.Close() require.NoError(t, err) var m Stats @@ -121,8 +125,10 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/not-exist") + r, err := http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests + r.Body.Close() require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } @@ -143,8 +149,10 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/debug/pprof/") + r, err := http.Get("http://localhost:5066/debug/pprof/") //nolint:noctx // fine for tests + r.Body.Close() require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") }