Skip to content

Commit

Permalink
feat: report at the unit level the error msg of the first FAILED or D…
Browse files Browse the repository at this point in the history
…EGRADED stream
  • Loading branch information
pkoutsovasilis committed May 21, 2024
1 parent 0daeed8 commit 2e1e21f
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 99 deletions.
56 changes: 25 additions & 31 deletions x-pack/libbeat/management/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ type clientUnit interface {
}

// agentUnit implements status.StatusReporter and holds an unitState
// fot the input as well as a unitState for each stream of
// for the input as well as a unitState for each stream of
// the input in when this a client.UnitTypeInput.
type agentUnit struct {
softDeleted bool
mtx sync.Mutex
logger *logp.Logger
clientUnit clientUnit
input unitState
streamIDs []string
streamStates map[string]unitState
softDeleted bool
mtx sync.Mutex
logger *logp.Logger
clientUnit clientUnit
inputLevelState unitState
streamIDs []string
streamStates map[string]unitState
}

// getUnitState converts status.Status to client.UnitState
Expand Down Expand Up @@ -208,37 +208,31 @@ func (u *agentUnit) ID() string {
func (u *agentUnit) calcState() (status.Status, string) {
// for type output return the unit state directly as it has no streams
if u.clientUnit.Type() == client.UnitTypeOutput {
return u.input.state, u.input.msg
return u.inputLevelState.state, u.inputLevelState.msg
}

// if input state is not running return the input state
if u.input.state != status.Running {
return u.input.state, u.input.msg
// if inputLevelState state is not running return the inputLevelState state
if u.inputLevelState.state != status.Running {
return u.inputLevelState.state, u.inputLevelState.msg
}

// input state is marked as running, check the stream states
currStatus := status.Running
countFailed := 0
countDegraded := 0
// inputLevelState state is marked as running, check the stream states
reportedStatus := status.Running
reportedMsg := "Healthy"
for _, streamState := range u.streamStates {
switch streamState.state {
case status.Degraded:
countDegraded++
if currStatus == status.Running {
currStatus = status.Degraded
if reportedStatus != status.Degraded {
reportedStatus = status.Degraded
reportedMsg = streamState.msg
}
case status.Failed:
countFailed++
currStatus = status.Failed
// return the first failed stream
return streamState.state, streamState.msg
}
}

if currStatus == status.Running {
return currStatus, "Healthy"
}

return currStatus, fmt.Sprintf("Out of %d streams, %d are failed, %d are degraded",
len(u.streamStates), countFailed, countDegraded)
return reportedStatus, reportedMsg
}

// Type of the unit.
Expand All @@ -262,11 +256,11 @@ func (u *agentUnit) UpdateState(state status.Status, msg string, payload map[str
return nil
}

if u.input.state == state && u.input.msg == msg {
if u.inputLevelState.state == state && u.inputLevelState.msg == msg {
return nil
}

u.input = unitState{
u.inputLevelState = unitState{
state: state,
msg: msg,
}
Expand Down Expand Up @@ -345,8 +339,8 @@ func (u *agentUnit) update(cu *client.Unit) {
u.clientUnit = cu

inputStatus := getStatus(cu.Expected().State)
if u.input.state != inputStatus {
u.input = unitState{
if u.inputLevelState.state != inputStatus {
u.inputLevelState = unitState{
state: inputStatus,
}
}
Expand Down
165 changes: 97 additions & 68 deletions x-pack/libbeat/management/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,22 @@ import (
)

func TestUnitUpdate(t *testing.T) {

type StatusUpdate struct {
status status.Status
msg string
}

const (
Healthy = "Healthy"
Failed = "Failed"
Degraded = "Degraded"
)

unitCfg := &mockClientUnit{
expected: client.Expected{
Config: &proto.UnitExpectedConfig{
Id: "input-1",
Id: "inputLevelState-1",
Streams: []*proto.Stream{
{Id: "stream-1"},
{Id: "stream-2"},
Expand All @@ -29,126 +41,141 @@ func TestUnitUpdate(t *testing.T) {
cases := []struct {
name string
unit *mockClientUnit
inputStatus status.Status
streamStates map[string]status.Status
inputLevelStatus StatusUpdate
streamStates map[string]StatusUpdate
expectedUnitStatus client.UnitState
expectedUnitMsg string
}{
{
name: "all running",
unit: unitCfg,
inputStatus: status.Running,
streamStates: map[string]status.Status{
"stream-1": status.Running,
"stream-2": status.Running,
name: "all running",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Running, Healthy},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Running, Healthy},
"stream-2": {status.Running, Healthy},
},
expectedUnitStatus: client.UnitStateHealthy,
expectedUnitMsg: Healthy,
},
{
name: "input failed",
unit: unitCfg,
inputStatus: status.Failed,
streamStates: map[string]status.Status{
"stream-1": status.Running,
"stream-2": status.Running,
name: "inputLevelState failed",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Failed, Failed},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Running, Healthy},
"stream-2": {status.Running, Healthy},
},
expectedUnitStatus: client.UnitStateFailed,
expectedUnitMsg: Failed,
},
{
name: "input stopping",
unit: unitCfg,
inputStatus: status.Stopping,
streamStates: map[string]status.Status{
"stream-1": status.Running,
"stream-2": status.Running,
name: "inputLevelState stopping",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Stopping, ""},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Running, Healthy},
"stream-2": {status.Running, Healthy},
},
expectedUnitStatus: client.UnitStateStopping,
expectedUnitMsg: "",
},
{
name: "input configuring",
unit: unitCfg,
inputStatus: status.Configuring,
streamStates: map[string]status.Status{
"stream-1": status.Running,
"stream-2": status.Running,
name: "inputLevelState configuring",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Configuring, ""},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Running, Healthy},
"stream-2": {status.Running, Healthy},
},
expectedUnitStatus: client.UnitStateConfiguring,
expectedUnitMsg: "",
},
{
name: "input starting",
unit: unitCfg,
inputStatus: status.Starting,
streamStates: map[string]status.Status{
"stream-1": status.Running,
"stream-2": status.Running,
name: "inputLevelState starting",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Starting, ""},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Running, Healthy},
"stream-2": {status.Running, Healthy},
},
expectedUnitStatus: client.UnitStateStarting,
expectedUnitMsg: "",
},
{
name: "input degraded",
unit: unitCfg,
inputStatus: status.Degraded,
streamStates: map[string]status.Status{
"stream-1": status.Running,
"stream-2": status.Running,
name: "inputLevelState degraded",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Degraded, Degraded},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Running, Healthy},
"stream-2": {status.Running, Healthy},
},
expectedUnitStatus: client.UnitStateDegraded,
expectedUnitMsg: Degraded,
},
{
name: "one stream failed the other running",
unit: unitCfg,
inputStatus: status.Running,
streamStates: map[string]status.Status{
"stream-1": status.Failed,
"stream-2": status.Running,
name: "one stream failed the other running",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Running, Healthy},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Failed, Failed},
"stream-2": {status.Running, Healthy},
},
expectedUnitStatus: client.UnitStateFailed,
expectedUnitMsg: Failed,
},
{
name: "one stream failed the other degraded",
unit: unitCfg,
inputStatus: status.Running,
streamStates: map[string]status.Status{
"stream-1": status.Failed,
"stream-2": status.Degraded,
name: "one stream failed the other degraded",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Running, Healthy},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Failed, Failed},
"stream-2": {status.Degraded, Degraded},
},
expectedUnitStatus: client.UnitStateFailed,
expectedUnitMsg: Failed,
},
{
name: "one stream running the other degraded",
unit: unitCfg,
inputStatus: status.Running,
streamStates: map[string]status.Status{
"stream-1": status.Running,
"stream-2": status.Degraded,
name: "one stream running the other degraded",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Running, Healthy},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Running, Healthy},
"stream-2": {status.Degraded, Degraded},
},
expectedUnitStatus: client.UnitStateDegraded,
expectedUnitMsg: Degraded,
},
{
name: "both streams degraded",
unit: unitCfg,
inputStatus: status.Running,
streamStates: map[string]status.Status{
"stream-1": status.Degraded,
"stream-2": status.Degraded,
name: "both streams degraded",
unit: unitCfg,
inputLevelStatus: StatusUpdate{status.Running, Healthy},
streamStates: map[string]StatusUpdate{
"stream-1": {status.Degraded, Degraded},
"stream-2": {status.Degraded, Degraded},
},
expectedUnitStatus: client.UnitStateDegraded,
expectedUnitMsg: Degraded,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
aUnit := newAgentUnit(c.unit, nil)
err := aUnit.UpdateState(c.inputStatus, "", nil)
err := aUnit.UpdateState(c.inputLevelStatus.status, c.inputLevelStatus.msg, nil)
if err != nil {
t.Fatal(err)
}

for id, state := range c.streamStates {
aUnit.updateStateForStream(id, state, "")
aUnit.updateStateForStream(id, state.status, state.msg)
}

if c.unit.reportedState != c.expectedUnitStatus {
t.Errorf("expected unit status %s, got %s", c.expectedUnitStatus, aUnit.input.state)
t.Errorf("expected unit status %s, got %s", c.expectedUnitStatus, aUnit.inputLevelState.state)
}

if c.unit.reportedMsg != c.expectedUnitMsg {
t.Errorf("expected unit msg %s, got %s", c.expectedUnitStatus, aUnit.inputLevelState.state)
}
})
}
Expand All @@ -157,19 +184,21 @@ func TestUnitUpdate(t *testing.T) {
type mockClientUnit struct {
expected client.Expected
reportedState client.UnitState
reportedMsg string
}

func (u *mockClientUnit) Expected() client.Expected {
return u.expected
}

func (u *mockClientUnit) UpdateState(state client.UnitState, _ string, _ map[string]interface{}) error {
func (u *mockClientUnit) UpdateState(state client.UnitState, msg string, _ map[string]interface{}) error {
u.reportedState = state
u.reportedMsg = msg
return nil
}

func (u *mockClientUnit) ID() string {
return "input-1"
return "inputLevelState-1"
}

func (u *mockClientUnit) Type() client.UnitType {
Expand Down

0 comments on commit 2e1e21f

Please sign in to comment.