Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Monitor Retries #36147

Merged
merged 79 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
4a23bc8
Simplify task fns
andrewvc Jul 25, 2023
e930348
Checkpoint
andrewvc Jul 25, 2023
6a15943
Checkpoint
andrewvc Jul 25, 2023
bf71c22
Add browser config
andrewvc Jul 25, 2023
b32f58a
GENCHECK
andrewvc Jul 26, 2023
b3184b4
GENCHECK
andrewvc Jul 26, 2023
6e7cc5c
GENCHECK
andrewvc Jul 26, 2023
8d21c79
Working with wrappers
andrewvc Jul 26, 2023
d7fe482
tweaks
andrewvc Jul 26, 2023
f822a1b
Tweaks
andrewvc Jul 26, 2023
1016853
Tweaks
andrewvc Jul 26, 2023
e328ed4
Checkpoint
andrewvc Jul 28, 2023
712c682
Test improvements
andrewvc Jul 28, 2023
c1564ea
Checkpoint
andrewvc Jul 28, 2023
b03c74c
Merge remote-tracking branch 'origin/main' into retestsched
andrewvc Jul 28, 2023
74cc45c
Tests now pass
andrewvc Jul 28, 2023
85ad2ff
Some cleanup
andrewvc Jul 28, 2023
89b9444
Checkpoint
andrewvc Jul 28, 2023
fda313c
Default to one attempt
andrewvc Jul 28, 2023
8069e0f
Make the linter happy
andrewvc Jul 31, 2023
0911be3
Document new fields
andrewvc Jul 31, 2023
b9f739d
Document new fields
andrewvc Jul 31, 2023
2865f69
Remove unused vars
andrewvc Jul 31, 2023
a43874f
Fix scenario
andrewvc Jul 31, 2023
c98acbf
Merge remote-tracking branch 'origin/main' into retestsched
andrewvc Jul 31, 2023
930e34f
Fix legacy enrich test
andrewvc Jul 31, 2023
930e8c9
Remove superfluous check_group test
andrewvc Jul 31, 2023
87c4fab
FMT
andrewvc Jul 31, 2023
291a60a
Add new synthetics to allow testing of synthetics code on non-linux p…
andrewvc Aug 2, 2023
ebef751
Handle states more correctly
andrewvc Aug 2, 2023
c6128eb
Fix the thing where it doesn't work
andrewvc Aug 2, 2023
d94a2bf
Checkpoint
andrewvc Aug 3, 2023
eb3b2dd
Retry scenarios
andrewvc Aug 4, 2023
c176b39
Checkpoint, scenario tests pass
andrewvc Aug 10, 2023
cea50cc
All x-pack tests now pass
andrewvc Aug 10, 2023
1c6e07a
Heartbeat tests now pass
andrewvc Aug 10, 2023
a888614
Merge remote-tracking branch 'origin/main' into retestsched
andrewvc Aug 10, 2023
46dbbe9
Make linter happy
andrewvc Aug 10, 2023
2a3a3ee
Make linter happy
andrewvc Aug 10, 2023
d8ca32f
Make update happy
andrewvc Aug 10, 2023
cb3b4b7
Correctly preallocateA
andrewvc Aug 10, 2023
a97ebe2
More comments
andrewvc Aug 10, 2023
2f852e2
Merge remote-tracking branch 'origin/main' into retestsched
andrewvc Aug 10, 2023
add8785
Tweaks
andrewvc Aug 10, 2023
749d947
make the linter happy
andrewvc Aug 11, 2023
e52d549
make the linter happy
andrewvc Aug 11, 2023
766e7f7
build flag fix
andrewvc Aug 11, 2023
367693e
Make tests happier
andrewvc Aug 11, 2023
7ad8c33
Add synthetics tag to integ tests
andrewvc Aug 11, 2023
af47de8
Merge remote-tracking branch 'origin/main' into retestsched
andrewvc Aug 11, 2023
6af222c
Test improvements
andrewvc Aug 11, 2023
9a13a50
Fix test parallelization in scenarios
andrewvc Aug 11, 2023
49f1f29
Fix tag passing in xpack integ
andrewvc Aug 11, 2023
2cac5fc
Fix missing license header
andrewvc Aug 11, 2023
37d3c24
Fix some state continuity errors
andrewvc Aug 12, 2023
7d877e8
Debug missing deps / other synthetics issues
andrewvc Aug 15, 2023
db08ea2
Check whether browsers can be tested
andrewvc Aug 15, 2023
9e0e003
Add proper build tag
andrewvc Aug 15, 2023
7fabf7f
Cleanup
andrewvc Aug 15, 2023
7ffbdc8
Make linter happy
andrewvc Aug 15, 2023
c596457
Use a more recent node for synthetics testing
andrewvc Aug 15, 2023
b472e9f
Pass node JS env as map keyword
andrewvc Aug 15, 2023
7e6f539
Move browser check to mage
andrewvc Aug 15, 2023
001a10e
Set env
andrewvc Aug 15, 2023
7bdd0e8
Merge remote-tracking branch 'origin/main' into retestsched
andrewvc Aug 25, 2023
0f9e078
Incorporate PR feedback
andrewvc Aug 25, 2023
d06aa9b
Cleanup wrapers
andrewvc Aug 25, 2023
7afa14e
Retry only down monitors
andrewvc Aug 25, 2023
743e6af
Merge remote-tracking branch 'origin' into retestsched
andrewvc Aug 28, 2023
e6d1785
Add summarizer tests
andrewvc Aug 30, 2023
052d846
Fix broken tests
andrewvc Aug 30, 2023
19060ea
Fix typo
andrewvc Aug 30, 2023
a2222e7
Fix imports order
andrewvc Aug 30, 2023
5844e72
Add more test assertions
andrewvc Aug 30, 2023
8d0845b
Remove debugging int
andrewvc Aug 30, 2023
ed5ba55
Finish removing summary ID
andrewvc Aug 30, 2023
43b2441
Fix check group ids
andrewvc Aug 31, 2023
62542c1
Address PR Feedback
andrewvc Aug 31, 2023
8eeff78
Merge branch 'main' into retestsched
andrewvc Aug 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions heartbeat/monitors/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,19 @@ func Wrap(job Job, wrapper JobWrapper) Job {
return WrapAll(cont, wrapper), err
}
}

