Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(v2): generate cache key and hash key #5849

Merged
merged 5 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions v2/cacheutils/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package cacheutils

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"

"github.com/kubeflow/pipelines/v2/third_party/pipeline_spec"
)

type CacheKey struct {
inputArtifactNames map[string]artifactNameList
inputParameters map[string]pipeline_spec.Value
outputArtifactsSpec map[string]pipeline_spec.RuntimeArtifact
outputParametersSpec map[string]string
containerSpec containerSpec
}

type artifactNameList struct {
// A list of artifact Names.
artifactNames []string
}

type containerSpec struct {
image string
cmdArgs []string
}

func GenerateFingerPrint(cacheKey CacheKey) (string, error) {
b, err := json.Marshal(cacheKey)
if err != nil {
return "", fmt.Errorf("failed to marshal cache key: %w", err)
}
hash := sha256.New()
hash.Write(b)
md := hash.Sum(nil)
executionHashKey := hex.EncodeToString(md)
return executionHashKey, nil

}

func GenerateCacheKey(
inputs *pipeline_spec.ExecutorInput_Inputs,
outputs *pipeline_spec.ExecutorInput_Outputs,
outputParametersTypeMap map[string]string,
cmdArgs []string, image string) (*CacheKey, error) {

cacheKey := CacheKey{
inputArtifactNames: make(map[string]artifactNameList),
inputParameters: make(map[string]pipeline_spec.Value),
outputArtifactsSpec: make(map[string]pipeline_spec.RuntimeArtifact),
outputParametersSpec: make(map[string]string),
}

for inputArtifactName, inputArtifactList := range inputs.GetArtifacts() {
artifactNameList := artifactNameList{artifactNames: make([]string, 0)}
for _, artifact := range inputArtifactList.Artifacts {
artifactNameList.artifactNames = append(artifactNameList.artifactNames, artifact.GetName())
}
cacheKey.inputArtifactNames[inputArtifactName] = artifactNameList
}

for inputParameterName, inputParameterValue := range inputs.GetParameters() {
cacheKey.inputParameters[inputParameterName] = pipeline_spec.Value{
Value: inputParameterValue.Value,
}
}

for outputArtifactName, outputArtifactList := range outputs.GetArtifacts() {
if len(outputArtifactList.Artifacts) == 0 {
continue
}
// TODO: Support multiple artifacts someday, probably through the v2 engine.
outputArtifact := outputArtifactList.Artifacts[0]
outputArtifactWithUriWiped := pipeline_spec.RuntimeArtifact{
Name: outputArtifact.GetName(),
Type: outputArtifact.GetType(),
Metadata: outputArtifact.GetMetadata(),
}
cacheKey.outputArtifactsSpec[outputArtifactName] = outputArtifactWithUriWiped
}

for outputParameterName, _ := range outputs.GetParameters() {
outputParameterType, ok := outputParametersTypeMap[outputParameterName]
if !ok {
return nil, fmt.Errorf("unknown parameter %q found in ExecutorInput_Outputs", outputParameterName)
}

cacheKey.outputParametersSpec[outputParameterName] = outputParameterType
}

cacheKey.containerSpec = containerSpec{
image: image,
cmdArgs: cmdArgs,
}

return &cacheKey, nil

}
151 changes: 151 additions & 0 deletions v2/cacheutils/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package cacheutils

import (
"encoding/json"
"fmt"
"google.golang.org/protobuf/testing/protocmp"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/kubeflow/pipelines/v2/third_party/pipeline_spec"
"google.golang.org/protobuf/types/known/structpb"
)

