Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disentangle metric reporting from the actual reconciler. #2762

Merged
merged 1 commit into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ import (
"knative.dev/pkg/tracker"
)

const (
resyncPeriod = 10 * time.Hour
)

mattmoor marked this conversation as resolved.
Show resolved Hide resolved
// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(namespace string, images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
Expand Down Expand Up @@ -103,6 +99,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
c.configStore = config.NewStore(images, c.Logger.Named("config-store"))
c.configStore.WatchConfigs(cmw)

go metrics.ReportRunningPipelineRuns(ctx, pipelineRunInformer.Lister())

return impl
}
}
25 changes: 25 additions & 0 deletions pkg/reconciler/pipelinerun/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opencensus.io/tag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
)

Expand Down Expand Up @@ -56,13 +57,18 @@ type Recorder struct {
pipelineRun tag.Key
namespace tag.Key
status tag.Key

ReportingPeriod time.Duration
}

// NewRecorder creates a new metrics recorder instance
// to log the PipelineRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,

// Default to 30s intervals.
ReportingPeriod: 30 * time.Second,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be nice if we keep this configurable? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a public field, so it can be changed after the resource is created, but I see that this is just consumed by NewController without really having a place to hook into. I'm happy to whatever is conventional for Tekton here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure as well(I was away for some time). It's better if @afrittoli @vdemeester comment more on it.
In my opinion, we can have it as it is. If required in the future we can provide the relevant configuration option :)

However, maybe it's nice to document it? so users are aware of 30 seconds window to get updated metrics

}

pipeline, err := tag.NewKey("pipeline")
Expand Down Expand Up @@ -184,3 +190,22 @@ func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error {

return nil
}

// ReportRunningPipelineRuns invokes RunningPipelineRuns on our configured PeriodSeconds
// until the context is cancelled.
func (r *Recorder) ReportRunningPipelineRuns(ctx context.Context, lister listers.PipelineRunLister) {
logger := logging.FromContext(ctx)
for {
select {
case <-ctx.Done():
// When the context is cancelled, stop reporting.
return

case <-time.After(r.ReportingPeriod):
// Every 30s surface a metric for the number of running pipelines.
if err := r.RunningPipelineRuns(lister); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}
}
}
12 changes: 0 additions & 12 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,12 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}
}

var updated bool
if !equality.Semantic.DeepEqual(original.Status, pr.Status) {
if _, err := c.updateStatus(pr); err != nil {
c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err))
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update")
return multierror.Append(merr, err)
}
updated = true
}

// When we update the status only, we use updateStatus to minimize the chances of
Expand All @@ -250,16 +248,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update labels/annotations")
return multierror.Append(merr, err)
}
updated = true
}

if updated {
go func(metrics *Recorder) {
err := metrics.RunningPipelineRuns(c.pipelineRunLister)
if err != nil {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
}

return merr.ErrorOrNil()
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex

c.configStore = config.NewStore(c.Logger.Named("config-store"))
c.configStore.WatchConfigs(cmw)

go metrics.ReportRunningTaskRuns(ctx, taskRunInformer.Lister())

return impl
}
}
25 changes: 25 additions & 0 deletions pkg/reconciler/taskrun/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
)

Expand Down Expand Up @@ -69,13 +70,18 @@ type Recorder struct {
pipeline tag.Key
pipelineRun tag.Key
pod tag.Key

ReportingPeriod time.Duration
}

// NewRecorder creates a new metrics recorder instance
// to log the TaskRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,

// Default to reporting metrics every 30s.
ReportingPeriod: 30 * time.Second,
}

task, err := tag.NewKey("task")
Expand Down Expand Up @@ -250,6 +256,25 @@ func (r *Recorder) RunningTaskRuns(lister listers.TaskRunLister) error {
return nil
}

// ReportRunningTaskRuns invokes RunningTaskRuns on our configured PeriodSeconds
// until the context is cancelled.
func (r *Recorder) ReportRunningTaskRuns(ctx context.Context, lister listers.TaskRunLister) {
logger := logging.FromContext(ctx)
for {
select {
case <-ctx.Done():
// When the context is cancelled, stop reporting.
return

case <-time.After(r.ReportingPeriod):
// Every 30s surface a metric for the number of running tasks.
if err := r.RunningTaskRuns(lister); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}
}
}

// RecordPodLatency logs the duration required to schedule the pod for TaskRun
// returns an error if its failed to log the metrics
func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1beta1.TaskRun) error {
Expand Down
13 changes: 0 additions & 13 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,6 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun,
// Push changes (if any) to the TaskRun status, labels and annotations to
// TaskRun definition in ectd
func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.TaskRun) error {
var updated bool

if !equality.Semantic.DeepEqual(original.Status, tr.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
Expand All @@ -432,7 +430,6 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task
c.Logger.Warn("Failed to update taskRun status", zap.Error(err))
return err
}
updated = true
}

// When we update the status only, we use updateStatus to minimize the chances of
Expand All @@ -444,16 +441,6 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task
c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err))
return err
}
updated = true
}

if updated {
go func(metrics *Recorder) {
err := metrics.RunningTaskRuns(c.taskRunLister)
if err != nil {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
}

return nil
Expand Down