Skip to content

Commit

Permalink
Stop publisher properly
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-gr committed Aug 21, 2024
1 parent 8eb0f42 commit c75d9f9
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ https:/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Allow port number 0 in the community ID flowhash processor {pull}40259[40259]
- Fix handling of escaped brackets in syslog structured data. {issue}40445[40445] {pull}40446[40446]
- Update Go version to 1.22.6. {pull}40528[40528]
- Aborts all active connections for Elasticsearch output. {pull}[]
- Closes beat Publisher on beat stop and by the Agent manager. {issue}[] {pull}[]

*Auditbeat*

Expand Down
13 changes: 6 additions & 7 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer pipeline.Close()

b.Publisher = publisher
beater, err := bt(&b.Beat, sub)
if err != nil {
Expand Down Expand Up @@ -520,9 +516,15 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
ctx, cancel := context.WithCancel(context.Background())
stopBeat := func() {
b.Instrumentation.Tracer().Close()
// If the publisher has a Close() method, call it before stopping the beater.
if c, ok := b.Publisher.(io.Closer); ok {
c.Close()
}
beater.Stop()
}
svc.HandleSignals(stopBeat, cancel)
// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(stopBeat)

err = b.loadDashboards(ctx, false)
if err != nil {
Expand All @@ -531,9 +533,6 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {

logp.Info("%s start running.", b.Info.Beat)

// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(beater.Stop)

err = beater.Run(&b.Beat)
if b.shouldReexec {
if err := b.reexec(); err != nil {
Expand Down
11 changes: 11 additions & 0 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eslegclient

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -62,6 +63,11 @@ type Connection struct {
responseBuffer *bytes.Buffer

isServerless bool

// requests will share the same cancellable context
// so they can be aborted on Close()
reqsContext context.Context
cancelReqs func()
}

// ConnectionSettings are the settings needed for a Connection
Expand Down Expand Up @@ -178,12 +184,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
logger.Info("kerberos client created")
}

ctx, cancelFunc := context.WithCancel(context.Background())
conn := Connection{
ConnectionSettings: s,
HTTP: esClient,
Encoder: encoder,
log: logger,
responseBuffer: bytes.NewBuffer(nil),
reqsContext: ctx,
cancelReqs: cancelFunc,
}

if s.APIKey != "" {
Expand Down Expand Up @@ -317,6 +326,7 @@ func (conn *Connection) Ping() (ESPingData, error) {
// Close closes a connection.
func (conn *Connection) Close() error {
conn.HTTP.CloseIdleConnections()
conn.cancelReqs()
return nil
}

Expand Down Expand Up @@ -486,6 +496,7 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error)
req.Host = host
}

req = req.WithContext(conn.reqsContext)
resp, err := conn.HTTP.Do(req)
if err != nil {
return 0, nil, err
Expand Down
11 changes: 1 addition & 10 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func (c *outputController) WaitClose(timeout time.Duration) error {
c.consumer.close()
close(c.workerChan)

// Signal the output workers to close. This step is a hint, and carries
// no guarantees. For example, on close the Elasticsearch output workers
// will close idle connections, but will not change any behavior for
// active connections, giving any remaining events a chance to ingest
// before we terminate.
// Signal the output workers to close.
for _, out := range c.workers {
out.Close()
}
Expand Down Expand Up @@ -209,11 +205,6 @@ func (c *outputController) closeQueue(timeout time.Duration) {
// pipeline but it was shut down before any output was set.
// In this case, return nil and Pipeline.ConnectWith will pass on a
// real error to the caller.
// NOTE: under the current shutdown process, Pipeline.Close (and hence
// outputController.Close) is ~never called. So even if we did have
// blocked callers here, in a real shutdown they will never be woken
// up. But in hopes of a day when the shutdown process is more robust,
// I've decided to do the right thing here anyway.
req.responseChan <- nil
}
}
Expand Down

0 comments on commit c75d9f9

Please sign in to comment.