Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/websocket: do minor clean-up in main loop #40145

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 30 additions & 34 deletions x-pack/filebeat/input/websocket/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
package websocket

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/url"
"reflect"
"time"

"github.com/google/cel-go/cel"
"github.com/gorilla/websocket"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/structpb"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -109,7 +112,15 @@
headers := formHeader(cfg)
c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers)
if resp != nil && resp.Body != nil {
log.Debugw("websocket connection response", "body", resp.Body)
var buf bytes.Buffer
if log.Core().Enabled(zapcore.DebugLevel) {
const limit = 1e4
io.CopyN(&buf, resp.Body, limit)

Check failure on line 118 in x-pack/filebeat/input/websocket/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `io.CopyN` is not checked (errcheck)
}
if n, _ := io.Copy(io.Discard, resp.Body); n != 0 && buf.Len() != 0 {
buf.WriteString("... truncated")
}
log.Debugw("websocket connection response", "body", &buf)
resp.Body.Close()
}
if err != nil {
Expand All @@ -119,41 +130,26 @@
}
defer c.Close()

done := make(chan error)

go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
metrics.errorsTotal.Inc()
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
log.Errorw("websocket connection closed", "error", err)
} else {
log.Errorw("failed to read websocket data", "error", err)
}
done <- err
return
}
metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
log.Debugw("received websocket message", logp.Namespace("websocket"), string(message))
err = i.processAndPublishData(ctx, metrics, prg, ast, state, cursor, pub, log)
if err != nil {
metrics.errorsTotal.Inc()
log.Errorw("failed to process and publish data", "error", err)
done <- err
return
for {
_, message, err := c.ReadMessage()
if err != nil {
metrics.errorsTotal.Inc()
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
log.Errorw("websocket connection closed", "error", err)
} else {
log.Errorw("failed to read websocket data", "error", err)
}
return err
}
metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
log.Debugw("received websocket message", logp.Namespace("websocket"), string(message))
err = i.processAndPublishData(ctx, metrics, prg, ast, state, cursor, pub, log)
if err != nil {
metrics.errorsTotal.Inc()
log.Errorw("failed to process and publish data", "error", err)
return err
}
}()

// blocks until done is closed, context is cancelled or an error is received
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}

Expand Down
Loading