Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Wf-Diagnostics] Improve identification of activity timeouts #6232

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions service/worker/diagnostics/invariants/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) {
})
}
if event.ActivityTaskTimedOutEventAttributes != nil {
timeoutLimit, err := getActivityTaskConfiguredTimeout(event, events)
metadata, err := getActivityTaskMetadata(event, events)
if err != nil {
return nil, err
}
result = append(result, InvariantCheckResult{
InvariantType: TimeoutTypeActivity.String(),
Reason: event.GetActivityTaskTimedOutEventAttributes().GetTimeoutType().String(),
Metadata: timeoutLimitInBytes(timeoutLimit),
Metadata: marshalData(metadata),
})
}
if event.DecisionTaskTimedOutEventAttributes != nil {
Expand Down
53 changes: 45 additions & 8 deletions service/worker/diagnostics/invariants/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
taskTimeoutSecond = int32(50)
testTimeStamp = int64(2547596872371000000)
timeUnit = time.Second
testTasklist = "test-tasklist"
)

func Test__Check(t *testing.T) {
Expand All @@ -61,10 +62,27 @@ func Test__Check(t *testing.T) {
RunID: "abc",
},
}
activityTimeoutData := ActivityTimeoutMetadata{
TimeoutType: types.TimeoutTypeScheduleToStart.Ptr(),
ConfiguredTimeout: 50 * time.Second,
TimeElapsed: 50 * time.Second,
RetryPolicy: nil,
HeartBeatTimeout: 0,
Tasklist: &types.TaskList{
Name: testTasklist,
Kind: nil,
},
}
workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData)
require.NoError(t, err)
childWfTimeoutDataInBytes, err := json.Marshal(childWfTimeoutData)
require.NoError(t, err)
activityTimeoutDataInBytes, err := json.Marshal(activityTimeoutData)
require.NoError(t, err)
activityTimeoutData.TimeoutType = types.TimeoutTypeHeartbeat.Ptr()
activityTimeoutData.HeartBeatTimeout = 50 * time.Second
activityHeartBeatTimeoutDataInBytes, err := json.Marshal(activityTimeoutData)
require.NoError(t, err)
taskTimeoutSecondInBytes, err := json.Marshal(taskTimeoutSecond)
require.NoError(t, err)
testCases := []struct {
Expand Down Expand Up @@ -104,12 +122,12 @@ func Test__Check(t *testing.T) {
{
InvariantType: TimeoutTypeActivity.String(),
Reason: "SCHEDULE_TO_START",
Metadata: taskTimeoutSecondInBytes,
Metadata: activityTimeoutDataInBytes,
},
{
InvariantType: TimeoutTypeActivity.String(),
Reason: "HEARTBEAT",
Metadata: taskTimeoutSecondInBytes,
Metadata: activityHeartBeatTimeoutDataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -205,28 +223,47 @@ func activityTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse {
History: &types.History{
Events: []*types.HistoryEvent{
{
ID: 5,
ID: 1,
Timestamp: common.Int64Ptr(testTimeStamp),
ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{
ScheduleToStartTimeoutSeconds: common.Int32Ptr(taskTimeoutSecond),
TaskList: &types.TaskList{
Name: testTasklist,
Kind: nil,
},
},
},
{
ID: 2,
Timestamp: common.Int64Ptr(testTimeStamp + int64(taskTimeoutSecond)*timeUnit.Nanoseconds()),
ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{
ScheduledEventID: 5,
StartedEventID: 6,
ScheduledEventID: 1,
TimeoutType: types.TimeoutTypeScheduleToStart.Ptr(),
},
},
{
ID: 21,
ID: 3,
ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{
HeartbeatTimeoutSeconds: common.Int32Ptr(taskTimeoutSecond),
TaskList: &types.TaskList{
Name: testTasklist,
Kind: nil,
},
},
},
{
ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{
ID: 4,
Timestamp: common.Int64Ptr(testTimeStamp),
ActivityTaskStartedEventAttributes: &types.ActivityTaskStartedEventAttributes{
ScheduledEventID: 21,
StartedEventID: 22,
},
},
{
ID: 5,
Timestamp: common.Int64Ptr(testTimeStamp + int64(taskTimeoutSecond)*timeUnit.Nanoseconds()),
ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{
ScheduledEventID: 3,
StartedEventID: 4,
TimeoutType: types.TimeoutTypeHeartbeat.Ptr(),
},
},
Expand Down
30 changes: 23 additions & 7 deletions service/worker/diagnostics/invariants/timeout_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,43 @@
return 0
}

func getActivityTaskConfiguredTimeout(e *types.HistoryEvent, events []*types.HistoryEvent) (int32, error) {
func getActivityTaskMetadata(e *types.HistoryEvent, events []*types.HistoryEvent) (ActivityTimeoutMetadata, error) {
eventScheduledID := e.GetActivityTaskTimedOutEventAttributes().GetScheduledEventID()
eventstartedID := e.GetActivityTaskTimedOutEventAttributes().StartedEventID
timeoutType := e.GetActivityTaskTimedOutEventAttributes().GetTimeoutType()
var configuredTimeout int32
var timeElapsed time.Duration
for _, event := range events {
if event.ID == eventScheduledID {
attr := event.GetActivityTaskScheduledEventAttributes()
switch timeoutType {
case types.TimeoutTypeHeartbeat:
return attr.GetHeartbeatTimeoutSeconds(), nil
configuredTimeout = attr.GetHeartbeatTimeoutSeconds()
timeElapsed = getExecutionTime(eventstartedID, e.ID, events)
case types.TimeoutTypeScheduleToClose:
return attr.GetScheduleToCloseTimeoutSeconds(), nil
configuredTimeout = attr.GetScheduleToCloseTimeoutSeconds()
timeElapsed = getExecutionTime(eventScheduledID, e.ID, events)

Check warning on line 74 in service/worker/diagnostics/invariants/timeout_utils.go

View check run for this annotation

Codecov / codecov/patch

service/worker/diagnostics/invariants/timeout_utils.go#L73-L74

Added lines #L73 - L74 were not covered by tests
case types.TimeoutTypeScheduleToStart:
return attr.GetScheduleToStartTimeoutSeconds(), nil
configuredTimeout = attr.GetScheduleToStartTimeoutSeconds()
timeElapsed = getExecutionTime(eventScheduledID, e.ID, events)
case types.TimeoutTypeStartToClose:
return attr.GetStartToCloseTimeoutSeconds(), nil
configuredTimeout = attr.GetStartToCloseTimeoutSeconds()
timeElapsed = getExecutionTime(eventstartedID, e.ID, events)

Check warning on line 80 in service/worker/diagnostics/invariants/timeout_utils.go

View check run for this annotation

Codecov / codecov/patch

service/worker/diagnostics/invariants/timeout_utils.go#L79-L80

Added lines #L79 - L80 were not covered by tests
default:
return 0, fmt.Errorf("unknown timeout type")
return ActivityTimeoutMetadata{}, fmt.Errorf("unknown timeout type")

Check warning on line 82 in service/worker/diagnostics/invariants/timeout_utils.go

View check run for this annotation

Codecov / codecov/patch

service/worker/diagnostics/invariants/timeout_utils.go#L82

Added line #L82 was not covered by tests
}
return ActivityTimeoutMetadata{
TimeoutType: timeoutType.Ptr(),
ConfiguredTimeout: time.Duration(configuredTimeout) * time.Second,
TimeElapsed: timeElapsed,
RetryPolicy: attr.RetryPolicy,
HeartBeatTimeout: time.Duration(attr.GetHeartbeatTimeoutSeconds()) * time.Second,
Tasklist: attr.TaskList,
}, nil
}

}
return 0, fmt.Errorf("activity scheduled event not found")
return ActivityTimeoutMetadata{}, fmt.Errorf("activity scheduled event not found")

Check warning on line 95 in service/worker/diagnostics/invariants/timeout_utils.go

View check run for this annotation

Codecov / codecov/patch

service/worker/diagnostics/invariants/timeout_utils.go#L95

Added line #L95 was not covered by tests
}

func getDecisionTaskConfiguredTimeout(eventScheduledID int64, events []*types.HistoryEvent) int32 {
Expand Down
9 changes: 9 additions & 0 deletions service/worker/diagnostics/invariants/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,12 @@ type ChildWfTimeoutMetadata struct {
ConfiguredTimeout time.Duration
Execution *types.WorkflowExecution
}

type ActivityTimeoutMetadata struct {
TimeoutType *types.TimeoutType
ConfiguredTimeout time.Duration
TimeElapsed time.Duration
RetryPolicy *types.RetryPolicy
HeartBeatTimeout time.Duration
Tasklist *types.TaskList
}
Loading