Skip to content

Commit

Permalink
Added Executor Interface and TimerTaskExecutorBase with stop() Method…
Browse files Browse the repository at this point in the history
… and improve context management in TimerQueueProcessor #5920

* Added stop() to the Executor interface and created an empty stop() function for future implementations in Executors.
* Implemented stop() in TimerTaskExecutorBase.
* Added context and cancelFn in TimerTaskExecutorBase struct and replaced context.Background() with internal context to prevent memory leaks.
* Called executor.stop() in TimerQueueProcessor to prevent context leaks.
  • Loading branch information
timl3136 authored Apr 18, 2024
1 parent 6564701 commit 16c4775
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 10 deletions.
17 changes: 17 additions & 0 deletions service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type timerQueueProcessor struct {
activeTaskExecutor task.Executor
activeQueueProcessor *timerQueueProcessorBase
standbyQueueProcessors map[string]*timerQueueProcessorBase
standbyTaskExecutors []task.Executor
standbyQueueTimerGates map[string]RemoteTimerGate
}

Expand Down Expand Up @@ -100,6 +101,7 @@ func NewTimerQueueProcessor(
logger,
)

standbyTaskExecutors := make([]task.Executor, 0, len(shard.GetClusterMetadata().GetRemoteClusterInfo()))
standbyQueueProcessors := make(map[string]*timerQueueProcessorBase)
standbyQueueTimerGates := make(map[string]RemoteTimerGate)
for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
Expand All @@ -123,6 +125,7 @@ func NewTimerQueueProcessor(
clusterName,
config,
)
standbyTaskExecutors = append(standbyTaskExecutors, standbyTaskExecutor)
standbyQueueProcessors[clusterName], standbyQueueTimerGates[clusterName] = newTimerQueueStandbyProcessor(
clusterName,
shard,
Expand Down Expand Up @@ -154,6 +157,7 @@ func NewTimerQueueProcessor(
activeQueueProcessor: activeQueueProcessor,
standbyQueueProcessors: standbyQueueProcessors,
standbyQueueTimerGates: standbyQueueTimerGates,
standbyTaskExecutors: standbyTaskExecutors,
}
}

Expand All @@ -178,10 +182,17 @@ func (t *timerQueueProcessor) Stop() {

if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
t.activeQueueProcessor.Stop()
// stop active executor after queue processor
t.activeTaskExecutor.Stop()
for _, standbyQueueProcessor := range t.standbyQueueProcessors {
standbyQueueProcessor.Stop()
}

// stop standby executors after queue processors
for _, standbyTaskExecutor := range t.standbyTaskExecutors {
standbyTaskExecutor.Stop()
}

close(t.shutdownChan)
common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
return
Expand All @@ -194,9 +205,15 @@ func (t *timerQueueProcessor) Stop() {
t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout)
}
t.activeQueueProcessor.Stop()
t.activeTaskExecutor.Stop()
for _, standbyQueueProcessor := range t.standbyQueueProcessors {
standbyQueueProcessor.Stop()
}

// stop standby executors after queue processors
for _, standbyTaskExecutor := range t.standbyTaskExecutors {
standbyTaskExecutor.Stop()
}
}

func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/cross_cluster_source_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (t *crossClusterSourceTaskExecutor) Execute(
return err
}

// Empty func for now
func (t *crossClusterSourceTaskExecutor) Stop() {}

