Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Api discovery #3973

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 9 additions & 32 deletions cmd/postgres-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/healthz"

Expand Down Expand Up @@ -157,6 +156,10 @@ func main() {
// deprecation warnings when using an older version of a resource for backwards compatibility).
rest.SetDefaultWarningHandler(rest.NoWarnings{})

apis, err := runtime.NewAPIDiscoveryRunner(cfg)
assertNoError(err)
assertNoError(apis.Read())

options, err := initManager()
assertNoError(err)

Expand All @@ -165,13 +168,17 @@ func main() {
options.BaseContext = func() context.Context {
ctx := context.Background()
ctx = feature.NewContext(ctx, features)
ctx = runtime.NewAPIContext(ctx, apis)
return ctx
}

mgr, err := runtime.NewManager(cfg, options)
assertNoError(err)
assertNoError(mgr.Add(apis))

openshift := isOpenshift(cfg)
openshift := apis.Has(runtime.API{
Group: "security.openshift.io", Kind: "SecurityContextConstraints",
})
if openshift {
log.Info("detected OpenShift environment")
}
Expand Down Expand Up @@ -275,33 +282,3 @@ func addControllersToManager(mgr runtime.Manager, openshift bool, log logging.Lo
os.Exit(1)
}
}

func isOpenshift(cfg *rest.Config) bool {
const sccGroupName, sccKind = "security.openshift.io", "SecurityContextConstraints"

client, err := discovery.NewDiscoveryClientForConfig(cfg)
assertNoError(err)

groups, err := client.ServerGroups()
if err != nil {
assertNoError(err)
}
for _, g := range groups.Groups {
if g.Name != sccGroupName {
continue
}
for _, v := range g.Versions {
resourceList, err := client.ServerResourcesForGroupVersion(v.GroupVersion)
if err != nil {
assertNoError(err)
}
for _, r := range resourceList.APIResources {
if r.Kind == sccKind {
return true
}
}
}
}

return false
}
238 changes: 238 additions & 0 deletions internal/controller/runtime/api_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright 2023 - 2024 Crunchy Data Solutions, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package runtime

import (
"context"
"errors"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"

"github.com/crunchydata/postgres-operator/internal/logging"
)

// API is a combination of Group, Version, and Kind that can be used to check
// what is available in the Kubernetes API. There are four ways to populate it:
// 1. Group without Version nor Kind means any resource in that Group.
// 2. Group with Version but no Kind means any resource in that GV.
// 3. Group with Kind but no Version means that Kind in any Version of the Group.
// 4. Group with Version and Kind means that exact GVK.
type API = schema.GroupVersionKind

type APIs interface {
Has(API) bool
HasAll(...API) bool
HasOne(...API) bool
}

// APISet implements [APIs] using empty struct for minimal memory consumption.
type APISet map[API]struct{}

func NewAPISet(api ...API) APISet {
s := make(APISet)

for i := range api {
s[api[i]] = struct{}{}
s[API{Group: api[i].Group}] = struct{}{}
s[API{Group: api[i].Group, Version: api[i].Version}] = struct{}{}
s[API{Group: api[i].Group, Kind: api[i].Kind}] = struct{}{}
}

return s
}

// Has returns true when api is available in s.
func (s APISet) Has(api API) bool { return s.HasOne(api) }

// HasAll returns true when every api is available in s.
func (s APISet) HasAll(api ...API) bool {
for i := range api {
if _, present := s[api[i]]; !present {
return false
}
}
return true
}

// HasOne returns true when at least one api is available in s.
func (s APISet) HasOne(api ...API) bool {
for i := range api {
if _, present := s[api[i]]; present {
return true
}
}
return false
}

type APIDiscoveryRunner struct {
Client interface {
ServerGroups() (*metav1.APIGroupList, error)
ServerResourcesForGroupVersion(string) (*metav1.APIResourceList, error)
}

refresh time.Duration

want []API
have struct {
sync.RWMutex
APISet
}
}

// NewAPIDiscoveryRunner creates an [APIDiscoveryRunner] that periodically reads
// what APIs are available in the Kubernetes at config.
func NewAPIDiscoveryRunner(config *rest.Config) (*APIDiscoveryRunner, error) {
dc, err := discovery.NewDiscoveryClientForConfig(config)

runner := &APIDiscoveryRunner{
Client: dc,
refresh: 10 * time.Minute,
want: []API{
{Group: "cert-manager.io", Kind: "Certificate"},
{Group: "gateway.networking.k8s.io", Kind: "ReferenceGrant"},
{Group: "security.openshift.io", Kind: "SecurityContextConstraints"},
{Group: "snapshot.storage.k8s.io", Kind: "VolumeSnapshot"},
{Group: "trust.cert-manager.io", Kind: "Bundle"},
},
}

return runner, err
}

// NeedLeaderElection returns false so that r runs on any [manager.Manager],
// regardless of which is elected leader in the Kubernetes namespace.
func (r *APIDiscoveryRunner) NeedLeaderElection() bool { return false }

// Read fetches available APIs from Kubernetes.
func (r *APIDiscoveryRunner) Read() error {

// Build an index of the APIs we want to know about.
wantAPIs := make(map[string]map[string]sets.Set[string])
for _, want := range r.want {
if wantAPIs[want.Group] == nil {
wantAPIs[want.Group] = make(map[string]sets.Set[string])
}
if wantAPIs[want.Group][want.Version] == nil {
wantAPIs[want.Group][want.Version] = sets.New[string]()
}
if want.Kind != "" {
wantAPIs[want.Group][want.Version].Insert(want.Kind)
}
}

// Fetch Groups and Versions from Kubernetes.
groups, err := r.Client.ServerGroups()
if err != nil {
return err
}

// Build an index of the Groups, GVs, GKs, and GVKs available in Kuberentes
// that we want to know about.
haveWantedAPIs := make(map[API]struct{})
for _, apiG := range groups.Groups {
var haveG string = apiG.Name
haveWantedAPIs[API{Group: haveG}] = struct{}{}

for _, apiGV := range apiG.Versions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need to loop through the Versions if we don't have the Group in wantAPIs? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expensive part was calling Kubernetes to retrieve the groups and versions. IMV, there's little benefit to discarding that information here.

var haveV string = apiGV.Version
haveWantedAPIs[API{Group: haveG, Version: haveV}] = struct{}{}

// Only fetch Resources when there are Kinds we want to know about.
if wantAPIs[haveG][""].Len() == 0 && wantAPIs[haveG][haveV].Len() == 0 {
continue
}

resources, err := r.Client.ServerResourcesForGroupVersion(apiGV.GroupVersion)
if err != nil {
return err
}

for _, apiR := range resources.APIResources {
var haveK string = apiR.Kind
haveWantedAPIs[API{Group: haveG, Kind: haveK}] = struct{}{}
haveWantedAPIs[API{Group: haveG, Kind: haveK, Version: haveV}] = struct{}{}
}
}
}

r.have.Lock()
r.have.APISet = haveWantedAPIs
r.have.Unlock()

return nil
}

