Skip to content

Commit

Permalink
Use metadata informer for ReplicaSets
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Sep 27, 2024
1 parent 57a9b92 commit 6412ce3
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 42 deletions.
123 changes: 123 additions & 0 deletions internal/pkg/composable/providers/kubernetes/metagen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package kubernetes

import (
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

type (
AddResourceMetadataConfig = metadata.AddResourceMetadataConfig
MetaGen = metadata.MetaGen
FieldOptions = metadata.FieldOptions
)

// GetPodMetaGen is a wrapper function that creates a metaGen for pod resource and has embeeded
// nodeMetaGen and namespaceMetaGen
func GetPodMetaGen(
cfg *config.C,
podWatcher kubernetes.Watcher,
nodeWatcher kubernetes.Watcher,
namespaceWatcher kubernetes.Watcher,
replicasetWatcher kubernetes.Watcher,
jobWatcher kubernetes.Watcher,
metaConf *AddResourceMetadataConfig) MetaGen {

var nodeMetaGen, namespaceMetaGen, rsMetaGen, jobMetaGen MetaGen
if nodeWatcher != nil && metaConf.Node.Enabled() {
nodeMetaGen = metadata.NewNodeMetadataGenerator(metaConf.Node, nodeWatcher.Store(), nodeWatcher.Client())
}
if namespaceWatcher != nil && metaConf.Namespace.Enabled() {
namespaceMetaGen = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), namespaceWatcher.Client())
}
if replicasetWatcher != nil && metaConf.Deployment {
// use our own implementation of this generator, which can avoid tracking the full ReplicaSet resource
// TODO: Remove this after upstreaming the change to the autodiscovery lib
rsMetaGen = NewReplicasetMetadataGenerator(cfg, replicasetWatcher.Store(), replicasetWatcher.Client())
}
if jobWatcher != nil && metaConf.CronJob {
jobMetaGen = metadata.NewJobMetadataGenerator(cfg, jobWatcher.Store(), jobWatcher.Client())
}
metaGen := metadata.NewPodMetadataGenerator(
cfg,
podWatcher.Store(),
podWatcher.Client(),
nodeMetaGen,
namespaceMetaGen,
rsMetaGen,
jobMetaGen,
metaConf)
return metaGen
}

const resourceType = "replicaset"

type replicaset struct {
store cache.Store
resource *metadata.Resource
}

// NewReplicasetMetadataGenerator creates a metagen for replicaset resources
func NewReplicasetMetadataGenerator(cfg *config.C, replicasets cache.Store, client k8s.Interface) MetaGen {
return &replicaset{
resource: metadata.NewResourceMetadataGenerator(cfg, client),
store: replicasets,
}
}

// Generate generates replicaset metadata from a resource object
// Metadata map is in the following form:
//
// {
// "kubernetes": {},
// "some.ecs.field": "asdf"
// }
//
// All Kubernetes fields that need to be stored under kuberentes. prefix are populetad by
// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method
func (rs *replicaset) Generate(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M {
ecsFields := rs.GenerateECS(obj)
meta := mapstr.M{
"kubernetes": rs.GenerateK8s(obj, opts...),
}
meta.DeepUpdate(ecsFields)
return meta
}

// GenerateECS generates replicaset ECS metadata from a resource object
func (rs *replicaset) GenerateECS(obj kubernetes.Resource) mapstr.M {
return rs.resource.GenerateECS(obj)
}

// GenerateK8s generates replicaset metadata from a resource object
func (rs *replicaset) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M {
_, ok := obj.(metav1.Object) // one of the changes from upstream autodiscovery
if !ok {
return nil
}

meta := rs.resource.GenerateK8s(resourceType, obj, opts...)
return meta
}

// GenerateFromName generates replicaset metadata from a replicaset name
func (rs *replicaset) GenerateFromName(name string, opts ...FieldOptions) mapstr.M {
if rs.store == nil {
return nil
}

if obj, ok, _ := rs.store.GetByKey(name); ok {
replicaSet, ok := obj.(kubernetes.Resource) // one of the changes from upstream autodiscovery
if !ok {
return nil
}

return rs.GenerateK8s(replicaSet, opts...)
}

return nil
}
62 changes: 48 additions & 14 deletions internal/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ package kubernetes

import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sync"
"time"

v1 "k8s.io/api/apps/v1"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
"github.com/elastic/elastic-agent-autodiscover/utils"
Expand All @@ -20,6 +20,7 @@ import (
"github.com/elastic/elastic-agent-libs/safemapstr"

k8s "k8s.io/client-go/kubernetes"
clientgometa "k8s.io/client-go/metadata"

"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/composable"
Expand Down Expand Up @@ -106,12 +107,21 @@ func NewPodEventer(
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
metadataClient, err := GetKubernetesMetadataClient(cfg.KubeConfig, cfg.KubeClientOptions)
if err != nil {
logger.Errorf("Error creating metadata client for %T due to error %+v", &kubernetes.Namespace{}, err)
}
// use a custom watcher here, so we can provide a transform function and limit the data we're storing
replicaSetWatcher, err = NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Namespace: cfg.Namespace,
HonorReSyncs: true,
}, nil, removeUnnecessaryReplicaSetData)
replicaSetWatcher, err = NewNamedMetaWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Namespace: cfg.Namespace,
HonorReSyncs: true,
}, nil, removeUnnecessaryReplicaSetData)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
Expand All @@ -131,7 +141,7 @@ func NewPodEventer(
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)
metaGen := GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

