diff --git a/service/worker/diagnostics/activities_test.go b/service/worker/diagnostics/activities_test.go index 44ca2657c70..cd5493a438b 100644 --- a/service/worker/diagnostics/activities_test.go +++ b/service/worker/diagnostics/activities_test.go @@ -26,6 +26,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -37,6 +38,12 @@ import ( "github.com/uber/cadence/service/worker/diagnostics/invariants" ) +const ( + workflowTimeoutSecond = int32(110) + testTimeStamp = int64(2547596872371000000) + timeUnit = time.Second +) + func Test__retrieveExecutionHistory(t *testing.T) { dwtest := testDiagnosticWorkflow(t) result, err := dwtest.retrieveExecutionHistory(context.Background(), retrieveExecutionHistoryInputParams{ @@ -52,13 +59,24 @@ func Test__retrieveExecutionHistory(t *testing.T) { func Test__identifyTimeouts(t *testing.T) { dwtest := testDiagnosticWorkflow(t) - workflowTimeoutSecondInBytes, err := json.Marshal(int32(10)) + workflowTimeoutData := invariants.ExecutionTimeoutMetadata{ + ExecutionTime: 110 * time.Second, + ConfiguredTimeout: 110 * time.Second, + LastOngoingEvent: &types.HistoryEvent{ + ID: 1, + Timestamp: common.Int64Ptr(testTimeStamp), + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond), + }, + }, + } + workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData) require.NoError(t, err) expectedResult := []invariants.InvariantCheckResult{ { InvariantType: invariants.TimeoutTypeExecution.String(), Reason: "START_TO_CLOSE", - Metadata: workflowTimeoutSecondInBytes, + Metadata: workflowTimeoutDataInBytes, }, } result, err := dwtest.identifyTimeouts(context.Background(), identifyTimeoutsInputParams{history: testWorkflowExecutionHistoryResponse()}) @@ -82,12 +100,15 @@ func testWorkflowExecutionHistoryResponse() *types.GetWorkflowExecutionHistoryRe History: &types.History{ Events: []*types.HistoryEvent{ { - ID: 1, + ID: 1, + Timestamp: common.Int64Ptr(testTimeStamp), WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10), + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond), }, }, { + ID: 2, + Timestamp: common.Int64Ptr(testTimeStamp + int64(workflowTimeoutSecond)*timeUnit.Nanoseconds()), WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()}, }, }, diff --git a/service/worker/diagnostics/invariants/timeout.go b/service/worker/diagnostics/invariants/timeout.go index b9b8c5354ff..9dd41d2f77d 100644 --- a/service/worker/diagnostics/invariants/timeout.go +++ b/service/worker/diagnostics/invariants/timeout.go @@ -24,8 +24,7 @@ package invariants import ( "context" - "encoding/json" - "fmt" + "time" "github.com/uber/cadence/common/types" ) @@ -47,24 +46,26 @@ func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) { events := t.workflowExecutionHistory.GetHistory().GetEvents() for _, event := range events { if event.WorkflowExecutionTimedOutEventAttributes != nil { - timeoutType := event.GetWorkflowExecutionTimedOutEventAttributes().GetTimeoutType().String() timeoutLimit := getWorkflowExecutionConfiguredTimeout(events) + data := ExecutionTimeoutMetadata{ + ExecutionTime: getExecutionTime(1, event.ID, events), + ConfiguredTimeout: time.Duration(timeoutLimit) * time.Second, + LastOngoingEvent: events[len(events)-2], + } result = append(result, InvariantCheckResult{ InvariantType: TimeoutTypeExecution.String(), - Reason: timeoutType, - Metadata: timeoutLimitInBytes(timeoutLimit), + Reason: event.GetWorkflowExecutionTimedOutEventAttributes().GetTimeoutType().String(), + Metadata: marshalData(data), }) } if event.ActivityTaskTimedOutEventAttributes != nil { - timeoutType := event.GetActivityTaskTimedOutEventAttributes().GetTimeoutType() - eventScheduledID := event.GetActivityTaskTimedOutEventAttributes().GetScheduledEventID() - timeoutLimit, err := getActivityTaskConfiguredTimeout(eventScheduledID, timeoutType, events) + timeoutLimit, err := getActivityTaskConfiguredTimeout(event, events) if err != nil { return nil, err } result = append(result, InvariantCheckResult{ InvariantType: TimeoutTypeActivity.String(), - Reason: timeoutType.String(), + Reason: event.GetActivityTaskTimedOutEventAttributes().GetTimeoutType().String(), Metadata: timeoutLimitInBytes(timeoutLimit), }) } @@ -77,83 +78,18 @@ func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) { }) } if event.ChildWorkflowExecutionTimedOutEventAttributes != nil { - timeoutType := event.GetChildWorkflowExecutionTimedOutEventAttributes().TimeoutType.String() - childWfInitiatedID := event.GetChildWorkflowExecutionTimedOutEventAttributes().GetInitiatedEventID() - timeoutLimit := getChildWorkflowExecutionConfiguredTimeout(childWfInitiatedID, events) + timeoutLimit := getChildWorkflowExecutionConfiguredTimeout(event, events) + data := ChildWfTimeoutMetadata{ + ExecutionTime: getExecutionTime(event.GetChildWorkflowExecutionTimedOutEventAttributes().StartedEventID, event.ID, events), + ConfiguredTimeout: time.Duration(timeoutLimit) * time.Second, + Execution: event.GetChildWorkflowExecutionTimedOutEventAttributes().WorkflowExecution, + } result = append(result, InvariantCheckResult{ InvariantType: TimeoutTypeChildWorkflow.String(), - Reason: timeoutType, - Metadata: timeoutLimitInBytes(timeoutLimit), + Reason: event.GetChildWorkflowExecutionTimedOutEventAttributes().TimeoutType.String(), + Metadata: marshalData(data), }) } } return result, nil } - -func reasonForDecisionTaskTimeouts(event *types.HistoryEvent, allEvents []*types.HistoryEvent) (string, []byte) { - eventScheduledID := event.GetDecisionTaskTimedOutEventAttributes().GetScheduledEventID() - attr := event.GetDecisionTaskTimedOutEventAttributes() - cause := attr.GetCause() - switch cause { - case types.DecisionTaskTimedOutCauseTimeout: - return attr.TimeoutType.String(), timeoutLimitInBytes(getDecisionTaskConfiguredTimeout(eventScheduledID, allEvents)) - case types.DecisionTaskTimedOutCauseReset: - newRunID := attr.GetNewRunID() - return attr.Reason, []byte(newRunID) - default: - return "valid cause not available for decision task timeout", nil - } -} - -func getWorkflowExecutionConfiguredTimeout(events []*types.HistoryEvent) int32 { - for _, event := range events { - if event.ID == 1 { // event 1 is workflow execution started event - return event.GetWorkflowExecutionStartedEventAttributes().GetExecutionStartToCloseTimeoutSeconds() - } - } - return 0 -} - -func getActivityTaskConfiguredTimeout(eventScheduledID int64, timeoutType types.TimeoutType, events []*types.HistoryEvent) (int32, error) { - for _, event := range events { - if event.ID == eventScheduledID { - attr := event.GetActivityTaskScheduledEventAttributes() - switch timeoutType { - case types.TimeoutTypeHeartbeat: - return attr.GetHeartbeatTimeoutSeconds(), nil - case types.TimeoutTypeScheduleToClose: - return attr.GetScheduleToCloseTimeoutSeconds(), nil - case types.TimeoutTypeScheduleToStart: - return attr.GetScheduleToStartTimeoutSeconds(), nil - case types.TimeoutTypeStartToClose: - return attr.GetStartToCloseTimeoutSeconds(), nil - default: - return 0, fmt.Errorf("unknown timeout type") - } - } - } - return 0, fmt.Errorf("activity scheduled event not found") -} - -func getDecisionTaskConfiguredTimeout(eventScheduledID int64, events []*types.HistoryEvent) int32 { - for _, event := range events { - if event.ID == eventScheduledID { - return event.GetDecisionTaskScheduledEventAttributes().GetStartToCloseTimeoutSeconds() - } - } - return 0 -} - -func getChildWorkflowExecutionConfiguredTimeout(wfInitiatedID int64, events []*types.HistoryEvent) int32 { - for _, event := range events { - if event.ID == wfInitiatedID { - return event.GetStartChildWorkflowExecutionInitiatedEventAttributes().GetExecutionStartToCloseTimeoutSeconds() - } - } - return 0 -} - -func timeoutLimitInBytes(val int32) []byte { - valInBytes, _ := json.Marshal(val) - return valInBytes -} diff --git a/service/worker/diagnostics/invariants/timeout_test.go b/service/worker/diagnostics/invariants/timeout_test.go index 34e1aa5736e..e16ac4040d6 100644 --- a/service/worker/diagnostics/invariants/timeout_test.go +++ b/service/worker/diagnostics/invariants/timeout_test.go @@ -26,6 +26,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/stretchr/testify/require" @@ -36,10 +37,34 @@ import ( const ( workflowTimeoutSecond = int32(110) taskTimeoutSecond = int32(50) + testTimeStamp = int64(2547596872371000000) + timeUnit = time.Second ) func Test__Check(t *testing.T) { - workflowTimeoutSecondInBytes, err := json.Marshal(workflowTimeoutSecond) + workflowTimeoutData := ExecutionTimeoutMetadata{ + ExecutionTime: 110 * time.Second, + ConfiguredTimeout: 110 * time.Second, + LastOngoingEvent: &types.HistoryEvent{ + ID: 1, + Timestamp: common.Int64Ptr(testTimeStamp), + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond), + }, + }, + } + childWfTimeoutData := ChildWfTimeoutMetadata{ + ExecutionTime: 110 * time.Second, + ConfiguredTimeout: 110 * time.Second, + Execution: &types.WorkflowExecution{ + WorkflowID: "123", + RunID: "abc", + }, + } + workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData) + require.NoError(t, err) + childWfTimeoutDataInBytes, err := json.Marshal(childWfTimeoutData) + require.NoError(t, err) taskTimeoutSecondInBytes, err := json.Marshal(taskTimeoutSecond) require.NoError(t, err) testCases := []struct { @@ -55,7 +80,7 @@ func Test__Check(t *testing.T) { { InvariantType: TimeoutTypeExecution.String(), Reason: "START_TO_CLOSE", - Metadata: workflowTimeoutSecondInBytes, + Metadata: workflowTimeoutDataInBytes, }, }, err: nil, @@ -67,7 +92,7 @@ func Test__Check(t *testing.T) { { InvariantType: TimeoutTypeChildWorkflow.String(), Reason: "START_TO_CLOSE", - Metadata: workflowTimeoutSecondInBytes, + Metadata: childWfTimeoutDataInBytes, }, }, err: nil, @@ -124,12 +149,15 @@ func wfTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse { History: &types.History{ Events: []*types.HistoryEvent{ { - ID: 1, + ID: 1, + Timestamp: common.Int64Ptr(testTimeStamp), WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond), }, }, { + ID: 2, + Timestamp: common.Int64Ptr(testTimeStamp + int64(workflowTimeoutSecond)*timeUnit.Nanoseconds()), WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()}, }, }, @@ -142,15 +170,30 @@ func childWfTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse { History: &types.History{ Events: []*types.HistoryEvent{ { - ID: 22, + ID: 1, StartChildWorkflowExecutionInitiatedEventAttributes: &types.StartChildWorkflowExecutionInitiatedEventAttributes{ ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond), }, }, { + ID: 2, + Timestamp: common.Int64Ptr(testTimeStamp), + ChildWorkflowExecutionStartedEventAttributes: &types.ChildWorkflowExecutionStartedEventAttributes{ + InitiatedEventID: 1, + }, + }, + { + ID: 3, + Timestamp: common.Int64Ptr(testTimeStamp + int64(workflowTimeoutSecond)*timeUnit.Nanoseconds()), ChildWorkflowExecutionTimedOutEventAttributes: &types.ChildWorkflowExecutionTimedOutEventAttributes{ - InitiatedEventID: 22, - TimeoutType: types.TimeoutTypeStartToClose.Ptr()}, + InitiatedEventID: 1, + StartedEventID: 2, + TimeoutType: types.TimeoutTypeStartToClose.Ptr(), + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "123", + RunID: "abc", + }, + }, }, }, }, diff --git a/service/worker/diagnostics/invariants/timeout_utils.go b/service/worker/diagnostics/invariants/timeout_utils.go new file mode 100644 index 00000000000..c870cf0a0cf --- /dev/null +++ b/service/worker/diagnostics/invariants/timeout_utils.go @@ -0,0 +1,119 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package invariants + +import ( + "encoding/json" + "fmt" + "sort" + "time" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/types" +) + +func reasonForDecisionTaskTimeouts(event *types.HistoryEvent, allEvents []*types.HistoryEvent) (string, []byte) { + eventScheduledID := event.GetDecisionTaskTimedOutEventAttributes().GetScheduledEventID() + attr := event.GetDecisionTaskTimedOutEventAttributes() + cause := attr.GetCause() + switch cause { + case types.DecisionTaskTimedOutCauseTimeout: + return attr.TimeoutType.String(), timeoutLimitInBytes(getDecisionTaskConfiguredTimeout(eventScheduledID, allEvents)) + case types.DecisionTaskTimedOutCauseReset: + newRunID := attr.GetNewRunID() + return attr.Reason, []byte(newRunID) + default: + return "valid cause not available for decision task timeout", nil + } +} + +func getWorkflowExecutionConfiguredTimeout(events []*types.HistoryEvent) int32 { + for _, event := range events { + if event.ID == 1 { // event 1 is workflow execution started event + return event.GetWorkflowExecutionStartedEventAttributes().GetExecutionStartToCloseTimeoutSeconds() + } + } + return 0 +} + +func getActivityTaskConfiguredTimeout(e *types.HistoryEvent, events []*types.HistoryEvent) (int32, error) { + eventScheduledID := e.GetActivityTaskTimedOutEventAttributes().GetScheduledEventID() + timeoutType := e.GetActivityTaskTimedOutEventAttributes().GetTimeoutType() + for _, event := range events { + if event.ID == eventScheduledID { + attr := event.GetActivityTaskScheduledEventAttributes() + switch timeoutType { + case types.TimeoutTypeHeartbeat: + return attr.GetHeartbeatTimeoutSeconds(), nil + case types.TimeoutTypeScheduleToClose: + return attr.GetScheduleToCloseTimeoutSeconds(), nil + case types.TimeoutTypeScheduleToStart: + return attr.GetScheduleToStartTimeoutSeconds(), nil + case types.TimeoutTypeStartToClose: + return attr.GetStartToCloseTimeoutSeconds(), nil + default: + return 0, fmt.Errorf("unknown timeout type") + } + } + } + return 0, fmt.Errorf("activity scheduled event not found") +} + +func getDecisionTaskConfiguredTimeout(eventScheduledID int64, events []*types.HistoryEvent) int32 { + for _, event := range events { + if event.ID == eventScheduledID { + return event.GetDecisionTaskScheduledEventAttributes().GetStartToCloseTimeoutSeconds() + } + } + return 0 +} + +func getChildWorkflowExecutionConfiguredTimeout(e *types.HistoryEvent, events []*types.HistoryEvent) int32 { + wfInitiatedID := e.GetChildWorkflowExecutionTimedOutEventAttributes().GetInitiatedEventID() + for _, event := range events { + if event.ID == wfInitiatedID { + return event.GetStartChildWorkflowExecutionInitiatedEventAttributes().GetExecutionStartToCloseTimeoutSeconds() + } + } + return 0 +} + +func timeoutLimitInBytes(val int32) []byte { + valInBytes, _ := json.Marshal(val) + return valInBytes +} + +func getExecutionTime(startID, timeoutID int64, events []*types.HistoryEvent) time.Duration { + sort.SliceStable(events, func(i, j int) bool { + return events[i].ID < events[j].ID + }) + + firstEvent := events[startID-1] + lastEvent := events[timeoutID-1] + return time.Unix(0, common.Int64Default(lastEvent.Timestamp)).Sub(time.Unix(0, common.Int64Default(firstEvent.Timestamp))) +} + +func marshalData(rc any) []byte { + data, _ := json.Marshal(rc) + return data +} diff --git a/service/worker/diagnostics/invariants/types.go b/service/worker/diagnostics/invariants/types.go index 93e87bedced..67062c3a7bd 100644 --- a/service/worker/diagnostics/invariants/types.go +++ b/service/worker/diagnostics/invariants/types.go @@ -22,6 +22,12 @@ package invariants +import ( + "time" + + "github.com/uber/cadence/common/types" +) + type TimeoutType string const ( @@ -34,3 +40,15 @@ const ( func (tt TimeoutType) String() string { return string(tt) } + +type ExecutionTimeoutMetadata struct { + ExecutionTime time.Duration + ConfiguredTimeout time.Duration + LastOngoingEvent *types.HistoryEvent +} + +type ChildWfTimeoutMetadata struct { + ExecutionTime time.Duration + ConfiguredTimeout time.Duration + Execution *types.WorkflowExecution +}