Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor TransportServer controller #6389

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 0 additions & 219 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,14 +556,6 @@ func (nsi *namespacedInformer) addPolicyHandler(handlers cache.ResourceEventHand
nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
}

func (nsi *namespacedInformer) addTransportServerHandler(handlers cache.ResourceEventHandlerFuncs) {
informer := nsi.confSharedInformerFactory.K8s().V1().TransportServers().Informer()
informer.AddEventHandler(handlers)
nsi.transportServerLister = informer.GetStore()

nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
}

func (lbc *LoadBalancerController) addNamespaceHandler(handlers cache.ResourceEventHandlerFuncs, nsLabel string) {
optionsModifier := func(options *meta_v1.ListOptions) {
options.LabelSelector = nsLabel
Expand Down Expand Up @@ -1238,35 +1230,6 @@ func (lbc *LoadBalancerController) syncPolicy(task task) {
// Note: updating the status of a policy based on a reload is not needed.
}

func (lbc *LoadBalancerController) syncTransportServer(task task) {
key := task.Key
var obj interface{}
var tsExists bool
var err error

ns, _, _ := cache.SplitMetaNamespaceKey(key)
obj, tsExists, err = lbc.getNamespacedInformer(ns).transportServerLister.GetByKey(key)
if err != nil {
lbc.syncQueue.Requeue(task, err)
return
}

var changes []ResourceChange
var problems []ConfigurationProblem

if !tsExists {
glog.V(2).Infof("Deleting TransportServer: %v\n", key)
changes, problems = lbc.configuration.DeleteTransportServer(key)
} else {
glog.V(2).Infof("Adding or Updating TransportServer: %v\n", key)
ts := obj.(*conf_v1.TransportServer)
changes, problems = lbc.configuration.AddOrUpdateTransportServer(ts)
}

lbc.processChanges(changes)
lbc.processProblems(problems)
}

func (lbc *LoadBalancerController) syncVirtualServer(task task) {
key := task.Key
var obj interface{}
Expand Down Expand Up @@ -1437,45 +1400,6 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) {
}
}

func (lbc *LoadBalancerController) updateTransportServerStatusAndEventsOnDelete(tsConfig *TransportServerConfiguration, changeError string, deleteErr error) {
eventType := api_v1.EventTypeWarning
eventTitle := "Rejected"
eventWarningMessage := ""
var state string

// TransportServer either became invalid or lost its host or listener
if changeError != "" {
state = conf_v1.StateInvalid
eventWarningMessage = fmt.Sprintf("with error: %s", changeError)
} else if len(tsConfig.Warnings) > 0 {
state = conf_v1.StateWarning
eventWarningMessage = fmt.Sprintf("with warning(s): %s", formatWarningMessages(tsConfig.Warnings))
}

// we don't need to report anything if eventWarningMessage is empty
// in that case, the resource was deleted because its class became incorrect
// (some other Ingress Controller will handle it)

if eventWarningMessage != "" {
if deleteErr != nil {
eventType = api_v1.EventTypeWarning
eventTitle = "RejectedWithError"
eventWarningMessage = fmt.Sprintf("%s; but was not applied: %v", eventWarningMessage, deleteErr)
state = conf_v1.StateInvalid
}

msg := fmt.Sprintf("TransportServer %s was rejected %s", getResourceKey(&tsConfig.TransportServer.ObjectMeta), eventWarningMessage)
lbc.recorder.Eventf(tsConfig.TransportServer, eventType, eventTitle, msg)

if lbc.reportCustomResourceStatusEnabled() {
err := lbc.statusUpdater.UpdateTransportServerStatus(tsConfig.TransportServer, state, eventTitle, msg)
if err != nil {
glog.Errorf("Error when updating the status for TransportServer %v/%v: %v", tsConfig.TransportServer.Namespace, tsConfig.TransportServer.Name, err)
}
}
}
}

// UpdateVirtualServerStatusAndEventsOnDelete updates the virtual server status and events
func (lbc *LoadBalancerController) UpdateVirtualServerStatusAndEventsOnDelete(vsConfig *VirtualServerConfiguration, changeError string, deleteErr error) {
eventType := api_v1.EventTypeWarning
Expand Down Expand Up @@ -1688,44 +1612,6 @@ func (lbc *LoadBalancerController) updateRegularIngressStatusAndEvents(ingConfig
}
}

func (lbc *LoadBalancerController) updateTransportServerStatusAndEvents(tsConfig *TransportServerConfiguration, warnings configs.Warnings, operationErr error) {
eventTitle := "AddedOrUpdated"
eventType := api_v1.EventTypeNormal
eventWarningMessage := ""
state := conf_v1.StateValid

if len(tsConfig.Warnings) > 0 {
eventType = api_v1.EventTypeWarning
eventTitle = "AddedOrUpdatedWithWarning"
eventWarningMessage = fmt.Sprintf("with warning(s): %s", formatWarningMessages(tsConfig.Warnings))
state = conf_v1.StateWarning
}

if messages, ok := warnings[tsConfig.TransportServer]; ok {
eventType = api_v1.EventTypeWarning
eventTitle = "AddedOrUpdatedWithWarning"
eventWarningMessage = fmt.Sprintf("with warning(s): %s", formatWarningMessages(messages))
state = conf_v1.StateWarning
}

if operationErr != nil {
eventType = api_v1.EventTypeWarning
eventTitle = "AddedOrUpdatedWithError"
eventWarningMessage = fmt.Sprintf("%s; but was not applied: %v", eventWarningMessage, operationErr)
state = conf_v1.StateInvalid
}

msg := fmt.Sprintf("Configuration for %v was added or updated %s", getResourceKey(&tsConfig.TransportServer.ObjectMeta), eventWarningMessage)
lbc.recorder.Eventf(tsConfig.TransportServer, eventType, eventTitle, msg)

if lbc.reportCustomResourceStatusEnabled() {
err := lbc.statusUpdater.UpdateTransportServerStatus(tsConfig.TransportServer, state, eventTitle, msg)
if err != nil {
glog.Errorf("Error when updating the status for TransportServer %v/%v: %v", tsConfig.TransportServer.Namespace, tsConfig.TransportServer.Name, err)
}
}
}

func (lbc *LoadBalancerController) updateVirtualServerStatusAndEvents(vsConfig *VirtualServerConfiguration, warnings configs.Warnings, operationErr error) {
eventType := api_v1.EventTypeNormal
eventTitle := "AddedOrUpdated"
Expand Down Expand Up @@ -1870,15 +1756,6 @@ func (lbc *LoadBalancerController) updateVirtualServerMetrics() {
lbc.metricsCollector.SetVirtualServerRoutes(vsrCount)
}

func (lbc *LoadBalancerController) updateTransportServerMetrics() {
if !lbc.areCustomResourcesEnabled {
return
}

metrics := lbc.configuration.GetTransportServerMetrics()
lbc.metricsCollector.SetTransportServers(metrics.TotalTLSPassthrough, metrics.TotalTCP, metrics.TotalUDP)
}

func (lbc *LoadBalancerController) syncService(task task) {
key := task.Key

Expand Down Expand Up @@ -2214,45 +2091,6 @@ func (lbc *LoadBalancerController) updateVirtualServerRoutesStatusFromEvents() e
return nil
}

func (lbc *LoadBalancerController) updateTransportServersStatusFromEvents() error {
var allErrs []error
for _, nsi := range lbc.namespacedInformers {
for _, obj := range nsi.transportServerLister.List() {
ts := obj.(*conf_v1.TransportServer)

events, err := lbc.client.CoreV1().Events(ts.Namespace).List(context.TODO(),
meta_v1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%v,involvedObject.uid=%v", ts.Name, ts.UID)})
if err != nil {
allErrs = append(allErrs, fmt.Errorf("error trying to get events for TransportServer %v/%v: %w", ts.Namespace, ts.Name, err))
break
}

if len(events.Items) == 0 {
continue
}

var timestamp time.Time
var latestEvent api_v1.Event
for _, event := range events.Items {
if event.CreationTimestamp.After(timestamp) {
latestEvent = event
}
}

err = lbc.statusUpdater.UpdateTransportServerStatus(ts, getStatusFromEventTitle(latestEvent.Reason), latestEvent.Reason, latestEvent.Message)
if err != nil {
allErrs = append(allErrs, err)
}
}
}

if len(allErrs) > 0 {
return fmt.Errorf("not all TransportServers statuses were updated: %v", allErrs)
}

return nil
}

func getIPAddressesFromEndpoints(endpoints []podEndpoint) []string {
var endps []string
for _, ep := range endpoints {
Expand Down Expand Up @@ -3028,63 +2866,6 @@ func (lbc *LoadBalancerController) getTransportServerBackupEndpointsAndKey(trans
return bendps, backupEndpointsKey
}

func (lbc *LoadBalancerController) createTransportServerEx(transportServer *conf_v1.TransportServer, listenerPort int) *configs.TransportServerEx {
endpoints := make(map[string][]string)
externalNameSvcs := make(map[string]bool)
podsByIP := make(map[string]string)
disableIPV6 := lbc.configuration.isIPV6Disabled

for _, u := range transportServer.Spec.Upstreams {
podEndps, external, err := lbc.getEndpointsForUpstream(transportServer.Namespace, u.Service, uint16(u.Port))
if err == nil && external && lbc.isNginxPlus {
externalNameSvcs[configs.GenerateExternalNameSvcKey(transportServer.Namespace, u.Service)] = true
}
if err != nil {
glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err)
}

// subselector is not supported yet in TransportServer upstreams. That's why we pass "nil" here
endpointsKey := configs.GenerateEndpointsKey(transportServer.Namespace, u.Service, nil, uint16(u.Port))

endps := getIPAddressesFromEndpoints(podEndps)
endpoints[endpointsKey] = endps

if lbc.isNginxPlus && lbc.isPrometheusEnabled {
for _, endpoint := range podEndps {
podsByIP[endpoint.Address] = endpoint.PodName
}
}

if u.Backup != "" && u.BackupPort != nil {
bendps, backupEndpointsKey := lbc.getTransportServerBackupEndpointsAndKey(transportServer, u, externalNameSvcs)
endpoints[backupEndpointsKey] = bendps
}
}

scrtRefs := make(map[string]*secrets.SecretReference)

if transportServer.Spec.TLS != nil && transportServer.Spec.TLS.Secret != "" {
scrtKey := transportServer.Namespace + "/" + transportServer.Spec.TLS.Secret

scrtRef := lbc.secretStore.GetSecret(scrtKey)
if scrtRef.Error != nil {
glog.Warningf("Error trying to get the secret %v for TransportServer %v: %v", scrtKey, transportServer.Name, scrtRef.Error)
}

scrtRefs[scrtKey] = scrtRef
}

return &configs.TransportServerEx{
ListenerPort: listenerPort,
TransportServer: transportServer,
Endpoints: endpoints,
PodsByIP: podsByIP,
ExternalNameSvcs: externalNameSvcs,
DisableIPV6: disableIPV6,
SecretRefs: scrtRefs,
}
}

func (lbc *LoadBalancerController) getEndpointsForUpstream(namespace string, upstreamService string, upstreamPort uint16) (endps []podEndpoint, isExternal bool, err error) {
svc, err := lbc.getServiceForUpstream(namespace, upstreamService, upstreamPort)
if err != nil {
Expand Down
34 changes: 0 additions & 34 deletions internal/k8s/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,40 +326,6 @@ func createVirtualServerRouteHandlers(lbc *LoadBalancerController) cache.Resourc
}
}

func createTransportServerHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ts := obj.(*conf_v1.TransportServer)
glog.V(3).Infof("Adding TransportServer: %v", ts.Name)
lbc.AddSyncQueue(ts)
},
DeleteFunc: func(obj interface{}) {
ts, isTs := obj.(*conf_v1.TransportServer)
if !isTs {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(3).Infof("Error received unexpected object: %v", obj)
return
}
ts, ok = deletedState.Obj.(*conf_v1.TransportServer)
if !ok {
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-TransportServer object: %v", deletedState.Obj)
return
}
}
glog.V(3).Infof("Removing TransportServer: %v", ts.Name)
lbc.AddSyncQueue(ts)
},
UpdateFunc: func(old, cur interface{}) {
curTs := cur.(*conf_v1.TransportServer)
if !reflect.DeepEqual(old, cur) {
glog.V(3).Infof("TransportServer %v changed, syncing", curTs.Name)
lbc.AddSyncQueue(curTs)
}
},
}
}

func createPolicyHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down
Loading
Loading