Skip to content

Commit

Permalink
[Wf-Diagnostics] Refactor and improve identification of execution tim…
Browse files Browse the repository at this point in the history
…eouts (#6228)

* Refactor and improve identification of execution timeouts

* lint updates

* Update activities_test.go
  • Loading branch information
sankari165 authored Aug 16, 2024
1 parent c903543 commit beeac59
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 93 deletions.
29 changes: 25 additions & 4 deletions service/worker/diagnostics/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"encoding/json"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand All @@ -37,6 +38,12 @@ import (
"github.com/uber/cadence/service/worker/diagnostics/invariants"
)

const (
workflowTimeoutSecond = int32(110)
testTimeStamp = int64(2547596872371000000)
timeUnit = time.Second
)

func Test__retrieveExecutionHistory(t *testing.T) {
dwtest := testDiagnosticWorkflow(t)
result, err := dwtest.retrieveExecutionHistory(context.Background(), retrieveExecutionHistoryInputParams{
Expand All @@ -52,13 +59,24 @@ func Test__retrieveExecutionHistory(t *testing.T) {

func Test__identifyTimeouts(t *testing.T) {
dwtest := testDiagnosticWorkflow(t)
workflowTimeoutSecondInBytes, err := json.Marshal(int32(10))
workflowTimeoutData := invariants.ExecutionTimeoutMetadata{
ExecutionTime: 110 * time.Second,
ConfiguredTimeout: 110 * time.Second,
LastOngoingEvent: &types.HistoryEvent{
ID: 1,
Timestamp: common.Int64Ptr(testTimeStamp),
WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond),
},
},
}
workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData)
require.NoError(t, err)
expectedResult := []invariants.InvariantCheckResult{
{
InvariantType: invariants.TimeoutTypeExecution.String(),
Reason: "START_TO_CLOSE",
Metadata: workflowTimeoutSecondInBytes,
Metadata: workflowTimeoutDataInBytes,
},
}
result, err := dwtest.identifyTimeouts(context.Background(), identifyTimeoutsInputParams{history: testWorkflowExecutionHistoryResponse()})
Expand All @@ -82,12 +100,15 @@ func testWorkflowExecutionHistoryResponse() *types.GetWorkflowExecutionHistoryRe
History: &types.History{
Events: []*types.HistoryEvent{
{
ID: 1,
ID: 1,
Timestamp: common.Int64Ptr(testTimeStamp),
WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond),
},
},
{
ID: 2,
Timestamp: common.Int64Ptr(testTimeStamp + int64(workflowTimeoutSecond)*timeUnit.Nanoseconds()),
WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()},
},
},
Expand Down
100 changes: 18 additions & 82 deletions service/worker/diagnostics/invariants/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ package invariants

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/uber/cadence/common/types"
)
Expand All @@ -47,24 +46,26 @@ func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) {
events := t.workflowExecutionHistory.GetHistory().GetEvents()
for _, event := range events {
if event.WorkflowExecutionTimedOutEventAttributes != nil {
timeoutType := event.GetWorkflowExecutionTimedOutEventAttributes().GetTimeoutType().String()
timeoutLimit := getWorkflowExecutionConfiguredTimeout(events)
data := ExecutionTimeoutMetadata{
ExecutionTime: getExecutionTime(1, event.ID, events),
ConfiguredTimeout: time.Duration(timeoutLimit) * time.Second,
LastOngoingEvent: events[len(events)-2],
}
result = append(result, InvariantCheckResult{
InvariantType: TimeoutTypeExecution.String(),
Reason: timeoutType,
Metadata: timeoutLimitInBytes(timeoutLimit),
Reason: event.GetWorkflowExecutionTimedOutEventAttributes().GetTimeoutType().String(),
Metadata: marshalData(data),
})
}
if event.ActivityTaskTimedOutEventAttributes != nil {
timeoutType := event.GetActivityTaskTimedOutEventAttributes().GetTimeoutType()
eventScheduledID := event.GetActivityTaskTimedOutEventAttributes().GetScheduledEventID()
timeoutLimit, err := getActivityTaskConfiguredTimeout(eventScheduledID, timeoutType, events)
timeoutLimit, err := getActivityTaskConfiguredTimeout(event, events)
if err != nil {
return nil, err
}
result = append(result, InvariantCheckResult{
InvariantType: TimeoutTypeActivity.String(),
Reason: timeoutType.String(),
Reason: event.GetActivityTaskTimedOutEventAttributes().GetTimeoutType().String(),
Metadata: timeoutLimitInBytes(timeoutLimit),
})
}
Expand All @@ -77,83 +78,18 @@ func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) {
})
}
if event.ChildWorkflowExecutionTimedOutEventAttributes != nil {
timeoutType := event.GetChildWorkflowExecutionTimedOutEventAttributes().TimeoutType.String()
childWfInitiatedID := event.GetChildWorkflowExecutionTimedOutEventAttributes().GetInitiatedEventID()
timeoutLimit := getChildWorkflowExecutionConfiguredTimeout(childWfInitiatedID, events)
timeoutLimit := getChildWorkflowExecutionConfiguredTimeout(event, events)
data := ChildWfTimeoutMetadata{
ExecutionTime: getExecutionTime(event.GetChildWorkflowExecutionTimedOutEventAttributes().StartedEventID, event.ID, events),
ConfiguredTimeout: time.Duration(timeoutLimit) * time.Second,
Execution: event.GetChildWorkflowExecutionTimedOutEventAttributes().WorkflowExecution,
}
result = append(result, InvariantCheckResult{
InvariantType: TimeoutTypeChildWorkflow.String(),
Reason: timeoutType,
Metadata: timeoutLimitInBytes(timeoutLimit),
Reason: event.GetChildWorkflowExecutionTimedOutEventAttributes().TimeoutType.String(),
Metadata: marshalData(data),
})
}
}
return result, nil
}

