Skip to content

Commit

Permalink
Moved logging and metrics to AddWorkflowExecutionTerminatedEvent. Mad…
Browse files Browse the repository at this point in the history
…e `terminateReason` a parameter to `terminateWorkflow`
  • Loading branch information
fimanishi committed Jul 1, 2024
1 parent 2f2f9c7 commit 8e519a7
Show file tree
Hide file tree
Showing 14 changed files with 26 additions and 140 deletions.
2 changes: 0 additions & 2 deletions service/history/decision/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,6 @@ Update_History_Loop:
common.FailureReasonTransactionSizeExceedsLimit,
[]byte(updateErr.Error()),
execution.IdentityHistoryService,
handler.logger,
handler.metricsClient,
); err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions service/history/engine/engineimpl/reapply_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ func (e *historyEngineImpl) ReapplyEvents(
wfContext,
mutableState,
execution.NoopReleaseFn,
e.logger,
e.metricsClient,
),
ndc.EventsReapplicationResetWorkflowReason,
toReapplyEvents,
Expand Down
2 changes: 0 additions & 2 deletions service/history/engine/engineimpl/reset_workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ func (e *historyEngineImpl) ResetWorkflowExecution(
currentContext,
currentMutableState,
currentReleaseFn,
e.logger,
e.metricsClient,
),
request.GetReason(),
nil,
Expand Down
2 changes: 0 additions & 2 deletions service/history/engine/engineimpl/start_workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,6 @@ UpdateWorkflowLoop:
TerminateIfRunningReason,
getTerminateIfRunningDetails(workflowExecution.GetRunID()),
execution.IdentityHistoryService,
e.logger,
e.metricsClient,
); err != nil {
if err == workflow.ErrStaleState {
// Handler detected that cached workflow mutable could potentially be stale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
request.GetReason(),
request.GetDetails(),
request.GetIdentity(),
e.logger,
e.metricsClient,
)
})
}
14 changes: 14 additions & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,20 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent(
if err := e.ReplicateWorkflowExecutionTerminatedEvent(firstEventID, event); err != nil {
return nil, err
}

domainName := e.GetDomainEntry().GetInfo().Name

e.logger.Info(
"Workflow execution terminated.",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(e.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(e.GetExecutionInfo().RunID),
tag.WorkflowTerminationReason(reason),
)

scopeWithDomainTag := e.metricsClient.Scope(metrics.HistoryTerminateWorkflowExecutionScope).Tagged(metrics.DomainTag(domainName))
scopeWithDomainTag.IncCounter(metrics.WorkflowTerminateCounterPerDomain)

return event, nil
}

Expand Down
43 changes: 10 additions & 33 deletions service/history/execution/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ import (
"fmt"

"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
Expand Down Expand Up @@ -59,12 +56,10 @@ type (
workflowImpl struct {
clusterMetadata cluster.Metadata

ctx context.Context
context Context
mutableState MutableState
releaseFn ReleaseFunc
logger log.Logger
metricsClient metrics.Client
ctx context.Context
context Context
mutableState MutableState
releaseFn ReleaseFunc
}
)

Expand All @@ -75,19 +70,15 @@ func NewWorkflow(
context Context,
mutableState MutableState,
releaseFn ReleaseFunc,
logger log.Logger,
metricsClient metrics.Client,
) Workflow {

return &workflowImpl{
ctx: ctx,
clusterMetadata: clusterMetadata,

context: context,
mutableState: mutableState,
releaseFn: releaseFn,
logger: logger,
metricsClient: metricsClient,
context: context,
mutableState: mutableState,
releaseFn: releaseFn,
}
}

Expand Down Expand Up @@ -198,7 +189,7 @@ func (r *workflowImpl) SuppressBy(
currentCluster := r.clusterMetadata.GetCurrentClusterName()

if currentCluster == lastWriteCluster {
return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion, r.logger, r.metricsClient)
return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion, WorkflowTerminationReason)
}
return TransactionPolicyPassive, r.zombiefyWorkflow()
}
Expand Down Expand Up @@ -260,8 +251,7 @@ func (r *workflowImpl) failDecision(
func (r *workflowImpl) terminateWorkflow(
lastWriteVersion int64,
incomingLastWriteVersion int64,
logger log.Logger,
metricsClient metrics.Client,
terminateReason string,
) error {

eventBatchFirstEventID := r.GetMutableState().GetNextEventID()
Expand All @@ -276,24 +266,11 @@ func (r *workflowImpl) terminateWorkflow(

_, err := r.mutableState.AddWorkflowExecutionTerminatedEvent(
eventBatchFirstEventID,
WorkflowTerminationReason,
terminateReason,
[]byte(fmt.Sprintf("terminated by version: %v", incomingLastWriteVersion)),
WorkflowTerminationIdentity,
)

domainName := r.mutableState.GetDomainEntry().GetInfo().Name

logger.Info(
"Workflow execution terminated.",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(r.mutableState.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(r.mutableState.GetExecutionInfo().RunID),
tag.WorkflowTerminationReason(WorkflowTerminationReason),
)

scopeWithDomainTag := metricsClient.Scope(metrics.HistoryTerminateWorkflowExecutionScope).Tagged(metrics.DomainTag(domainName))
scopeWithDomainTag.IncCounter(metrics.WorkflowTerminateCounterPerDomain)

return err
}

Expand Down
23 changes: 1 addition & 22 deletions service/history/execution/workflow_execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@

package execution

import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
)
import "github.com/uber/cadence/common/types"

// TerminateWorkflow is a helper function to terminate workflow
func TerminateWorkflow(
Expand All @@ -34,8 +29,6 @@ func TerminateWorkflow(
terminateReason string,
terminateDetails []byte,
terminateIdentity string,
logger log.Logger,
metricsClient metrics.Client,
) error {

if decision, ok := mutableState.GetInFlightDecision(); ok {
Expand All @@ -54,19 +47,5 @@ func TerminateWorkflow(
terminateDetails,
terminateIdentity,
)

domainName := mutableState.GetDomainEntry().GetInfo().Name

logger.Info(
"Workflow execution terminated.",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(mutableState.GetExecutionInfo().RunID),
tag.WorkflowTerminationReason(terminateReason),
)

scopeWithDomainTag := metricsClient.Scope(metrics.HistoryTerminateWorkflowExecutionScope).Tagged(metrics.DomainTag(domainName))
scopeWithDomainTag.IncCounter(metrics.WorkflowTerminateCounterPerDomain)

return err
}
51 changes: 0 additions & 51 deletions service/history/execution/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package execution

import (
"context"
"fmt"
"reflect"
"runtime"
"testing"
Expand All @@ -31,16 +30,10 @@ import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/constants"
)

type (
Expand All @@ -51,9 +44,6 @@ type (
controller *gomock.Controller
mockContext *MockContext
mockMutableState *MockMutableState
logger log.Logger
metricsClient metrics.Client
testScope tally.TestScope

domainID string
workflowID string
Expand All @@ -72,9 +62,6 @@ func (s *workflowSuite) SetupTest() {
s.controller = gomock.NewController(s.T())
s.mockContext = NewMockContext(s.controller)
s.mockMutableState = NewMockMutableState(s.controller)
s.logger = log.NewNoop()
s.testScope = tally.NewTestScope("test", nil)
s.metricsClient = metrics.NewClient(s.testScope, metrics.History)

s.domainID = uuid.New()
s.workflowID = "some random workflow ID"
Expand Down Expand Up @@ -102,8 +89,6 @@ func (s *workflowSuite) TestGetMethods() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

s.Equal(s.mockContext, nDCWorkflow.GetContext())
Expand Down Expand Up @@ -183,8 +168,6 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Error() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

incomingMockContext := NewMockContext(s.controller)
Expand All @@ -195,8 +178,6 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Error() {
incomingMockContext,
incomingMockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

// cannot suppress by older workflow
Expand Down Expand Up @@ -237,23 +218,12 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
RunID: s.runID,
LastEventTaskID: lastEventTaskID,
}).AnyTimes()
domainCacheEntry := cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{Name: constants.TestDomainName},
&persistence.DomainConfig{},
false,
&persistence.DomainReplicationConfig{},
1,
common.Int64Ptr(1),
)
s.mockMutableState.EXPECT().GetDomainEntry().Return(domainCacheEntry).Times(1)
nDCWorkflow := NewWorkflow(
context.Background(),
cluster.TestActiveClusterMetadata,
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

incomingRunID := uuid.New()
Expand All @@ -267,8 +237,6 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
incomingMockContext,
incomingMockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
incomingMockMutableState.EXPECT().GetLastWriteVersion().Return(incomingLastEventVersion, nil).AnyTimes()
incomingMockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
Expand Down Expand Up @@ -314,9 +282,6 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
policy, err = nDCWorkflow.SuppressBy(incomingNDCWorkflow)
s.NoError(err)
s.Equal(TransactionPolicyActive, policy)
countersSnapshot := s.testScope.Snapshot().Counters()
s.Contains(countersSnapshot, fmt.Sprintf("test.workflow_terminate_counter_per_domain+domain=%v,operation=TerminateWorkflowExecution", constants.TestDomainName))
s.Equal(int64(1), countersSnapshot[fmt.Sprintf("test.workflow_terminate_counter_per_domain+domain=%v,operation=TerminateWorkflowExecution", constants.TestDomainName)].Value())
}

func (s *workflowSuite) TestSuppressWorkflowBy_Zombiefy() {
Expand All @@ -338,8 +303,6 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Zombiefy() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

incomingRunID := uuid.New()
Expand All @@ -353,8 +316,6 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Zombiefy() {
incomingMockContext,
incomingMockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
incomingMockMutableState.EXPECT().GetLastWriteVersion().Return(incomingLastEventVersion, nil).AnyTimes()
incomingMockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
Expand Down Expand Up @@ -389,8 +350,6 @@ func (s *workflowSuite) TestRevive_Zombie_Error() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.Revive()
s.Error(err)
Expand All @@ -407,8 +366,6 @@ func (s *workflowSuite) TestRevive_Zombie_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.Revive()
s.NoError(err)
Expand All @@ -423,8 +380,6 @@ func (s *workflowSuite) TestRevive_NonZombie_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.Revive()
s.NoError(err)
Expand Down Expand Up @@ -455,8 +410,6 @@ func (s *workflowSuite) TestFlushBufferedEvents_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.FlushBufferedEvents()
s.NoError(err)
Expand All @@ -472,8 +425,6 @@ func (s *workflowSuite) TestFlushBufferedEvents_NoBuffer_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.FlushBufferedEvents()
s.NoError(err)
Expand All @@ -496,8 +447,6 @@ func (s *workflowSuite) TestFlushBufferedEvents_NoDecision_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.FlushBufferedEvents()
s.NoError(err)
Expand Down
2 changes: 0 additions & 2 deletions service/history/ndc/branch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ func (r *branchManagerImpl) flushBufferedEvents(
r.context,
r.mutableState,
execution.NoopReleaseFn,
r.logger,
r.shard.GetMetricsClient(),
)
if err := targetWorkflow.FlushBufferedEvents(); err != nil {
return 0, nil, err
Expand Down
Loading

0 comments on commit 8e519a7

Please sign in to comment.