Skip to content

Commit

Permalink
fix: ensure connectivity with Kubernetes API on start-up
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Sep 11, 2023
1 parent 5bf7fba commit 838ddc8
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 233 deletions.
9 changes: 7 additions & 2 deletions internal/controllers/crds/dynamic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type DynamicCRDController struct {
Controller Controller
RequiredCRDs []schema.GroupVersionResource

startControllersOnce sync.Once
// startControllerOnce ensures that the controller is started only once.
startControllerOnce sync.Once
}

func (r *DynamicCRDController) SetupWithManager(mgr ctrl.Manager) error {
Expand Down Expand Up @@ -87,7 +88,7 @@ func (r *DynamicCRDController) Reconcile(ctx context.Context, req ctrl.Request)
}

var startControllerErr error
r.startControllersOnce.Do(func() {
r.startControllerOnce.Do(func() {
log.V(util.InfoLevel).Info("All required CustomResourceDefinitions are installed, setting up the controller")
startControllerErr = r.setupController(r.Manager)
})
Expand All @@ -98,6 +99,10 @@ func (r *DynamicCRDController) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

func (r *DynamicCRDController) SetLogger(logger logr.Logger) {
r.Log = logger
}

func (r *DynamicCRDController) allRequiredCRDsInstalled() bool {
return lo.EveryBy(r.RequiredCRDs, func(gvr schema.GroupVersionResource) bool {
return utils.CRDExists(r.Manager.GetClient().RESTMapper(), gvr)
Expand Down
13 changes: 0 additions & 13 deletions internal/manager/conditions.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package manager

import (
"context"
"fmt"

netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"

ctrlutils "github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/utils"
)
Expand Down Expand Up @@ -68,12 +64,3 @@ func negotiateIngressAPI(config *Config, mapper meta.RESTMapper) (IngressAPI, er
}
return NoIngressAPI, nil
}

func ShouldEnableCRDController(ctx context.Context, gvr schema.GroupVersionResource, restMapper meta.RESTMapper) bool {
if !ctrlutils.CRDExists(restMapper, gvr) {
ctrl.LoggerFrom(ctx).WithName("controllers").WithName("crdCondition").
Info(fmt.Sprintf("Disabling controller for Group=%s, Resource=%s due to missing CRD", gvr.GroupVersion(), gvr.Resource))
return false
}
return true
}
48 changes: 0 additions & 48 deletions internal/manager/conditions_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package manager_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -80,50 +79,3 @@ func TestIngressControllerConditions(t *testing.T) {
})
}
}

func TestShouldEnableCRDController(t *testing.T) {
knownGvr := schema.GroupVersionResource{
Group: "group",
Version: "v1",
Resource: "resources",
}
unknownGVR := schema.GroupVersionResource{
Group: "otherGroup",
Version: "v1",
Resource: "resources",
}

restMapper := meta.NewDefaultRESTMapper(nil)
restMapper.Add(schema.GroupVersionKind{
Group: knownGvr.Group,
Version: knownGvr.Version,
Kind: "Resource",
}, meta.RESTScopeRoot)

testCases := []struct {
name string
gvr schema.GroupVersionResource
expectedResult bool
}{
{
name: "registered_resource",
gvr: knownGvr,
expectedResult: true,
},
{
name: "not_registered_resource",
gvr: unknownGVR,
expectedResult: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(
t,
tc.expectedResult,
manager.ShouldEnableCRDController(context.Background(), tc.gvr, restMapper),
)
})
}
}
108 changes: 26 additions & 82 deletions internal/manager/controllerdef.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import (
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/manager/featuregates"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/kubernetes/object/status"
kongv1 "github.com/kong/kubernetes-ingress-controller/v2/pkg/apis/configuration/v1"
kongv1alpha1 "github.com/kong/kubernetes-ingress-controller/v2/pkg/apis/configuration/v1alpha1"
kongv1beta1 "github.com/kong/kubernetes-ingress-controller/v2/pkg/apis/configuration/v1beta1"
)

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -160,14 +157,7 @@ func setupControllers(
// Kong API Controllers
// ---------------------------------------------------------------------------
{
Enabled: c.UDPIngressEnabled && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Group: kongv1beta1.GroupVersion.Group,
Version: kongv1beta1.GroupVersion.Version,
Resource: "udpingresses",
},
restMapper,
),
Enabled: c.UDPIngressEnabled,
Controller: &configuration.KongV1Beta1UDPIngressReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("UDPIngress"),
Expand All @@ -181,14 +171,7 @@ func setupControllers(
},
},
{
Enabled: c.TCPIngressEnabled && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Group: kongv1beta1.GroupVersion.Group,
Version: kongv1beta1.GroupVersion.Version,
Resource: "tcpingresses",
},
restMapper,
),
Enabled: c.TCPIngressEnabled,
Controller: &configuration.KongV1Beta1TCPIngressReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("TCPIngress"),
Expand All @@ -203,14 +186,7 @@ func setupControllers(
},
},
{
Enabled: c.KongIngressEnabled && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Group: kongv1.GroupVersion.Group,
Version: kongv1.GroupVersion.Version,
Resource: "kongingresses",
},
restMapper,
),
Enabled: c.KongIngressEnabled,
Controller: &configuration.KongV1KongIngressReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("KongIngress"),
Expand All @@ -220,14 +196,7 @@ func setupControllers(
},
},
{
Enabled: c.IngressClassParametersEnabled && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Group: kongv1alpha1.GroupVersion.Group,
Version: kongv1alpha1.GroupVersion.Version,
Resource: "ingressclassparameterses",
},
restMapper,
),
Enabled: c.IngressClassParametersEnabled,
Controller: &configuration.KongV1Alpha1IngressClassParametersReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("IngressClassParameters"),
Expand All @@ -237,14 +206,7 @@ func setupControllers(
},
},
{
Enabled: c.KongPluginEnabled && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Group: kongv1.GroupVersion.Group,
Version: kongv1.GroupVersion.Version,
Resource: "kongplugins",
},
restMapper,
),
Enabled: c.KongPluginEnabled,
Controller: &configuration.KongV1KongPluginReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("KongPlugin"),
Expand All @@ -257,14 +219,7 @@ func setupControllers(
},
},
{
Enabled: c.KongConsumerEnabled && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Group: kongv1.GroupVersion.Group,
Version: kongv1.GroupVersion.Version,
Resource: "kongconsumers",
},
restMapper,
),
Enabled: c.KongConsumerEnabled,
Controller: &configuration.KongV1KongConsumerReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("KongConsumer"),
Expand All @@ -278,14 +233,7 @@ func setupControllers(
},
},
{
Enabled: c.KongConsumerEnabled && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Group: kongv1beta1.GroupVersion.Group,
Version: kongv1beta1.GroupVersion.Version,
Resource: "kongconsumergroups",
},
restMapper,
),
Enabled: c.KongConsumerEnabled,
Controller: &configuration.KongV1Beta1KongConsumerGroupReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("KongConsumerGroup"),
Expand All @@ -299,14 +247,7 @@ func setupControllers(
},
},
{
Enabled: c.KongClusterPluginEnabled && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Group: kongv1.GroupVersion.Group,
Version: kongv1.GroupVersion.Version,
Resource: "kongclusterplugins",
},
restMapper,
),
Enabled: c.KongClusterPluginEnabled,
Controller: &configuration.KongV1KongClusterPluginReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("KongClusterPlugin"),
Expand All @@ -327,25 +268,28 @@ func setupControllers(
// knative is a special case because it existed before we added feature gates functionality
// for this controller (only) the existing --enable-controller-knativeingress flag overrides
// any feature gate configuration. See FEATURE_GATES.md for more information.
Enabled: (featureGates[featuregates.KnativeFeature] || c.KnativeIngressEnabled) && ShouldEnableCRDController(ctx,
schema.GroupVersionResource{
Enabled: featureGates[featuregates.KnativeFeature] || c.KnativeIngressEnabled,
Controller: &crds.DynamicCRDController{
Manager: mgr,
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Dynamic/KnativeV1Alpha1/Ingress"),
CacheSyncTimeout: c.CacheSyncTimeout,
RequiredCRDs: []schema.GroupVersionResource{{
Group: knativev1alpha1.SchemeGroupVersion.Group,
Version: knativev1alpha1.SchemeGroupVersion.Version,
Resource: "ingresses",
}},
Controller: &knative.Knativev1alpha1IngressReconciler{
Client: mgr.GetClient(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Ingress").WithName("KnativeV1Alpha1"),
Scheme: mgr.GetScheme(),
DataplaneClient: dataplaneClient,
IngressClassName: c.IngressClassName,
DisableIngressClassLookups: !c.IngressClassNetV1Enabled,
StatusQueue: kubernetesStatusQueue,
DataplaneAddressFinder: dataplaneAddressFinder,
CacheSyncTimeout: c.CacheSyncTimeout,
ReferenceIndexers: referenceIndexers,
},
restMapper,
),
Controller: &knative.Knativev1alpha1IngressReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Ingress").WithName("KnativeV1Alpha1"),
Scheme: mgr.GetScheme(),
DataplaneClient: dataplaneClient,
IngressClassName: c.IngressClassName,
DisableIngressClassLookups: !c.IngressClassNetV1Enabled,
StatusQueue: kubernetesStatusQueue,
DataplaneAddressFinder: dataplaneAddressFinder,
CacheSyncTimeout: c.CacheSyncTimeout,
ReferenceIndexers: referenceIndexers,
},
},
// ---------------------------------------------------------------------------
Expand Down
49 changes: 48 additions & 1 deletion internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"os"
"time"

"github.com/avast/retry-go/v4"
"github.com/blang/semver/v4"
"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -128,7 +130,11 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d

mgr, err := ctrl.NewManager(kubeconfig, controllerOpts)
if err != nil {
return fmt.Errorf("unable to start controller manager: %w", err)
return fmt.Errorf("unable to create controller manager: %w", err)
}

if err := waitForKubernetesAPIReadiness(ctx, setupLog, mgr); err != nil {
return fmt.Errorf("unable to connect to Kubernetes API: %w", err)
}

setupLog.Info("Initializing Dataplane Client")
Expand Down Expand Up @@ -329,6 +335,47 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d
return mgr.Start(ctx)
}

// waitForKubernetesAPIReadiness waits for the Kubernetes API to be ready. It's used as a prerequisite to run any
// controller components (i.e. Manager along with its Runnables).
// It retries with a timeout of 1m and a fixed delay of 1s.
func waitForKubernetesAPIReadiness(ctx context.Context, logger logr.Logger, mgr manager.Manager) error {
const (
timeout = time.Minute
delay = time.Second
)

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

readinessEndpointURL, err := url.JoinPath(mgr.GetConfig().Host, "readyz")
if err != nil {
return fmt.Errorf("failed to build readiness check URL: %w", err)
}

return retry.Do(func() error {
// Call the readiness check of the Kubernetes API server: https://kubernetes.io/docs/reference/using-api/health-checks/.
resp, err := mgr.GetHTTPClient().Get(readinessEndpointURL)
if err != nil {
return fmt.Errorf("failed to connect to %q: %w", readinessEndpointURL, err)
}
defer resp.Body.Close()
// We're waiting for the readiness check to return status 200.
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("readiness check %q returned status %d", readinessEndpointURL, resp.StatusCode)
}
return nil
},
retry.Context(ctx),
retry.Delay(delay),
retry.DelayType(retry.FixedDelay),
retry.Attempts(0),
retry.LastErrorOnly(true),
retry.OnRetry(func(n uint, err error) {
logger.Info("Retrying Kubernetes API readiness check after error", "error", err.Error())
}),
)
}

// setupKonnectNodeAgentWithMgr creates and adds Konnect NodeAgent as the manager's Runnable.
// Returns error if failed to create Konnect NodeAgent.
func setupKonnectNodeAgentWithMgr(
Expand Down
Loading

0 comments on commit 838ddc8

Please sign in to comment.