Skip to content

Commit

Permalink
Migrate tekton to the cloudevents sdk v2
Browse files Browse the repository at this point in the history
While I was here, I also refactored some of the CloudEvents integration, some major points:

you now send spec version 1.0 formatted events
ID is not the task run name, this was a bad idea if you want to send more than one event for that run. The task run name is now used as the subject.
ID is now a UUID.
The event that is created for sending is only created once, and then used for all deliveries. This means that all subscribers get the same event content and ID.
The event payload is marshaled inside the Event object. I show examples in the code and tests of how to set and get the payload as your custom struct.
Removed a lot of the layers passing clients and event components around. Better to build the event and then call send on the client directly. This deleted a lot of code that was doing many things, like create an event, and then attempt to send it.

Signed-off-by: Scott Nichols <[email protected]>
  • Loading branch information
Scott Nichols committed Mar 20, 2020
1 parent 212e3d4 commit 4202b69
Show file tree
Hide file tree
Showing 146 changed files with 4,654 additions and 6,219 deletions.
2 changes: 0 additions & 2 deletions examples/v1alpha1/taskruns/cloud-event.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ spec:
with open("content.txt", mode="wb") as f:
f.write(content)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'<html><body><h1>POST!</h1></body></html>')
def do_GET(self):
with open("content.txt", mode="rb") as f:
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ require (
cloud.google.com/go v0.47.0 // indirect
contrib.go.opencensus.io/exporter/stackdriver v0.12.8 // indirect
github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher v0.0.0-20191203181535-308b93ad1f39
github.com/cloudevents/sdk-go v1.0.0
github.com/cloudevents/sdk-go/v2 v2.0.0-preview6
github.com/ghodss/yaml v1.0.0
github.com/go-openapi/spec v0.19.4 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.3.3 // indirect
github.com/google/go-cmp v0.4.0
github.com/google/go-containerregistry v0.0.0-20200115214256-379933c9c22b
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.3
Expand All @@ -36,14 +37,12 @@ require (
go.uber.org/multierr v1.4.0 // indirect
go.uber.org/zap v1.13.0
golang.org/x/lint v0.0.0-20200130185559-910be7a94367 // indirect
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20200214144324-88be01311a71 // indirect
gomodules.xyz/jsonpatch/v2 v2.1.0 // indirect
google.golang.org/appengine v1.6.5 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
k8s.io/api v0.17.3
k8s.io/apiextensions-apiserver v0.17.3 // indirect
k8s.io/apimachinery v0.17.3
Expand All @@ -52,7 +51,6 @@ require (
k8s.io/gengo v0.0.0-20191108084044-e500ee069b5c // indirect
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a
knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa
knative.dev/eventing-contrib v0.11.2
knative.dev/pkg v0.0.0-20200227193851-2fe8db300072
)

Expand Down
17 changes: 13 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go v1.0.0 h1:gS5I0s2qPmdc4GBPlUmzZU7RH30BaiOdcRJ1RkXnPrc=
github.com/cloudevents/sdk-go v1.0.0/go.mod h1:3TkmM0cFqkhCHOq5JzzRU/RxRkwzoS8TZ+G448qVTog=
github.com/cloudevents/sdk-go v1.1.2 h1:mg/7d+BzubBPrPpH1bdeF85BQZYV85j7Ljqat3+m+qE=
github.com/cloudevents/sdk-go v1.1.2/go.mod h1:ss+jWJ88wypiewnPEzChSBzTYXGpdcILoN9YHk8uhTQ=
github.com/cloudevents/sdk-go/v2 v2.0.0-preview6 h1:IYRl0IoWKRZE+1f9Zlm8qtc6zKg/wq4k7TKBDbIq1rg=
github.com/cloudevents/sdk-go/v2 v2.0.0-preview6/go.mod h1:zbpxUOAoR8b4Shvo1GyHbcL/S/hgr1WJ3rNuMUgrlh0=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/containerd/containerd v1.3.0 h1:xjvXQWABwS2uiv3TWgQt5Uth60Gu86LTGZXMJkjc7rY=
github.com/containerd/containerd v1.3.0/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
Expand Down Expand Up @@ -377,11 +379,15 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac h1:+2b6iGRJe3hvV/yVXrd41yVEjxuFHxasJqDhkIjS4gk=
github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac/go.mod h1:Frd2bnT3w5FB5q49ENTfVlztJES+1k/7lyWX2+9gq/M=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down Expand Up @@ -543,13 +549,16 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/tektoncd/plumbing v0.0.0-20200217163359-cd0db6e567d2 h1:BksmpUwtap3THXJ8Z4KGcotsvpRdFQKySjDHgtc22lA=
github.com/tektoncd/plumbing v0.0.0-20200217163359-cd0db6e567d2/go.mod h1:QZHgU07PRBTRF6N57w4+ApRu8OgfYLFNqCDlfEZaD9Y=
github.com/tektoncd/plumbing/pipelinerun-logs v0.0.0-20191206114338-712d544c2c21/go.mod h1:S62EUWtqmejjJgUMOGB1CCCHRp6C706laH06BoALkzU=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/vdemeester/k8s-pkg-credentialprovider v0.0.0-20200107171650-7c61ffa44238 h1:o+AFkRxPBxf23y/mH9RM5doyTxQHRFpq6psWi7012X0=
github.com/vdemeester/k8s-pkg-credentialprovider v0.0.0-20200107171650-7c61ffa44238/go.mod h1:JwQJCMWpUDqjZrB5jpw0f5VbN7U95zxFy1ZDpoEarGo=
Expand Down Expand Up @@ -606,6 +615,8 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -876,8 +887,6 @@ k8s.io/utils v0.0.0-20191114184206-e782cd3c129f h1:GiPwtSzdP43eI1hpPCbROQCCIgCui
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa h1:mxrur6DsVK8uIjhIq7c1OMls4YjBcRlyvnh3Vx13a0M=
knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg=
knative.dev/eventing-contrib v0.11.2 h1:xncT+JrokPG+hPUFJwue8ubPpzmziV9GUIZqYt01JDo=
knative.dev/eventing-contrib v0.11.2/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g=
knative.dev/pkg v0.0.0-20200227193851-2fe8db300072 h1:ZituYY5obt+2k4JRN5hNmtX00KXDBXGMohS6n2xf250=
knative.dev/pkg v0.0.0-20200227193851-2fe8db300072/go.mod h1:pgODObA1dTyhNoFxPZTTjNWfx6F0aKsKzn+vaT9XO/Q=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package cloudevent

import (
"context"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
Expand Down Expand Up @@ -67,6 +69,15 @@ func cloudEventDeliveryFromTargets(targets []string) []v1alpha1.CloudEventDelive
// the TaskRun is complete. `tr` is used to obtain the list of targets but also
// to construct the body of the
func SendCloudEvents(tr *v1alpha1.TaskRun, ceclient CEClient, logger *zap.SugaredLogger) error {
logger = logger.With(zap.String("taskrun", tr.Name))

// Make the event we would like to send:
event, err := EventForTaskRun(tr)
if err != nil || event == nil {
logger.With(zap.Error(err)).Error("failed to produce a cloudevent from TaskRun.")
return err
}

// Using multierror here so we can attempt to send all cloud events defined,
// regardless of whether they fail or not, and report all failed ones
var merr *multierror.Error
Expand All @@ -77,20 +88,24 @@ func SendCloudEvents(tr *v1alpha1.TaskRun, ceclient CEClient, logger *zap.Sugare
if eventStatus.Condition != v1alpha1.CloudEventConditionUnknown || eventStatus.RetryCount > 0 {
continue
}
_, err := SendTaskRunCloudEvent(cloudEventDelivery.Target, tr, logger, ceclient)

// Send the event.
err := ceclient.Send(cloudevents.ContextWithTarget(context.Background(), cloudEventDelivery.Target), *event)

// Record the result.
eventStatus.SentAt = &metav1.Time{Time: time.Now()}
eventStatus.RetryCount++
if err != nil {
merr = multierror.Append(merr, err)
eventStatus.Condition = v1alpha1.CloudEventConditionFailed
eventStatus.Error = merr.Error()
} else {
logger.Infof("Sent event for target %s", cloudEventDelivery.Target)
logger.Infow("Event sent.", zap.String("target", cloudEventDelivery.Target))
eventStatus.Condition = v1alpha1.CloudEventConditionSent
}
}
if merr != nil && merr.Len() > 0 {
logger.Errorf("Failed to send %d cloud events for TaskRun %s", merr.Len(), tr.Name)
logger.With(zap.Error(merr)).Errorw("Failed to send events for TaskRun.", zap.Int("count", merr.Len()))
}
return merr.ErrorOrNil()
}
98 changes: 31 additions & 67 deletions pkg/reconciler/taskrun/resources/cloudevent/cloudevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@ limitations under the License.
package cloudevent

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
"github.com/google/uuid"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/zap"
"knative.dev/eventing-contrib/pkg/kncloudevents"
"knative.dev/pkg/apis"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
Expand All @@ -49,8 +43,12 @@ const (
TektonTaskRunFailedV1 TektonEventType = "dev.tekton.event.task.failed.v1"
)

// CEClient matches the `Client` interface from github.com/cloudevents/sdk-go/pkg/cloudevents
type CEClient client.Client
func (t TektonEventType) String() string {
return string(t)
}

// CEClient matches the `Client` interface from github.com/cloudevents/sdk-go/v2/cloudevents
type CEClient cloudevents.Client

// TektonCloudEventData type is used to marshal and unmarshal the payload of
// a Tekton cloud event. It only includes a TaskRun for now. Using a type opens
Expand All @@ -66,68 +64,34 @@ func NewTektonCloudEventData(taskRun *v1alpha1.TaskRun) TektonCloudEventData {
}
}

// SendCloudEvent sends a Cloud Event to the specified SinkURI
func SendCloudEvent(sinkURI, eventID, eventSourceURI string, data []byte, eventType TektonEventType, logger *zap.SugaredLogger, cloudEventClient CEClient) (cloudevents.Event, error) {
var event cloudevents.Event

cloudEventSource := types.ParseURLRef(eventSourceURI)
if cloudEventSource == nil {
logger.Errorf("Invalid eventSourceURI: %s", eventSourceURI)
return event, fmt.Errorf("invalid eventSourceURI: %s", eventSourceURI)
}

event = cloudevents.Event{
Context: cloudevents.EventContextV02{
ID: eventID,
Type: string(eventType),
Source: *cloudEventSource,
Time: &types.Timestamp{Time: time.Now()},
Extensions: nil,
}.AsV02(),
Data: data,
}
ctxt := cecontext.WithTarget(context.TODO(), sinkURI)
_, _, err := cloudEventClient.Send(ctxt, event)
if err != nil {
logger.Errorf("Error sending the cloud-event: %s", err)
return event, err
}
return event, nil
}

// SendTaskRunCloudEvent sends a cloud event for a TaskRun
func SendTaskRunCloudEvent(sinkURI string, taskRun *v1alpha1.TaskRun, logger *zap.SugaredLogger, cloudEventClient CEClient) (cloudevents.Event, error) {
var event cloudevents.Event
var err error
// Check if a client was provided, if not build one on the fly
if cloudEventClient == nil {
cloudEventClient, err = kncloudevents.NewDefaultClient()
if err != nil {
logger.Errorf("Error creating the cloud-event client: %s", err)
return event, err
}
}
// EventForTaskRun will create a new event based on a TaskRun,
// or return an error if not possible.
func EventForTaskRun(taskRun *v1alpha1.TaskRun) (*cloudevents.Event, error) {
// Check if the TaskRun is defined
if taskRun == nil {
return event, errors.New("Cannot send an event for an empty TaskRun")
return nil, errors.New("Cannot send an event for an empty TaskRun")
}
eventID := taskRun.ObjectMeta.Name
taskRunStatus := taskRun.Status.GetCondition(apis.ConditionSucceeded)
var eventType TektonEventType
event := cloudevents.NewEvent()
event.SetID(uuid.New().String())
event.SetSubject(taskRun.ObjectMeta.Name)
event.SetSource(taskRun.ObjectMeta.SelfLink) // TODO: SelfLink is deprecated

c := taskRun.Status.GetCondition(apis.ConditionSucceeded)
switch {
case taskRunStatus.IsUnknown():
eventType = TektonTaskRunUnknownV1
case taskRunStatus.IsFalse():
eventType = TektonTaskRunFailedV1
case taskRunStatus.IsTrue():
eventType = TektonTaskRunSuccessfulV1
case c.IsUnknown():
event.SetType(TektonTaskRunUnknownV1.String())
case c.IsFalse():
event.SetType(TektonTaskRunFailedV1.String())
case c.IsTrue():
event.SetType(TektonTaskRunSuccessfulV1.String())
default:
return event, fmt.Errorf("unknown condition for in TaskRun.Status %s", taskRunStatus.Status)
return nil, fmt.Errorf("unknown condition for in TaskRun.Status %s", c.Status)
}

if err := event.SetData(cloudevents.ApplicationJSON, NewTektonCloudEventData(taskRun)); err != nil {
return nil, err
}
eventSourceURI := taskRun.ObjectMeta.SelfLink
data, _ := json.Marshal(NewTektonCloudEventData(taskRun))
event, err = SendCloudEvent(sinkURI, eventID, eventSourceURI, data, eventType, logger, cloudEventClient)
return event, err
return &event, nil
}

// GetCloudEventDeliveryCompareOptions returns compare options to sort
Expand Down
Loading

0 comments on commit 4202b69

Please sign in to comment.