type StatefulWrapper[T any] interface {
Wrap(j Job) Job
}

type StatefulWrapperFactory[T any] func(rootJob Job) StatefulWrapper[T]

func WrapStateful[T StatefulWrapper[T]](makeSW StatefulWrapperFactory[T]) JobWrapper {
return func(j Job) Job {
return func(event *beat.Event) ([]Job, error) {
sw := makeSW(j)
conts, err := sw.Wrap(j)(event)
return WrapAll(conts, sw.Wrap), err
}
}
}
62 changes: 62 additions & 0 deletions heartbeat/monitors/jobs/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package jobs
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/eventext"
Expand All @@ -29,6 +30,67 @@ import (
"github.com/elastic/go-lookslike/testslike"
)

type TestSWrapper struct {
rootCounter int
contCounter int
}

func (tsw *TestSWrapper) Wrap(j Job) Job {
return func(event *beat.Event) ([]Job, error) {
tsw.contCounter++
eventext.MergeEventFields(event, mapstr.M{"cont_counter": tsw.contCounter, "root_counter": tsw.rootCounter})
return j(event)
}
}

func TestStatefulWrapper(t *testing.T) {
innerj1Fields := mapstr.M{
"innerjob": "innervalue",
"innercount": 1,
}
innerj2Fields := mapstr.M{
"innerjob": "innervalue",
"innercount": 2,
}

var j Job = func(event *beat.Event) ([]Job, error) {
eventext.MergeEventFields(event, innerj1Fields)

return []Job{
func(event *beat.Event) ([]Job, error) {
eventext.MergeEventFields(event, innerj2Fields)
return nil, nil
},
}, nil
}

wrapper := WrapStateful[*TestSWrapper](func(rootJob Job) StatefulWrapper[*TestSWrapper] { return &TestSWrapper{rootCounter: 1} })

// Run this ten times to ensure state is not carried across retries
for i := 0; i < 10; i++ {
events, err := ExecJobAndConts(t, wrapper(j))
assert.NoError(t, err)
assert.Len(t, events, 2)
testslike.Test(
t,
lookslike.Compose(
lookslike.MustCompile(innerj1Fields),
lookslike.MustCompile(mapstr.M{"cont_counter": 1, "root_counter": 1}),
),
events[0].Fields,
)
testslike.Test(
t,
lookslike.Compose(
lookslike.MustCompile(innerj2Fields),
lookslike.MustCompile(mapstr.M{"cont_counter": 2, "root_counter": 1}),
),
events[1].Fields,
)
}

}

