Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Stop publisher properly #40572

Merged
merged 15 commits into from
Aug 26, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ https:/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Allow port number 0 in the community ID flowhash processor {pull}40259[40259]
- Fix handling of escaped brackets in syslog structured data. {issue}40445[40445] {pull}40446[40446]
- Update Go version to 1.22.6. {pull}40528[40528]
- Aborts all active connections for Elasticsearch output. {pull}40572[40572]
- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572]

*Auditbeat*

Expand Down
29 changes: 20 additions & 9 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"runtime/debug"
"strconv"
"strings"
"sync"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -392,6 +393,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}
outputFactory := b.makeOutputFactory(b.Config.Output)
settings := pipeline.Settings{
// Since now publisher is closed on Stop, we want to give some
// time to ack any pending events by default to avoid
// changing on stop behavior too much.
WaitClose: time.Second,
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
}
Expand All @@ -402,10 +407,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer pipeline.Close()

b.Publisher = publisher
beater, err := bt(&b.Beat, sub)
if err != nil {
Expand Down Expand Up @@ -518,22 +519,32 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
}

ctx, cancel := context.WithCancel(context.Background())

// stopBeat must be idempotent since it will be called both from a signal and by the manager.
// Since publisher.Close is not safe to be called more than once this is necessary.
var once sync.Once
stopBeat := func() {
b.Instrumentation.Tracer().Close()
beater.Stop()
once.Do(func() {
b.Instrumentation.Tracer().Close()
// If the publisher has a Close() method, call it before stopping the beater.
if c, ok := b.Publisher.(io.Closer); ok {
c.Close()
}
beater.Stop()
})
}
svc.HandleSignals(stopBeat, cancel)

// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(stopBeat)

err = b.loadDashboards(ctx, false)
if err != nil {
return err
}

logp.Info("%s start running.", b.Info.Beat)

// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(beater.Stop)

err = beater.Run(&b.Beat)
if b.shouldReexec {
if err := b.reexec(); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eslegclient

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -62,6 +63,11 @@ type Connection struct {
responseBuffer *bytes.Buffer

isServerless bool

// requests will share the same cancellable context
// so they can be aborted on Close()
reqsContext context.Context
cancelReqs func()
}

// ConnectionSettings are the settings needed for a Connection
Expand Down Expand Up @@ -178,12 +184,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
logger.Info("kerberos client created")
}

ctx, cancelFunc := context.WithCancel(context.Background())
conn := Connection{
ConnectionSettings: s,
HTTP: esClient,
Encoder: encoder,
log: logger,
responseBuffer: bytes.NewBuffer(nil),
reqsContext: ctx,
cancelReqs: cancelFunc,
}

if s.APIKey != "" {
Expand Down Expand Up @@ -317,6 +326,7 @@ func (conn *Connection) Ping() (ESPingData, error) {
// Close closes a connection.
func (conn *Connection) Close() error {
conn.HTTP.CloseIdleConnections()
conn.cancelReqs()
return nil
}

Expand Down Expand Up @@ -391,7 +401,7 @@ func (conn *Connection) execRequest(
method, url string,
body io.Reader,
) (int, []byte, error) {
req, err := http.NewRequest(method, url, body) //nolint:noctx // keep legacy behaviour
req, err := http.NewRequestWithContext(conn.reqsContext, method, url, body)
if err != nil {
conn.log.Warnf("Failed to create request %+v", err)
return 0, nil, err
Expand Down
11 changes: 1 addition & 10 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func (c *outputController) WaitClose(timeout time.Duration) error {
c.consumer.close()
close(c.workerChan)

// Signal the output workers to close. This step is a hint, and carries
// no guarantees. For example, on close the Elasticsearch output workers
// will close idle connections, but will not change any behavior for
// active connections, giving any remaining events a chance to ingest
// before we terminate.
// Signal the output workers to close.
for _, out := range c.workers {
out.Close()
}
Expand Down Expand Up @@ -209,11 +205,6 @@ func (c *outputController) closeQueue(timeout time.Duration) {
// pipeline but it was shut down before any output was set.
// In this case, return nil and Pipeline.ConnectWith will pass on a
// real error to the caller.
// NOTE: under the current shutdown process, Pipeline.Close (and hence
// outputController.Close) is ~never called. So even if we did have
// blocked callers here, in a real shutdown they will never be woken
// up. But in hopes of a day when the shutdown process is more robust,
// I've decided to do the right thing here anyway.
req.responseChan <- nil
}
}
Expand Down
22 changes: 15 additions & 7 deletions libbeat/tests/integration/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package integration

import (
"encoding/json"
"io/ioutil"
"io"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -57,12 +57,14 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocker]
I see why just WaitForLogs("Starting stats endpoint" isn't enough:

s.log.Info("Starting stats endpoint")
go func(l net.Listener) {
s.log.Infof("Metrics endpoint listening on: %s (configured: %s)", l.Addr().String(), s.config.Host)
err := http.Serve(l, s.mux)

but we don't use time.Sleep anymore, unless there is no other way. However here there is a better alternative. See below


r, err := http.Get("http://localhost:5066")
r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")
Comment on lines +62 to 64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocker] cont.

you could use it instead

Suggested change
r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")
buff := &bytes.Buffer{}
require.Eventually(t, func() bool {
buff.Reset()
r, err := http.Get("http://localhost:5066")
if err != nil {
_, _ = fmt.Fprintf(buff, "stats endpoint not available: %v", err)
return false
}
if r.StatusCode != http.StatusOK {
_, _ = fmt.Fprintf(buff, "stats endpoint: bad HTTPnstatus: %s",
r.Status)
return false
}
return true
}, time.Second, 100*time.Millisecond,
"stats endpoint never become available: %s", buff)

if you want, you could even remove the WaitForLogs. Just don't mix WaitForLogs and Eventually because WaitForLogs uses eventually underneath.


body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
r.Body.Close()
require.NoError(t, err)
var m map[string]interface{}
err = json.Unmarshal(body, &m)
Expand All @@ -88,12 +90,14 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/stats")
r, err := http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
r.Body.Close()
require.NoError(t, err)
var m Stats

Expand Down Expand Up @@ -121,8 +125,10 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/not-exist")
r, err := http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests
r.Body.Close()
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}
Expand All @@ -143,8 +149,10 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/debug/pprof/")
r, err := http.Get("http://localhost:5066/debug/pprof/") //nolint:noctx // fine for tests
r.Body.Close()
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}
Loading