p := &pod{
logger: logger,
Expand Down Expand Up @@ -546,15 +556,39 @@ func hintsCheck(annotations mapstr.M, container string, prefix string, validate
// removeUnnecessaryReplicaSetData removes all data from a ReplicaSet resource, except what we need to compute
// Pod metadata. Which is just the name and owner references.
func removeUnnecessaryReplicaSetData(obj interface{}) (interface{}, error) {
old, ok := obj.(*v1.ReplicaSet)
old, ok := obj.(*metav1.PartialObjectMetadata)
if !ok {
return nil, fmt.Errorf("obj is not a ReplicaSet")
}
transformed := v1.ReplicaSet{}
transformed.ObjectMeta = kubernetes.ObjectMeta{
Name: old.GetName(),
Namespace: old.GetNamespace(),
OwnerReferences: old.GetOwnerReferences(),
transformed := &metav1.PartialObjectMetadata{
ObjectMeta: kubernetes.ObjectMeta{
Name: old.GetName(),
Namespace: old.GetNamespace(),
OwnerReferences: old.GetOwnerReferences(),
ResourceVersion: old.GetResourceVersion(),
},
}
return transformed, nil
}

// GetKubernetesMetadataClient returns a kubernetes metadata client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
func GetKubernetesMetadataClient(kubeconfig string, opt kubernetes.KubeClientOptions) (clientgometa.Interface, error) {
if kubeconfig == "" {
kubeconfig = kubernetes.GetKubeConfigEnvironmentVariable()
}

cfg, err := kubernetes.BuildConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("unable to build kube config due to error: %w", err)
}
cfg.QPS = opt.QPS
cfg.Burst = opt.Burst
client, err := clientgometa.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err)
}

return client, nil
}
64 changes: 36 additions & 28 deletions internal/pkg/composable/providers/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ package kubernetes
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
clientgometa "k8s.io/client-go/metadata"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -65,35 +69,34 @@ type watcher struct {

// NewWatcher initializes the watcher client to provide a events handler for
// resource from the cluster (filtered to the given node)
func NewWatcher(
func NewMetaWatcher(
client kubernetes.Interface,
resource Resource,
metadataClient clientgometa.Interface,
gvr schema.GroupVersionResource,
opts WatchOptions,
indexers cache.Indexers,
transformFunc cache.TransformFunc,
) (Watcher, error) {
return NewNamedWatcher("", client, resource, opts, indexers, transformFunc)
return NewNamedMetaWatcher("", client, metadataClient, gvr, opts, indexers, transformFunc)
}

// NewNamedWatcher initializes the watcher client to provide an events handler for
// NewNamedMetaWatcher initializes the watcher client to provide an events handler for
// resource from the cluster (filtered to the given node) and also allows to name the k8s
// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
func NewNamedWatcher(
func NewNamedMetaWatcher(
name string,
client kubernetes.Interface,
resource Resource,
metadataClient clientgometa.Interface,
gvr schema.GroupVersionResource,
opts WatchOptions,
indexers cache.Indexers,
transformFunc cache.TransformFunc,
) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface //nolint:staticcheck // TODO: use the typed version
var cachedObject runtime.Object
informer, _, err := autodiscoverK8s.NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
}
informer := NewMetadataInformer(metadataClient, gvr, opts, indexers)

store = informer.GetStore()
queue = workqueue.NewNamed(name)
Expand Down Expand Up @@ -121,7 +124,7 @@ func NewNamedWatcher(
handler: autodiscoverK8s.NoOpEventHandlerFuncs{},
}

_, err = w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
w.enqueue(o, add)
},
Expand All @@ -141,14 +144,6 @@ func NewNamedWatcher(
// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
w.enqueue(n, add)
}

//We check the type of resource and only if it is namespace or node return the cacheObject
switch resource.(type) {
case *Namespace:
w.cacheObject(o)
case *Node:
w.cacheObject(o)
}
},
})
if err != nil {
Expand Down Expand Up @@ -229,15 +224,6 @@ func (w *watcher) enqueue(obj interface{}, state string) {
w.queue.Add(&item{key, obj, state})
}

// cacheObject updates watcher with the old version of cache objects before change during update events
func (w *watcher) cacheObject(o interface{}) {
if old, ok := o.(runtime.Object); !ok {
utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o))
} else {
w.cachedObject = old
}
}

// process gets the top of the work queue and processes the object that is received.
func (w *watcher) process(_ context.Context) bool {
obj, quit := w.queue.Get()
Expand Down Expand Up @@ -283,3 +269,25 @@ func (w *watcher) process(_ context.Context) bool {

return true
}

// NewMetadataInformer creates an informer for a given resource that only tracks the resource metadata.
func NewMetadataInformer(client clientgometa.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers) cache.SharedInformer {
ctx := context.Background()
if indexers == nil {
indexers = cache.Indexers{}
}
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.Resource(gvr).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Resource(gvr).Watch(ctx, options)
},
},
&metav1.PartialObjectMetadata{},
opts.SyncTimeout,
indexers,
)
return informer
}

0 comments on commit 6412ce3

Please sign in to comment.