diff --git a/pkg/reconciler/v1alpha1/pipeline/resources/dag.go b/pkg/apis/pipeline/v1alpha1/dag.go similarity index 51% rename from pkg/reconciler/v1alpha1/pipeline/resources/dag.go rename to pkg/apis/pipeline/v1alpha1/dag.go index 766a1596fc7..2e08c0b52eb 100644 --- a/pkg/reconciler/v1alpha1/pipeline/resources/dag.go +++ b/pkg/apis/pipeline/v1alpha1/dag.go @@ -14,20 +14,17 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pipeline +package v1alpha1 import ( "fmt" "strings" - - "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" - "github.com/knative/build-pipeline/pkg/list" ) // Node represents a Task in a pipeline. type Node struct { // Task represent the PipelineTask in Pipeline - Task v1alpha1.PipelineTask + Task PipelineTask // Prev represent all the Previous task Nodes for the current Task Prev []*Node // Next represent all the Next task Nodes for the current Task @@ -41,11 +38,11 @@ type DAG struct { } // Returns an empty Pipeline DAG -func new() *DAG { +func newDAG() *DAG { return &DAG{Nodes: map[string]*Node{}} } -func (g *DAG) addPipelineTask(t v1alpha1.PipelineTask) (*Node, error) { +func (g *DAG) addPipelineTask(t PipelineTask) (*Node, error) { if _, ok := g.Nodes[t.Name]; ok { return nil, fmt.Errorf("duplicate pipeline taks") } @@ -87,7 +84,7 @@ func visit(currentName string, nodes []*Node, path []string, visited map[string] } func getVisitedPath(path []string) string { - // Reverse the path since we traversed the graph using prev pointers. + // Reverse the path since we traversed the DAG using prev pointers. for i := len(path)/2 - 1; i >= 0; i-- { opp := len(path) - 1 - i path[i], path[opp] = path[opp], path[i] @@ -95,92 +92,7 @@ func getVisitedPath(path []string) string { return strings.Join(path, " -> ") } -// GetSchedulable returns a map of PipelineTask that can be scheduled (keyed -// by the name of the PipelineTask) given a list of successfully finished doneTasks. -// It returns tasks which have all dependecies marked as done, and thus can be scheduled. If the -// specified doneTasks are invalid (i.e. if it is indicated that a Task is -// done, but the previous Tasks are not done), an error is returned. -func (g *DAG) GetSchedulable(doneTasks ...string) (map[string]v1alpha1.PipelineTask, error) { - roots := g.getRoots() - tm := toMap(doneTasks...) - d := map[string]v1alpha1.PipelineTask{} - - visited := map[string]struct{}{} - for _, root := range roots { - schedulable := findSchedulable(root, visited, tm) - for _, task := range schedulable { - d[task.Name] = task - } - } - - visitedNames := make([]string, len(visited)) - for v := range visited { - visitedNames = append(visitedNames, v) - } - - notVisited := list.DiffLeft(doneTasks, visitedNames) - if len(notVisited) > 0 { - return map[string]v1alpha1.PipelineTask{}, fmt.Errorf("invalid list of done tasks; some tasks were indicated completed without ancestors being done: %v", notVisited) - } - - return d, nil -} - -func (g *DAG) getRoots() []*Node { - n := []*Node{} - for _, node := range g.Nodes { - if len(node.Prev) == 0 { - n = append(n, node) - } - } - return n -} - -func findSchedulable(n *Node, visited map[string]struct{}, doneTasks map[string]struct{}) []v1alpha1.PipelineTask { - if _, ok := visited[n.Task.Name]; ok { - return []v1alpha1.PipelineTask{} - } - visited[n.Task.Name] = struct{}{} - if _, ok := doneTasks[n.Task.Name]; ok { - schedulable := []v1alpha1.PipelineTask{} - // This one is done! Take note of it and look at the next candidate - for _, next := range n.Next { - if _, ok := visited[next.Task.Name]; !ok { - schedulable = append(schedulable, findSchedulable(next, visited, doneTasks)...) - } - } - return schedulable - } - // This one isn't done! Return it if it's schedulable - if isSchedulable(doneTasks, n.Prev) { - return []v1alpha1.PipelineTask{n.Task} - } - // This one isn't done, but it also isn't ready to schedule - return []v1alpha1.PipelineTask{} -} - -func isSchedulable(doneTasks map[string]struct{}, prevs []*Node) bool { - if len(prevs) == 0 { - return true - } - collected := []string{} - for _, n := range prevs { - if _, ok := doneTasks[n.Task.Name]; ok { - collected = append(collected, n.Task.Name) - } - } - return len(collected) == len(prevs) -} - -func toMap(t ...string) map[string]struct{} { - m := make(map[string]struct{}, len(t)) - for _, s := range t { - m[s] = struct{}{} - } - return m -} - -func addLink(pt v1alpha1.PipelineTask, previousTask string, nodes map[string]*Node) error { +func addLink(pt PipelineTask, previousTask string, nodes map[string]*Node) error { prev, ok := nodes[previousTask] if !ok { return fmt.Errorf("Task %s depends on %s but %s wasn't present in Pipeline", pt.Name, previousTask, previousTask) @@ -192,18 +104,18 @@ func addLink(pt v1alpha1.PipelineTask, previousTask string, nodes map[string]*No return nil } -// Build returns a valid pipeline DAG. Returns error if the pipeline is invalid -func Build(p *v1alpha1.Pipeline) (*DAG, error) { - d := new() +// BuildDAG returns a valid pipeline DAG. Returns error if the pipeline is invalid +func BuildDAG(tasks []PipelineTask) (*DAG, error) { + d := newDAG() // Add all Tasks mentioned in the `PipelineSpec` - for _, pt := range p.Spec.Tasks { + for _, pt := range tasks { if _, err := d.addPipelineTask(pt); err != nil { - return nil, fmt.Errorf("task %s is already present in graph, can't add it again: %v", pt.Name, err) + return nil, fmt.Errorf("task %s is already present in DAG, can't add it again: %v", pt.Name, err) } } // Process all from and runAfter constraints to add task dependency - for _, pt := range p.Spec.Tasks { + for _, pt := range tasks { for _, previousTask := range pt.RunAfter { if err := addLink(pt, previousTask, d.Nodes); err != nil { return nil, fmt.Errorf("couldn't add link between %s and %s: %v", pt.Name, previousTask, err) diff --git a/pkg/apis/pipeline/v1alpha1/dag_test.go b/pkg/apis/pipeline/v1alpha1/dag_test.go new file mode 100644 index 00000000000..d490b7c270b --- /dev/null +++ b/pkg/apis/pipeline/v1alpha1/dag_test.go @@ -0,0 +1,327 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + "github.com/knative/build-pipeline/pkg/list" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func sameNodes(l, r []*Node) error { + lNames, rNames := []string{}, []string{} + for _, n := range l { + lNames = append(lNames, n.Task.Name) + } + for _, n := range r { + rNames = append(rNames, n.Task.Name) + } + + return list.IsSame(lNames, rNames) +} + +func assertSameDAG(t *testing.T, l, r *DAG) { + t.Helper() + lKeys, rKeys := []string{}, []string{} + + for k := range l.Nodes { + lKeys = append(lKeys, k) + } + for k := range r.Nodes { + rKeys = append(rKeys, k) + } + + // For the DAGs to be the same, they must contain the same nodes + err := list.IsSame(lKeys, rKeys) + if err != nil { + t.Fatalf("DAGS contain different nodes: %v", err) + } + + // If they contain the same nodes, the DAGs will be the same if all + // of the nodes have the same linkages + for k, rn := range r.Nodes { + ln := l.Nodes[k] + + err := sameNodes(rn.Prev, ln.Prev) + if err != nil { + t.Errorf("The %s nodes in the DAG have different previous nodes: %v", k, err) + } + err = sameNodes(rn.Next, ln.Next) + if err != nil { + t.Errorf("The %s nodes in the DAG have different next nodes: %v", k, err) + } + } +} + +func TestBuild_Parallel(t *testing.T) { + a := PipelineTask{Name: "a"} + b := PipelineTask{Name: "b"} + c := PipelineTask{Name: "c"} + + // This test make sure we can create a Pipeline with no links between any Tasks + // (all tasks run in parallel) + // a b c + p := &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, + Spec: PipelineSpec{ + Tasks: []PipelineTask{a, b, c}, + }, + } + expectedDAG := &DAG{ + Nodes: map[string]*Node{ + "a": {Task: a}, + "b": {Task: b}, + "c": {Task: c}, + }, + } + g, err := BuildDAG(p.Spec.Tasks) + if err != nil { + t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) + } + assertSameDAG(t, expectedDAG, g) +} + +func TestBuild_JoinMultipleRoots(t *testing.T) { + a := PipelineTask{Name: "a"} + b := PipelineTask{Name: "b"} + c := PipelineTask{Name: "c"} + xDependsOnA := PipelineTask{ + Name: "x", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + yDependsOnARunsAfterB := PipelineTask{ + Name: "y", + RunAfter: []string{"b"}, + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + zDependsOnX := PipelineTask{ + Name: "z", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"x"}}}, + }, + } + + // a b c + // | \ / + // x y + // | + // z + + nodeA := &Node{Task: a} + nodeB := &Node{Task: b} + nodeC := &Node{Task: c} + nodeX := &Node{Task: xDependsOnA} + nodeY := &Node{Task: yDependsOnARunsAfterB} + nodeZ := &Node{Task: zDependsOnX} + + nodeA.Next = []*Node{nodeX, nodeY} + nodeB.Next = []*Node{nodeY} + nodeX.Prev = []*Node{nodeA} + nodeX.Next = []*Node{nodeZ} + nodeY.Prev = []*Node{nodeA, nodeB} + nodeZ.Prev = []*Node{nodeX} + + expectedDAG := &DAG{ + Nodes: map[string]*Node{ + "a": nodeA, + "b": nodeB, + "c": nodeC, + "x": nodeX, + "y": nodeY, + "z": nodeZ}, + } + p := &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, + Spec: PipelineSpec{ + Tasks: []PipelineTask{a, xDependsOnA, yDependsOnARunsAfterB, zDependsOnX, b, c}, + }, + } + g, err := BuildDAG(p.Spec.Tasks) + if err != nil { + t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) + } + assertSameDAG(t, expectedDAG, g) +} + +func TestBuild_FanInFanOut(t *testing.T) { + a := PipelineTask{Name: "a"} + dDependsOnA := PipelineTask{ + Name: "d", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + eRunsAfterA := PipelineTask{ + Name: "e", + RunAfter: []string{"a"}, + } + fDependsOnDAndE := PipelineTask{ + Name: "f", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"d", "e"}}}, + }, + } + gRunsAfterF := PipelineTask{ + Name: "g", + RunAfter: []string{"f"}, + } + + // This test make sure we don't detect cycle (A -> B -> B -> …) when there is not. + // This means we "visit" a twice, from two different path ; but there is no cycle. + // a + // / \ + // d e + // \ / + // f + // | + // g + nodeA := &Node{Task: a} + nodeD := &Node{Task: dDependsOnA} + nodeE := &Node{Task: eRunsAfterA} + nodeF := &Node{Task: fDependsOnDAndE} + nodeG := &Node{Task: gRunsAfterF} + + nodeA.Next = []*Node{nodeD, nodeE} + nodeD.Prev = []*Node{nodeA} + nodeD.Next = []*Node{nodeF} + nodeE.Prev = []*Node{nodeA} + nodeE.Next = []*Node{nodeF} + nodeF.Prev = []*Node{nodeD, nodeE} + nodeF.Next = []*Node{nodeG} + nodeG.Prev = []*Node{nodeF} + + expectedDAG := &DAG{ + Nodes: map[string]*Node{ + "a": nodeA, + "d": nodeD, + "e": nodeE, + "f": nodeF, + "g": nodeG, + }, + } + p := &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, + Spec: PipelineSpec{ + Tasks: []PipelineTask{a, dDependsOnA, eRunsAfterA, fDependsOnDAndE, gRunsAfterF}, + }, + } + g, err := BuildDAG(p.Spec.Tasks) + if err != nil { + t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) + } + assertSameDAG(t, expectedDAG, g) +} + +func TestBuild_Invalid(t *testing.T) { + a := PipelineTask{Name: "a"} + xDependsOnA := PipelineTask{ + Name: "x", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + zDependsOnX := PipelineTask{ + Name: "z", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"x"}}}, + }, + } + aDependsOnZ := PipelineTask{ + Name: "a", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"z"}}}, + }, + } + xAfterA := PipelineTask{ + Name: "x", + RunAfter: []string{"a"}, + } + zAfterX := PipelineTask{ + Name: "z", + RunAfter: []string{"x"}, + } + aAfterZ := PipelineTask{ + Name: "a", + RunAfter: []string{"z"}, + } + selfLinkFrom := PipelineTask{ + Name: "a", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + selfLinkAfter := PipelineTask{ + Name: "a", + RunAfter: []string{"a"}, + } + invalidTaskFrom := PipelineTask{ + Name: "a", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"none"}}}, + }, + } + invalidTaskAfter := PipelineTask{ + Name: "a", + RunAfter: []string{"none"}, + } + + tcs := []struct { + name string + spec PipelineSpec + }{{ + name: "self-link-from", + spec: PipelineSpec{Tasks: []PipelineTask{selfLinkFrom}}, + }, { + name: "self-link-after", + spec: PipelineSpec{Tasks: []PipelineTask{selfLinkAfter}}, + }, { + name: "cycle-from", + spec: PipelineSpec{Tasks: []PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}}, + }, { + name: "cycle-runAfter", + spec: PipelineSpec{Tasks: []PipelineTask{xAfterA, zAfterX, aAfterZ}}, + }, { + name: "cycle-both", + spec: PipelineSpec{Tasks: []PipelineTask{xDependsOnA, zAfterX, aDependsOnZ}}, + }, { + name: "duplicate-tasks", + spec: PipelineSpec{Tasks: []PipelineTask{a, a}}, + }, { + name: "invalid-task-name-from", + spec: PipelineSpec{Tasks: []PipelineTask{invalidTaskFrom}}, + }, { + name: "invalid-task-name-after", + spec: PipelineSpec{Tasks: []PipelineTask{invalidTaskAfter}}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + p := &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: tc.name}, + Spec: tc.spec, + } + if _, err := BuildDAG(p.Spec.Tasks); err == nil { + t.Errorf("expected to see an error for invalid DAG in pipeline %v but had none", tc.spec) + } + }) + } +} diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_validation.go b/pkg/apis/pipeline/v1alpha1/pipeline_validation.go index c4c14a4f7fc..28e20329bdc 100644 --- a/pkg/apis/pipeline/v1alpha1/pipeline_validation.go +++ b/pkg/apis/pipeline/v1alpha1/pipeline_validation.go @@ -58,8 +58,8 @@ func validateDeclaredResources(ps *PipelineSpec) error { return nil } -func isOutput(task PipelineTask, resource string) bool { - for _, output := range task.Resources.Outputs { +func isOutput(outputs []PipelineTaskOutputResource, resource string) bool { + for _, output := range outputs { if output.Resource == resource { return true } @@ -69,29 +69,29 @@ func isOutput(task PipelineTask, resource string) bool { // validateFrom ensures that the `from` values make sense: that they rely on values from Tasks // that ran previously, and that the PipelineResource is actually an output of the Task it should come from. -// TODO(#168) when pipelines don't just execute linearly this will need to be more sophisticated func validateFrom(tasks []PipelineTask) error { - for i, t := range tasks { + taskOutputs := map[string][]PipelineTaskOutputResource{} + for _, task := range tasks { + var to []PipelineTaskOutputResource + if task.Resources != nil { + to = make([]PipelineTaskOutputResource, len(task.Resources.Outputs)) + for i, o := range task.Resources.Outputs { + to[i] = o + } + } + taskOutputs[task.Name] = to + } + for _, t := range tasks { if t.Resources != nil { for _, rd := range t.Resources.Inputs { for _, pb := range rd.From { - if i == 0 { - return fmt.Errorf("first Task in Pipeline can't depend on anything before it (b/c there is nothing)") - } - found := false - // Look for previous Task that satisfies constraint - for j := i - 1; j >= 0; j-- { - if tasks[j].Name == pb { - // The input resource must actually be an output of the from tasks - if !isOutput(tasks[j], rd.Resource) { - return fmt.Errorf("the resource %s from %s must be an output but is an input", rd.Resource, pb) - } - found = true - } - } + outputs, found := taskOutputs[pb] if !found { return fmt.Errorf("expected resource %s to be from task %s, but task %s doesn't exist", rd.Resource, pb, pb) } + if !isOutput(outputs, rd.Resource) { + return fmt.Errorf("the resource %s from %s must be an output but is an input", rd.Resource, pb) + } } } } @@ -99,6 +99,16 @@ func validateFrom(tasks []PipelineTask) error { return nil } +// validateGraph ensures the Pipeline's dependency Graph (DAG) make sense: that there is no dependency +// cycle or that they rely on values from Tasks that ran previously, and that the PipelineResource +// is actually an output of the Task it should come from. +func validateGraph(tasks []PipelineTask) error { + if _, err := BuildDAG(tasks); err != nil { + return err + } + return nil +} + // Validate checks that taskNames in the Pipeline are valid and that the graph // of Tasks expressed in the Pipeline makes sense. func (ps *PipelineSpec) Validate() *apis.FieldError { @@ -126,6 +136,11 @@ func (ps *PipelineSpec) Validate() *apis.FieldError { return apis.ErrInvalidValue(err.Error(), "spec.tasks.resources.inputs.from") } + // Validate the pipeline task graph + if err := validateGraph(ps.Tasks); err != nil { + return apis.ErrInvalidValue(err.Error(), "spec.tasks") + } + // The parameter variables should be valid if err := validatePipelineParameterVariables(ps.Tasks, ps.Params); err != nil { return err diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go b/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go index 0da44288e1e..c51695e2385 100644 --- a/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go +++ b/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go @@ -52,16 +52,6 @@ func TestPipelineSpec_Validate_Error(t *testing.T) { tb.PipelineTaskInputResource("the-resource", "great-resource", tb.From("bar"))), )), }, - { - name: "from task is afterward", - p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec( - tb.PipelineDeclaredResource("great-resource", v1alpha1.PipelineResourceTypeGit), - tb.PipelineTask("foo", "foo-task", - tb.PipelineTaskInputResource("the-resource", "great-resource", tb.From("bar"))), - tb.PipelineTask("bar", "bar-task", - tb.PipelineTaskOutputResource("the-resource", "great-resource")), - )), - }, { name: "unused resources declared", p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec( @@ -113,6 +103,13 @@ func TestPipelineSpec_Validate_Error(t *testing.T) { tb.PipelineTask("foo", "foo-task", tb.PipelineTaskParam("a-param", "${params.foo} and ${params.does-not-exist}")))), }, + { + name: "invalid dependency graph between the tasks", + p: tb.Pipeline("foo", "namespace", tb.PipelineSpec( + tb.PipelineTask("foo", "foo", tb.RunAfter("bar")), + tb.PipelineTask("bar", "bar", tb.RunAfter("foo")), + )), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go index bf2fa76e781..bbc7d10363d 100644 --- a/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go @@ -166,6 +166,34 @@ func (in *ClusterTaskList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DAG) DeepCopyInto(out *DAG) { + *out = *in + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make(map[string]*Node, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(Node) + val.DeepCopyInto((*out)[key]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DAG. +func (in *DAG) DeepCopy() *DAG { + if in == nil { + return nil + } + out := new(DAG) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GCSResource) DeepCopyInto(out *GCSResource) { *out = *in @@ -245,6 +273,47 @@ func (in *Inputs) DeepCopy() *Inputs { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Node) DeepCopyInto(out *Node) { + *out = *in + in.Task.DeepCopyInto(&out.Task) + if in.Prev != nil { + in, out := &in.Prev, &out.Prev + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(Node) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + if in.Next != nil { + in, out := &in.Next, &out.Next + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(Node) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Node. +func (in *Node) DeepCopy() *Node { + if in == nil { + return nil + } + out := new(Node) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Outputs) DeepCopyInto(out *Outputs) { *out = *in diff --git a/pkg/reconciler/v1alpha1/pipeline/dag/dag.go b/pkg/reconciler/v1alpha1/pipeline/dag/dag.go new file mode 100644 index 00000000000..79ee92ec04d --- /dev/null +++ b/pkg/reconciler/v1alpha1/pipeline/dag/dag.go @@ -0,0 +1,109 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dag + +import ( + "fmt" + + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/knative/build-pipeline/pkg/list" +) + +// GetSchedulable returns a map of PipelineTask that can be scheduled (keyed +// by the name of the PipelineTask) given a list of successfully finished doneTasks. +// It returns tasks which have all dependecies marked as done, and thus can be scheduled. If the +// specified doneTasks are invalid (i.e. if it is indicated that a Task is +// done, but the previous Tasks are not done), an error is returned. +func GetSchedulable(g *v1alpha1.DAG, doneTasks ...string) (map[string]v1alpha1.PipelineTask, error) { + roots := getRoots(g) + tm := toMap(doneTasks...) + d := map[string]v1alpha1.PipelineTask{} + + visited := map[string]struct{}{} + for _, root := range roots { + schedulable := findSchedulable(root, visited, tm) + for _, task := range schedulable { + d[task.Name] = task + } + } + + visitedNames := make([]string, len(visited)) + for v := range visited { + visitedNames = append(visitedNames, v) + } + + notVisited := list.DiffLeft(doneTasks, visitedNames) + if len(notVisited) > 0 { + return map[string]v1alpha1.PipelineTask{}, fmt.Errorf("invalid list of done tasks; some tasks were indicated completed without ancestors being done: %v", notVisited) + } + + return d, nil +} + +func getRoots(g *v1alpha1.DAG) []*v1alpha1.Node { + n := []*v1alpha1.Node{} + for _, node := range g.Nodes { + if len(node.Prev) == 0 { + n = append(n, node) + } + } + return n +} + +func findSchedulable(n *v1alpha1.Node, visited map[string]struct{}, doneTasks map[string]struct{}) []v1alpha1.PipelineTask { + if _, ok := visited[n.Task.Name]; ok { + return []v1alpha1.PipelineTask{} + } + visited[n.Task.Name] = struct{}{} + if _, ok := doneTasks[n.Task.Name]; ok { + schedulable := []v1alpha1.PipelineTask{} + // This one is done! Take note of it and look at the next candidate + for _, next := range n.Next { + if _, ok := visited[next.Task.Name]; !ok { + schedulable = append(schedulable, findSchedulable(next, visited, doneTasks)...) + } + } + return schedulable + } + // This one isn't done! Return it if it's schedulable + if isSchedulable(doneTasks, n.Prev) { + return []v1alpha1.PipelineTask{n.Task} + } + // This one isn't done, but it also isn't ready to schedule + return []v1alpha1.PipelineTask{} +} + +func isSchedulable(doneTasks map[string]struct{}, prevs []*v1alpha1.Node) bool { + if len(prevs) == 0 { + return true + } + collected := []string{} + for _, n := range prevs { + if _, ok := doneTasks[n.Task.Name]; ok { + collected = append(collected, n.Task.Name) + } + } + return len(collected) == len(prevs) +} + +func toMap(t ...string) map[string]struct{} { + m := make(map[string]struct{}, len(t)) + for _, s := range t { + m[s] = struct{}{} + } + return m +} diff --git a/pkg/reconciler/v1alpha1/pipeline/dag/dag_test.go b/pkg/reconciler/v1alpha1/pipeline/dag/dag_test.go new file mode 100644 index 00000000000..70127683b37 --- /dev/null +++ b/pkg/reconciler/v1alpha1/pipeline/dag/dag_test.go @@ -0,0 +1,183 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dag + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" +) + +func testGraph(t *testing.T) *v1alpha1.DAG { + // b a + // | / \ + // | | x + // | | / | + // | y | + // \ / z + // w + t.Helper() + g, err := v1alpha1.BuildDAG([]v1alpha1.PipelineTask{ + { + Name: "a", + }, + { + Name: "b", + }, + { + Name: "w", + RunAfter: []string{"b", "y"}, + }, + { + Name: "x", + RunAfter: []string{"a"}, + }, + { + Name: "y", + RunAfter: []string{"a", "x"}, + }, + { + Name: "z", + RunAfter: []string{"x"}, + }, + }) + if err != nil { + t.Fatal(err) + } + return g +} + +func TestGetSchedulable(t *testing.T) { + g := testGraph(t) + tcs := []struct { + name string + finished []string + expectedTasks map[string]v1alpha1.PipelineTask + }{{ + name: "nothing-done", + finished: []string{}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "a": {Name: "a"}, + "b": {Name: "b"}, + }, + }, { + name: "a-done", + finished: []string{"a"}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "b": {Name: "b"}, + "x": {Name: "x"}, + }, + }, { + name: "b-done", + finished: []string{"b"}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "a": {Name: "a"}, + }, + }, { + name: "a-and-b-done", + finished: []string{"a", "b"}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "x": {Name: "x"}, + }, + }, { + name: "a-x-done", + finished: []string{"a", "x"}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "b": {Name: "b"}, + "y": {Name: "y"}, + "z": {Name: "z"}, + }, + }, { + name: "a-x-b-done", + finished: []string{"a", "x", "b"}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "y": {Name: "y"}, + "z": {Name: "z"}, + }, + }, { + name: "a-x-y-done", + finished: []string{"a", "x", "y"}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "b": {Name: "b"}, + "z": {Name: "z"}, + }, + }, { + name: "a-x-y-done", + finished: []string{"a", "x", "y"}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "b": {Name: "b"}, + "z": {Name: "z"}, + }, + }, { + name: "a-x-y-b-done", + finished: []string{"a", "x", "y", "b"}, + expectedTasks: map[string]v1alpha1.PipelineTask{ + "w": {Name: "w"}, + "z": {Name: "z"}, + }, + }} + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tasks, err := GetSchedulable(g, tc.finished...) + if err != nil { + t.Fatalf("Didn't expect error when getting next tasks for %v but got %v", tc.finished, err) + } + if d := cmp.Diff(tasks, tc.expectedTasks, cmpopts.IgnoreFields(v1alpha1.PipelineTask{}, "RunAfter")); d != "" { + t.Errorf("expected that with %v done, %v would be ready to schedule but was different: %s", tc.finished, tc.expectedTasks, d) + } + }) + } +} + +func TestGetSchedulable_Invalid(t *testing.T) { + g := testGraph(t) + tcs := []struct { + name string + finished []string + }{{ + // x can't be completed on its own b/c it depends on a + name: "only-x", + finished: []string{"x"}, + }, { + // y can't be completed on its own b/c it depends on a and x + name: "only-y", + finished: []string{"y"}, + }, { + // w can't be completed on its own b/c it depends on y and b + name: "only-w", + finished: []string{"w"}, + }, { + name: "only-y-and-x", + finished: []string{"y", "x"}, + }, { + name: "only-y-and-w", + finished: []string{"y", "w"}, + }, { + name: "only-x-and-w", + finished: []string{"x", "w"}, + }} + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + _, err := GetSchedulable(g, tc.finished...) + if err == nil { + t.Fatalf("Expected error for invalid done tasks %v but got none", tc.finished) + } + }) + } +} diff --git a/pkg/reconciler/v1alpha1/pipeline/resources/dag_test.go b/pkg/reconciler/v1alpha1/pipeline/resources/dag_test.go deleted file mode 100644 index 3fbcef6ea66..00000000000 --- a/pkg/reconciler/v1alpha1/pipeline/resources/dag_test.go +++ /dev/null @@ -1,487 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package pipeline - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" - "github.com/knative/build-pipeline/pkg/list" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func sameNodes(l, r []*Node) error { - lNames, rNames := []string{}, []string{} - for _, n := range l { - lNames = append(lNames, n.Task.Name) - } - for _, n := range r { - rNames = append(rNames, n.Task.Name) - } - - return list.IsSame(lNames, rNames) -} - -func assertSameDAG(t *testing.T, l, r *DAG) { - t.Helper() - lKeys, rKeys := []string{}, []string{} - - for k := range l.Nodes { - lKeys = append(lKeys, k) - } - for k := range r.Nodes { - rKeys = append(rKeys, k) - } - - // For the DAGs to be the same, they must contain the same nodes - err := list.IsSame(lKeys, rKeys) - if err != nil { - t.Fatalf("DAGS contain different nodes: %v", err) - } - - // If they contain the same nodes, the DAGs will be the same if all - // of the nodes have the same linkages - for k, rn := range r.Nodes { - ln := l.Nodes[k] - - err := sameNodes(rn.Prev, ln.Prev) - if err != nil { - t.Errorf("The %s nodes in the DAG have different previous nodes: %v", k, err) - } - err = sameNodes(rn.Next, ln.Next) - if err != nil { - t.Errorf("The %s nodes in the DAG have different next nodes: %v", k, err) - } - } -} - -func TestBuild_Parallel(t *testing.T) { - a := v1alpha1.PipelineTask{Name: "a"} - b := v1alpha1.PipelineTask{Name: "b"} - c := v1alpha1.PipelineTask{Name: "c"} - - // This test make sure we can create a Pipeline with no links between any Tasks - // (all tasks run in parallel) - // a b c - p := &v1alpha1.Pipeline{ - ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, - Spec: v1alpha1.PipelineSpec{ - Tasks: []v1alpha1.PipelineTask{a, b, c}, - }, - } - expectedDAG := &DAG{ - Nodes: map[string]*Node{ - "a": {Task: a}, - "b": {Task: b}, - "c": {Task: c}, - }, - } - g, err := Build(p) - if err != nil { - t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) - } - assertSameDAG(t, expectedDAG, g) -} - -func TestBuild_JoinMultipleRoots(t *testing.T) { - a := v1alpha1.PipelineTask{Name: "a"} - b := v1alpha1.PipelineTask{Name: "b"} - c := v1alpha1.PipelineTask{Name: "c"} - xDependsOnA := v1alpha1.PipelineTask{ - Name: "x", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, - }, - } - yDependsOnARunsAfterB := v1alpha1.PipelineTask{ - Name: "y", - RunAfter: []string{"b"}, - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, - }, - } - zDependsOnX := v1alpha1.PipelineTask{ - Name: "z", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"x"}}}, - }, - } - - // a b c - // | \ / - // x y - // | - // z - - nodeA := &Node{Task: a} - nodeB := &Node{Task: b} - nodeC := &Node{Task: c} - nodeX := &Node{Task: xDependsOnA} - nodeY := &Node{Task: yDependsOnARunsAfterB} - nodeZ := &Node{Task: zDependsOnX} - - nodeA.Next = []*Node{nodeX, nodeY} - nodeB.Next = []*Node{nodeY} - nodeX.Prev = []*Node{nodeA} - nodeX.Next = []*Node{nodeZ} - nodeY.Prev = []*Node{nodeA, nodeB} - nodeZ.Prev = []*Node{nodeX} - - expectedDAG := &DAG{ - Nodes: map[string]*Node{ - "a": nodeA, - "b": nodeB, - "c": nodeC, - "x": nodeX, - "y": nodeY, - "z": nodeZ}, - } - p := &v1alpha1.Pipeline{ - ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, - Spec: v1alpha1.PipelineSpec{ - Tasks: []v1alpha1.PipelineTask{a, xDependsOnA, yDependsOnARunsAfterB, zDependsOnX, b, c}, - }, - } - g, err := Build(p) - if err != nil { - t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) - } - assertSameDAG(t, expectedDAG, g) -} - -func TestBuild_FanInFanOut(t *testing.T) { - a := v1alpha1.PipelineTask{Name: "a"} - dDependsOnA := v1alpha1.PipelineTask{ - Name: "d", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, - }, - } - eRunsAfterA := v1alpha1.PipelineTask{ - Name: "e", - RunAfter: []string{"a"}, - } - fDependsOnDAndE := v1alpha1.PipelineTask{ - Name: "f", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"d", "e"}}}, - }, - } - gRunsAfterF := v1alpha1.PipelineTask{ - Name: "g", - RunAfter: []string{"f"}, - } - - // This test make sure we don't detect cycle (A -> B -> B -> …) when there is not. - // This means we "visit" a twice, from two different path ; but there is no cycle. - // a - // / \ - // d e - // \ / - // f - // | - // g - nodeA := &Node{Task: a} - nodeD := &Node{Task: dDependsOnA} - nodeE := &Node{Task: eRunsAfterA} - nodeF := &Node{Task: fDependsOnDAndE} - nodeG := &Node{Task: gRunsAfterF} - - nodeA.Next = []*Node{nodeD, nodeE} - nodeD.Prev = []*Node{nodeA} - nodeD.Next = []*Node{nodeF} - nodeE.Prev = []*Node{nodeA} - nodeE.Next = []*Node{nodeF} - nodeF.Prev = []*Node{nodeD, nodeE} - nodeF.Next = []*Node{nodeG} - nodeG.Prev = []*Node{nodeF} - - expectedDAG := &DAG{ - Nodes: map[string]*Node{ - "a": nodeA, - "d": nodeD, - "e": nodeE, - "f": nodeF, - "g": nodeG, - }, - } - p := &v1alpha1.Pipeline{ - ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, - Spec: v1alpha1.PipelineSpec{ - Tasks: []v1alpha1.PipelineTask{a, dDependsOnA, eRunsAfterA, fDependsOnDAndE, gRunsAfterF}, - }, - } - g, err := Build(p) - if err != nil { - t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) - } - assertSameDAG(t, expectedDAG, g) -} - -func TestBuild_Invalid(t *testing.T) { - a := v1alpha1.PipelineTask{Name: "a"} - xDependsOnA := v1alpha1.PipelineTask{ - Name: "x", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, - }, - } - zDependsOnX := v1alpha1.PipelineTask{ - Name: "z", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"x"}}}, - }, - } - aDependsOnZ := v1alpha1.PipelineTask{ - Name: "a", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"z"}}}, - }, - } - xAfterA := v1alpha1.PipelineTask{ - Name: "x", - RunAfter: []string{"a"}, - } - zAfterX := v1alpha1.PipelineTask{ - Name: "z", - RunAfter: []string{"x"}, - } - aAfterZ := v1alpha1.PipelineTask{ - Name: "a", - RunAfter: []string{"z"}, - } - selfLinkFrom := v1alpha1.PipelineTask{ - Name: "a", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, - }, - } - selfLinkAfter := v1alpha1.PipelineTask{ - Name: "a", - RunAfter: []string{"a"}, - } - invalidTaskFrom := v1alpha1.PipelineTask{ - Name: "a", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"none"}}}, - }, - } - invalidTaskAfter := v1alpha1.PipelineTask{ - Name: "a", - RunAfter: []string{"none"}, - } - - tcs := []struct { - name string - spec v1alpha1.PipelineSpec - }{{ - name: "self-link-from", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{selfLinkFrom}}, - }, { - name: "self-link-after", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{selfLinkAfter}}, - }, { - name: "cycle-from", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}}, - }, { - name: "cycle-runAfter", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xAfterA, zAfterX, aAfterZ}}, - }, { - name: "cycle-both", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xDependsOnA, zAfterX, aDependsOnZ}}, - }, { - name: "duplicate-tasks", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, a}}, - }, { - name: "invalid-task-name-from", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{invalidTaskFrom}}, - }, { - name: "invalid-task-name-after", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{invalidTaskAfter}}, - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - p := &v1alpha1.Pipeline{ - ObjectMeta: metav1.ObjectMeta{Name: tc.name}, - Spec: tc.spec, - } - if _, err := Build(p); err == nil { - t.Errorf("expected to see an error for invalid DAG in pipeline %v but had none", tc.spec) - } - }) - } -} - -func testGraph() *DAG { - // b a - // | / \ - // | | x - // | | / | - // | y | - // \ / z - // w - g := new() - g.Nodes["a"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "a"}, - } - g.Nodes["b"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "b"}, - } - g.Nodes["x"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "x"}, - } - linkPipelineTasks(g.Nodes["a"], g.Nodes["x"]) - - g.Nodes["y"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "y"}, - } - linkPipelineTasks(g.Nodes["a"], g.Nodes["y"]) - linkPipelineTasks(g.Nodes["x"], g.Nodes["y"]) - - g.Nodes["z"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "z"}, - } - linkPipelineTasks(g.Nodes["x"], g.Nodes["z"]) - - g.Nodes["w"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "w"}, - } - linkPipelineTasks(g.Nodes["y"], g.Nodes["w"]) - linkPipelineTasks(g.Nodes["b"], g.Nodes["w"]) - return g -} - -func TestGetSchedulable(t *testing.T) { - g := testGraph() - tcs := []struct { - name string - finished []string - expectedTasks map[string]v1alpha1.PipelineTask - }{{ - name: "nothing-done", - finished: []string{}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "a": {Name: "a"}, - "b": {Name: "b"}, - }, - }, { - name: "a-done", - finished: []string{"a"}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "b": {Name: "b"}, - "x": {Name: "x"}, - }, - }, { - name: "b-done", - finished: []string{"b"}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "a": {Name: "a"}, - }, - }, { - name: "a-and-b-done", - finished: []string{"a", "b"}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "x": {Name: "x"}, - }, - }, { - name: "a-x-done", - finished: []string{"a", "x"}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "b": {Name: "b"}, - "y": {Name: "y"}, - "z": {Name: "z"}, - }, - }, { - name: "a-x-b-done", - finished: []string{"a", "x", "b"}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "y": {Name: "y"}, - "z": {Name: "z"}, - }, - }, { - name: "a-x-y-done", - finished: []string{"a", "x", "y"}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "b": {Name: "b"}, - "z": {Name: "z"}, - }, - }, { - name: "a-x-y-done", - finished: []string{"a", "x", "y"}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "b": {Name: "b"}, - "z": {Name: "z"}, - }, - }, { - name: "a-x-y-b-done", - finished: []string{"a", "x", "y", "b"}, - expectedTasks: map[string]v1alpha1.PipelineTask{ - "w": {Name: "w"}, - "z": {Name: "z"}, - }, - }} - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - tasks, err := g.GetSchedulable(tc.finished...) - if err != nil { - t.Fatalf("Didn't expect error when getting next tasks for %v but got %v", tc.finished, err) - } - if d := cmp.Diff(tasks, tc.expectedTasks); d != "" { - t.Errorf("expected that with %v done, %v would be ready to schedule but was different: %s", tc.finished, tc.expectedTasks, d) - } - }) - } -} - -func TestGetSchedulable_Invalid(t *testing.T) { - g := testGraph() - tcs := []struct { - name string - finished []string - }{{ - // x can't be completed on its own b/c it depends on a - name: "only-x", - finished: []string{"x"}, - }, { - // y can't be completed on its own b/c it depends on a and x - name: "only-y", - finished: []string{"y"}, - }, { - // w can't be completed on its own b/c it depends on y and b - name: "only-w", - finished: []string{"w"}, - }, { - name: "only-y-and-x", - finished: []string{"y", "x"}, - }, { - name: "only-y-and-w", - finished: []string{"y", "w"}, - }, { - name: "only-x-and-w", - finished: []string{"x", "w"}, - }} - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - _, err := g.GetSchedulable(tc.finished...) - if err == nil { - t.Fatalf("Expected error for invalid done tasks %v but got none", tc.finished) - } - }) - } -} diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go index 71f412f8185..ef895bad197 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go @@ -28,7 +28,7 @@ import ( informers "github.com/knative/build-pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1" listers "github.com/knative/build-pipeline/pkg/client/listers/pipeline/v1alpha1" "github.com/knative/build-pipeline/pkg/reconciler" - dag "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipeline/resources" + "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipeline/dag" "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/config" "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/resources" "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun" @@ -222,7 +222,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er p = p.DeepCopy() - d, err := dag.Build(p) + d, err := v1alpha1.BuildDAG(p.Spec.Tasks) if err != nil { // This Run has failed, so we need to mark it as failed and stop reconciling it pr.Status.SetCondition(&duckv1alpha1.Condition{ @@ -336,7 +336,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er return cancelPipelineRun(pr, pipelineState, c.PipelineClientSet) } - candidateTasks, err := d.GetSchedulable(pipelineState.SuccessfulPipelineTaskNames()...) + candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulPipelineTaskNames()...) if err != nil { c.Logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err) }