Skip to content

Commit

Permalink
Disentangle metric reporting from the actual reconciler.
Browse files Browse the repository at this point in the history
This changes the metric reporting to happen periodically vs. being
triggered by reconciliation.  The previous method was prone to stale
data because it was driven by the informer cache immediately following
writes through the client, and might not fix itself since the status
may not change (even on resync every 10h).  With this change it should
exhibit the correct value within 30s + {informer delay}, where the 30s
is configurable on the Recorders.

Fixes: tektoncd#2729
  • Loading branch information
mattmoor committed Jun 5, 2020
1 parent 550e3b4 commit 2f44504
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 29 deletions.
6 changes: 2 additions & 4 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ import (
"knative.dev/pkg/tracker"
)

const (
resyncPeriod = 10 * time.Hour
)

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(namespace string, images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
Expand Down Expand Up @@ -103,6 +99,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
c.configStore = config.NewStore(images, c.Logger.Named("config-store"))
c.configStore.WatchConfigs(cmw)

go metrics.ReportRunningPipelineRuns(ctx, pipelineRunInformer.Lister())

return impl
}
}
25 changes: 25 additions & 0 deletions pkg/reconciler/pipelinerun/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opencensus.io/tag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
)

Expand Down Expand Up @@ -56,13 +57,18 @@ type Recorder struct {
pipelineRun tag.Key
namespace tag.Key
status tag.Key

ReportingPeriod time.Duration
}

// NewRecorder creates a new metrics recorder instance
// to log the PipelineRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,

// Default to 30s intervals.
ReportingPeriod: 30 * time.Second,
}

pipeline, err := tag.NewKey("pipeline")
Expand Down Expand Up @@ -184,3 +190,22 @@ func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error {

return nil
}

// ReportRunningPipelineRuns invokes RunningPipelineRuns on our configured PeriodSeconds
// until the context is cancelled.
func (r *Recorder) ReportRunningPipelineRuns(ctx context.Context, lister listers.PipelineRunLister) {
logger := logging.FromContext(ctx)
for {
select {
case <-ctx.Done():
// When the context is cancelled, stop reporting.
return

case <-time.After(r.ReportingPeriod):
// Every 30s surface a metric for the number of running pipelines.
if err := r.RunningPipelineRuns(lister); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}
}
}
12 changes: 0 additions & 12 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,12 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}
}

var updated bool
if !equality.Semantic.DeepEqual(original.Status, pr.Status) {
if _, err := c.updateStatus(pr); err != nil {
c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err))
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update")
return multierror.Append(merr, err)
}
updated = true
}

// When we update the status only, we use updateStatus to minimize the chances of
Expand All @@ -250,16 +248,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update labels/annotations")
return multierror.Append(merr, err)
}
updated = true
}

if updated {
go func(metrics *Recorder) {
err := metrics.RunningPipelineRuns(c.pipelineRunLister)
if err != nil {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
}

return merr.ErrorOrNil()
Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

go metrics.ReportRunningTaskRuns(ctx, taskRunInformer.Lister())

return impl
}
}
25 changes: 25 additions & 0 deletions pkg/reconciler/taskrun/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
)

Expand Down Expand Up @@ -69,13 +70,18 @@ type Recorder struct {
pipeline tag.Key
pipelineRun tag.Key
pod tag.Key

ReportingPeriod time.Duration
}

// NewRecorder creates a new metrics recorder instance
// to log the TaskRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,

// Default to reporting metrics every 30s.
ReportingPeriod: 30 * time.Second,
}

task, err := tag.NewKey("task")
Expand Down Expand Up @@ -250,6 +256,25 @@ func (r *Recorder) RunningTaskRuns(lister listers.TaskRunLister) error {
return nil
}

// ReportRunningTaskRuns invokes RunningTaskRuns on our configured PeriodSeconds
// until the context is cancelled.
func (r *Recorder) ReportRunningTaskRuns(ctx context.Context, lister listers.TaskRunLister) {
logger := logging.FromContext(ctx)
for {
select {
case <-ctx.Done():
// When the context is cancelled, stop reporting.
return

case <-time.After(r.ReportingPeriod):
// Every 30s surface a metric for the number of running tasks.
if err := r.RunningTaskRuns(lister); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}
}
}

// RecordPodLatency logs the duration required to schedule the pod for TaskRun
// returns an error if its failed to log the metrics
func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1beta1.TaskRun) error {
Expand Down
13 changes: 0 additions & 13 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,6 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun,
// Push changes (if any) to the TaskRun status, labels and annotations to
// TaskRun definition in ectd
func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.TaskRun) error {
var updated bool

if !equality.Semantic.DeepEqual(original.Status, tr.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
Expand All @@ -423,7 +421,6 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task
c.Logger.Warn("Failed to update taskRun status", zap.Error(err))
return err
}
updated = true
}

// When we update the status only, we use updateStatus to minimize the chances of
Expand All @@ -435,16 +432,6 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task
c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err))
return err
}
updated = true
}

if updated {
go func(metrics *Recorder) {
err := metrics.RunningTaskRuns(c.taskRunLister)
if err != nil {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
}

return nil
Expand Down

0 comments on commit 2f44504

Please sign in to comment.