func TestGenerateCacheKey(t *testing.T) {

tests := []struct {
name string
executorInputInputs *pipeline_spec.ExecutorInput_Inputs
executorInputOutputs *pipeline_spec.ExecutorInput_Outputs
outputParametersTypeMap map[string]string
cmdArgs []string
image string
want *CacheKey
wantErr bool
}{
{
name: "Generate CacheKey Correctly",
executorInputInputs: &pipeline_spec.ExecutorInput_Inputs{
Parameters: map[string]*pipeline_spec.Value{
"message": {Value: &pipeline_spec.Value_StringValue{StringValue: "Some string value"}},
"num_steps": {Value: &pipeline_spec.Value_IntValue{IntValue: 5}},
},
Artifacts: map[string]*pipeline_spec.ArtifactList{
"dataset_one": {
Artifacts: []*pipeline_spec.RuntimeArtifact{
{
Name: "1",
Type: &pipeline_spec.ArtifactTypeSchema{
Kind: &pipeline_spec.ArtifactTypeSchema_InstanceSchema{InstanceSchema: "title: kfp.Dataset\ntype: object\nproperties:\n payload_format:\n type: string\n container_format:\n type: string\n"},
},
Uri: "gs://some-bucket/dataset-one",
Metadata: &structpb.Struct{},
}}},
"dataset_two": {
Artifacts: []*pipeline_spec.RuntimeArtifact{
{
Name: "2",
Type: &pipeline_spec.ArtifactTypeSchema{
Kind: &pipeline_spec.ArtifactTypeSchema_SchemaTitle{SchemaTitle: "kfp.Model"},
},
Uri: "gs://some-bucket/dataset-two",
Metadata: &structpb.Struct{},
}}}},
},

executorInputOutputs: &pipeline_spec.ExecutorInput_Outputs{
Parameters: map[string]*pipeline_spec.ExecutorInput_OutputParameter{
"output_parameter_one": {OutputFile: "/tmp/outputs/output_parameter_one/data"},
"output_parameter_two": {OutputFile: "/tmp/outputs/output_parameter_two/data"},
},
Artifacts: map[string]*pipeline_spec.ArtifactList{
"model": {
Artifacts: []*pipeline_spec.RuntimeArtifact{
{
Name: "model",
Type: &pipeline_spec.ArtifactTypeSchema{
Kind: &pipeline_spec.ArtifactTypeSchema_InstanceSchema{InstanceSchema: "title: kfp.Model\ntype: object\nproperties:\n framework:\n type: string\n framework_version:\n type: string\n"},
},
Uri: "gs://my-bucket/some-prefix/pipeline/task/model",
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{"name": {Kind: &structpb.Value_StringValue{StringValue: "model"}}},
}}}},
"metrics": {
Artifacts: []*pipeline_spec.RuntimeArtifact{
{
Name: "metrics",
Type: &pipeline_spec.ArtifactTypeSchema{
Kind: &pipeline_spec.ArtifactTypeSchema_SchemaTitle{SchemaTitle: "kfp.Metrics"},
},
Uri: "gs://my-bucket/some-prefix/pipeline/task/metrics",
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{"name": {Kind: &structpb.Value_StringValue{StringValue: "metrics"}}},
}}}},
},
OutputFile: "/tmp/kfp_outputs/output_metadata.json",
},
outputParametersTypeMap: map[string]string{
"output_parameter_one": "STRING",
"output_parameter_two": "INT",
},
cmdArgs: []string{"sh", "ec", "test"},
image: "python:3.9",
want: &CacheKey{
inputArtifactNames: map[string]artifactNameList{
"dataset_one": {artifactNames: []string{"1"}},
"dataset_two": {artifactNames: []string{"2"}},
},
inputParameters: map[string]pipeline_spec.Value{
"message": {Value: &pipeline_spec.Value_StringValue{StringValue: "Some string value"}},
"num_steps": {Value: &pipeline_spec.Value_IntValue{IntValue: 5}},
},
outputArtifactsSpec: map[string]pipeline_spec.RuntimeArtifact{
"model": {
Name: "model",
Type: &pipeline_spec.ArtifactTypeSchema{
Kind: &pipeline_spec.ArtifactTypeSchema_InstanceSchema{InstanceSchema: "title: kfp.Model\ntype: object\nproperties:\n framework:\n type: string\n framework_version:\n type: string\n"},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{"name": {Kind: &structpb.Value_StringValue{StringValue: "model"}}},
}},
"metrics": {
Name: "metrics",
Type: &pipeline_spec.ArtifactTypeSchema{
Kind: &pipeline_spec.ArtifactTypeSchema_SchemaTitle{SchemaTitle: "kfp.Metrics"},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{"name": {Kind: &structpb.Value_StringValue{StringValue: "metrics"}}},
}},
},
outputParametersSpec: map[string]string{
"output_parameter_one": "STRING",
"output_parameter_two": "INT",
},
containerSpec: containerSpec{
cmdArgs: []string{"sh", "ec", "test"},
image: "python:3.9",
},
},

wantErr: false,
},
}
for _, test := range tests {

t.Run(test.name, func(t *testing.T) {
got, err := GenerateCacheKey(test.executorInputInputs, test.executorInputOutputs, test.outputParametersTypeMap, test.cmdArgs, test.image)
if (err != nil) != test.wantErr {
t.Errorf("GenerateCacheKey() error = %v", err)
return
}

if diff := cmp.Diff(test.want, got, cmpopts.EquateEmpty(), protocmp.Transform(), cmp.AllowUnexported(CacheKey{}, artifactNameList{}, containerSpec{})); diff != "" {
t.Errorf("GenerateCacheKey() = %+v, want %+v\nDiff (-want, +got)\n%s", got, test.want, diff)
s, _ := json.MarshalIndent(test.want, "", " ")
fmt.Printf("Want\n%s", s)
}

})
}
}
3 changes: 1 addition & 2 deletions v2/cmd/launch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package main
import (
"context"
"flag"

"github.com/golang/glog"
"github.com/kubeflow/pipelines/v2/component"
)
Expand Down Expand Up @@ -52,7 +51,7 @@ func main() {
glog.Exitf("Failed to create component launcher: %v", err)
}

