diff --git a/v2/cacheutils/cache.go b/v2/cacheutils/cache.go new file mode 100644 index 00000000000..3db17b568f2 --- /dev/null +++ b/v2/cacheutils/cache.go @@ -0,0 +1,100 @@ +package cacheutils + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + + "github.com/kubeflow/pipelines/v2/third_party/pipeline_spec" +) + +type CacheKey struct { + inputArtifactNames map[string]artifactNameList + inputParameters map[string]pipeline_spec.Value + outputArtifactsSpec map[string]pipeline_spec.RuntimeArtifact + outputParametersSpec map[string]string + containerSpec containerSpec +} + +type artifactNameList struct { + // A list of artifact Names. + artifactNames []string +} + +type containerSpec struct { + image string + cmdArgs []string +} + +func GenerateFingerPrint(cacheKey CacheKey) (string, error) { + b, err := json.Marshal(cacheKey) + if err != nil { + return "", fmt.Errorf("failed to marshal cache key: %w", err) + } + hash := sha256.New() + hash.Write(b) + md := hash.Sum(nil) + executionHashKey := hex.EncodeToString(md) + return executionHashKey, nil + +} + +func GenerateCacheKey( + inputs *pipeline_spec.ExecutorInput_Inputs, + outputs *pipeline_spec.ExecutorInput_Outputs, + outputParametersTypeMap map[string]string, + cmdArgs []string, image string) (*CacheKey, error) { + + cacheKey := CacheKey{ + inputArtifactNames: make(map[string]artifactNameList), + inputParameters: make(map[string]pipeline_spec.Value), + outputArtifactsSpec: make(map[string]pipeline_spec.RuntimeArtifact), + outputParametersSpec: make(map[string]string), + } + + for inputArtifactName, inputArtifactList := range inputs.GetArtifacts() { + artifactNameList := artifactNameList{artifactNames: make([]string, 0)} + for _, artifact := range inputArtifactList.Artifacts { + artifactNameList.artifactNames = append(artifactNameList.artifactNames, artifact.GetName()) + } + cacheKey.inputArtifactNames[inputArtifactName] = artifactNameList + } + + for inputParameterName, inputParameterValue := range inputs.GetParameters() { + cacheKey.inputParameters[inputParameterName] = pipeline_spec.Value{ + Value: inputParameterValue.Value, + } + } + + for outputArtifactName, outputArtifactList := range outputs.GetArtifacts() { + if len(outputArtifactList.Artifacts) == 0 { + continue + } + // TODO: Support multiple artifacts someday, probably through the v2 engine. + outputArtifact := outputArtifactList.Artifacts[0] + outputArtifactWithUriWiped := pipeline_spec.RuntimeArtifact{ + Name: outputArtifact.GetName(), + Type: outputArtifact.GetType(), + Metadata: outputArtifact.GetMetadata(), + } + cacheKey.outputArtifactsSpec[outputArtifactName] = outputArtifactWithUriWiped + } + + for outputParameterName, _ := range outputs.GetParameters() { + outputParameterType, ok := outputParametersTypeMap[outputParameterName] + if !ok { + return nil, fmt.Errorf("unknown parameter %q found in ExecutorInput_Outputs", outputParameterName) + } + + cacheKey.outputParametersSpec[outputParameterName] = outputParameterType + } + + cacheKey.containerSpec = containerSpec{ + image: image, + cmdArgs: cmdArgs, + } + + return &cacheKey, nil + +} \ No newline at end of file diff --git a/v2/cacheutils/cache_test.go b/v2/cacheutils/cache_test.go new file mode 100644 index 00000000000..6817c97fe89 --- /dev/null +++ b/v2/cacheutils/cache_test.go @@ -0,0 +1,151 @@ +package cacheutils + +import ( + "encoding/json" + "fmt" + "google.golang.org/protobuf/testing/protocmp" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/kubeflow/pipelines/v2/third_party/pipeline_spec" + "google.golang.org/protobuf/types/known/structpb" +) + +func TestGenerateCacheKey(t *testing.T) { + + tests := []struct { + name string + executorInputInputs *pipeline_spec.ExecutorInput_Inputs + executorInputOutputs *pipeline_spec.ExecutorInput_Outputs + outputParametersTypeMap map[string]string + cmdArgs []string + image string + want *CacheKey + wantErr bool + }{ + { + name: "Generate CacheKey Correctly", + executorInputInputs: &pipeline_spec.ExecutorInput_Inputs{ + Parameters: map[string]*pipeline_spec.Value{ + "message": {Value: &pipeline_spec.Value_StringValue{StringValue: "Some string value"}}, + "num_steps": {Value: &pipeline_spec.Value_IntValue{IntValue: 5}}, + }, + Artifacts: map[string]*pipeline_spec.ArtifactList{ + "dataset_one": { + Artifacts: []*pipeline_spec.RuntimeArtifact{ + { + Name: "1", + Type: &pipeline_spec.ArtifactTypeSchema{ + Kind: &pipeline_spec.ArtifactTypeSchema_InstanceSchema{InstanceSchema: "title: kfp.Dataset\ntype: object\nproperties:\n payload_format:\n type: string\n container_format:\n type: string\n"}, + }, + Uri: "gs://some-bucket/dataset-one", + Metadata: &structpb.Struct{}, + }}}, + "dataset_two": { + Artifacts: []*pipeline_spec.RuntimeArtifact{ + { + Name: "2", + Type: &pipeline_spec.ArtifactTypeSchema{ + Kind: &pipeline_spec.ArtifactTypeSchema_SchemaTitle{SchemaTitle: "kfp.Model"}, + }, + Uri: "gs://some-bucket/dataset-two", + Metadata: &structpb.Struct{}, + }}}}, + }, + + executorInputOutputs: &pipeline_spec.ExecutorInput_Outputs{ + Parameters: map[string]*pipeline_spec.ExecutorInput_OutputParameter{ + "output_parameter_one": {OutputFile: "/tmp/outputs/output_parameter_one/data"}, + "output_parameter_two": {OutputFile: "/tmp/outputs/output_parameter_two/data"}, + }, + Artifacts: map[string]*pipeline_spec.ArtifactList{ + "model": { + Artifacts: []*pipeline_spec.RuntimeArtifact{ + { + Name: "model", + Type: &pipeline_spec.ArtifactTypeSchema{ + Kind: &pipeline_spec.ArtifactTypeSchema_InstanceSchema{InstanceSchema: "title: kfp.Model\ntype: object\nproperties:\n framework:\n type: string\n framework_version:\n type: string\n"}, + }, + Uri: "gs://my-bucket/some-prefix/pipeline/task/model", + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{"name": {Kind: &structpb.Value_StringValue{StringValue: "model"}}}, + }}}}, + "metrics": { + Artifacts: []*pipeline_spec.RuntimeArtifact{ + { + Name: "metrics", + Type: &pipeline_spec.ArtifactTypeSchema{ + Kind: &pipeline_spec.ArtifactTypeSchema_SchemaTitle{SchemaTitle: "kfp.Metrics"}, + }, + Uri: "gs://my-bucket/some-prefix/pipeline/task/metrics", + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{"name": {Kind: &structpb.Value_StringValue{StringValue: "metrics"}}}, + }}}}, + }, + OutputFile: "/tmp/kfp_outputs/output_metadata.json", + }, + outputParametersTypeMap: map[string]string{ + "output_parameter_one": "STRING", + "output_parameter_two": "INT", + }, + cmdArgs: []string{"sh", "ec", "test"}, + image: "python:3.9", + want: &CacheKey{ + inputArtifactNames: map[string]artifactNameList{ + "dataset_one": {artifactNames: []string{"1"}}, + "dataset_two": {artifactNames: []string{"2"}}, + }, + inputParameters: map[string]pipeline_spec.Value{ + "message": {Value: &pipeline_spec.Value_StringValue{StringValue: "Some string value"}}, + "num_steps": {Value: &pipeline_spec.Value_IntValue{IntValue: 5}}, + }, + outputArtifactsSpec: map[string]pipeline_spec.RuntimeArtifact{ + "model": { + Name: "model", + Type: &pipeline_spec.ArtifactTypeSchema{ + Kind: &pipeline_spec.ArtifactTypeSchema_InstanceSchema{InstanceSchema: "title: kfp.Model\ntype: object\nproperties:\n framework:\n type: string\n framework_version:\n type: string\n"}, + }, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{"name": {Kind: &structpb.Value_StringValue{StringValue: "model"}}}, + }}, + "metrics": { + Name: "metrics", + Type: &pipeline_spec.ArtifactTypeSchema{ + Kind: &pipeline_spec.ArtifactTypeSchema_SchemaTitle{SchemaTitle: "kfp.Metrics"}, + }, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{"name": {Kind: &structpb.Value_StringValue{StringValue: "metrics"}}}, + }}, + }, + outputParametersSpec: map[string]string{ + "output_parameter_one": "STRING", + "output_parameter_two": "INT", + }, + containerSpec: containerSpec{ + cmdArgs: []string{"sh", "ec", "test"}, + image: "python:3.9", + }, + }, + + wantErr: false, + }, + } + for _, test := range tests { + + t.Run(test.name, func(t *testing.T) { + got, err := GenerateCacheKey(test.executorInputInputs, test.executorInputOutputs, test.outputParametersTypeMap, test.cmdArgs, test.image) + if (err != nil) != test.wantErr { + t.Errorf("GenerateCacheKey() error = %v", err) + return + } + + if diff := cmp.Diff(test.want, got, cmpopts.EquateEmpty(), protocmp.Transform(), cmp.AllowUnexported(CacheKey{}, artifactNameList{}, containerSpec{})); diff != "" { + t.Errorf("GenerateCacheKey() = %+v, want %+v\nDiff (-want, +got)\n%s", got, test.want, diff) + s, _ := json.MarshalIndent(test.want, "", " ") + fmt.Printf("Want\n%s", s) + } + + }) + } +} diff --git a/v2/cmd/launch/main.go b/v2/cmd/launch/main.go index 7badd1920d3..e38b2774f1a 100644 --- a/v2/cmd/launch/main.go +++ b/v2/cmd/launch/main.go @@ -16,7 +16,6 @@ package main import ( "context" "flag" - "github.com/golang/glog" "github.com/kubeflow/pipelines/v2/component" ) @@ -52,7 +51,7 @@ func main() { glog.Exitf("Failed to create component launcher: %v", err) } - if err := launcher.RunComponent(ctx, flag.Args()[0], flag.Args()[1:]...); err != nil { + if err := launcher.RunComponent(ctx, flag.Args()); err != nil { glog.Exitf("Failed to execute component: %v", err) } } diff --git a/v2/component/launcher.go b/v2/component/launcher.go index dc5db54de68..79b998d321b 100644 --- a/v2/component/launcher.go +++ b/v2/component/launcher.go @@ -31,6 +31,7 @@ import ( "strings" "github.com/golang/glog" + "github.com/kubeflow/pipelines/v2/cacheutils" "github.com/kubeflow/pipelines/v2/metadata" "github.com/kubeflow/pipelines/v2/objectstore" "github.com/kubeflow/pipelines/v2/third_party/pipeline_spec" @@ -240,11 +241,27 @@ func NewLauncher(runtimeInfo string, options *LauncherOptions) (*Launcher, error // RunComponent runs the current KFP component using the specified command and // arguments. -func (l *Launcher) RunComponent(ctx context.Context, cmd string, args ...string) error { +func (l *Launcher) RunComponent(ctx context.Context, cmdArgs []string) error { + cmd := cmdArgs[0] + args := make([]string, len(cmdArgs)-1) + _ = copy(args, cmdArgs[1:]) executorInput, err := l.runtimeInfo.generateExecutorInput(l.generateOutputURI, outputMetadataFilepath) if err != nil { return fmt.Errorf("failure while generating ExecutorInput: %w", err) } + outputParametersTypeMap:= make(map[string]string) + for outputParameterName, outputParameter := range l.runtimeInfo.OutputParameters { + outputParametersTypeMap[outputParameterName] = outputParameter.Type + + } + cacheKey, err := cacheutils.GenerateCacheKey(executorInput.GetInputs(), executorInput.GetOutputs(), outputParametersTypeMap, cmdArgs, l.options.ContainerImage) + if err != nil { + return fmt.Errorf("failure while generating CacheKey: %w", err) + } + _, err = cacheutils.GenerateFingerPrint(*cacheKey) + if err != nil { + return fmt.Errorf("failure while generating FingerPrint: %w", err) + } if err := l.prepareInputs(ctx, executorInput); err != nil { return err @@ -267,7 +284,6 @@ func (l *Launcher) RunComponent(ctx context.Context, cmd string, args ...string) } args[i] = arg } - // Record Execution in MLMD. // TODO(neuromage): Refactor launcher.go and split these functions up into // testable units. @@ -482,6 +498,8 @@ func localPathForURI(uri string) (string, error) { return "", fmt.Errorf("found URI with unsupported storage scheme: %s", uri) } + + func (l *Launcher) prepareInputs(ctx context.Context, executorInput *pipeline_spec.ExecutorInput) error { executorInputJSON, err := protojson.Marshal(executorInput) if err != nil {