Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Executor Interface and TimerTaskExecutorBase with stop() Method and improve context management in TimerQueueProcessor #5920

Merged
merged 6 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type timerQueueProcessor struct {
activeTaskExecutor task.Executor
activeQueueProcessor *timerQueueProcessorBase
standbyQueueProcessors map[string]*timerQueueProcessorBase
standbyTaskExecutors []task.Executor
standbyQueueTimerGates map[string]RemoteTimerGate
}

Expand Down Expand Up @@ -100,6 +101,7 @@ func NewTimerQueueProcessor(
logger,
)

standbyTaskExecutors := make([]task.Executor, 0, len(shard.GetClusterMetadata().GetRemoteClusterInfo()))
standbyQueueProcessors := make(map[string]*timerQueueProcessorBase)
standbyQueueTimerGates := make(map[string]RemoteTimerGate)
for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
Expand All @@ -123,6 +125,7 @@ func NewTimerQueueProcessor(
clusterName,
config,
)
standbyTaskExecutors = append(standbyTaskExecutors, standbyTaskExecutor)
standbyQueueProcessors[clusterName], standbyQueueTimerGates[clusterName] = newTimerQueueStandbyProcessor(
clusterName,
shard,
Expand Down Expand Up @@ -154,6 +157,7 @@ func NewTimerQueueProcessor(
activeQueueProcessor: activeQueueProcessor,
standbyQueueProcessors: standbyQueueProcessors,
standbyQueueTimerGates: standbyQueueTimerGates,
standbyTaskExecutors: standbyTaskExecutors,
}
}

Expand All @@ -178,10 +182,17 @@ func (t *timerQueueProcessor) Stop() {

if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
t.activeQueueProcessor.Stop()
// stop active executor after queue processor
t.activeTaskExecutor.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also standbyTaskExecutor objects created and passed to newTimerQueueStandbyProcessor in the constructor. We need to stop them as well. Timer queue processor is creating those so it should be responsible of stopping the executors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have added a field accounting for the standbyTaskExecutors and stop then in the end.

for _, standbyQueueProcessor := range t.standbyQueueProcessors {
standbyQueueProcessor.Stop()
}

// stop standby executors after queue processors
for _, standbyTaskExecutor := range t.standbyTaskExecutors {
standbyTaskExecutor.Stop()
}

close(t.shutdownChan)
common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
return
Expand All @@ -194,9 +205,15 @@ func (t *timerQueueProcessor) Stop() {
t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout)
}
t.activeQueueProcessor.Stop()
t.activeTaskExecutor.Stop()
for _, standbyQueueProcessor := range t.standbyQueueProcessors {
standbyQueueProcessor.Stop()
}

// stop standby executors after queue processors
for _, standbyTaskExecutor := range t.standbyTaskExecutors {
standbyTaskExecutor.Stop()
}
}

func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/cross_cluster_source_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (t *crossClusterSourceTaskExecutor) Execute(
return err
}

// Empty func for now
func (t *crossClusterSourceTaskExecutor) Stop() {}

