Skip to content

Commit

Permalink
TEP-0090: Refactor execution status validation
Browse files Browse the repository at this point in the history
Today, the execution status validation is long and has duplicated
code. We need to validate execution status in matrix, a feature
proposed in TEP-0090. To set up for that, we are refactoring the
execution status validation in this change which includes splitting
up the function and reusing common logic.
  • Loading branch information
jerop committed Mar 1, 2022
1 parent 7d7d02c commit cbff0f7
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 76 deletions.
76 changes: 76 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,82 @@ func (pt *PipelineTask) validateMatrix(ctx context.Context) (errs *apis.FieldErr
return errs
}

func (pt *PipelineTask) validateExecutionStatusVariablesDisallowed() (errs *apis.FieldError) {
for _, param := range pt.Params {
if expressions, ok := GetVarSubstitutionExpressionsForParam(param); ok {
errs = errs.Also(validateContainsExecutionStatusVariablesDisallowed(expressions, "value").
ViaFieldKey("params", param.Name))
}
}
for i, we := range pt.WhenExpressions {
if expressions, ok := we.GetVarSubstitutionExpressions(); ok {
errs = errs.Also(validateContainsExecutionStatusVariablesDisallowed(expressions, "").
ViaFieldIndex("when", i))
}
}
return errs
}

func (pt *PipelineTask) validateExecutionStatusVariablesAllowed(ptNames sets.String) (errs *apis.FieldError) {
for _, param := range pt.Params {
if expressions, ok := GetVarSubstitutionExpressionsForParam(param); ok {
errs = errs.Also(validateExecutionStatusVariablesExpressions(expressions, ptNames, "value").
ViaFieldKey("params", param.Name))
}
}
for i, we := range pt.WhenExpressions {
if expressions, ok := we.GetVarSubstitutionExpressions(); ok {
errs = errs.Also(validateExecutionStatusVariablesExpressions(expressions, ptNames, "").
ViaFieldIndex("when", i))
}
}
return errs
}

func validateContainsExecutionStatusVariablesDisallowed(expressions []string, path string) (errs *apis.FieldError) {
if containsExecutionStatusReferences(expressions) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline tasks can not refer to execution status"+
" of any other pipeline task or aggregate status of tasks"), path))
}
return errs
}

func containsExecutionStatusReferences(expressions []string) bool {
// validate tasks.pipelineTask.status/tasks.status if this expression is not a result reference
if !LooksLikeContainsResultRefs(expressions) {
for _, e := range expressions {
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
// or an aggregate status - $(tasks.status)
if containsExecutionStatusRef(e) {
return true
}
}
}
return false
}

func validateExecutionStatusVariablesExpressions(expressions []string, ptNames sets.String, fieldPath string) (errs *apis.FieldError) {
// validate tasks.pipelineTask.status if this expression is not a result reference
if !LooksLikeContainsResultRefs(expressions) {
for _, expression := range expressions {
// its a reference to aggregate status of dag tasks - $(tasks.status)
if expression == PipelineTasksAggregateStatus {
continue
}
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
if containsExecutionStatusRef(expression) {
// strip tasks. and .status from tasks.taskname.status to further verify task name
pt := strings.TrimSuffix(strings.TrimPrefix(expression, "tasks."), ".status")
// report an error if the task name does not exist in the list of dag tasks
if !ptNames.Has(pt) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline task %s is not defined in the pipeline", pt), fieldPath))
}
}
}
}
return errs
}