func TestWrapAll(t *testing.T) {
type args struct {
jobs []Job
Expand Down
25 changes: 14 additions & 11 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Monitor struct {
// stats is the countersRecorder used to record lifecycle events
// for global metrics + telemetry
stats plugin.RegistryRecorder

monitorStateTracker *monitorstate.Tracker
}

// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe
Expand Down Expand Up @@ -125,15 +127,16 @@ func newMonitorUnsafe(
}

m := &Monitor{
stdFields: standardFields,
pluginName: pluginFactory.Name,
addTask: addTask,
configuredJobs: []*configuredJob{},
pubClient: pubClient,
internalsMtx: sync.Mutex{},
config: config,
stats: pluginFactory.Stats,
state: MON_INIT,
stdFields: standardFields,
pluginName: pluginFactory.Name,
addTask: addTask,
configuredJobs: []*configuredJob{},
pubClient: pubClient,
internalsMtx: sync.Mutex{},
config: config,
stats: pluginFactory.Stats,
state: MON_INIT,
monitorStateTracker: monitorstate.NewTracker(stateLoader, false),
}

if m.stdFields.ID == "" {
Expand All @@ -156,7 +159,7 @@ func newMonitorUnsafe(

var wrappedJobs []jobs.Job
if err == nil {
wrappedJobs = wrappers.WrapCommon(p.Jobs, m.stdFields, stateLoader)
wrappedJobs = wrappers.WrapCommon(p.Jobs, m.stdFields, stateLoader, 2)
} else {
// If we've hit an error at this point, still run on schedule, but always return an error.
// This way the error is clearly communicated through to kibana.
Expand All @@ -179,7 +182,7 @@ func newMonitorUnsafe(
// We need to use the lightweight wrapping for error jobs
// since browser wrapping won't write summaries, but the fake job here is
// effectively a lightweight job
wrappedJobs = wrappers.WrapLightweight(p.Jobs, m.stdFields, monitorstate.NewTracker(stateLoader, false))
wrappedJobs = wrappers.WrapLightweight(p.Jobs, m.stdFields, monitorstate.NewTracker(stateLoader, false), 2)
}

m.endpoints = p.Endpoints
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p Plugin) Close() error {

// RunWrapped runs the plug-in with the provided wrappers returning a channel of resultant events.
func (p Plugin) RunWrapped(fields stdfields.StdMonitorFields) chan *beat.Event {
wj := wrappers.WrapCommon(p.Jobs, fields, nil)
wj := wrappers.WrapCommon(p.Jobs, fields, nil, 2)
results := make(chan *beat.Event)

var runJob func(j jobs.Job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestStatesESLoader(t *testing.T) {

monID := etc.createTestMonitorStateInES(t, testStatus)
// Since we've continued this state it should register the initial state
ms := etc.tracker.getCurrentState(monID)
ms := etc.tracker.GetCurrentState(monID)
require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off")
requireMSStatusCount(t, ms, testStatus, 1)

Expand Down
2 changes: 2 additions & 0 deletions heartbeat/monitors/wrappers/monitorstate/monitorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
StatusUp StateStatus = "up"
StatusDown StateStatus = "down"
StatusFlapping StateStatus = "flap"
// Nil, essentially
StatusEmpty StateStatus = ""
)

func newMonitorState(sf stdfields.StdMonitorFields, status StateStatus, ctr int, flappingEnabled bool) *State {
Expand Down
12 changes: 10 additions & 2 deletions heartbeat/monitors/wrappers/monitorstate/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
t.mtx.Lock()
defer t.mtx.Unlock()

state := t.getCurrentState(sf)
state := t.GetCurrentState(sf)
if state == nil {
state = newMonitorState(sf, newStatus, 0, t.flappingEnabled)
logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String())
Expand All @@ -74,7 +74,15 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
return state.copy()
}

func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State) {
func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus {
s := t.GetCurrentState(sf)
if s == nil {
return StatusEmpty
}
return s.Status
}

func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) {
if state, ok := t.states[sf.ID]; ok {
return state
}
Expand Down
Loading
Loading