Skip to content

Commit

Permalink
migrate to cloudevents sdk v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Scott Nichols committed Mar 18, 2020
1 parent 153f1d1 commit 5e51fc6
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 49 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ require (
cloud.google.com/go v0.47.0 // indirect
contrib.go.opencensus.io/exporter/stackdriver v0.12.8 // indirect
github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher v0.0.0-20191203181535-308b93ad1f39
github.com/cloudevents/sdk-go v1.0.0
github.com/cloudevents/sdk-go v1.1.2
github.com/cloudevents/sdk-go/v2 v2.0.0-preview4
github.com/ghodss/yaml v1.0.0
github.com/go-openapi/spec v0.19.4 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
Expand Down Expand Up @@ -36,14 +37,12 @@ require (
go.uber.org/multierr v1.4.0 // indirect
go.uber.org/zap v1.13.0
golang.org/x/lint v0.0.0-20200130185559-910be7a94367 // indirect
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20200214144324-88be01311a71 // indirect
gomodules.xyz/jsonpatch/v2 v2.1.0 // indirect
google.golang.org/appengine v1.6.5 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
k8s.io/api v0.17.3
k8s.io/apiextensions-apiserver v0.17.3 // indirect
k8s.io/apimachinery v0.17.3
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go v1.0.0 h1:gS5I0s2qPmdc4GBPlUmzZU7RH30BaiOdcRJ1RkXnPrc=
github.com/cloudevents/sdk-go v1.0.0/go.mod h1:3TkmM0cFqkhCHOq5JzzRU/RxRkwzoS8TZ+G448qVTog=
github.com/cloudevents/sdk-go v1.1.2 h1:mg/7d+BzubBPrPpH1bdeF85BQZYV85j7Ljqat3+m+qE=
github.com/cloudevents/sdk-go v1.1.2/go.mod h1:ss+jWJ88wypiewnPEzChSBzTYXGpdcILoN9YHk8uhTQ=
github.com/cloudevents/sdk-go v1.2.0-preview4/go.mod h1:xV7GfuhjnJoK6+2MgCk3kfkoO4YRIuARdY3UpSwGz+U=
github.com/cloudevents/sdk-go v2.0.0-preview4+incompatible/go.mod h1:xV7GfuhjnJoK6+2MgCk3kfkoO4YRIuARdY3UpSwGz+U=
github.com/cloudevents/sdk-go/v2 v2.0.0-preview4 h1:rpy3ci622ihe6+JDkJ2A5CQEv1D+T/lDQzJ9dUO95Us=
github.com/cloudevents/sdk-go/v2 v2.0.0-preview4/go.mod h1:1Bg+Ps3/JUl/c7PUYEmG/nixhH10dBWKa/BPUNopu0M=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/containerd/containerd v1.3.0 h1:xjvXQWABwS2uiv3TWgQt5Uth60Gu86LTGZXMJkjc7rY=
github.com/containerd/containerd v1.3.0/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
Expand Down Expand Up @@ -377,11 +383,15 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac h1:+2b6iGRJe3hvV/yVXrd41yVEjxuFHxasJqDhkIjS4gk=
github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac/go.mod h1:Frd2bnT3w5FB5q49ENTfVlztJES+1k/7lyWX2+9gq/M=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down Expand Up @@ -543,6 +553,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/tektoncd/plumbing v0.0.0-20200217163359-cd0db6e567d2 h1:BksmpUwtap3THXJ8Z4KGcotsvpRdFQKySjDHgtc22lA=
github.com/tektoncd/plumbing v0.0.0-20200217163359-cd0db6e567d2/go.mod h1:QZHgU07PRBTRF6N57w4+ApRu8OgfYLFNqCDlfEZaD9Y=
github.com/tektoncd/plumbing/pipelinerun-logs v0.0.0-20191206114338-712d544c2c21/go.mod h1:S62EUWtqmejjJgUMOGB1CCCHRp6C706laH06BoALkzU=
Expand Down Expand Up @@ -606,6 +618,8 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down
52 changes: 21 additions & 31 deletions pkg/reconciler/taskrun/resources/cloudevent/cloudevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/zap"
"knative.dev/eventing-contrib/pkg/kncloudevents"
"knative.dev/pkg/apis"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
Expand All @@ -49,8 +44,8 @@ const (
TektonTaskRunFailedV1 TektonEventType = "dev.tekton.event.task.failed.v1"
)

// CEClient matches the `Client` interface from github.com/cloudevents/sdk-go/pkg/cloudevents
type CEClient client.Client
// CEClient matches the `Client` interface from github.com/cloudevents/sdk-go/v1/cloudevents
type CEClient cloudevents.Client

// TektonCloudEventData type is used to marshal and unmarshal the payload of
// a Tekton cloud event. It only includes a TaskRun for now. Using a type opens
Expand All @@ -68,28 +63,17 @@ func NewTektonCloudEventData(taskRun *v1alpha1.TaskRun) TektonCloudEventData {

// SendCloudEvent sends a Cloud Event to the specified SinkURI
func SendCloudEvent(sinkURI, eventID, eventSourceURI string, data []byte, eventType TektonEventType, logger *zap.SugaredLogger, cloudEventClient CEClient) (cloudevents.Event, error) {
var event cloudevents.Event

cloudEventSource := types.ParseURLRef(eventSourceURI)
if cloudEventSource == nil {
logger.Errorf("Invalid eventSourceURI: %s", eventSourceURI)
return event, fmt.Errorf("invalid eventSourceURI: %s", eventSourceURI)
}

event = cloudevents.Event{
Context: cloudevents.EventContextV02{
ID: eventID,
Type: string(eventType),
Source: *cloudEventSource,
Time: &types.Timestamp{Time: time.Now()},
Extensions: nil,
}.AsV02(),
Data: data,
event := cloudevents.NewEvent()
event.SetID(eventID)
event.SetSource(eventSourceURI)
event.SetType(string(eventType))
if err := event.SetData(cloudevents.ApplicationJSON, data); err != nil {
logger.Errorf("Error setting the cloudevents data: %s", err)
return event, err
}
ctxt := cecontext.WithTarget(context.TODO(), sinkURI)
_, _, err := cloudEventClient.Send(ctxt, event)
if err != nil {
logger.Errorf("Error sending the cloud-event: %s", err)
ctxt := cloudevents.ContextWithTarget(context.Background(), sinkURI)
if err := cloudEventClient.Send(ctxt, event); err != nil {
logger.Errorf("Error sending the cloudevents: %s", err)
return event, err
}
return event, nil
Expand All @@ -101,9 +85,15 @@ func SendTaskRunCloudEvent(sinkURI string, taskRun *v1alpha1.TaskRun, logger *za
var err error
// Check if a client was provided, if not build one on the fly
if cloudEventClient == nil {
cloudEventClient, err = kncloudevents.NewDefaultClient()
p, err := cloudevents.NewHTTP()
if err != nil {
logger.Errorf("Error creating the cloudevents http protocol: %s", err)
return event, err
}

cloudEventClient, err = cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
if err != nil {
logger.Errorf("Error creating the cloud-event client: %s", err)
logger.Errorf("Error creating the cloudevents client: %s", err)
return event, err
}
}
Expand Down
13 changes: 5 additions & 8 deletions pkg/reconciler/taskrun/resources/cloudevent/cloudevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cloudevent
import (
"encoding/json"
"fmt"
cloudevents "github.com/cloudevents/sdk-go/v2"
"regexp"
"testing"

Expand All @@ -28,7 +29,6 @@ import (
"github.com/tektoncd/pipeline/test/names"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing-contrib/pkg/kncloudevents"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
)
Expand All @@ -38,7 +38,7 @@ const (
invalidSinkURI = "invalid_URI"
defaultEventID = "event1234"
defaultEventSourceURI = "/taskrun/1234"
invalidEventSourceURI = "htt%23p://_invalid_URI##"
invalidEventSourceURI = "htt%23p://_invalid_URI"
defaultEventType = TektonTaskRunUnknownV1
taskRunName = "faketaskrunname"
invalidConditionSuccessStatus = "foobar"
Expand All @@ -47,7 +47,7 @@ const (
var (
defaultRawData = []byte(`{"metadata": {"name":"faketaskrun"}}`)
nilEventType TektonEventType
defaultCloudEventClient, _ = kncloudevents.NewDefaultClient()
defaultCloudEventClient, _ = cloudevents.NewDefaultClient()
happyClientBehaviour = FakeClientBehaviour{SendSuccessfully: true}
failingClientBehaviour = FakeClientBehaviour{SendSuccessfully: false}
)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestSendCloudEvent(t *testing.T) {
eventSourceURI: invalidEventSourceURI,
cloudEventClient: defaultCloudEventClient,
wantErr: true,
errRegexp: fmt.Sprintf("invalid eventSourceURI: %s", invalidEventSourceURI),
errRegexp: fmt.Sprintf(`parse %s`, invalidEventSourceURI),
}} {
t.Run(c.desc, func(t *testing.T) {
logger, _ := logging.NewLogger("", "")
Expand Down Expand Up @@ -164,10 +164,7 @@ func TestSendTaskRunCloudEvent(t *testing.T) {
t.Errorf("Wrong Event Type (-want +got) = %s", diff)
}
wantData, _ := json.Marshal(NewTektonCloudEventData(c.taskRun))
gotData, err := event.DataBytes()
if err != nil {
t.Fatalf("Could not get data from event %v: %v", event, err)
}
gotData := event.Data()
if diff := cmp.Diff(wantData, gotData); diff != "" {
t.Errorf("Wrong Event data (-want +got) = %s", diff)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
)

// FakeClientBehaviour defines how the client will behave
Expand All @@ -37,22 +36,32 @@ type FakeClient struct {
}

// NewFakeClient is a FakeClient factory, it returns a client for the target
func NewFakeClient(behaviour *FakeClientBehaviour) client.Client {
func NewFakeClient(behaviour *FakeClientBehaviour) cloudevents.Client {
c := FakeClient{
behaviour: behaviour,
}
return c
}

// Send fakes the Send method from kncloudevents.NewDefaultClient
func (c FakeClient) Send(ctx context.Context, event cloudevents.Event) (context.Context, *cloudevents.Event, error) {
// Send fakes the Send method from cloudevents.Client
func (c FakeClient) Send(ctx context.Context, event cloudevents.Event) error {
c.event = event
if c.behaviour.SendSuccessfully {
return ctx, &event, nil
return nil
}
return ctx, nil, fmt.Errorf("%s had to fail", event.Context.GetID())
return fmt.Errorf("%s had to fail", event.Context.GetID())
}

// Send fakes the Send method from cloudevents.Client
func (c FakeClient) Request(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) {
c.event = event
if c.behaviour.SendSuccessfully {
return &event, nil
}
return nil, fmt.Errorf("%s had to fail", event.Context.GetID())
}


// StartReceiver fakes the StartReceiver method from kncloudevents.NewDefaultClient
func (c FakeClient) StartReceiver(ctx context.Context, fn interface{}) error {
return nil
Expand Down

0 comments on commit 5e51fc6

Please sign in to comment.