From 79c066437e5a0345e7fcfd4eabc0e228944f7b17 Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Fri, 5 Jun 2020 07:09:38 -0700 Subject: [PATCH] Disentangle metric reporting from the actual reconciler. This changes the metric reporting to happen periodically vs. being triggered by reconciliation. The previous method was prone to stale data because it was driven by the informer cache immediately following writes through the client, and might not fix itself since the status may not change (even on resync every 10h). With this change it should exhibit the correct value within 30s + {informer delay}, where the 30s is configurable on the Recorders. Fixes: #2729 --- pkg/reconciler/pipelinerun/controller.go | 6 ++---- pkg/reconciler/pipelinerun/metrics.go | 25 +++++++++++++++++++++++ pkg/reconciler/pipelinerun/pipelinerun.go | 12 ----------- pkg/reconciler/taskrun/controller.go | 3 +++ pkg/reconciler/taskrun/metrics.go | 25 +++++++++++++++++++++++ pkg/reconciler/taskrun/taskrun.go | 13 ------------ 6 files changed, 55 insertions(+), 29 deletions(-) diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index b8c161856c8..2920f5fe808 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -40,10 +40,6 @@ import ( "knative.dev/pkg/tracker" ) -const ( - resyncPeriod = 10 * time.Hour -) - // NewController instantiates a new controller.Impl from knative.dev/pkg/controller func NewController(namespace string, images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl { return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { @@ -103,6 +99,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex c.configStore = config.NewStore(images, c.Logger.Named("config-store")) c.configStore.WatchConfigs(cmw) + go metrics.ReportRunningPipelineRuns(ctx, pipelineRunInformer.Lister()) + return impl } } diff --git a/pkg/reconciler/pipelinerun/metrics.go b/pkg/reconciler/pipelinerun/metrics.go index 776ef82762a..b91fe1af236 100644 --- a/pkg/reconciler/pipelinerun/metrics.go +++ b/pkg/reconciler/pipelinerun/metrics.go @@ -29,6 +29,7 @@ import ( "go.opencensus.io/tag" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/logging" "knative.dev/pkg/metrics" ) @@ -56,6 +57,8 @@ type Recorder struct { pipelineRun tag.Key namespace tag.Key status tag.Key + + ReportingPeriod time.Duration } // NewRecorder creates a new metrics recorder instance @@ -63,6 +66,9 @@ type Recorder struct { func NewRecorder() (*Recorder, error) { r := &Recorder{ initialized: true, + + // Default to 30s intervals. + ReportingPeriod: 30 * time.Second, } pipeline, err := tag.NewKey("pipeline") @@ -184,3 +190,22 @@ func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error { return nil } + +// ReportRunningPipelineRuns invokes RunningPipelineRuns on our configured PeriodSeconds +// until the context is cancelled. +func (r *Recorder) ReportRunningPipelineRuns(ctx context.Context, lister listers.PipelineRunLister) { + logger := logging.FromContext(ctx) + for { + select { + case <-ctx.Done(): + // When the context is cancelled, stop reporting. + return + + case <-time.After(r.ReportingPeriod): + // Every 30s surface a metric for the number of running pipelines. + if err := r.RunningPipelineRuns(lister); err != nil { + logger.Warnf("Failed to log the metrics : %v", err) + } + } + } +} diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index ea7c0ded648..a49147740ca 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -230,14 +230,12 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } } - var updated bool if !equality.Semantic.DeepEqual(original.Status, pr.Status) { if _, err := c.updateStatus(pr); err != nil { c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") return multierror.Append(merr, err) } - updated = true } // When we update the status only, we use updateStatus to minimize the chances of @@ -250,16 +248,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update labels/annotations") return multierror.Append(merr, err) } - updated = true - } - - if updated { - go func(metrics *Recorder) { - err := metrics.RunningPipelineRuns(c.pipelineRunLister) - if err != nil { - c.Logger.Warnf("Failed to log the metrics : %v", err) - } - }(c.metrics) } return merr.ErrorOrNil() diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 7c9bf6caf4e..f84d38e8002 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -106,6 +106,9 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex c.configStore = config.NewStore(c.Logger.Named("config-store")) c.configStore.WatchConfigs(cmw) + + go metrics.ReportRunningTaskRuns(ctx, taskRunInformer.Lister()) + return impl } } diff --git a/pkg/reconciler/taskrun/metrics.go b/pkg/reconciler/taskrun/metrics.go index 681c686424b..3ce9a0e97f4 100644 --- a/pkg/reconciler/taskrun/metrics.go +++ b/pkg/reconciler/taskrun/metrics.go @@ -30,6 +30,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/logging" "knative.dev/pkg/metrics" ) @@ -69,6 +70,8 @@ type Recorder struct { pipeline tag.Key pipelineRun tag.Key pod tag.Key + + ReportingPeriod time.Duration } // NewRecorder creates a new metrics recorder instance @@ -76,6 +79,9 @@ type Recorder struct { func NewRecorder() (*Recorder, error) { r := &Recorder{ initialized: true, + + // Default to reporting metrics every 30s. + ReportingPeriod: 30 * time.Second, } task, err := tag.NewKey("task") @@ -250,6 +256,25 @@ func (r *Recorder) RunningTaskRuns(lister listers.TaskRunLister) error { return nil } +// ReportRunningTaskRuns invokes RunningTaskRuns on our configured PeriodSeconds +// until the context is cancelled. +func (r *Recorder) ReportRunningTaskRuns(ctx context.Context, lister listers.TaskRunLister) { + logger := logging.FromContext(ctx) + for { + select { + case <-ctx.Done(): + // When the context is cancelled, stop reporting. + return + + case <-time.After(r.ReportingPeriod): + // Every 30s surface a metric for the number of running tasks. + if err := r.RunningTaskRuns(lister); err != nil { + logger.Warnf("Failed to log the metrics : %v", err) + } + } + } +} + // RecordPodLatency logs the duration required to schedule the pod for TaskRun // returns an error if its failed to log the metrics func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1beta1.TaskRun) error { diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index d6a499fa275..ca91020bceb 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -421,8 +421,6 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, // Push changes (if any) to the TaskRun status, labels and annotations to // TaskRun definition in ectd func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.TaskRun) error { - var updated bool - if !equality.Semantic.DeepEqual(original.Status, tr.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's @@ -432,7 +430,6 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task c.Logger.Warn("Failed to update taskRun status", zap.Error(err)) return err } - updated = true } // When we update the status only, we use updateStatus to minimize the chances of @@ -444,16 +441,6 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err)) return err } - updated = true - } - - if updated { - go func(metrics *Recorder) { - err := metrics.RunningTaskRuns(c.taskRunLister) - if err != nil { - c.Logger.Warnf("Failed to log the metrics : %v", err) - } - }(c.metrics) } return nil