diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go index fa59d70c9f0..914fcd9a535 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go @@ -290,22 +290,28 @@ func (facts *PipelineRunFacts) IsRunning() bool { return false } -// IsGracefullyCancelled returns true if the PipelineRun won't be scheduling any new Task because it was gracefully cancelled +// IsCancelled returns true if the PipelineRun was cancelled +func (facts *PipelineRunFacts) IsCancelled() bool { + return facts.SpecStatus == v1beta1.PipelineRunSpecStatusCancelledDeprecated || + facts.SpecStatus == v1beta1.PipelineRunSpecStatusCancelled +} + +// IsGracefullyCancelled returns true if the PipelineRun was gracefully cancelled func (facts *PipelineRunFacts) IsGracefullyCancelled() bool { return facts.SpecStatus == v1beta1.PipelineRunSpecStatusCancelledRunFinally } -// IsGracefullyStopped returns true if the PipelineRun won't be scheduling any new Task because it was gracefully stopped +// IsGracefullyStopped returns true if the PipelineRun was gracefully stopped func (facts *PipelineRunFacts) IsGracefullyStopped() bool { return facts.SpecStatus == v1beta1.PipelineRunSpecStatusStoppedRunFinally } // DAGExecutionQueue returns a list of DAG tasks which needs to be scheduled next func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) { - tasks := PipelineRunState{} - // when pipeline run is stopping, gracefully cancelled or stopped, do not schedule any new task and only + var tasks PipelineRunState + // when pipeline run is stopping, cancelled, gracefully cancelled or stopped, do not schedule any new task and only // wait for all running tasks to complete and report their status - if !facts.IsStopping() && !facts.IsGracefullyCancelled() && !facts.IsGracefullyStopped() { + if !facts.IsStopping() && !facts.IsCancelled() && !facts.IsGracefullyCancelled() && !facts.IsGracefullyStopped() { // candidateTasks is initialized to DAG root nodes to start pipeline execution // candidateTasks is derived based on successfully finished tasks and/or skipped tasks candidateTasks, err := dag.GetSchedulable(facts.TasksGraph, facts.successfulOrSkippedDAGTasks()...) @@ -318,13 +324,12 @@ func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) { } // GetFinalTasks returns a list of final tasks without any taskRun associated with it -// GetFinalTasks returns final tasks only when all DAG tasks have finished executing successfully or skipped or -// any one DAG task resulted in failure +// GetFinalTasks returns final tasks only when all DAG tasks have finished executing successfully or have been skipped func (facts *PipelineRunFacts) GetFinalTasks() PipelineRunState { tasks := PipelineRunState{} finalCandidates := sets.NewString() - // check either pipeline has finished executing all DAG pipelineTasks - // or any one of the DAG pipelineTask has failed + // check either pipeline has finished executing all DAG pipelineTasks, + // where "finished executing" means succeeded, failed, or skipped. if facts.checkDAGTasksDone() { // return list of tasks with all final tasks for _, t := range facts.State { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go index 12dcf2142d4..ed1eec7c6ad 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go @@ -220,7 +220,7 @@ func TestPipelineRunFacts_CheckDAGTasksDoneDone(t *testing.T) { t.Run(tc.name, func(t *testing.T) { d, err := dagFromState(tc.state) if err != nil { - t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err) + t.Fatalf("Unexpected error while building DAG for state %v: %v", tc.state, err) } facts := PipelineRunFacts{ State: tc.state, @@ -600,6 +600,177 @@ func TestGetNextTaskWithRetries(t *testing.T) { } } +// TestDAGExecutionQueue tests the DAGExecutionQueue function for PipelineTasks +// in different states (without dependencies on each other) and the PipelineRun in different states. +func TestDAGExecutionQueue(t *testing.T) { + createdTask := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "createdtask", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRunName: "createdtask", + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + } + createdRun := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "createdrun", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + RunName: "createdrun", + CustomTask: true, + } + runningTask := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "runningtask", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRunName: "runningtask", + TaskRun: newTaskRun(trs[0]), + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + } + runningRun := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "runningrun", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + RunName: "runningrun", + Run: newRun(runs[0]), + CustomTask: true, + } + successfulTask := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "successfultask", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRunName: "successfultask", + TaskRun: makeSucceeded(trs[0]), + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + } + successfulRun := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "successfulrun", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + RunName: "successfulrun", + Run: makeRunSucceeded(runs[0]), + CustomTask: true, + } + failedTask := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "failedtask", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRunName: "failedtask", + TaskRun: makeFailed(trs[0]), + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + } + failedRun := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "failedrun", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + RunName: "failedrun", + Run: makeRunFailed(runs[0]), + CustomTask: true, + } + failedTaskWithRetries := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "failedtaskwithretries", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + Retries: 1, + }, + TaskRunName: "failedtaskwithretries", + TaskRun: makeFailed(trs[0]), + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + } + failedRunWithRetries := ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "failedrunwithretries", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + Retries: 1, + }, + RunName: "failedrunwithretries", + Run: makeRunFailed(runs[0]), + CustomTask: true, + } + tcs := []struct { + name string + state PipelineRunState + specStatus v1beta1.PipelineRunSpecStatus + want PipelineRunState + }{{ + name: "cancelled", + specStatus: v1beta1.PipelineRunSpecStatusCancelled, + state: PipelineRunState{ + &createdTask, &createdRun, + &runningTask, &runningRun, &successfulTask, &successfulRun, + &failedTaskWithRetries, &failedRunWithRetries, + }, + }, { + name: "gracefully cancelled", + specStatus: v1beta1.PipelineRunSpecStatusCancelledRunFinally, + state: PipelineRunState{ + &createdTask, &createdRun, + &runningTask, &runningRun, &successfulTask, &successfulRun, + &failedTaskWithRetries, &failedRunWithRetries, + }, + }, { + name: "gracefully stopped", + specStatus: v1beta1.PipelineRunSpecStatusStoppedRunFinally, + state: PipelineRunState{ + &createdTask, &createdRun, &runningTask, &runningRun, &successfulTask, &successfulRun, + &failedTaskWithRetries, &failedRunWithRetries, + }, + }, { + name: "running", + state: PipelineRunState{ + &createdTask, &createdRun, &runningTask, &runningRun, + &failedTaskWithRetries, &failedRunWithRetries, &successfulTask, &successfulRun, + }, + want: PipelineRunState{&createdTask, &createdRun, &failedTaskWithRetries, &failedRunWithRetries}, + }, { + name: "stopped", + state: PipelineRunState{ + &createdTask, &createdRun, &runningTask, &runningRun, + &successfulTask, &successfulRun, &failedTask, &failedRun, + }, + }, { + name: "all tasks finished", + state: PipelineRunState{&successfulTask, &successfulRun, &failedTask, &failedRun}, + }} + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + d, err := dagFromState(tc.state) + if err != nil { + t.Fatalf("Unexpected error while building DAG for state %v: %v", tc.state, err) + } + facts := PipelineRunFacts{ + State: tc.state, + SpecStatus: tc.specStatus, + TasksGraph: d, + FinalTasksGraph: &dag.Graph{}, + } + queue, err := facts.DAGExecutionQueue() + if err != nil { + t.Errorf("unexpected error getting DAG execution queue: %s", err) + } + if d := cmp.Diff(tc.want, queue); d != "" { + t.Errorf("Didn't get expected execution queue: %s", diff.PrintWantGot(d)) + } + }) + } +} + func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { largePipelineState := buildPipelineStateWithLargeDepencyGraph(t) tcs := []struct { @@ -685,7 +856,7 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { t.Run(tc.name, func(t *testing.T) { d, err := dagFromState(tc.state) if err != nil { - t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err) + t.Fatalf("Unexpected error while building DAG for state %v: %v", tc.state, err) } facts := PipelineRunFacts{ State: tc.state, @@ -910,11 +1081,11 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { for _, tc := range tcs { dagGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.DAGTasks), v1beta1.PipelineTaskList(tc.DAGTasks).Deps()) if err != nil { - t.Fatalf("Unexpected error while buildig DAG for pipelineTasks %v: %v", tc.DAGTasks, err) + t.Fatalf("Unexpected error while building DAG for pipelineTasks %v: %v", tc.DAGTasks, err) } finalGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.finalTasks), map[string][]string{}) if err != nil { - t.Fatalf("Unexpected error while buildig DAG for final pipelineTasks %v: %v", tc.finalTasks, err) + t.Fatalf("Unexpected error while building DAG for final pipelineTasks %v: %v", tc.finalTasks, err) } t.Run(tc.name, func(t *testing.T) { facts := PipelineRunFacts{ @@ -1231,11 +1402,11 @@ func TestGetPipelineConditionStatus(t *testing.T) { } d, err := dagFromState(tc.state) if err != nil { - t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err) + t.Fatalf("Unexpected error while building DAG for state %v: %v", tc.state, err) } dfinally, err := dagFromState(tc.finallyState) if err != nil { - t.Fatalf("Unexpected error while buildig DAG for finally state %v: %v", tc.finallyState, err) + t.Fatalf("Unexpected error while building DAG for finally state %v: %v", tc.finallyState, err) } facts := PipelineRunFacts{ State: tc.state, @@ -1374,11 +1545,11 @@ func TestGetPipelineConditionStatus_WithFinalTasks(t *testing.T) { } d, err := dag.Build(v1beta1.PipelineTaskList(tc.dagTasks), v1beta1.PipelineTaskList(tc.dagTasks).Deps()) if err != nil { - t.Fatalf("Unexpected error while buildig graph for DAG tasks %v: %v", tc.dagTasks, err) + t.Fatalf("Unexpected error while building graph for DAG tasks %v: %v", tc.dagTasks, err) } df, err := dag.Build(v1beta1.PipelineTaskList(tc.finalTasks), map[string][]string{}) if err != nil { - t.Fatalf("Unexpected error while buildig graph for final tasks %v: %v", tc.finalTasks, err) + t.Fatalf("Unexpected error while building graph for final tasks %v: %v", tc.finalTasks, err) } facts := PipelineRunFacts{ State: tc.state, @@ -1404,7 +1575,7 @@ func TestGetPipelineConditionStatus_WithFinalTasks(t *testing.T) { func TestGetPipelineConditionStatus_PipelineTimeouts(t *testing.T) { d, err := dagFromState(oneFinishedState) if err != nil { - t.Fatalf("Unexpected error while buildig DAG for state %v: %v", oneFinishedState, err) + t.Fatalf("Unexpected error while building DAG for state %v: %v", oneFinishedState, err) } pr := &v1beta1.PipelineRun{ ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-no-tasks-started"}, @@ -1670,7 +1841,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) { t.Run(tc.name, func(t *testing.T) { d, err := dag.Build(v1beta1.PipelineTaskList(tc.dagTasks), v1beta1.PipelineTaskList(tc.dagTasks).Deps()) if err != nil { - t.Fatalf("Unexpected error while buildig graph for DAG tasks %v: %v", tc.dagTasks, err) + t.Fatalf("Unexpected error while building graph for DAG tasks %v: %v", tc.dagTasks, err) } facts := PipelineRunFacts{ State: tc.state,