func (t *crossClusterSourceTaskExecutor) executeStartChildExecutionTask(
ctx context.Context,
task *crossClusterSourceTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (s *crossClusterSourceTaskExecutorSuite) SetupTest() {
}

func (s *crossClusterSourceTaskExecutorSuite) TearDownTest() {
defer s.executor.Stop()
s.controller.Finish()
s.mockShard.Finish(s.T())
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/cross_cluster_target_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func (t *crossClusterTargetTaskExecutor) Execute(
return nil
}

// Empty func for now
func (t *crossClusterTargetTaskExecutor) Stop() {}

func (t *crossClusterTargetTaskExecutor) executeStartChildExecutionTask(
ctx context.Context,
task *crossClusterTargetTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (s *crossClusterTargetTaskExecutorSuite) SetupTest() {
}

func (s *crossClusterTargetTaskExecutorSuite) TearDownTest() {
defer s.executor.Stop()
s.controller.Finish()
s.mockShard.Finish(s.T())
}
Expand Down
1 change: 1 addition & 0 deletions service/history/task/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type (
// Executor contains the execution logic for Task
Executor interface {
Execute(task Task, shouldProcessTask bool) error
Stop()
}

// Filter filters Task
Expand Down
12 changes: 12 additions & 0 deletions service/history/task/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions service/history/task/timer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,34 @@ func (t *timerActiveTaskExecutor) Execute(
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
defer cancel()

switch timerTask.TaskType {
case persistence.TaskTypeUserTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeUserTimerTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeActivityTimeoutTask(ctx, timerTask)
case persistence.TaskTypeDecisionTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeDecisionTimeoutTask(ctx, timerTask)
case persistence.TaskTypeWorkflowTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeWorkflowTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityRetryTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeActivityRetryTimerTask(ctx, timerTask)
case persistence.TaskTypeWorkflowBackoffTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
case persistence.TaskTypeDeleteHistoryEvent:
// special timeout for delete history event
deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(context.Background(), time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
defer deleteHistoryEventCancel()
return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask)
default:
Expand Down Expand Up @@ -157,7 +166,7 @@ func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
// is encountered, so that we don't need to scan history multiple times
// where there're multiple timers with high delay
var resurrectedTimer map[string]struct{}
scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout)
defer cancel()

sortedUserTimers := timerSequence.LoadAndSortUserTimers()
Expand Down Expand Up @@ -304,7 +313,7 @@ func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
// is encountered, so that we don't need to scan history multiple times
// where there're multiple timers with high delay
var resurrectedActivity map[int64]struct{}
scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout)
defer cancel()

// need to clear activity heartbeat timer task mask for new activity timer task creation
Expand Down
15 changes: 11 additions & 4 deletions service/history/task/timer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,34 @@ func (t *timerStandbyTaskExecutor) Execute(
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
defer cancel()

switch timerTask.TaskType {
case persistence.TaskTypeUserTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeUserTimerTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeActivityTimeoutTask(ctx, timerTask)
case persistence.TaskTypeDecisionTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeDecisionTimeoutTask(ctx, timerTask)
case persistence.TaskTypeWorkflowTimeout:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeWorkflowTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityRetryTimer:
// retry backoff timer should not get created on passive cluster
// TODO: add error logs
return nil
case persistence.TaskTypeWorkflowBackoffTimer:
ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
defer cancel()
return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
case persistence.TaskTypeDeleteHistoryEvent:
// special timeout for delete history event
deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(context.Background(), time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
defer deleteHistoryEventCancel()
return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask)
default:
Expand Down
10 changes: 10 additions & 0 deletions service/history/task/timer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type (
metricsClient metrics.Client
config *config.Config
throttleRetry *backoff.ThrottleRetry
ctx context.Context
cancelFn context.CancelFunc
}
)

Expand All @@ -61,6 +63,7 @@ func newTimerTaskExecutorBase(
metricsClient metrics.Client,
config *config.Config,
) *timerTaskExecutorBase {
ctx, cancelFn := context.WithCancel(context.Background())
return &timerTaskExecutorBase{
shard: shard,
archiverClient: archiverClient,
Expand All @@ -72,6 +75,8 @@ func newTimerTaskExecutorBase(
backoff.WithRetryPolicy(taskRetryPolicy),
backoff.WithRetryableError(persistence.IsTransientError),
),
ctx: ctx,
cancelFn: cancelFn,
}
}

Expand Down Expand Up @@ -309,3 +314,8 @@ func (t *timerTaskExecutorBase) deleteWorkflowVisibility(
}
return t.throttleRetry.Do(ctx, op)
}

func (t *timerTaskExecutorBase) Stop() {
t.logger.Info("Stopping timerTaskExecutorBase")
t.cancelFn()
}
1 change: 1 addition & 0 deletions service/history/task/timer_task_executor_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (s *timerQueueTaskExecutorBaseSuite) TearDownTest() {
s.controller.Finish()
s.mockShard.Finish(s.T())
s.mockArchivalClient.AssertExpectations(s.T())
s.timerQueueTaskExecutorBase.Stop()
}

func (s *timerQueueTaskExecutorBaseSuite) TestDeleteWorkflow_NoErr() {
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (t *transferActiveTaskExecutor) Execute(
}
}

// Empty func for now
func (t *transferActiveTaskExecutor) Stop() {}

func (t *transferActiveTaskExecutor) processActivityTask(
ctx context.Context,
task *persistence.TransferTaskInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() {
}

func (s *transferActiveTaskExecutorSuite) TearDownTest() {
s.transferActiveTaskExecutor.Stop()
s.controller.Finish()
s.mockShard.Finish(s.T())
s.mockArchivalClient.AssertExpectations(s.T())
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (t *transferStandbyTaskExecutor) Execute(
}
}

// Empty func for now
func (t *transferStandbyTaskExecutor) Stop() {}

func (t *transferStandbyTaskExecutor) processActivityTask(
ctx context.Context,
transferTask *persistence.TransferTaskInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (s *transferStandbyTaskExecutorSuite) TearDownTest() {
s.controller.Finish()
s.mockShard.Finish(s.T())
s.mockArchivalClient.AssertExpectations(s.T())
s.transferStandbyTaskExecutor.Stop()
}

func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending() {
Expand Down
Loading