// TaskSpecMetadata returns the metadata of the PipelineTask's EmbeddedTask spec.
func (pt *PipelineTask) TaskSpecMetadata() PipelineTaskMetadata {
return pt.TaskSpec.Metadata
Expand Down
78 changes: 6 additions & 72 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,91 +203,25 @@ func containsExecutionStatusRef(p string) bool {

// validate dag pipeline tasks, task params can not access execution status of any other task
// dag tasks cannot have param value as $(tasks.pipelineTask.status)
func validateExecutionStatusVariablesInTasks(tasks []PipelineTask) (errs *apis.FieldError) {
func validateExecutionStatusVariablesDisallowed(tasks []PipelineTask) (errs *apis.FieldError) {
for idx, t := range tasks {
for _, param := range t.Params {
// retrieve a list of substitution expression from a param
if ps, ok := GetVarSubstitutionExpressionsForParam(param); ok {
// validate tasks.pipelineTask.status/tasks.status if this expression is not a result reference
if !LooksLikeContainsResultRefs(ps) {
for _, p := range ps {
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
// or an aggregate status - $(tasks.status)
if containsExecutionStatusRef(p) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline tasks can not refer to execution status of any other pipeline task"+
" or aggregate status of tasks"), "value").ViaFieldKey("params", param.Name).ViaFieldIndex("tasks", idx))
}
}
}
}
}
for i, we := range t.WhenExpressions {
// retrieve a list of substitution expression from a when expression
if expressions, ok := we.GetVarSubstitutionExpressions(); ok {
// validate tasks.pipelineTask.status/tasks.status if this expression is not a result reference
if !LooksLikeContainsResultRefs(expressions) {
for _, e := range expressions {
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
// or an aggregate status - $(tasks.status)
if containsExecutionStatusRef(e) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("when expressions in pipeline tasks can not refer to execution status of any other pipeline task"+
" or aggregate status of tasks"), "").ViaFieldIndex("when", i).ViaFieldIndex("tasks", idx))
}
}
}
}
}
errs = errs.Also(t.validateExecutionStatusVariablesDisallowed()).ViaIndex(idx)
}
return errs
}

// validate finally tasks accessing execution status of a dag task specified in the pipeline
// $(tasks.pipelineTask.status) is invalid if pipelineTask is not defined as a dag task
func validateExecutionStatusVariablesInFinally(tasks []PipelineTask, finally []PipelineTask) (errs *apis.FieldError) {
// creating a list of pipelineTask names to validate tasks.<name>.status
ptNames := PipelineTaskList(tasks).Names()
func validateExecutionStatusVariablesAllowed(tasksNames sets.String, finally []PipelineTask) (errs *apis.FieldError) {
for idx, t := range finally {
for _, param := range t.Params {
if expressions, ok := GetVarSubstitutionExpressionsForParam(param); ok {
errs = errs.Also(validateExecutionStatusVariablesExpressions(expressions, ptNames, "value").ViaFieldKey(
"params", param.Name).ViaFieldIndex("finally", idx))
}
}
for i, we := range t.WhenExpressions {
if expressions, ok := we.GetVarSubstitutionExpressions(); ok {
errs = errs.Also(validateExecutionStatusVariablesExpressions(expressions, ptNames, "").ViaFieldIndex(
"when", i).ViaFieldIndex("finally", idx))
}
}
}
return errs
}

func validateExecutionStatusVariablesExpressions(expressions []string, ptNames sets.String, fieldPath string) (errs *apis.FieldError) {
// validate tasks.pipelineTask.status if this expression is not a result reference
if !LooksLikeContainsResultRefs(expressions) {
for _, expression := range expressions {
// its a reference to aggregate status of dag tasks - $(tasks.status)
if expression == PipelineTasksAggregateStatus {
continue
}
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
if containsExecutionStatusRef(expression) {
// strip tasks. and .status from tasks.taskname.status to further verify task name
pt := strings.TrimSuffix(strings.TrimPrefix(expression, "tasks."), ".status")
// report an error if the task name does not exist in the list of dag tasks
if !ptNames.Has(pt) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline task %s is not defined in the pipeline", pt), fieldPath))
}
}
}
errs = errs.Also(t.validateExecutionStatusVariablesAllowed(tasksNames).ViaIndex(idx))
}
return errs
}

func validateExecutionStatusVariables(tasks []PipelineTask, finallyTasks []PipelineTask) (errs *apis.FieldError) {
errs = errs.Also(validateExecutionStatusVariablesInTasks(tasks))
errs = errs.Also(validateExecutionStatusVariablesInFinally(tasks, finallyTasks))
errs = errs.Also(validateExecutionStatusVariablesDisallowed(tasks).ViaField("tasks"))
errs = errs.Also(validateExecutionStatusVariablesAllowed(PipelineTaskList(tasks).Names(), finallyTasks).ViaField("finally"))
return errs
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2420,10 +2420,7 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
}},
expectedError: *apis.ErrGeneric("").Also(&apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[bar-status].value"},
}).Also(&apis.FieldError{
Message: `invalid value: when expressions in pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].when[0]"},
Paths: []string{"tasks[0].params[bar-status].value", "tasks[0].when[0]"},
}),
}, {
name: "invalid string variable in dag task accessing aggregate status of tasks",
Expand Down

0 comments on commit cbff0f7

Please sign in to comment.