Skip to content

Commit

Permalink
Add support for updating server parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
Vighneswar Rao Bojja authored and Raul Marrero committed Nov 6, 2019
1 parent 7ab5c57 commit c1b1ca1
Show file tree
Hide file tree
Showing 3 changed files with 718 additions and 78 deletions.
243 changes: 212 additions & 31 deletions client/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,30 @@ import (
"io"
"io/ioutil"
"net/http"
"reflect"
"strings"
)

// APIVersion is a version of NGINX Plus API.
const APIVersion = 5
const (
// APIVersion is a version of NGINX Plus API.
APIVersion = 5

const pathNotFoundCode = "PathNotFound"
pathNotFoundCode = "PathNotFound"
streamContext = true
httpContext = false
defaultServerPort = "80"
)

const streamContext = true
const httpContext = false
// Default values for servers in Upstreams.
var (
defaultMaxConns = 0
defaultMaxFails = 1
defaultFailTimeout = "10s"
defaultSlowStart = "0s"
defaultBackup = false
defaultDown = false
defaultWeight = 1
)

// NginxClient lets you access NGINX Plus API.
type NginxClient struct {
Expand All @@ -29,13 +44,13 @@ type versions []int
type UpstreamServer struct {
ID int `json:"id,omitempty"`
Server string `json:"server"`
MaxConns int `json:"max_conns"`
MaxConns *int `json:"max_conns,omitempty"`
MaxFails *int `json:"max_fails,omitempty"`
FailTimeout string `json:"fail_timeout,omitempty"`
SlowStart string `json:"slow_start,omitempty"`
Route string `json:"route"`
Backup bool `json:"backup"`
Down bool `json:"down"`
Route string `json:"route,omitempty"`
Backup *bool `json:"backup,omitempty"`
Down *bool `json:"down,omitempty"`
Drain bool `json:"drain,omitempty"`
Weight *int `json:"weight,omitempty"`
Service string `json:"service,omitempty"`
Expand All @@ -45,12 +60,12 @@ type UpstreamServer struct {
type StreamUpstreamServer struct {
ID int `json:"id,omitempty"`
Server string `json:"server"`
MaxConns int `json:"max_conns"`
MaxConns *int `json:"max_conns,omitempty"`
MaxFails *int `json:"max_fails,omitempty"`
FailTimeout string `json:"fail_timeout,omitempty"`
SlowStart string `json:"slow_start,omitempty"`
Backup bool `json:"backup"`
Down bool `json:"down"`
Backup *bool `json:"backup,omitempty"`
Down *bool `json:"down,omitempty"`
Weight *int `json:"weight,omitempty"`
Service string `json:"service,omitempty"`
}
Expand Down Expand Up @@ -469,32 +484,96 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro
// UpdateHTTPServers updates the servers of the upstream.
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) ([]UpstreamServer, []UpstreamServer, error) {
// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated.
func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) {
serversInNginx, err := client.GetHTTPServers(upstream)
if err != nil {
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
}

// We assume port 80 if no port is set for servers.
var formattedServers []UpstreamServer
for _, server := range servers {
server.Server = addPortToServer(server.Server)
formattedServers = append(formattedServers, server)
}

toAdd, toDelete := determineUpdates(servers, serversInNginx)
toAdd, toDelete, toUpdate := determineUpdates(formattedServers, serversInNginx)

for _, server := range toAdd {
err := client.AddHTTPServer(upstream, server)
if err != nil {
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
}
}

for _, server := range toDelete {
err := client.DeleteHTTPServer(upstream, server.Server)
if err != nil {
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
}
}

return toAdd, toDelete, nil
for _, server := range toUpdate {
err := client.UpdateHTTPServer(upstream, server)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
}
}

return toAdd, toDelete, toUpdate, nil
}

func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer) {
// haveSameParameters checks if a given server has the same parameters as a server already present in NGINX. Order matters
func haveSameParameters(newServer UpstreamServer, serverNGX UpstreamServer) bool {
newServer.ID = serverNGX.ID

if serverNGX.MaxConns != nil && newServer.MaxConns == nil {
newServer.MaxConns = &defaultMaxConns
}

if serverNGX.MaxFails != nil && newServer.MaxFails == nil {
newServer.MaxFails = &defaultMaxFails
}

if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" {
newServer.FailTimeout = defaultFailTimeout
}

if serverNGX.SlowStart != "" && newServer.SlowStart == "" {
newServer.SlowStart = defaultSlowStart
}

if serverNGX.Backup != nil && newServer.Backup == nil {
newServer.Backup = &defaultBackup
}

if serverNGX.Down != nil && newServer.Down == nil {
newServer.Down = &defaultDown
}

if serverNGX.Weight != nil && newServer.Weight == nil {
newServer.Weight = &defaultWeight
}

return reflect.DeepEqual(newServer, serverNGX)
}

