diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index b8c161856c8..2920f5fe808 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -40,10 +40,6 @@ import ( "knative.dev/pkg/tracker" ) -const ( - resyncPeriod = 10 * time.Hour -) - // NewController instantiates a new controller.Impl from knative.dev/pkg/controller func NewController(namespace string, images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl { return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { @@ -103,6 +99,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex c.configStore = config.NewStore(images, c.Logger.Named("config-store")) c.configStore.WatchConfigs(cmw) + go metrics.ReportRunningPipelineRuns(ctx, pipelineRunInformer.Lister()) + return impl } } diff --git a/pkg/reconciler/pipelinerun/metrics.go b/pkg/reconciler/pipelinerun/metrics.go index 776ef82762a..b91fe1af236 100644 --- a/pkg/reconciler/pipelinerun/metrics.go +++ b/pkg/reconciler/pipelinerun/metrics.go @@ -29,6 +29,7 @@ import ( "go.opencensus.io/tag" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/logging" "knative.dev/pkg/metrics" ) @@ -56,6 +57,8 @@ type Recorder struct { pipelineRun tag.Key namespace tag.Key status tag.Key + + ReportingPeriod time.Duration } // NewRecorder creates a new metrics recorder instance @@ -63,6 +66,9 @@ type Recorder struct { func NewRecorder() (*Recorder, error) { r := &Recorder{ initialized: true, + + // Default to 30s intervals. + ReportingPeriod: 30 * time.Second, } pipeline, err := tag.NewKey("pipeline") @@ -184,3 +190,22 @@ func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error { return nil } + +// ReportRunningPipelineRuns invokes RunningPipelineRuns on our configured PeriodSeconds +// until the context is cancelled. +func (r *Recorder) ReportRunningPipelineRuns(ctx context.Context, lister listers.PipelineRunLister) { + logger := logging.FromContext(ctx) + for { + select { + case <-ctx.Done(): + // When the context is cancelled, stop reporting. + return + + case <-time.After(r.ReportingPeriod): + // Every 30s surface a metric for the number of running pipelines. + if err := r.RunningPipelineRuns(lister); err != nil { + logger.Warnf("Failed to log the metrics : %v", err) + } + } + } +} diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index ea7c0ded648..a49147740ca 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -230,14 +230,12 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } } - var updated bool if !equality.Semantic.DeepEqual(original.Status, pr.Status) { if _, err := c.updateStatus(pr); err != nil { c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") return multierror.Append(merr, err) } - updated = true } // When we update the status only, we use updateStatus to minimize the chances of @@ -250,16 +248,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update labels/annotations") return multierror.Append(merr, err) } - updated = true - } - - if updated { - go func(metrics *Recorder) { - err := metrics.RunningPipelineRuns(c.pipelineRunLister) - if err != nil { - c.Logger.Warnf("Failed to log the metrics : %v", err) - } - }(c.metrics) } return merr.ErrorOrNil() diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 7c9bf6caf4e..f84d38e8002 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -106,6 +106,9 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex c.configStore = config.NewStore(c.Logger.Named("config-store")) c.configStore.WatchConfigs(cmw) + + go metrics.ReportRunningTaskRuns(ctx, taskRunInformer.Lister()) + return impl } } diff --git a/pkg/reconciler/taskrun/metrics.go b/pkg/reconciler/taskrun/metrics.go index 681c686424b..3ce9a0e97f4 100644 --- a/pkg/reconciler/taskrun/metrics.go +++ b/pkg/reconciler/taskrun/metrics.go @@ -30,6 +30,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/logging" "knative.dev/pkg/metrics" ) @@ -69,6 +70,8 @@ type Recorder struct { pipeline tag.Key pipelineRun tag.Key pod tag.Key + + ReportingPeriod time.Duration } // NewRecorder creates a new metrics recorder instance @@ -76,6 +79,9 @@ type Recorder struct { func NewRecorder() (*Recorder, error) { r := &Recorder{ initialized: true, + + // Default to reporting metrics every 30s. + ReportingPeriod: 30 * time.Second, } task, err := tag.NewKey("task") @@ -250,6 +256,25 @@ func (r *Recorder) RunningTaskRuns(lister listers.TaskRunLister) error { return nil } +// ReportRunningTaskRuns invokes RunningTaskRuns on our configured PeriodSeconds +// until the context is cancelled. +func (r *Recorder) ReportRunningTaskRuns(ctx context.Context, lister listers.TaskRunLister) { + logger := logging.FromContext(ctx) + for { + select { + case <-ctx.Done(): + // When the context is cancelled, stop reporting. + return + + case <-time.After(r.ReportingPeriod): + // Every 30s surface a metric for the number of running tasks. + if err := r.RunningTaskRuns(lister); err != nil { + logger.Warnf("Failed to log the metrics : %v", err) + } + } + } +} + // RecordPodLatency logs the duration required to schedule the pod for TaskRun // returns an error if its failed to log the metrics func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1beta1.TaskRun) error { diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index d6a499fa275..ca91020bceb 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -421,8 +421,6 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, // Push changes (if any) to the TaskRun status, labels and annotations to // TaskRun definition in ectd func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.TaskRun) error { - var updated bool - if !equality.Semantic.DeepEqual(original.Status, tr.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's @@ -432,7 +430,6 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task c.Logger.Warn("Failed to update taskRun status", zap.Error(err)) return err } - updated = true } // When we update the status only, we use updateStatus to minimize the chances of @@ -444,16 +441,6 @@ func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1beta1.Task c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err)) return err } - updated = true - } - - if updated { - go func(metrics *Recorder) { - err := metrics.RunningTaskRuns(c.taskRunLister) - if err != nil { - c.Logger.Warnf("Failed to log the metrics : %v", err) - } - }(c.metrics) } return nil