Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate the DAG when validating the pipeline spec (aka at creation) #578

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package pipeline
package v1alpha1

import (
"fmt"
"strings"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/knative/build-pipeline/pkg/list"
)

// Node represents a Task in a pipeline.
type Node struct {
// Task represent the PipelineTask in Pipeline
Task v1alpha1.PipelineTask
Task PipelineTask
// Prev represent all the Previous task Nodes for the current Task
Prev []*Node
// Next represent all the Next task Nodes for the current Task
Expand All @@ -41,11 +38,11 @@ type DAG struct {
}

// Returns an empty Pipeline DAG
func new() *DAG {
func newDAG() *DAG {
return &DAG{Nodes: map[string]*Node{}}
}

func (g *DAG) addPipelineTask(t v1alpha1.PipelineTask) (*Node, error) {
func (g *DAG) addPipelineTask(t PipelineTask) (*Node, error) {
if _, ok := g.Nodes[t.Name]; ok {
return nil, fmt.Errorf("duplicate pipeline taks")
}
Expand Down Expand Up @@ -87,100 +84,15 @@ func visit(currentName string, nodes []*Node, path []string, visited map[string]
}

func getVisitedPath(path []string) string {
// Reverse the path since we traversed the graph using prev pointers.
// Reverse the path since we traversed the DAG using prev pointers.
for i := len(path)/2 - 1; i >= 0; i-- {
opp := len(path) - 1 - i
path[i], path[opp] = path[opp], path[i]
}
return strings.Join(path, " -> ")
}

// GetSchedulable returns a map of PipelineTask that can be scheduled (keyed
// by the name of the PipelineTask) given a list of successfully finished doneTasks.
// It returns tasks which have all dependecies marked as done, and thus can be scheduled. If the
// specified doneTasks are invalid (i.e. if it is indicated that a Task is
// done, but the previous Tasks are not done), an error is returned.
func (g *DAG) GetSchedulable(doneTasks ...string) (map[string]v1alpha1.PipelineTask, error) {
roots := g.getRoots()
tm := toMap(doneTasks...)
d := map[string]v1alpha1.PipelineTask{}

visited := map[string]struct{}{}
for _, root := range roots {
schedulable := findSchedulable(root, visited, tm)
for _, task := range schedulable {
d[task.Name] = task
}
}

visitedNames := make([]string, len(visited))
for v := range visited {
visitedNames = append(visitedNames, v)
}

notVisited := list.DiffLeft(doneTasks, visitedNames)
if len(notVisited) > 0 {
return map[string]v1alpha1.PipelineTask{}, fmt.Errorf("invalid list of done tasks; some tasks were indicated completed without ancestors being done: %v", notVisited)
}

return d, nil
}

func (g *DAG) getRoots() []*Node {
n := []*Node{}
for _, node := range g.Nodes {
if len(node.Prev) == 0 {
n = append(n, node)
}
}
return n
}

func findSchedulable(n *Node, visited map[string]struct{}, doneTasks map[string]struct{}) []v1alpha1.PipelineTask {
if _, ok := visited[n.Task.Name]; ok {
return []v1alpha1.PipelineTask{}
}
visited[n.Task.Name] = struct{}{}
if _, ok := doneTasks[n.Task.Name]; ok {
schedulable := []v1alpha1.PipelineTask{}
// This one is done! Take note of it and look at the next candidate
for _, next := range n.Next {
if _, ok := visited[next.Task.Name]; !ok {
schedulable = append(schedulable, findSchedulable(next, visited, doneTasks)...)
}
}
return schedulable
}
// This one isn't done! Return it if it's schedulable
if isSchedulable(doneTasks, n.Prev) {
return []v1alpha1.PipelineTask{n.Task}
}
// This one isn't done, but it also isn't ready to schedule
return []v1alpha1.PipelineTask{}
}

func isSchedulable(doneTasks map[string]struct{}, prevs []*Node) bool {
if len(prevs) == 0 {
return true
}
collected := []string{}
for _, n := range prevs {
if _, ok := doneTasks[n.Task.Name]; ok {
collected = append(collected, n.Task.Name)
}
}
return len(collected) == len(prevs)
}

func toMap(t ...string) map[string]struct{} {
m := make(map[string]struct{}, len(t))
for _, s := range t {
m[s] = struct{}{}
}
return m
}

func addLink(pt v1alpha1.PipelineTask, previousTask string, nodes map[string]*Node) error {
func addLink(pt PipelineTask, previousTask string, nodes map[string]*Node) error {
prev, ok := nodes[previousTask]
if !ok {
return fmt.Errorf("Task %s depends on %s but %s wasn't present in Pipeline", pt.Name, previousTask, previousTask)
Expand All @@ -192,18 +104,18 @@ func addLink(pt v1alpha1.PipelineTask, previousTask string, nodes map[string]*No
return nil
}

// Build returns a valid pipeline DAG. Returns error if the pipeline is invalid
func Build(p *v1alpha1.Pipeline) (*DAG, error) {
d := new()
// BuildDAG returns a valid pipeline DAG. Returns error if the pipeline is invalid
func BuildDAG(tasks []PipelineTask) (*DAG, error) {
d := newDAG()

// Add all Tasks mentioned in the `PipelineSpec`
for _, pt := range p.Spec.Tasks {
for _, pt := range tasks {
if _, err := d.addPipelineTask(pt); err != nil {
return nil, fmt.Errorf("task %s is already present in graph, can't add it again: %v", pt.Name, err)
return nil, fmt.Errorf("task %s is already present in DAG, can't add it again: %v", pt.Name, err)
}
}
// Process all from and runAfter constraints to add task dependency
for _, pt := range p.Spec.Tasks {
for _, pt := range tasks {
for _, previousTask := range pt.RunAfter {
if err := addLink(pt, previousTask, d.Nodes); err != nil {
return nil, fmt.Errorf("couldn't add link between %s and %s: %v", pt.Name, previousTask, err)
Expand Down
Loading