Skip to content
This repository has been archived by the owner on Dec 11, 2023. It is now read-only.

Commit

Permalink
restructure status with backends
Browse files Browse the repository at this point in the history
  • Loading branch information
odacremolbap committed Jul 5, 2023
1 parent 731574f commit 1a13a18
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 243 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ kubernetes-broker-config-secret-key | KUBERNETES_BROKER_CONFIG_SECRET_KEY | |
kubernetes-observability-configmap-name | KUBERNETES_OBSERVABILITY_CONFIGMAP_NAME || ConfigMap object name that contains the observability configuration.
kubernetes-status-configmap-name | KUBERNETES_STATUS_CONFIGMAP_NAME || ConfigMap object name where the broker instance should write its status.
kubernetes-status-configmap-key | KUBERNETES_STATUS_CONFIGMAP_KEY | status | ConfigMap object key where the broker instance should write its status.
kubernetes-status-resync-period | KUBERNETES_STATUS_RESYNC_PERIOD | PT10S | Period for running pending status write checks using ISO8601.
kubernetes-status-cache-expiration | KUBERNETES_STATUS_CACHE_EXPIRATION | PT1M | Time to wait without forcing a status write to the ConfigMap using ISO8601.
status-reporter-resync-check-period | STATUS_REPORTER_RESYNC_CHECK_PERIOD | PT10S | Period for running status checks for pending changes, using ISO8601.
status-reporter-resync-force-period | STATUS_REPORTER_RESYNC_FORCE_PERIOD | PT1M | Period for running status resync cycles that force status writes, using ISO8601.
config-polling-period | CONFIG_POLLING_PERIOD | PT0S | ISO8601 duration for config polling. Disabled if PT0S. Enabling it will disable other configuration methods.
broker-config | BROKER_CONFIG | | JSON representation of broker configuration. Enabling it will disable other configuration methods.
observability-config | BROKER_CONFIG | | JSON representation of observability configuration. Enabling it will disable other configuration methods.
Expand Down
61 changes: 37 additions & 24 deletions pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ const (
)

