Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.15](backport #40572) [libbeat] Stop publisher properly #40617

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ https:/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
- Update Go version to 1.22.6. {pull}40528[40528]
- Aborts all active connections for Elasticsearch output. {pull}40572[40572]
- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572]

*Auditbeat*

Expand Down
29 changes: 20 additions & 9 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"runtime/debug"
"strconv"
"strings"
"sync"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -386,6 +387,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}
outputFactory := b.makeOutputFactory(b.Config.Output)
settings := pipeline.Settings{
// Since now publisher is closed on Stop, we want to give some
// time to ack any pending events by default to avoid
// changing on stop behavior too much.
WaitClose: time.Second,
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
}
Expand All @@ -396,10 +401,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer pipeline.Close()

b.Publisher = publisher
beater, err := bt(&b.Beat, sub)
if err != nil {
Expand Down Expand Up @@ -512,22 +513,32 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
}

ctx, cancel := context.WithCancel(context.Background())

// stopBeat must be idempotent since it will be called both from a signal and by the manager.
// Since publisher.Close is not safe to be called more than once this is necessary.
var once sync.Once
stopBeat := func() {
b.Instrumentation.Tracer().Close()
beater.Stop()
once.Do(func() {
b.Instrumentation.Tracer().Close()
// If the publisher has a Close() method, call it before stopping the beater.
if c, ok := b.Publisher.(io.Closer); ok {
c.Close()
}
beater.Stop()
})
}
svc.HandleSignals(stopBeat, cancel)

// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(stopBeat)

err = b.loadDashboards(ctx, false)
if err != nil {
return err
}

logp.Info("%s start running.", b.Info.Beat)

// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(beater.Stop)

err = beater.Run(&b.Beat)
if b.shouldReexec {
if err := b.reexec(); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eslegclient

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -62,6 +63,11 @@ type Connection struct {
responseBuffer *bytes.Buffer

isServerless bool

// requests will share the same cancellable context
// so they can be aborted on Close()
reqsContext context.Context
cancelReqs func()
}

// ConnectionSettings are the settings needed for a Connection
Expand Down Expand Up @@ -178,12 +184,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
logger.Info("kerberos client created")
}

ctx, cancelFunc := context.WithCancel(context.Background())
conn := Connection{
ConnectionSettings: s,
HTTP: esClient,
Encoder: encoder,
log: logger,
responseBuffer: bytes.NewBuffer(nil),
reqsContext: ctx,
cancelReqs: cancelFunc,
}

if s.APIKey != "" {
Expand Down Expand Up @@ -317,6 +326,7 @@ func (conn *Connection) Ping() (ESPingData, error) {
// Close closes a connection.
func (conn *Connection) Close() error {
conn.HTTP.CloseIdleConnections()
conn.cancelReqs()
return nil
}

Expand Down Expand Up @@ -391,7 +401,7 @@ func (conn *Connection) execRequest(
method, url string,
body io.Reader,
) (int, []byte, error) {
req, err := http.NewRequest(method, url, body) //nolint:noctx // keep legacy behaviour
req, err := http.NewRequestWithContext(conn.reqsContext, method, url, body)
if err != nil {
conn.log.Warnf("Failed to create request %+v", err)
return 0, nil, err
Expand Down
11 changes: 1 addition & 10 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func (c *outputController) WaitClose(timeout time.Duration) error {
c.consumer.close()
close(c.workerChan)

// Signal the output workers to close. This step is a hint, and carries
// no guarantees. For example, on close the Elasticsearch output workers
// will close idle connections, but will not change any behavior for
// active connections, giving any remaining events a chance to ingest
// before we terminate.
// Signal the output workers to close.
for _, out := range c.workers {
out.Close()
}
Expand Down Expand Up @@ -209,11 +205,6 @@ func (c *outputController) closeQueue(timeout time.Duration) {
// pipeline but it was shut down before any output was set.
// In this case, return nil and Pipeline.ConnectWith will pass on a
// real error to the caller.
// NOTE: under the current shutdown process, Pipeline.Close (and hence
// outputController.Close) is ~never called. So even if we did have
// blocked callers here, in a real shutdown they will never be woken
// up. But in hopes of a day when the shutdown process is more robust,
// I've decided to do the right thing here anyway.
req.responseChan <- nil
}
}
Expand Down
22 changes: 15 additions & 7 deletions libbeat/tests/integration/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package integration

import (
"encoding/json"
"io/ioutil"
"io"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -57,12 +57,14 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066")
r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
r.Body.Close()
require.NoError(t, err)
var m map[string]interface{}
err = json.Unmarshal(body, &m)
Expand All @@ -88,12 +90,14 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/stats")
r, err := http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
r.Body.Close()
require.NoError(t, err)
var m Stats

Expand Down Expand Up @@ -121,8 +125,10 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/not-exist")
r, err := http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests
r.Body.Close()
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}
Expand All @@ -143,8 +149,10 @@ output.console:
mockbeat.WriteConfigFile(cfg)
mockbeat.Start()
mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second)
time.Sleep(time.Second)

r, err := http.Get("http://localhost:5066/debug/pprof/")
r, err := http.Get("http://localhost:5066/debug/pprof/") //nolint:noctx // fine for tests
r.Body.Close()
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}
Loading