func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer, toUpdate []UpstreamServer) {
for _, server := range updatedServers {
updateFound := false
for _, serverNGX := range nginxServers {
if server.Server == serverNGX.Server && !haveSameParameters(server, serverNGX) {
server.ID = serverNGX.ID
updateFound = true
break
}
}
if updateFound {
toUpdate = append(toUpdate, server)
}
}

for _, server := range updatedServers {
found := false
for _, serverNGX := range nginxServers {
Expand Down Expand Up @@ -608,7 +687,7 @@ func (client *NginxClient) delete(path string, expectedStatusCode int) error {
return nil
}

func (client *NginxClient) patch(path string, input interface{}) error {
func (client *NginxClient) patch(path string, input interface{}, expectedStatusCode int) error {
path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, APIVersion, path)

jsonInput, err := json.Marshal(input)
Expand All @@ -627,10 +706,10 @@ func (client *NginxClient) patch(path string, input interface{}) error {
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusNoContent {
if resp.StatusCode != expectedStatusCode {
return createResponseMismatchError(resp.Body).Wrap(fmt.Sprintf(
"failed to complete patch request: expected %v response, got %v",
http.StatusNoContent, resp.StatusCode))
expectedStatusCode, resp.StatusCode))
}
return nil
}
Expand Down Expand Up @@ -692,29 +771,43 @@ func (client *NginxClient) DeleteStreamServer(upstream string, server string) er
// UpdateStreamServers updates the servers of the upstream.
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) ([]StreamUpstreamServer, []StreamUpstreamServer, error) {
// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated.
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) {
serversInNginx, err := client.GetStreamServers(upstream)
if err != nil {
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
}

toAdd, toDelete := determineStreamUpdates(servers, serversInNginx)
var formattedServers []StreamUpstreamServer
for _, server := range servers {
server.Server = addPortToServer(server.Server)
formattedServers = append(formattedServers, server)
}

toAdd, toDelete, toUpdate := determineStreamUpdates(formattedServers, serversInNginx)

for _, server := range toAdd {
err := client.AddStreamServer(upstream, server)
if err != nil {
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
}
}

for _, server := range toDelete {
err := client.DeleteStreamServer(upstream, server.Server)
if err != nil {
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
}
}

for _, server := range toUpdate {
err := client.UpdateStreamServer(upstream, server)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
}
}

return toAdd, toDelete, nil
return toAdd, toDelete, toUpdate, nil
}

func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int, error) {
Expand All @@ -732,7 +825,55 @@ func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (in
return -1, nil
}

func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer) {
// haveSameParametersForStream checks if a given server has the same parameters as a server already present in NGINX. Order matters
func haveSameParametersForStream(newServer StreamUpstreamServer, serverNGX StreamUpstreamServer) bool {
newServer.ID = serverNGX.ID
if serverNGX.MaxConns != nil && newServer.MaxConns == nil {
newServer.MaxConns = &defaultMaxConns
}

if serverNGX.MaxFails != nil && newServer.MaxFails == nil {
newServer.MaxFails = &defaultMaxFails
}

if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" {
newServer.FailTimeout = defaultFailTimeout
}

if serverNGX.SlowStart != "" && newServer.SlowStart == "" {
newServer.SlowStart = defaultSlowStart
}

if serverNGX.Backup != nil && newServer.Backup == nil {
newServer.Backup = &defaultBackup
}

if serverNGX.Down != nil && newServer.Down == nil {
newServer.Down = &defaultDown
}

if serverNGX.Weight != nil && newServer.Weight == nil {
newServer.Weight = &defaultWeight
}

return reflect.DeepEqual(newServer, serverNGX)
}

func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer, toUpdate []StreamUpstreamServer) {
for _, server := range updatedServers {
updateFound := false
for _, serverNGX := range nginxServers {
if server.Server == serverNGX.Server && !haveSameParametersForStream(server, serverNGX) {
server.ID = serverNGX.ID
updateFound = true
break
}
}
if updateFound {
toUpdate = append(toUpdate, server)
}
}

for _, server := range updatedServers {
found := false
for _, serverNGX := range nginxServers {
Expand Down Expand Up @@ -1059,7 +1200,7 @@ func (client *NginxClient) modifyKeyValPair(zone string, key string, val string,

path := fmt.Sprintf("%v/keyvals/%v", base, zone)
input := KeyValPairs{key: val}
err := client.patch(path, &input)
err := client.patch(path, &input, http.StatusNoContent)
if err != nil {
return fmt.Errorf("failed to update key value pair for %v/%v zone: %v", base, zone, err)
}
Expand Down Expand Up @@ -1092,7 +1233,7 @@ func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bo
keyval[key] = nil

path := fmt.Sprintf("%v/keyvals/%v", base, zone)
err := client.patch(path, &keyval)
err := client.patch(path, &keyval, http.StatusNoContent)
if err != nil {
return fmt.Errorf("failed to remove key values pair for %v/%v zone: %v", base, zone, err)
}
Expand Down Expand Up @@ -1125,3 +1266,43 @@ func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error {
}
return nil
}

// UpdateHTTPServer updates the server of the upstream.
func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServer) error {
path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, server.ID)
server.ID = 0
err := client.patch(path, &server, http.StatusOK)
if err != nil {
return fmt.Errorf("failed to update %v server to %v upstream: %v", server.Server, upstream, err)
}

return nil
}

// UpdateStreamServer updates the stream server of the upstream.
func (client *NginxClient) UpdateStreamServer(upstream string, server StreamUpstreamServer) error {
path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, server.ID)
server.ID = 0
err := client.patch(path, &server, http.StatusOK)
if err != nil {
return fmt.Errorf("failed to update %v stream server to %v upstream: %v", server.Server, upstream, err)
}

return nil
}

func addPortToServer(server string) string {
if len(strings.Split(server, ":")) == 2 {
return server
}

if len(strings.Split(server, "]:")) == 2 {
return server
}

if strings.HasPrefix(server, "unix:") {
return server
}

return fmt.Sprintf("%v:%v", server, defaultServerPort)
}
Loading

0 comments on commit c1b1ca1

Please sign in to comment.