type Instance struct {
backend backend.Interface
ingest *ingest.Instance
subscription *subscriptions.Manager
bcw *cfgbwatcher.Watcher
ocw *cfgowatcher.Watcher
bcp *cfgbpoller.Poller
ocp *cfgopoller.Poller
km *controller.Manager
staticConfig *cfgbroker.Config
status Status
backend backend.Interface
ingest *ingest.Instance
subscription *subscriptions.Manager
bcw *cfgbwatcher.Watcher
ocw *cfgowatcher.Watcher
bcp *cfgbpoller.Poller
ocp *cfgopoller.Poller
km *controller.Manager
staticConfig *cfgbroker.Config
statusManager status.Manager
status Status

logger *zap.SugaredLogger
}
Expand All @@ -60,29 +61,33 @@ func NewInstance(globals *cmd.Globals, b backend.Interface) (*Instance, error) {
globals.Logger.Debug("Creating subscription manager")

// Create status manager to be injected into ingest and subscription manager.
var statusManager status.Manager
statusManager := status.NewManager(
/* Cached status expiry. When reached a re-write of the ConfigMap will be forced */
globals.StatusForcePeriod,
/* Resync period */
globals.StatusCheckPeriod,
globals.Logger.Named("status"),
)

if globals.KubernetesStatusConfigmapName != "" {
kc, err := client.New(config.GetConfigOrDie(), client.Options{})
if err != nil {
return nil, err
}

statusManager = kstatus.NewKubernetesManager(globals.Context,
kbackend := kstatus.NewKubernetesBackend(

// ConfigMap identification
globals.KubernetesStatusConfigmapName,
globals.KubernetesNamespace,
globals.KubernetesStatusConfigmapKey,

// Broker instance
globals.BrokerName,

/* Cached status expiry. When reached a re-write of the ConfigMap will be forced */
globals.StatusCacheExpiration,
/* Resync period */
globals.StatusResyncPeriod,

kc,
globals.Logger.Named("status"))
globals.Logger.Named("kubestatus"),
)

statusManager.RegisterBackendStatusWriters(kbackend)
}

// Create subscription manager.
Expand All @@ -105,10 +110,11 @@ func NewInstance(globals *cmd.Globals, b backend.Interface) (*Instance, error) {

globals.Logger.Debug("Creating broker instance")
broker := &Instance{
backend: b,
ingest: i,
subscription: sm,
status: StatusStopped,
backend: b,
ingest: i,
subscription: sm,
statusManager: statusManager,
status: StatusStopped,

logger: globals.Logger.Named("broker"),
}
Expand Down Expand Up @@ -254,6 +260,13 @@ func (i *Instance) Start(inctx context.Context) error {
}
}()

// Launch status manager
i.logger.Debug("Starting status manager")
grp.Go(func() error {
i.statusManager.Start(ctx)
return nil
})

// Initialization will create structures, execute migrations
// and claim non processed messages from the backend.
i.logger.Debug("Initializing backend")
Expand Down
44 changes: 21 additions & 23 deletions pkg/broker/cmd/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,18 @@ type Globals struct {
KubernetesObservabilityConfigMapName string `help:"ConfigMap object name that contains the observability configuration." env:"KUBERNETES_OBSERVABILITY_CONFIGMAP_NAME"`
KubernetesStatusConfigmapName string `help:"ConfigMap object name where the broker instance should write its status." env:"KUBERNETES_STATUS_CONFIGMAP_NAME"`
KubernetesStatusConfigmapKey string `help:"ConfigMap object key where the broker should write its status." env:"KUBERNETES_STATUS_CONFIGMAP_KEY" default:"status"`
KubernetesStatusResyncPeriod string `help:"Period for running pending status write checks using ISO8601." env:"KUBERNETES_STATUS_RESYNC_PERIOD" default:"PT10S"`
KubernetesStatusCacheExpiration string `help:"Time to wait without forcing a status write to the ConfigMap using ISO8601." env:"KUBERNETES_STATUS_CACHE_EXPIRATION" default:"PT1M"`
StatusReporterResyncCheckPeriod string `help:"Period for running status checks for pending changes, using ISO8601." env:"STATUS_REPORTER_RESYNC_CHECK_PERIOD" default:"PT10S"`
StatusReporterResyncForcePeriod string `help:"Period for running status resync cycles that force status writes, using ISO8601." env:"STATUS_REPORTER_RESYNC_FORCE_PERIOD" default:"PT1M"`

ObservabilityMetricsDomain string `help:"Domain to be used for some metrics reporters." env:"OBSERVABILITY_METRICS_DOMAIN" default:"triggermesh.io/eventing"`

Context context.Context `kong:"-"`
Logger *zap.SugaredLogger `kong:"-"`
LogLevel zap.AtomicLevel `kong:"-"`
PollingPeriod time.Duration `kong:"-"`
ConfigMethod ConfigMethod `kong:"-"`
StatusResyncPeriod time.Duration `kong:"-"`
StatusCacheExpiration time.Duration `kong:"-"`
Context context.Context `kong:"-"`
Logger *zap.SugaredLogger `kong:"-"`
LogLevel zap.AtomicLevel `kong:"-"`
PollingPeriod time.Duration `kong:"-"`
ConfigMethod ConfigMethod `kong:"-"`
StatusCheckPeriod time.Duration `kong:"-"`
StatusForcePeriod time.Duration `kong:"-"`
}

func (s *Globals) Validate() error {
Expand Down Expand Up @@ -176,21 +176,19 @@ func (s *Globals) Validate() error {
msg = append(msg, "Either Kubernetes Secret or local file configuration must be informed.")
}

// If the status is enabled, parse durations for resync and expired cache.
if s.KubernetesStatusConfigmapName != "" {
p, err := period.Parse(s.KubernetesStatusResyncPeriod)
if err != nil {
msg = append(msg, fmt.Sprintf("Kubernetes status resync period is not an ISO8601 duration: %v", err))
} else {
s.StatusResyncPeriod = p.DurationApprox()
}
// parse durations for resync and expired cache.
p, err := period.Parse(s.StatusReporterResyncCheckPeriod)
if err != nil {
msg = append(msg, fmt.Sprintf("status resync check period is not an ISO8601 duration: %v", err))
} else {
s.StatusCheckPeriod = p.DurationApprox()
}

p, err = period.Parse(s.KubernetesStatusCacheExpiration)
if err != nil {
msg = append(msg, fmt.Sprintf("Kubernetes status cache expiration is not an ISO8601 duration: %v", err))
} else {
s.StatusCacheExpiration = p.DurationApprox()
}
p, err = period.Parse(s.StatusReporterResyncForcePeriod)
if err != nil {
msg = append(msg, fmt.Sprintf("status resync force period is not an ISO8601 duration: %v", err))
} else {
s.StatusForcePeriod = p.DurationApprox()
}

if len(msg) != 0 {
Expand Down
Loading

0 comments on commit 1a13a18

Please sign in to comment.