Skip to content

Commit

Permalink
WorkflowExecutionCloseStatus proto mapping (#4080)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Mar 31, 2021
1 parent 1f27aae commit caf8fbd
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 29 deletions.
41 changes: 34 additions & 7 deletions common/persistence/workflowStateCloseStatusValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,48 @@ func validateWorkflowCloseStatus(
// ToInternalWorkflowExecutionCloseStatus convert persistence representation of close status to internal representation
func ToInternalWorkflowExecutionCloseStatus(
closeStatus int,
) types.WorkflowExecutionCloseStatus {
) *types.WorkflowExecutionCloseStatus {

switch closeStatus {
case WorkflowCloseStatusNone:
return nil
case WorkflowCloseStatusCompleted:
return types.WorkflowExecutionCloseStatusCompleted
return types.WorkflowExecutionCloseStatusCompleted.Ptr()
case WorkflowCloseStatusFailed:
return types.WorkflowExecutionCloseStatusFailed
return types.WorkflowExecutionCloseStatusFailed.Ptr()
case WorkflowCloseStatusCanceled:
return types.WorkflowExecutionCloseStatusCanceled
return types.WorkflowExecutionCloseStatusCanceled.Ptr()
case WorkflowCloseStatusTerminated:
return types.WorkflowExecutionCloseStatusTerminated
return types.WorkflowExecutionCloseStatusTerminated.Ptr()
case WorkflowCloseStatusContinuedAsNew:
return types.WorkflowExecutionCloseStatusContinuedAsNew
return types.WorkflowExecutionCloseStatusContinuedAsNew.Ptr()
case WorkflowCloseStatusTimedOut:
return types.WorkflowExecutionCloseStatusTimedOut
return types.WorkflowExecutionCloseStatusTimedOut.Ptr()
default:
panic("Invalid value for enum WorkflowExecutionCloseStatus")
}
}

// FromInternalWorkflowExecutionCloseStatus convert internal representation of close status to persistence representation
func FromInternalWorkflowExecutionCloseStatus(
closeStatus *types.WorkflowExecutionCloseStatus,
) int {
if closeStatus == nil {
return WorkflowCloseStatusNone
}
switch *closeStatus {
case types.WorkflowExecutionCloseStatusCompleted:
return WorkflowCloseStatusCompleted
case types.WorkflowExecutionCloseStatusFailed:
return WorkflowCloseStatusFailed
case types.WorkflowExecutionCloseStatusCanceled:
return WorkflowCloseStatusCanceled
case types.WorkflowExecutionCloseStatusTerminated:
return WorkflowCloseStatusTerminated
case types.WorkflowExecutionCloseStatusContinuedAsNew:
return WorkflowCloseStatusContinuedAsNew
case types.WorkflowExecutionCloseStatusTimedOut:
return WorkflowCloseStatusTimedOut
default:
panic("Invalid value for enum WorkflowExecutionCloseStatus")
}
Expand Down
19 changes: 19 additions & 0 deletions common/persistence/workflowStateCloseStatusValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package persistence

import (
"github.com/uber/cadence/common/types"
"testing"

"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -188,3 +189,21 @@ func (s *workflowStateCloseStatusSuite) TestUpdateWorkflowStateCloseStatus_Workf
s.NotNil(ValidateUpdateWorkflowStateCloseStatus(WorkflowStateZombie, closeStatus))
}
}

func (s *workflowStateCloseStatusSuite) TestInternalMapping() {
s.Nil(ToInternalWorkflowExecutionCloseStatus(WorkflowCloseStatusNone))
s.Equal(types.WorkflowExecutionCloseStatusCompleted.Ptr(), ToInternalWorkflowExecutionCloseStatus(WorkflowCloseStatusCompleted))
s.Equal(types.WorkflowExecutionCloseStatusFailed.Ptr(), ToInternalWorkflowExecutionCloseStatus(WorkflowCloseStatusFailed))
s.Equal(types.WorkflowExecutionCloseStatusCanceled.Ptr(), ToInternalWorkflowExecutionCloseStatus(WorkflowCloseStatusCanceled))
s.Equal(types.WorkflowExecutionCloseStatusTerminated.Ptr(), ToInternalWorkflowExecutionCloseStatus(WorkflowCloseStatusTerminated))
s.Equal(types.WorkflowExecutionCloseStatusContinuedAsNew.Ptr(), ToInternalWorkflowExecutionCloseStatus(WorkflowCloseStatusContinuedAsNew))
s.Equal(types.WorkflowExecutionCloseStatusTimedOut.Ptr(), ToInternalWorkflowExecutionCloseStatus(WorkflowCloseStatusTimedOut))

s.Equal(WorkflowCloseStatusNone, FromInternalWorkflowExecutionCloseStatus(nil))
s.Equal(WorkflowCloseStatusCompleted, FromInternalWorkflowExecutionCloseStatus(types.WorkflowExecutionCloseStatusCompleted.Ptr()))
s.Equal(WorkflowCloseStatusFailed, FromInternalWorkflowExecutionCloseStatus(types.WorkflowExecutionCloseStatusFailed.Ptr()))
s.Equal(WorkflowCloseStatusCanceled, FromInternalWorkflowExecutionCloseStatus(types.WorkflowExecutionCloseStatusCanceled.Ptr()))
s.Equal(WorkflowCloseStatusTerminated, FromInternalWorkflowExecutionCloseStatus(types.WorkflowExecutionCloseStatusTerminated.Ptr()))
s.Equal(WorkflowCloseStatusContinuedAsNew, FromInternalWorkflowExecutionCloseStatus(types.WorkflowExecutionCloseStatusContinuedAsNew.Ptr()))
s.Equal(WorkflowCloseStatusTimedOut, FromInternalWorkflowExecutionCloseStatus(types.WorkflowExecutionCloseStatusTimedOut.Ptr()))
}
19 changes: 7 additions & 12 deletions common/types/mapper/proto/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
historyv1 "github.com/uber/cadence/.gen/proto/history/v1"
sharedv1 "github.com/uber/cadence/.gen/proto/shared/v1"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -278,7 +279,7 @@ func FromHistoryGetMutableStateResponse(t *types.GetMutableStateResponse) *histo
}
var workflowCloseState *types.WorkflowExecutionCloseStatus
if t.WorkflowCloseState != nil {
workflowCloseState = types.WorkflowExecutionCloseStatus(*t.WorkflowCloseState).Ptr()
workflowCloseState = persistence.ToInternalWorkflowExecutionCloseStatus(int(*t.WorkflowCloseState))
}
return &historyv1.GetMutableStateResponse{
WorkflowExecution: FromWorkflowExecution(t.Execution),
Expand All @@ -305,10 +306,6 @@ func ToHistoryGetMutableStateResponse(t *historyv1.GetMutableStateResponse) *typ
if t == nil {
return nil
}
var workflowCloseState *int32
if s := ToWorkflowExecutionCloseStatus(t.WorkflowCloseState); s != nil {
workflowCloseState = common.Int32Ptr(int32(*s))
}
return &types.GetMutableStateResponse{
Execution: ToWorkflowExecution(t.WorkflowExecution),
WorkflowType: ToWorkflowType(t.WorkflowType),
Expand All @@ -324,7 +321,7 @@ func ToHistoryGetMutableStateResponse(t *historyv1.GetMutableStateResponse) *typ
EventStoreVersion: t.EventStoreVersion,
CurrentBranchToken: t.CurrentBranchToken,
WorkflowState: ToWorkflowState(t.WorkflowState),
WorkflowCloseState: workflowCloseState,
WorkflowCloseState: common.Int32Ptr(int32(persistence.FromInternalWorkflowExecutionCloseStatus(ToWorkflowExecutionCloseStatus(t.WorkflowCloseState)))),
VersionHistories: ToVersionHistories(t.VersionHistories),
IsStickyTaskListEnabled: t.IsStickyTaskListEnabled,
IsWorkflowRunning: t.WorkflowState == sharedv1.WorkflowState_WORKFLOW_STATE_RUNNING,
Expand Down Expand Up @@ -461,9 +458,10 @@ func FromHistoryPollMutableStateResponse(t *types.PollMutableStateResponse) *his
if t == nil {
return nil
}

var workflowCloseState *types.WorkflowExecutionCloseStatus
if t.WorkflowCloseState != nil {
workflowCloseState = types.WorkflowExecutionCloseStatus(*t.WorkflowCloseState).Ptr()
workflowCloseState = persistence.ToInternalWorkflowExecutionCloseStatus(int(*t.WorkflowCloseState))
}
return &historyv1.PollMutableStateResponse{
WorkflowExecution: FromWorkflowExecution(t.Execution),
Expand All @@ -488,10 +486,7 @@ func ToHistoryPollMutableStateResponse(t *historyv1.PollMutableStateResponse) *t
if t == nil {
return nil
}
var workflowCloseState *int32
if s := ToWorkflowExecutionCloseStatus(t.WorkflowCloseState); s != nil {
workflowCloseState = common.Int32Ptr(int32(*s))
}

return &types.PollMutableStateResponse{
Execution: ToWorkflowExecution(t.WorkflowExecution),
WorkflowType: ToWorkflowType(t.WorkflowType),
Expand All @@ -507,7 +502,7 @@ func ToHistoryPollMutableStateResponse(t *historyv1.PollMutableStateResponse) *t
CurrentBranchToken: t.CurrentBranchToken,
VersionHistories: ToVersionHistories(t.VersionHistories),
WorkflowState: ToWorkflowState(t.WorkflowState),
WorkflowCloseState: workflowCloseState,
WorkflowCloseState: common.Int32Ptr(int32(persistence.FromInternalWorkflowExecutionCloseStatus(ToWorkflowExecutionCloseStatus(t.WorkflowCloseState)))),
}
}

Expand Down
4 changes: 2 additions & 2 deletions common/types/mapper/proto/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestHistoryGetMutableStateRequest(t *testing.T) {
}
}
func TestHistoryGetMutableStateResponse(t *testing.T) {
for _, item := range []*types.GetMutableStateResponse{nil, {}, &testdata.HistoryGetMutableStateResponse} {
for _, item := range []*types.GetMutableStateResponse{nil, &testdata.HistoryGetMutableStateResponse} {
assert.Equal(t, item, ToHistoryGetMutableStateResponse(FromHistoryGetMutableStateResponse(item)))
}
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestHistoryPollMutableStateRequest(t *testing.T) {
}
}
func TestHistoryPollMutableStateResponse(t *testing.T) {
for _, item := range []*types.PollMutableStateResponse{nil, {}, &testdata.HistoryPollMutableStateResponse} {
for _, item := range []*types.PollMutableStateResponse{nil, &testdata.HistoryPollMutableStateResponse} {
assert.Equal(t, item, ToHistoryPollMutableStateResponse(FromHistoryPollMutableStateResponse(item)))
}
}
Expand Down
6 changes: 3 additions & 3 deletions common/types/testdata/service_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var (
EventStoreVersion: EventStoreVersion,
CurrentBranchToken: BranchToken,
WorkflowState: common.Int32Ptr(persistence.WorkflowStateRunning),
WorkflowCloseState: common.Int32Ptr(3),
WorkflowCloseState: common.Int32Ptr(persistence.WorkflowCloseStatusTimedOut),
VersionHistories: &VersionHistories,
IsStickyTaskListEnabled: true,
}
Expand Down Expand Up @@ -100,8 +100,8 @@ var (
StickyTaskListScheduleToStartTimeout: &Duration1,
CurrentBranchToken: BranchToken,
VersionHistories: &VersionHistories,
WorkflowState: common.Int32Ptr(2),
WorkflowCloseState: common.Int32Ptr(3),
WorkflowState: common.Int32Ptr(persistence.WorkflowStateCorrupted),
WorkflowCloseState: common.Int32Ptr(persistence.WorkflowCloseStatusTimedOut),
}
HistoryPurgeDLQMessagesRequest = AdminPurgeDLQMessagesRequest
HistoryQueryWorkflowRequest = types.HistoryQueryWorkflowRequest{
Expand Down
5 changes: 2 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ func (e *historyEngineImpl) QueryWorkflow(
return &types.HistoryQueryWorkflowResponse{
Response: &types.QueryWorkflowResponse{
QueryRejected: &types.QueryRejected{
CloseStatus: persistence.ToInternalWorkflowExecutionCloseStatus(int(closeStatus)).Ptr(),
CloseStatus: persistence.ToInternalWorkflowExecutionCloseStatus(int(closeStatus)),
},
},
}, nil
Expand Down Expand Up @@ -1486,8 +1486,7 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
}
if executionInfo.State == persistence.WorkflowStateCompleted {
// for closed workflow
closeStatus := persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
result.WorkflowExecutionInfo.CloseStatus = &closeStatus
result.WorkflowExecutionInfo.CloseStatus = persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
completionEvent, err := mutableState.GetCompletionEvent(ctx)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (t *transferActiveTaskExecutor) processCloseExecution(
workflowStartTimestamp,
workflowExecutionTimestamp.UnixNano(),
workflowCloseTimestamp,
workflowCloseStatus,
*workflowCloseStatus,
workflowHistoryLength,
task.GetTaskID(),
visibilityMemo,
Expand Down
2 changes: 1 addition & 1 deletion service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
workflowStartTimestamp,
workflowExecutionTimestamp.UnixNano(),
workflowCloseTimestamp,
workflowCloseStatus,
*workflowCloseStatus,
workflowHistoryLength,
transferTask.GetTaskID(),
visibilityMemo,
Expand Down

0 comments on commit caf8fbd

Please sign in to comment.