// Start periodically reads the Kuberentes API. It blocks until ctx is cancelled.
func (r *APIDiscoveryRunner) Start(ctx context.Context) error {
ticker := time.NewTicker(r.refresh)
defer ticker.Stop()

log := logging.FromContext(ctx).WithValues("controller", "kubernetes")

for {
select {
case <-ticker.C:
if err := r.Read(); err != nil {
log.Error(err, "Unable to detect Kubernetes APIs")
}
case <-ctx.Done():
// TODO(controller-runtime): Fixed in v0.19.0
// https:/kubernetes-sigs/controller-runtime/issues/1927
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
return ctx.Err()
}
}
}

// Has returns true when api is available in Kuberentes.
func (r *APIDiscoveryRunner) Has(api API) bool { return r.HasOne(api) }

// HasAll returns true when every api is available in Kubernetes.
func (r *APIDiscoveryRunner) HasAll(api ...API) bool {
r.have.RLock()
defer r.have.RUnlock()
return r.have.HasAll(api...)
}

// HasOne returns true when at least one api is available in Kubernetes.
func (r *APIDiscoveryRunner) HasOne(api ...API) bool {
r.have.RLock()
defer r.have.RUnlock()
return r.have.HasOne(api...)
}

type apiContextKey struct{}

// Kubernetes returns the APIs previously stored by [NewAPIContext].
// When nothing was stored, it returns an empty [APISet].
func Kubernetes(ctx context.Context) APIs {
cbandy marked this conversation as resolved.
Show resolved Hide resolved
if apis, ok := ctx.Value(apiContextKey{}).(APIs); ok {
cbandy marked this conversation as resolved.
Show resolved Hide resolved
return apis
}
return APISet{}
}

