diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index 230035a74e4..691612f5938 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -64,6 +64,7 @@ type timerQueueProcessor struct { activeTaskExecutor task.Executor activeQueueProcessor *timerQueueProcessorBase standbyQueueProcessors map[string]*timerQueueProcessorBase + standbyTaskExecutors []task.Executor standbyQueueTimerGates map[string]RemoteTimerGate } @@ -100,6 +101,7 @@ func NewTimerQueueProcessor( logger, ) + standbyTaskExecutors := make([]task.Executor, 0, len(shard.GetClusterMetadata().GetRemoteClusterInfo())) standbyQueueProcessors := make(map[string]*timerQueueProcessorBase) standbyQueueTimerGates := make(map[string]RemoteTimerGate) for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() { @@ -123,6 +125,7 @@ func NewTimerQueueProcessor( clusterName, config, ) + standbyTaskExecutors = append(standbyTaskExecutors, standbyTaskExecutor) standbyQueueProcessors[clusterName], standbyQueueTimerGates[clusterName] = newTimerQueueStandbyProcessor( clusterName, shard, @@ -154,6 +157,7 @@ func NewTimerQueueProcessor( activeQueueProcessor: activeQueueProcessor, standbyQueueProcessors: standbyQueueProcessors, standbyQueueTimerGates: standbyQueueTimerGates, + standbyTaskExecutors: standbyTaskExecutors, } } @@ -178,10 +182,17 @@ func (t *timerQueueProcessor) Stop() { if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() { t.activeQueueProcessor.Stop() + // stop active executor after queue processor + t.activeTaskExecutor.Stop() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } + // stop standby executors after queue processors + for _, standbyTaskExecutor := range t.standbyTaskExecutors { + standbyTaskExecutor.Stop() + } + close(t.shutdownChan) common.AwaitWaitGroup(&t.shutdownWG, time.Minute) return @@ -194,9 +205,15 @@ func (t *timerQueueProcessor) Stop() { t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout) } t.activeQueueProcessor.Stop() + t.activeTaskExecutor.Stop() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } + + // stop standby executors after queue processors + for _, standbyTaskExecutor := range t.standbyTaskExecutors { + standbyTaskExecutor.Stop() + } } func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) { diff --git a/service/history/task/cross_cluster_source_task_executor.go b/service/history/task/cross_cluster_source_task_executor.go index 6a2566a757a..b1c2dba7505 100644 --- a/service/history/task/cross_cluster_source_task_executor.go +++ b/service/history/task/cross_cluster_source_task_executor.go @@ -110,6 +110,9 @@ func (t *crossClusterSourceTaskExecutor) Execute( return err } +// Empty func for now +func (t *crossClusterSourceTaskExecutor) Stop() {} + func (t *crossClusterSourceTaskExecutor) executeStartChildExecutionTask( ctx context.Context, task *crossClusterSourceTask, diff --git a/service/history/task/cross_cluster_source_task_executor_test.go b/service/history/task/cross_cluster_source_task_executor_test.go index c82f1367d20..916e962d9ee 100644 --- a/service/history/task/cross_cluster_source_task_executor_test.go +++ b/service/history/task/cross_cluster_source_task_executor_test.go @@ -123,6 +123,7 @@ func (s *crossClusterSourceTaskExecutorSuite) SetupTest() { } func (s *crossClusterSourceTaskExecutorSuite) TearDownTest() { + defer s.executor.Stop() s.controller.Finish() s.mockShard.Finish(s.T()) } diff --git a/service/history/task/cross_cluster_target_task_executor.go b/service/history/task/cross_cluster_target_task_executor.go index 0bb8a575624..76a2c16910c 100644 --- a/service/history/task/cross_cluster_target_task_executor.go +++ b/service/history/task/cross_cluster_target_task_executor.go @@ -143,6 +143,9 @@ func (t *crossClusterTargetTaskExecutor) Execute( return nil } +// Empty func for now +func (t *crossClusterTargetTaskExecutor) Stop() {} + func (t *crossClusterTargetTaskExecutor) executeStartChildExecutionTask( ctx context.Context, task *crossClusterTargetTask, diff --git a/service/history/task/cross_cluster_target_task_executor_test.go b/service/history/task/cross_cluster_target_task_executor_test.go index 0f996c1af81..a59c6209fd1 100644 --- a/service/history/task/cross_cluster_target_task_executor_test.go +++ b/service/history/task/cross_cluster_target_task_executor_test.go @@ -85,6 +85,7 @@ func (s *crossClusterTargetTaskExecutorSuite) SetupTest() { } func (s *crossClusterTargetTaskExecutorSuite) TearDownTest() { + defer s.executor.Stop() s.controller.Finish() s.mockShard.Finish(s.T()) } diff --git a/service/history/task/interface.go b/service/history/task/interface.go index fd861f3af84..28d02ac3fe9 100644 --- a/service/history/task/interface.go +++ b/service/history/task/interface.go @@ -72,6 +72,7 @@ type ( // Executor contains the execution logic for Task Executor interface { Execute(task Task, shouldProcessTask bool) error + Stop() } // Filter filters Task diff --git a/service/history/task/interface_mock.go b/service/history/task/interface_mock.go index 11514305b22..c874f456de2 100644 --- a/service/history/task/interface_mock.go +++ b/service/history/task/interface_mock.go @@ -856,6 +856,18 @@ func (mr *MockExecutorMockRecorder) Execute(task, shouldProcessTask interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Execute", reflect.TypeOf((*MockExecutor)(nil).Execute), task, shouldProcessTask) } +// Stop mocks base method. +func (m *MockExecutor) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockExecutorMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockExecutor)(nil).Stop)) +} + // MockPriorityAssigner is a mock of PriorityAssigner interface. type MockPriorityAssigner struct { ctrl *gomock.Controller diff --git a/service/history/task/timer_active_task_executor.go b/service/history/task/timer_active_task_executor.go index 950ccb0c692..0abcfc0096a 100644 --- a/service/history/task/timer_active_task_executor.go +++ b/service/history/task/timer_active_task_executor.go @@ -87,25 +87,34 @@ func (t *timerActiveTaskExecutor) Execute( return nil } - ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout) - defer cancel() - switch timerTask.TaskType { case persistence.TaskTypeUserTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeUserTimerTimeoutTask(ctx, timerTask) case persistence.TaskTypeActivityTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeActivityTimeoutTask(ctx, timerTask) case persistence.TaskTypeDecisionTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeDecisionTimeoutTask(ctx, timerTask) case persistence.TaskTypeWorkflowTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeWorkflowTimeoutTask(ctx, timerTask) case persistence.TaskTypeActivityRetryTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeActivityRetryTimerTask(ctx, timerTask) case persistence.TaskTypeWorkflowBackoffTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeWorkflowBackoffTimerTask(ctx, timerTask) case persistence.TaskTypeDeleteHistoryEvent: // special timeout for delete history event - deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(context.Background(), time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second) + deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second) defer deleteHistoryEventCancel() return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask) default: @@ -157,7 +166,7 @@ func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask( // is encountered, so that we don't need to scan history multiple times // where there're multiple timers with high delay var resurrectedTimer map[string]struct{} - scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout) + scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout) defer cancel() sortedUserTimers := timerSequence.LoadAndSortUserTimers() @@ -304,7 +313,7 @@ func (t *timerActiveTaskExecutor) executeActivityTimeoutTask( // is encountered, so that we don't need to scan history multiple times // where there're multiple timers with high delay var resurrectedActivity map[int64]struct{} - scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout) + scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout) defer cancel() // need to clear activity heartbeat timer task mask for new activity timer task creation diff --git a/service/history/task/timer_standby_task_executor.go b/service/history/task/timer_standby_task_executor.go index e70ef22d900..51ac4102fd0 100644 --- a/service/history/task/timer_standby_task_executor.go +++ b/service/history/task/timer_standby_task_executor.go @@ -91,27 +91,34 @@ func (t *timerStandbyTaskExecutor) Execute( return nil } - ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout) - defer cancel() - switch timerTask.TaskType { case persistence.TaskTypeUserTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeUserTimerTimeoutTask(ctx, timerTask) case persistence.TaskTypeActivityTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeActivityTimeoutTask(ctx, timerTask) case persistence.TaskTypeDecisionTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeDecisionTimeoutTask(ctx, timerTask) case persistence.TaskTypeWorkflowTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeWorkflowTimeoutTask(ctx, timerTask) case persistence.TaskTypeActivityRetryTimer: // retry backoff timer should not get created on passive cluster // TODO: add error logs return nil case persistence.TaskTypeWorkflowBackoffTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeWorkflowBackoffTimerTask(ctx, timerTask) case persistence.TaskTypeDeleteHistoryEvent: // special timeout for delete history event - deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(context.Background(), time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second) + deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second) defer deleteHistoryEventCancel() return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask) default: diff --git a/service/history/task/timer_task_executor_base.go b/service/history/task/timer_task_executor_base.go index 1cc0cd0ac67..6474a2d55bb 100644 --- a/service/history/task/timer_task_executor_base.go +++ b/service/history/task/timer_task_executor_base.go @@ -50,6 +50,8 @@ type ( metricsClient metrics.Client config *config.Config throttleRetry *backoff.ThrottleRetry + ctx context.Context + cancelFn context.CancelFunc } ) @@ -61,6 +63,7 @@ func newTimerTaskExecutorBase( metricsClient metrics.Client, config *config.Config, ) *timerTaskExecutorBase { + ctx, cancelFn := context.WithCancel(context.Background()) return &timerTaskExecutorBase{ shard: shard, archiverClient: archiverClient, @@ -72,6 +75,8 @@ func newTimerTaskExecutorBase( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(persistence.IsTransientError), ), + ctx: ctx, + cancelFn: cancelFn, } } @@ -309,3 +314,8 @@ func (t *timerTaskExecutorBase) deleteWorkflowVisibility( } return t.throttleRetry.Do(ctx, op) } + +func (t *timerTaskExecutorBase) Stop() { + t.logger.Info("Stopping timerTaskExecutorBase") + t.cancelFn() +} diff --git a/service/history/task/timer_task_executor_base_test.go b/service/history/task/timer_task_executor_base_test.go index ebea8e6c7be..dbdd7ae3185 100644 --- a/service/history/task/timer_task_executor_base_test.go +++ b/service/history/task/timer_task_executor_base_test.go @@ -115,6 +115,7 @@ func (s *timerQueueTaskExecutorBaseSuite) TearDownTest() { s.controller.Finish() s.mockShard.Finish(s.T()) s.mockArchivalClient.AssertExpectations(s.T()) + s.timerQueueTaskExecutorBase.Stop() } func (s *timerQueueTaskExecutorBaseSuite) TestDeleteWorkflow_NoErr() { diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 83a3b04a087..802bbd646c6 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -161,6 +161,9 @@ func (t *transferActiveTaskExecutor) Execute( } } +// Empty func for now +func (t *transferActiveTaskExecutor) Stop() {} + func (t *transferActiveTaskExecutor) processActivityTask( ctx context.Context, task *persistence.TransferTaskInfo, diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 81480f96a79..3d8e54a3fb6 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -215,6 +215,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() { } func (s *transferActiveTaskExecutorSuite) TearDownTest() { + s.transferActiveTaskExecutor.Stop() s.controller.Finish() s.mockShard.Finish(s.T()) s.mockArchivalClient.AssertExpectations(s.T()) diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index c13e6594768..1e4f10ec54f 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -118,6 +118,9 @@ func (t *transferStandbyTaskExecutor) Execute( } } +// Empty func for now +func (t *transferStandbyTaskExecutor) Stop() {} + func (t *transferStandbyTaskExecutor) processActivityTask( ctx context.Context, transferTask *persistence.TransferTaskInfo, diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index da1e28647e8..8614e7d9b29 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -166,6 +166,7 @@ func (s *transferStandbyTaskExecutorSuite) TearDownTest() { s.controller.Finish() s.mockShard.Finish(s.T()) s.mockArchivalClient.AssertExpectations(s.T()) + s.transferStandbyTaskExecutor.Stop() } func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending() {