if err := launcher.RunComponent(ctx, flag.Args()[0], flag.Args()[1:]...); err != nil {
if err := launcher.RunComponent(ctx, flag.Args()); err != nil {
glog.Exitf("Failed to execute component: %v", err)
}
}
22 changes: 20 additions & 2 deletions v2/component/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"strings"

"github.com/golang/glog"
"github.com/kubeflow/pipelines/v2/cacheutils"
"github.com/kubeflow/pipelines/v2/metadata"
"github.com/kubeflow/pipelines/v2/objectstore"
"github.com/kubeflow/pipelines/v2/third_party/pipeline_spec"
Expand Down Expand Up @@ -240,11 +241,27 @@ func NewLauncher(runtimeInfo string, options *LauncherOptions) (*Launcher, error

// RunComponent runs the current KFP component using the specified command and
// arguments.
func (l *Launcher) RunComponent(ctx context.Context, cmd string, args ...string) error {
func (l *Launcher) RunComponent(ctx context.Context, cmdArgs []string) error {
cmd := cmdArgs[0]
args := make([]string, len(cmdArgs)-1)
_ = copy(args, cmdArgs[1:])
executorInput, err := l.runtimeInfo.generateExecutorInput(l.generateOutputURI, outputMetadataFilepath)
if err != nil {
return fmt.Errorf("failure while generating ExecutorInput: %w", err)
}
outputParametersTypeMap:= make(map[string]string)
for outputParameterName, outputParameter := range l.runtimeInfo.OutputParameters {
outputParametersTypeMap[outputParameterName] = outputParameter.Type

}
cacheKey, err := cacheutils.GenerateCacheKey(executorInput.GetInputs(), executorInput.GetOutputs(), outputParametersTypeMap, cmdArgs, l.options.ContainerImage)
if err != nil {
return fmt.Errorf("failure while generating CacheKey: %w", err)
}
_, err = cacheutils.GenerateFingerPrint(*cacheKey)
if err != nil {
return fmt.Errorf("failure while generating FingerPrint: %w", err)
}

if err := l.prepareInputs(ctx, executorInput); err != nil {
return err
Expand All @@ -267,7 +284,6 @@ func (l *Launcher) RunComponent(ctx context.Context, cmd string, args ...string)
}
args[i] = arg
}

// Record Execution in MLMD.
// TODO(neuromage): Refactor launcher.go and split these functions up into
// testable units.
Expand Down Expand Up @@ -482,6 +498,8 @@ func localPathForURI(uri string) (string, error) {
return "", fmt.Errorf("found URI with unsupported storage scheme: %s", uri)
}



func (l *Launcher) prepareInputs(ctx context.Context, executorInput *pipeline_spec.ExecutorInput) error {
executorInputJSON, err := protojson.Marshal(executorInput)
if err != nil {
Expand Down