From 8b617b97966d6df8d6fab4f8ea4a4a6ed38da315 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 18 Apr 2024 10:17:12 -0700 Subject: [PATCH 1/4] Add stop() to Executor interface, implementing such method for timerTaskExecutorBase and calling that in timerqueueprocessor to prevent context leak --- service/history/queue/timer_queue_processor.go | 3 +++ .../task/cross_cluster_source_task_executor.go | 3 +++ .../task/cross_cluster_target_task_executor.go | 3 +++ service/history/task/interface.go | 1 + service/history/task/interface_mock.go | 12 ++++++++++++ service/history/task/timer_active_task_executor.go | 8 ++++---- service/history/task/timer_standby_task_executor.go | 4 ++-- service/history/task/timer_task_executor_base.go | 10 ++++++++++ .../history/task/transfer_active_task_executor.go | 3 +++ .../history/task/transfer_standby_task_executor.go | 3 +++ 10 files changed, 44 insertions(+), 6 deletions(-) diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index 230035a74e4..57dbadd39e9 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -178,6 +178,8 @@ func (t *timerQueueProcessor) Stop() { if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() { t.activeQueueProcessor.Stop() + // stop active executor after queue processor + t.activeTaskExecutor.Stop() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } @@ -194,6 +196,7 @@ func (t *timerQueueProcessor) Stop() { t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout) } t.activeQueueProcessor.Stop() + t.activeQueueProcessor.Stop() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } diff --git a/service/history/task/cross_cluster_source_task_executor.go b/service/history/task/cross_cluster_source_task_executor.go index 6a2566a757a..b1c2dba7505 100644 --- a/service/history/task/cross_cluster_source_task_executor.go +++ b/service/history/task/cross_cluster_source_task_executor.go @@ -110,6 +110,9 @@ func (t *crossClusterSourceTaskExecutor) Execute( return err } +// Empty func for now +func (t *crossClusterSourceTaskExecutor) Stop() {} + func (t *crossClusterSourceTaskExecutor) executeStartChildExecutionTask( ctx context.Context, task *crossClusterSourceTask, diff --git a/service/history/task/cross_cluster_target_task_executor.go b/service/history/task/cross_cluster_target_task_executor.go index 0bb8a575624..76a2c16910c 100644 --- a/service/history/task/cross_cluster_target_task_executor.go +++ b/service/history/task/cross_cluster_target_task_executor.go @@ -143,6 +143,9 @@ func (t *crossClusterTargetTaskExecutor) Execute( return nil } +// Empty func for now +func (t *crossClusterTargetTaskExecutor) Stop() {} + func (t *crossClusterTargetTaskExecutor) executeStartChildExecutionTask( ctx context.Context, task *crossClusterTargetTask, diff --git a/service/history/task/interface.go b/service/history/task/interface.go index fd861f3af84..28d02ac3fe9 100644 --- a/service/history/task/interface.go +++ b/service/history/task/interface.go @@ -72,6 +72,7 @@ type ( // Executor contains the execution logic for Task Executor interface { Execute(task Task, shouldProcessTask bool) error + Stop() } // Filter filters Task diff --git a/service/history/task/interface_mock.go b/service/history/task/interface_mock.go index 11514305b22..c874f456de2 100644 --- a/service/history/task/interface_mock.go +++ b/service/history/task/interface_mock.go @@ -856,6 +856,18 @@ func (mr *MockExecutorMockRecorder) Execute(task, shouldProcessTask interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Execute", reflect.TypeOf((*MockExecutor)(nil).Execute), task, shouldProcessTask) } +// Stop mocks base method. +func (m *MockExecutor) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockExecutorMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockExecutor)(nil).Stop)) +} + // MockPriorityAssigner is a mock of PriorityAssigner interface. type MockPriorityAssigner struct { ctrl *gomock.Controller diff --git a/service/history/task/timer_active_task_executor.go b/service/history/task/timer_active_task_executor.go index 950ccb0c692..74c0f6e91ba 100644 --- a/service/history/task/timer_active_task_executor.go +++ b/service/history/task/timer_active_task_executor.go @@ -87,7 +87,7 @@ func (t *timerActiveTaskExecutor) Execute( return nil } - ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout) + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) defer cancel() switch timerTask.TaskType { @@ -105,7 +105,7 @@ func (t *timerActiveTaskExecutor) Execute( return t.executeWorkflowBackoffTimerTask(ctx, timerTask) case persistence.TaskTypeDeleteHistoryEvent: // special timeout for delete history event - deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(context.Background(), time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second) + deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second) defer deleteHistoryEventCancel() return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask) default: @@ -157,7 +157,7 @@ func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask( // is encountered, so that we don't need to scan history multiple times // where there're multiple timers with high delay var resurrectedTimer map[string]struct{} - scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout) + scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout) defer cancel() sortedUserTimers := timerSequence.LoadAndSortUserTimers() @@ -304,7 +304,7 @@ func (t *timerActiveTaskExecutor) executeActivityTimeoutTask( // is encountered, so that we don't need to scan history multiple times // where there're multiple timers with high delay var resurrectedActivity map[int64]struct{} - scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout) + scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout) defer cancel() // need to clear activity heartbeat timer task mask for new activity timer task creation diff --git a/service/history/task/timer_standby_task_executor.go b/service/history/task/timer_standby_task_executor.go index e70ef22d900..ecc41168108 100644 --- a/service/history/task/timer_standby_task_executor.go +++ b/service/history/task/timer_standby_task_executor.go @@ -91,7 +91,7 @@ func (t *timerStandbyTaskExecutor) Execute( return nil } - ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout) + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) defer cancel() switch timerTask.TaskType { @@ -111,7 +111,7 @@ func (t *timerStandbyTaskExecutor) Execute( return t.executeWorkflowBackoffTimerTask(ctx, timerTask) case persistence.TaskTypeDeleteHistoryEvent: // special timeout for delete history event - deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(context.Background(), time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second) + deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second) defer deleteHistoryEventCancel() return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask) default: diff --git a/service/history/task/timer_task_executor_base.go b/service/history/task/timer_task_executor_base.go index 1cc0cd0ac67..6474a2d55bb 100644 --- a/service/history/task/timer_task_executor_base.go +++ b/service/history/task/timer_task_executor_base.go @@ -50,6 +50,8 @@ type ( metricsClient metrics.Client config *config.Config throttleRetry *backoff.ThrottleRetry + ctx context.Context + cancelFn context.CancelFunc } ) @@ -61,6 +63,7 @@ func newTimerTaskExecutorBase( metricsClient metrics.Client, config *config.Config, ) *timerTaskExecutorBase { + ctx, cancelFn := context.WithCancel(context.Background()) return &timerTaskExecutorBase{ shard: shard, archiverClient: archiverClient, @@ -72,6 +75,8 @@ func newTimerTaskExecutorBase( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(persistence.IsTransientError), ), + ctx: ctx, + cancelFn: cancelFn, } } @@ -309,3 +314,8 @@ func (t *timerTaskExecutorBase) deleteWorkflowVisibility( } return t.throttleRetry.Do(ctx, op) } + +func (t *timerTaskExecutorBase) Stop() { + t.logger.Info("Stopping timerTaskExecutorBase") + t.cancelFn() +} diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 83a3b04a087..802bbd646c6 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -161,6 +161,9 @@ func (t *transferActiveTaskExecutor) Execute( } } +// Empty func for now +func (t *transferActiveTaskExecutor) Stop() {} + func (t *transferActiveTaskExecutor) processActivityTask( ctx context.Context, task *persistence.TransferTaskInfo, diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index c13e6594768..1e4f10ec54f 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -118,6 +118,9 @@ func (t *transferStandbyTaskExecutor) Execute( } } +// Empty func for now +func (t *transferStandbyTaskExecutor) Stop() {} + func (t *transferStandbyTaskExecutor) processActivityTask( ctx context.Context, transferTask *persistence.TransferTaskInfo, From 446fc2717de7575482c64030be044bb47bc93af1 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 18 Apr 2024 11:25:30 -0700 Subject: [PATCH 2/4] unit tests --- service/history/queue/timer_queue_processor.go | 2 +- service/history/task/cross_cluster_source_task_executor_test.go | 1 + service/history/task/cross_cluster_target_task_executor_test.go | 1 + service/history/task/transfer_active_task_executor_test.go | 1 + service/history/task/transfer_standby_task_executor_test.go | 1 + 5 files changed, 5 insertions(+), 1 deletion(-) diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index 57dbadd39e9..aea37050b75 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -196,7 +196,7 @@ func (t *timerQueueProcessor) Stop() { t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout) } t.activeQueueProcessor.Stop() - t.activeQueueProcessor.Stop() + t.activeTaskExecutor.Stop() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } diff --git a/service/history/task/cross_cluster_source_task_executor_test.go b/service/history/task/cross_cluster_source_task_executor_test.go index c82f1367d20..916e962d9ee 100644 --- a/service/history/task/cross_cluster_source_task_executor_test.go +++ b/service/history/task/cross_cluster_source_task_executor_test.go @@ -123,6 +123,7 @@ func (s *crossClusterSourceTaskExecutorSuite) SetupTest() { } func (s *crossClusterSourceTaskExecutorSuite) TearDownTest() { + defer s.executor.Stop() s.controller.Finish() s.mockShard.Finish(s.T()) } diff --git a/service/history/task/cross_cluster_target_task_executor_test.go b/service/history/task/cross_cluster_target_task_executor_test.go index 0f996c1af81..a59c6209fd1 100644 --- a/service/history/task/cross_cluster_target_task_executor_test.go +++ b/service/history/task/cross_cluster_target_task_executor_test.go @@ -85,6 +85,7 @@ func (s *crossClusterTargetTaskExecutorSuite) SetupTest() { } func (s *crossClusterTargetTaskExecutorSuite) TearDownTest() { + defer s.executor.Stop() s.controller.Finish() s.mockShard.Finish(s.T()) } diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 81480f96a79..3d8e54a3fb6 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -215,6 +215,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() { } func (s *transferActiveTaskExecutorSuite) TearDownTest() { + s.transferActiveTaskExecutor.Stop() s.controller.Finish() s.mockShard.Finish(s.T()) s.mockArchivalClient.AssertExpectations(s.T()) diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index da1e28647e8..8614e7d9b29 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -166,6 +166,7 @@ func (s *transferStandbyTaskExecutorSuite) TearDownTest() { s.controller.Finish() s.mockShard.Finish(s.T()) s.mockArchivalClient.AssertExpectations(s.T()) + s.transferStandbyTaskExecutor.Stop() } func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending() { From b11e8e43c6693de5de02bebaa66181df22ef4572 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 18 Apr 2024 12:07:32 -0700 Subject: [PATCH 3/4] more tests --- service/history/task/timer_task_executor_base_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/service/history/task/timer_task_executor_base_test.go b/service/history/task/timer_task_executor_base_test.go index ebea8e6c7be..dbdd7ae3185 100644 --- a/service/history/task/timer_task_executor_base_test.go +++ b/service/history/task/timer_task_executor_base_test.go @@ -115,6 +115,7 @@ func (s *timerQueueTaskExecutorBaseSuite) TearDownTest() { s.controller.Finish() s.mockShard.Finish(s.T()) s.mockArchivalClient.AssertExpectations(s.T()) + s.timerQueueTaskExecutorBase.Stop() } func (s *timerQueueTaskExecutorBaseSuite) TestDeleteWorkflow_NoErr() { From ffd171c71d89fb5bcd16e4a93e9366e797dd038d Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 18 Apr 2024 12:45:36 -0700 Subject: [PATCH 4/4] Respond to comments --- service/history/queue/timer_queue_processor.go | 14 ++++++++++++++ .../history/task/timer_active_task_executor.go | 15 ++++++++++++--- .../history/task/timer_standby_task_executor.go | 13 ++++++++++--- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index aea37050b75..691612f5938 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -64,6 +64,7 @@ type timerQueueProcessor struct { activeTaskExecutor task.Executor activeQueueProcessor *timerQueueProcessorBase standbyQueueProcessors map[string]*timerQueueProcessorBase + standbyTaskExecutors []task.Executor standbyQueueTimerGates map[string]RemoteTimerGate } @@ -100,6 +101,7 @@ func NewTimerQueueProcessor( logger, ) + standbyTaskExecutors := make([]task.Executor, 0, len(shard.GetClusterMetadata().GetRemoteClusterInfo())) standbyQueueProcessors := make(map[string]*timerQueueProcessorBase) standbyQueueTimerGates := make(map[string]RemoteTimerGate) for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() { @@ -123,6 +125,7 @@ func NewTimerQueueProcessor( clusterName, config, ) + standbyTaskExecutors = append(standbyTaskExecutors, standbyTaskExecutor) standbyQueueProcessors[clusterName], standbyQueueTimerGates[clusterName] = newTimerQueueStandbyProcessor( clusterName, shard, @@ -154,6 +157,7 @@ func NewTimerQueueProcessor( activeQueueProcessor: activeQueueProcessor, standbyQueueProcessors: standbyQueueProcessors, standbyQueueTimerGates: standbyQueueTimerGates, + standbyTaskExecutors: standbyTaskExecutors, } } @@ -184,6 +188,11 @@ func (t *timerQueueProcessor) Stop() { standbyQueueProcessor.Stop() } + // stop standby executors after queue processors + for _, standbyTaskExecutor := range t.standbyTaskExecutors { + standbyTaskExecutor.Stop() + } + close(t.shutdownChan) common.AwaitWaitGroup(&t.shutdownWG, time.Minute) return @@ -200,6 +209,11 @@ func (t *timerQueueProcessor) Stop() { for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } + + // stop standby executors after queue processors + for _, standbyTaskExecutor := range t.standbyTaskExecutors { + standbyTaskExecutor.Stop() + } } func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) { diff --git a/service/history/task/timer_active_task_executor.go b/service/history/task/timer_active_task_executor.go index 74c0f6e91ba..0abcfc0096a 100644 --- a/service/history/task/timer_active_task_executor.go +++ b/service/history/task/timer_active_task_executor.go @@ -87,21 +87,30 @@ func (t *timerActiveTaskExecutor) Execute( return nil } - ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) - defer cancel() - switch timerTask.TaskType { case persistence.TaskTypeUserTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeUserTimerTimeoutTask(ctx, timerTask) case persistence.TaskTypeActivityTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeActivityTimeoutTask(ctx, timerTask) case persistence.TaskTypeDecisionTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeDecisionTimeoutTask(ctx, timerTask) case persistence.TaskTypeWorkflowTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeWorkflowTimeoutTask(ctx, timerTask) case persistence.TaskTypeActivityRetryTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeActivityRetryTimerTask(ctx, timerTask) case persistence.TaskTypeWorkflowBackoffTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeWorkflowBackoffTimerTask(ctx, timerTask) case persistence.TaskTypeDeleteHistoryEvent: // special timeout for delete history event diff --git a/service/history/task/timer_standby_task_executor.go b/service/history/task/timer_standby_task_executor.go index ecc41168108..51ac4102fd0 100644 --- a/service/history/task/timer_standby_task_executor.go +++ b/service/history/task/timer_standby_task_executor.go @@ -91,23 +91,30 @@ func (t *timerStandbyTaskExecutor) Execute( return nil } - ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) - defer cancel() - switch timerTask.TaskType { case persistence.TaskTypeUserTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeUserTimerTimeoutTask(ctx, timerTask) case persistence.TaskTypeActivityTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeActivityTimeoutTask(ctx, timerTask) case persistence.TaskTypeDecisionTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeDecisionTimeoutTask(ctx, timerTask) case persistence.TaskTypeWorkflowTimeout: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeWorkflowTimeoutTask(ctx, timerTask) case persistence.TaskTypeActivityRetryTimer: // retry backoff timer should not get created on passive cluster // TODO: add error logs return nil case persistence.TaskTypeWorkflowBackoffTimer: + ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout) + defer cancel() return t.executeWorkflowBackoffTimerTask(ctx, timerTask) case persistence.TaskTypeDeleteHistoryEvent: // special timeout for delete history event