Skip to content

Commit

Permalink
[libbeat] Stop publisher properly (#40572)
Browse files Browse the repository at this point in the history
* Stop publisher properly

* Just call beater.Stop from manager

* Delete duplicated lines

* Make call to stopBeat idempotent

* Add context at request creation to not break tracing

* Remove unused lint

* Add default WaitClose timeout

* Adjust wait on close time

* Add delay to account for the stop of the publisher

* Fix lint issues

* Fix lint issues

* Fix lint
  • Loading branch information
marc-gr authored Aug 26, 2024
1 parent af33fad commit 4808269
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ https:/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Allow port number 0 in the community ID flowhash processor {pull}40259[40259]
- Fix handling of escaped brackets in syslog structured data. {issue}40445[40445] {pull}40446[40446]
- Update Go version to 1.22.6. {pull}40528[40528]
- Aborts all active connections for Elasticsearch output. {pull}40572[40572]
- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572]

*Auditbeat*

Expand Down
29 changes: 20 additions & 9 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"runtime/debug"
"strconv"
"strings"
"sync"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -392,6 +393,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}
outputFactory := b.makeOutputFactory(b.Config.Output)
settings := pipeline.Settings{
// Since now publisher is closed on Stop, we want to give some
// time to ack any pending events by default to avoid
// changing on stop behavior too much.
WaitClose: time.Second,
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
}
Expand All @@ -402,10 +407,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer pipeline.Close()

b.Publisher = publisher
beater, err := bt(&b.Beat, sub)
if err != nil {
Expand Down Expand Up @@ -518,22 +519,32 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
}

ctx, cancel := context.WithCancel(context.Background())

// stopBeat must be idempotent since it will be called both from a signal and by the manager.
// Since publisher.Close is not safe to be called more than once this is necessary.
var once sync.Once
stopBeat := func() {
b.Instrumentation.Tracer().Close()
beater.Stop()
once.Do(func() {
b.Instrumentation.Tracer().Close()
// If the publisher has a Close() method, call it before stopping the beater.
if c, ok := b.Publisher.(io.Closer); ok {
c.Close()
}
beater.Stop()
})
}
svc.HandleSignals(stopBeat, cancel)

// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(stopBeat)

err = b.loadDashboards(ctx, false)
if err != nil {
return err
}

logp.Info("%s start running.", b.Info.Beat)

// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(beater.Stop)

err = beater.Run(&b.Beat)
if b.shouldReexec {
if err := b.reexec(); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eslegclient

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -62,6 +63,11 @@ type Connection struct {
responseBuffer *bytes.Buffer

isServerless bool

// requests will share the same cancellable context
// so they can be aborted on Close()
reqsContext context.Context
cancelReqs func()
}

// ConnectionSettings are the settings needed for a Connection
Expand Down Expand Up @@ -178,12 +184,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
logger.Info("kerberos client created")
}

ctx, cancelFunc := context.WithCancel(context.Background())
conn := Connection{
ConnectionSettings: s,
HTTP: esClient,
Encoder: encoder,
log: logger,
responseBuffer: bytes.NewBuffer(nil),
reqsContext: ctx,
cancelReqs: cancelFunc,
}

if s.APIKey != "" {
Expand Down Expand Up @@ -317,6 +326,7 @@ func (conn *Connection) Ping() (ESPingData, error) {
// Close closes a connection.
func (conn *Connection) Close() error {
conn.HTTP.CloseIdleConnections()
conn.cancelReqs()
return nil
}

Expand Down Expand Up @@ -391,7 +401,7 @@ func (conn *Connection) execRequest(
method, url string,
body io.Reader,
) (int, []byte, error) {
req, err := http.NewRequest(method, url, body) //nolint:noctx // keep legacy behaviour
req, err := http.NewRequestWithContext(conn.reqsContext, method, url, body)
if err != nil {
conn.log.Warnf("Failed to create request %+v", err)
return 0, nil, err
Expand Down
11 changes: 1 addition & 10 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func (c *outputController) WaitClose(timeout time.Duration) error {
c.consumer.close()
close(c.workerChan)

// Signal the output workers to close. This step is a hint, and carries
// no guarantees. For example, on close the Elasticsearch output workers
// will close idle connections, but will not change any behavior for
// active connections, giving any remaining events a chance to ingest
// before we terminate.
// Signal the output workers to close.
for _, out := range c.workers {
out.Close()
}
Expand Down Expand Up @@ -209,11 +205,6 @@ func (c *outputController) closeQueue(timeout time.Duration) {
// pipeline but it was shut down before any output was set.
// In this case, return nil and Pipeline.ConnectWith will pass on a
// real error to the caller.
// NOTE: under the current shutdown process, Pipeline.Close (and hence
// outputController.Close) is ~never called. So even if we did have
// blocked callers here, in a real shutdown they will never be woken
// up. But in hopes of a day when the shutdown process is more robust,
// I've decided to do the right thing here anyway.
req.responseChan <- nil
}
}
Expand Down
22 changes: 15 additions & 7 deletions libbeat/tests/integration/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package integration

import (
"encoding/json"
"io/ioutil"
"io"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -57,12 +57,14 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066")
r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
r.Body.Close()
require.NoError(t, err)
var m map[string]interface{}
err = json.Unmarshal(body, &m)
Expand All @@ -88,12 +90,14 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/stats")
r, err := http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
r.Body.Close()
require.NoError(t, err)
var m Stats

Expand Down Expand Up @@ -121,8 +125,10 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/not-exist")
r, err := http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests
r.Body.Close()
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}
Expand All @@ -143,8 +149,10 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/debug/pprof/")
r, err := http.Get("http://localhost:5066/debug/pprof/") //nolint:noctx // fine for tests
r.Body.Close()
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}

0 comments on commit 4808269

Please sign in to comment.