func (t *crossClusterSourceTaskExecutor) executeStartChildExecutionTask(
ctx context.Context,
task *crossClusterSourceTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (s *crossClusterSourceTaskExecutorSuite) SetupTest() {
}

func (s *crossClusterSourceTaskExecutorSuite) TearDownTest() {
defer s.executor.Stop()
s.controller.Finish()
s.mockShard.Finish(s.T())
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/cross_cluster_target_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func (t *crossClusterTargetTaskExecutor) Execute(
return nil
}

// Empty func for now
func (t *crossClusterTargetTaskExecutor) Stop() {}

func (t *crossClusterTargetTaskExecutor) executeStartChildExecutionTask(
ctx context.Context,
task *crossClusterTargetTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (s *crossClusterTargetTaskExecutorSuite) SetupTest() {
}

func (s *crossClusterTargetTaskExecutorSuite) TearDownTest() {
defer s.executor.Stop()
s.controller.Finish()
s.mockShard.Finish(s.T())
}
Expand Down
1 change: 1 addition & 0 deletions service/history/task/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type (
// Executor contains the execution logic for Task
Executor interface {
Execute(task Task, shouldProcessTask bool) error
Stop()
}

// Filter filters Task
Expand Down
12 changes: 12 additions & 0 deletions service/history/task/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions service/history/task/timer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,34 @@ func (t *timerActiveTaskExecutor) Execute(
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
defer cancel()

switch timerTask.TaskType {
case persistence.TaskTypeUserTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeUserTimerTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeActivityTimeoutTask(ctx, timerTask)
case persistence.TaskTypeDecisionTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeDecisionTimeoutTask(ctx, timerTask)
case persistence.TaskTypeWorkflowTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeWorkflowTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityRetryTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeActivityRetryTimerTask(ctx, timerTask)
case persistence.TaskTypeWorkflowBackoffTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
case persistence.TaskTypeDeleteHistoryEvent:
// special timeout for delete history event
deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(context.Background(), time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
defer deleteHistoryEventCancel()
return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask)
default:
Expand Down Expand Up @@ -157,7 +166,7 @@ func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
// is encountered, so that we don't need to scan history multiple times
// where there're multiple timers with high delay
var resurrectedTimer map[string]struct{}
scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout)
defer cancel()

sortedUserTimers := timerSequence.LoadAndSortUserTimers()
Expand Down Expand Up @@ -304,7 +313,7 @@ func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
// is encountered, so that we don't need to scan history multiple times
// where there're multiple timers with high delay
var resurrectedActivity map[int64]struct{}
scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout)
defer cancel()

// need to clear activity heartbeat timer task mask for new activity timer task creation
Expand Down
15 changes: 11 additions & 4 deletions service/history/task/timer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,34 @@ func (t *timerStandbyTaskExecutor) Execute(
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
defer cancel()

switch timerTask.TaskType {
case persistence.TaskTypeUserTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeUserTimerTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeActivityTimeoutTask(ctx, timerTask)
case persistence.TaskTypeDecisionTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeDecisionTimeoutTask(ctx, timerTask)
case persistence.TaskTypeWorkflowTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeWorkflowTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityRetryTimer:
// retry backoff timer should not get created on passive cluster
// TODO: add error logs
return nil
case persistence.TaskTypeWorkflowBackoffTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
case persistence.TaskTypeDeleteHistoryEvent:
// special timeout for delete history event
deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(context.Background(), time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
defer deleteHistoryEventCancel()
return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask)
default:
Expand Down
10 changes: 10 additions & 0 deletions service/history/task/timer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type (
metricsClient metrics.Client
config *config.Config
throttleRetry *backoff.ThrottleRetry
ctx context.Context
cancelFn context.CancelFunc
}
)

Expand All @@ -61,6 +63,7 @@ func newTimerTaskExecutorBase(
metricsClient metrics.Client,
config *config.Config,
) *timerTaskExecutorBase {
ctx, cancelFn := context.WithCancel(context.Background())
return &timerTaskExecutorBase{
shard: shard,
archiverClient: archiverClient,
Expand All @@ -72,6 +75,8 @@ func newTimerTaskExecutorBase(
backoff.WithRetryPolicy(taskRetryPolicy),
backoff.WithRetryableError(persistence.IsTransientError),
),
ctx: ctx,
cancelFn: cancelFn,
}
}

Expand Down Expand Up @@ -309,3 +314,8 @@ func (t *timerTaskExecutorBase) deleteWorkflowVisibility(
}
return t.throttleRetry.Do(ctx, op)
}

func (t *timerTaskExecutorBase) Stop() {
t.logger.Info("Stopping timerTaskExecutorBase")
t.cancelFn()
}
1 change: 1 addition & 0 deletions service/history/task/timer_task_executor_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (s *timerQueueTaskExecutorBaseSuite) TearDownTest() {
s.controller.Finish()
s.mockShard.Finish(s.T())
s.mockArchivalClient.AssertExpectations(s.T())
s.timerQueueTaskExecutorBase.Stop()
}

func (s *timerQueueTaskExecutorBaseSuite) TestDeleteWorkflow_NoErr() {
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (t *transferActiveTaskExecutor) Execute(
}
}

// Empty func for now
func (t *transferActiveTaskExecutor) Stop() {}

func (t *transferActiveTaskExecutor) processActivityTask(
ctx context.Context,
task *persistence.TransferTaskInfo,
Expand Down
1 change: 1 addition & 0 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() {
}

func (s *transferActiveTaskExecutorSuite) TearDownTest() {
s.transferActiveTaskExecutor.Stop()
s.controller.Finish()
s.mockShard.Finish(s.T())
s.mockArchivalClient.AssertExpectations(s.T())
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (t *transferStandbyTaskExecutor) Execute(
}
}

// Empty func for now
func (t *transferStandbyTaskExecutor) Stop() {}

func (t *transferStandbyTaskExecutor) processActivityTask(
ctx context.Context,
transferTask *persistence.TransferTaskInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (s *transferStandbyTaskExecutorSuite) TearDownTest() {
s.controller.Finish()
s.mockShard.Finish(s.T())
s.mockArchivalClient.AssertExpectations(s.T())
s.transferStandbyTaskExecutor.Stop()
}

func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending() {
Expand Down

0 comments on commit 16c4775

Please sign in to comment.