func reasonForDecisionTaskTimeouts(event *types.HistoryEvent, allEvents []*types.HistoryEvent) (string, []byte) {
eventScheduledID := event.GetDecisionTaskTimedOutEventAttributes().GetScheduledEventID()
attr := event.GetDecisionTaskTimedOutEventAttributes()
cause := attr.GetCause()
switch cause {
case types.DecisionTaskTimedOutCauseTimeout:
return attr.TimeoutType.String(), timeoutLimitInBytes(getDecisionTaskConfiguredTimeout(eventScheduledID, allEvents))
case types.DecisionTaskTimedOutCauseReset:
newRunID := attr.GetNewRunID()
return attr.Reason, []byte(newRunID)
default:
return "valid cause not available for decision task timeout", nil
}
}

func getWorkflowExecutionConfiguredTimeout(events []*types.HistoryEvent) int32 {
for _, event := range events {
if event.ID == 1 { // event 1 is workflow execution started event
return event.GetWorkflowExecutionStartedEventAttributes().GetExecutionStartToCloseTimeoutSeconds()
}
}
return 0
}

func getActivityTaskConfiguredTimeout(eventScheduledID int64, timeoutType types.TimeoutType, events []*types.HistoryEvent) (int32, error) {
for _, event := range events {
if event.ID == eventScheduledID {
attr := event.GetActivityTaskScheduledEventAttributes()
switch timeoutType {
case types.TimeoutTypeHeartbeat:
return attr.GetHeartbeatTimeoutSeconds(), nil
case types.TimeoutTypeScheduleToClose:
return attr.GetScheduleToCloseTimeoutSeconds(), nil
case types.TimeoutTypeScheduleToStart:
return attr.GetScheduleToStartTimeoutSeconds(), nil
case types.TimeoutTypeStartToClose:
return attr.GetStartToCloseTimeoutSeconds(), nil
default:
return 0, fmt.Errorf("unknown timeout type")
}
}
}
return 0, fmt.Errorf("activity scheduled event not found")
}

func getDecisionTaskConfiguredTimeout(eventScheduledID int64, events []*types.HistoryEvent) int32 {
for _, event := range events {
if event.ID == eventScheduledID {
return event.GetDecisionTaskScheduledEventAttributes().GetStartToCloseTimeoutSeconds()
}
}
return 0
}