// NewAPIContext returns a copy of ctx containing apis. Retrieve it using [Kubernetes].
func NewAPIContext(ctx context.Context, apis APIs) context.Context {
return context.WithValue(ctx, apiContextKey{}, apis)
}
85 changes: 85 additions & 0 deletions internal/controller/runtime/api_discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2023 - 2024 Crunchy Data Solutions, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package runtime

import (
"context"
"testing"

"gotest.tools/v3/assert"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

func TestAPISet(t *testing.T) {
t.Parallel()

var zero APISet
assert.Assert(t, !zero.Has(API{Group: "security.openshift.io"}))
assert.Assert(t, !zero.Has(API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"}))
assert.Assert(t, !zero.HasAll(API{Group: "security.openshift.io"}, API{Group: "snapshot.storage.k8s.io"}))
assert.Assert(t, !zero.HasOne(API{Group: "security.openshift.io"}, API{Group: "snapshot.storage.k8s.io"}))

empty := NewAPISet()
assert.Assert(t, !empty.Has(API{Group: "security.openshift.io"}))
assert.Assert(t, !empty.Has(API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"}))

one := NewAPISet(
API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"},
)
assert.Assert(t, one.Has(API{Group: "security.openshift.io"}))
assert.Assert(t, one.Has(API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"}))
assert.Assert(t, !one.HasAll(API{Group: "snapshot.storage.k8s.io"}, API{Group: "security.openshift.io"}))
assert.Assert(t, !one.HasOne(API{Group: "snapshot.storage.k8s.io"}))
assert.Assert(t, one.HasOne(API{Group: "snapshot.storage.k8s.io"}, API{Group: "security.openshift.io"}))

two := NewAPISet(
API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"},
API{Group: "snapshot.storage.k8s.io", Kind: "VolumeSnapshot"},
)
assert.Assert(t, two.Has(API{Group: "security.openshift.io"}))
assert.Assert(t, two.Has(API{Group: "snapshot.storage.k8s.io"}))
assert.Assert(t, two.HasAll(API{Group: "snapshot.storage.k8s.io"}, API{Group: "security.openshift.io"}))
assert.Assert(t, two.HasOne(API{Group: "snapshot.storage.k8s.io"}))
assert.Assert(t, two.HasOne(API{Group: "snapshot.storage.k8s.io"}, API{Group: "security.openshift.io"}))
}

func TestAPIContext(t *testing.T) {
t.Parallel()

// The background context always return false.
ctx := context.Background()

assert.Assert(t, !Kubernetes(ctx).Has(API{Group: "security.openshift.io"}))
assert.Assert(t, !Kubernetes(ctx).Has(API{Group: "snapshot.storage.k8s.io"}))

// An initialized context returns what is stored.
set := NewAPISet(API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"})
ctx = NewAPIContext(ctx, set)

assert.Assert(t, Kubernetes(ctx).Has(API{Group: "security.openshift.io"}))
assert.Assert(t, !Kubernetes(ctx).Has(API{Group: "snapshot.storage.k8s.io"}))

// The stored value is mutable within the context.
set[API{Group: "snapshot.storage.k8s.io"}] = struct{}{}
assert.Assert(t, Kubernetes(ctx).Has(API{Group: "snapshot.storage.k8s.io"}))
}

func TestAPIDiscoveryRunnerInterfaces(t *testing.T) {
var _ APIs = new(APIDiscoveryRunner)
var _ manager.Runnable = new(APIDiscoveryRunner)

var runnable manager.LeaderElectionRunnable = new(APIDiscoveryRunner)
assert.Assert(t, false == runnable.NeedLeaderElection())
}
Loading
Loading