func getChildWorkflowExecutionConfiguredTimeout(wfInitiatedID int64, events []*types.HistoryEvent) int32 {
for _, event := range events {
if event.ID == wfInitiatedID {
return event.GetStartChildWorkflowExecutionInitiatedEventAttributes().GetExecutionStartToCloseTimeoutSeconds()
}
}
return 0
}

func timeoutLimitInBytes(val int32) []byte {
valInBytes, _ := json.Marshal(val)
return valInBytes
}
57 changes: 50 additions & 7 deletions service/worker/diagnostics/invariants/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -36,10 +37,34 @@ import (
const (
workflowTimeoutSecond = int32(110)
taskTimeoutSecond = int32(50)
testTimeStamp = int64(2547596872371000000)
timeUnit = time.Second
)

func Test__Check(t *testing.T) {
workflowTimeoutSecondInBytes, err := json.Marshal(workflowTimeoutSecond)
workflowTimeoutData := ExecutionTimeoutMetadata{
ExecutionTime: 110 * time.Second,
ConfiguredTimeout: 110 * time.Second,
LastOngoingEvent: &types.HistoryEvent{
ID: 1,
Timestamp: common.Int64Ptr(testTimeStamp),
WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond),
},
},
}
childWfTimeoutData := ChildWfTimeoutMetadata{
ExecutionTime: 110 * time.Second,
ConfiguredTimeout: 110 * time.Second,
Execution: &types.WorkflowExecution{
WorkflowID: "123",
RunID: "abc",
},
}
workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData)
require.NoError(t, err)
childWfTimeoutDataInBytes, err := json.Marshal(childWfTimeoutData)
require.NoError(t, err)
taskTimeoutSecondInBytes, err := json.Marshal(taskTimeoutSecond)
require.NoError(t, err)
testCases := []struct {
Expand All @@ -55,7 +80,7 @@ func Test__Check(t *testing.T) {
{
InvariantType: TimeoutTypeExecution.String(),
Reason: "START_TO_CLOSE",
Metadata: workflowTimeoutSecondInBytes,
Metadata: workflowTimeoutDataInBytes,
},
},
err: nil,
Expand All @@ -67,7 +92,7 @@ func Test__Check(t *testing.T) {
{
InvariantType: TimeoutTypeChildWorkflow.String(),
Reason: "START_TO_CLOSE",
Metadata: workflowTimeoutSecondInBytes,
Metadata: childWfTimeoutDataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -124,12 +149,15 @@ func wfTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse {
History: &types.History{
Events: []*types.HistoryEvent{
{
ID: 1,
ID: 1,
Timestamp: common.Int64Ptr(testTimeStamp),
WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond),
},
},
{
ID: 2,
Timestamp: common.Int64Ptr(testTimeStamp + int64(workflowTimeoutSecond)*timeUnit.Nanoseconds()),
WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()},
},
},
Expand All @@ -142,15 +170,30 @@ func childWfTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse {
History: &types.History{
Events: []*types.HistoryEvent{
{
ID: 22,
ID: 1,
StartChildWorkflowExecutionInitiatedEventAttributes: &types.StartChildWorkflowExecutionInitiatedEventAttributes{
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond),
},
},
{
ID: 2,
Timestamp: common.Int64Ptr(testTimeStamp),
ChildWorkflowExecutionStartedEventAttributes: &types.ChildWorkflowExecutionStartedEventAttributes{
InitiatedEventID: 1,
},
},
{
ID: 3,
Timestamp: common.Int64Ptr(testTimeStamp + int64(workflowTimeoutSecond)*timeUnit.Nanoseconds()),
ChildWorkflowExecutionTimedOutEventAttributes: &types.ChildWorkflowExecutionTimedOutEventAttributes{
InitiatedEventID: 22,
TimeoutType: types.TimeoutTypeStartToClose.Ptr()},
InitiatedEventID: 1,
StartedEventID: 2,
TimeoutType: types.TimeoutTypeStartToClose.Ptr(),
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: "123",
RunID: "abc",
},
},
},
},
},
Expand Down
Loading

0 comments on commit beeac59

Please sign in to comment.