From c1b1ca1515f06a413eaf906c6e47e0a09d1c4bf7 Mon Sep 17 00:00:00 2001 From: Vighneswar Rao Bojja Date: Wed, 9 Oct 2019 12:10:31 +0530 Subject: [PATCH] Add support for updating server parameters --- client/nginx.go | 243 ++++++++++++++++++++++++++++++----- client/nginx_test.go | 295 +++++++++++++++++++++++++++++++++++++++++-- tests/client_test.go | 258 +++++++++++++++++++++++++++++++------ 3 files changed, 718 insertions(+), 78 deletions(-) diff --git a/client/nginx.go b/client/nginx.go index 42bb92f8..839df09f 100644 --- a/client/nginx.go +++ b/client/nginx.go @@ -7,15 +7,30 @@ import ( "io" "io/ioutil" "net/http" + "reflect" + "strings" ) -// APIVersion is a version of NGINX Plus API. -const APIVersion = 5 +const ( + // APIVersion is a version of NGINX Plus API. + APIVersion = 5 -const pathNotFoundCode = "PathNotFound" + pathNotFoundCode = "PathNotFound" + streamContext = true + httpContext = false + defaultServerPort = "80" +) -const streamContext = true -const httpContext = false +// Default values for servers in Upstreams. +var ( + defaultMaxConns = 0 + defaultMaxFails = 1 + defaultFailTimeout = "10s" + defaultSlowStart = "0s" + defaultBackup = false + defaultDown = false + defaultWeight = 1 +) // NginxClient lets you access NGINX Plus API. type NginxClient struct { @@ -29,13 +44,13 @@ type versions []int type UpstreamServer struct { ID int `json:"id,omitempty"` Server string `json:"server"` - MaxConns int `json:"max_conns"` + MaxConns *int `json:"max_conns,omitempty"` MaxFails *int `json:"max_fails,omitempty"` FailTimeout string `json:"fail_timeout,omitempty"` SlowStart string `json:"slow_start,omitempty"` - Route string `json:"route"` - Backup bool `json:"backup"` - Down bool `json:"down"` + Route string `json:"route,omitempty"` + Backup *bool `json:"backup,omitempty"` + Down *bool `json:"down,omitempty"` Drain bool `json:"drain,omitempty"` Weight *int `json:"weight,omitempty"` Service string `json:"service,omitempty"` @@ -45,12 +60,12 @@ type UpstreamServer struct { type StreamUpstreamServer struct { ID int `json:"id,omitempty"` Server string `json:"server"` - MaxConns int `json:"max_conns"` + MaxConns *int `json:"max_conns,omitempty"` MaxFails *int `json:"max_fails,omitempty"` FailTimeout string `json:"fail_timeout,omitempty"` SlowStart string `json:"slow_start,omitempty"` - Backup bool `json:"backup"` - Down bool `json:"down"` + Backup *bool `json:"backup,omitempty"` + Down *bool `json:"down,omitempty"` Weight *int `json:"weight,omitempty"` Service string `json:"service,omitempty"` } @@ -469,32 +484,96 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro // UpdateHTTPServers updates the servers of the upstream. // Servers that are in the slice, but don't exist in NGINX will be added to NGINX. // Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. -func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) ([]UpstreamServer, []UpstreamServer, error) { +// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated. +func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) { serversInNginx, err := client.GetHTTPServers(upstream) if err != nil { - return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) + return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) + } + + // We assume port 80 if no port is set for servers. + var formattedServers []UpstreamServer + for _, server := range servers { + server.Server = addPortToServer(server.Server) + formattedServers = append(formattedServers, server) } - toAdd, toDelete := determineUpdates(servers, serversInNginx) + toAdd, toDelete, toUpdate := determineUpdates(formattedServers, serversInNginx) for _, server := range toAdd { err := client.AddHTTPServer(upstream, server) if err != nil { - return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) + return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) } } for _, server := range toDelete { err := client.DeleteHTTPServer(upstream, server.Server) if err != nil { - return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) + return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) } } - return toAdd, toDelete, nil + for _, server := range toUpdate { + err := client.UpdateHTTPServer(upstream, server) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) + } + } + + return toAdd, toDelete, toUpdate, nil } -func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer) { +// haveSameParameters checks if a given server has the same parameters as a server already present in NGINX. Order matters +func haveSameParameters(newServer UpstreamServer, serverNGX UpstreamServer) bool { + newServer.ID = serverNGX.ID + + if serverNGX.MaxConns != nil && newServer.MaxConns == nil { + newServer.MaxConns = &defaultMaxConns + } + + if serverNGX.MaxFails != nil && newServer.MaxFails == nil { + newServer.MaxFails = &defaultMaxFails + } + + if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" { + newServer.FailTimeout = defaultFailTimeout + } + + if serverNGX.SlowStart != "" && newServer.SlowStart == "" { + newServer.SlowStart = defaultSlowStart + } + + if serverNGX.Backup != nil && newServer.Backup == nil { + newServer.Backup = &defaultBackup + } + + if serverNGX.Down != nil && newServer.Down == nil { + newServer.Down = &defaultDown + } + + if serverNGX.Weight != nil && newServer.Weight == nil { + newServer.Weight = &defaultWeight + } + + return reflect.DeepEqual(newServer, serverNGX) +} + +func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer, toUpdate []UpstreamServer) { + for _, server := range updatedServers { + updateFound := false + for _, serverNGX := range nginxServers { + if server.Server == serverNGX.Server && !haveSameParameters(server, serverNGX) { + server.ID = serverNGX.ID + updateFound = true + break + } + } + if updateFound { + toUpdate = append(toUpdate, server) + } + } + for _, server := range updatedServers { found := false for _, serverNGX := range nginxServers { @@ -608,7 +687,7 @@ func (client *NginxClient) delete(path string, expectedStatusCode int) error { return nil } -func (client *NginxClient) patch(path string, input interface{}) error { +func (client *NginxClient) patch(path string, input interface{}, expectedStatusCode int) error { path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, APIVersion, path) jsonInput, err := json.Marshal(input) @@ -627,10 +706,10 @@ func (client *NginxClient) patch(path string, input interface{}) error { } defer resp.Body.Close() - if resp.StatusCode != http.StatusNoContent { + if resp.StatusCode != expectedStatusCode { return createResponseMismatchError(resp.Body).Wrap(fmt.Sprintf( "failed to complete patch request: expected %v response, got %v", - http.StatusNoContent, resp.StatusCode)) + expectedStatusCode, resp.StatusCode)) } return nil } @@ -692,29 +771,43 @@ func (client *NginxClient) DeleteStreamServer(upstream string, server string) er // UpdateStreamServers updates the servers of the upstream. // Servers that are in the slice, but don't exist in NGINX will be added to NGINX. // Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. -func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) ([]StreamUpstreamServer, []StreamUpstreamServer, error) { +// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated. +func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) { serversInNginx, err := client.GetStreamServers(upstream) if err != nil { - return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) + return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) } - toAdd, toDelete := determineStreamUpdates(servers, serversInNginx) + var formattedServers []StreamUpstreamServer + for _, server := range servers { + server.Server = addPortToServer(server.Server) + formattedServers = append(formattedServers, server) + } + + toAdd, toDelete, toUpdate := determineStreamUpdates(formattedServers, serversInNginx) for _, server := range toAdd { err := client.AddStreamServer(upstream, server) if err != nil { - return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) + return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) } } for _, server := range toDelete { err := client.DeleteStreamServer(upstream, server.Server) if err != nil { - return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) + return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) + } + } + + for _, server := range toUpdate { + err := client.UpdateStreamServer(upstream, server) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) } } - return toAdd, toDelete, nil + return toAdd, toDelete, toUpdate, nil } func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int, error) { @@ -732,7 +825,55 @@ func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (in return -1, nil } -func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer) { +// haveSameParametersForStream checks if a given server has the same parameters as a server already present in NGINX. Order matters +func haveSameParametersForStream(newServer StreamUpstreamServer, serverNGX StreamUpstreamServer) bool { + newServer.ID = serverNGX.ID + if serverNGX.MaxConns != nil && newServer.MaxConns == nil { + newServer.MaxConns = &defaultMaxConns + } + + if serverNGX.MaxFails != nil && newServer.MaxFails == nil { + newServer.MaxFails = &defaultMaxFails + } + + if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" { + newServer.FailTimeout = defaultFailTimeout + } + + if serverNGX.SlowStart != "" && newServer.SlowStart == "" { + newServer.SlowStart = defaultSlowStart + } + + if serverNGX.Backup != nil && newServer.Backup == nil { + newServer.Backup = &defaultBackup + } + + if serverNGX.Down != nil && newServer.Down == nil { + newServer.Down = &defaultDown + } + + if serverNGX.Weight != nil && newServer.Weight == nil { + newServer.Weight = &defaultWeight + } + + return reflect.DeepEqual(newServer, serverNGX) +} + +func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer, toUpdate []StreamUpstreamServer) { + for _, server := range updatedServers { + updateFound := false + for _, serverNGX := range nginxServers { + if server.Server == serverNGX.Server && !haveSameParametersForStream(server, serverNGX) { + server.ID = serverNGX.ID + updateFound = true + break + } + } + if updateFound { + toUpdate = append(toUpdate, server) + } + } + for _, server := range updatedServers { found := false for _, serverNGX := range nginxServers { @@ -1059,7 +1200,7 @@ func (client *NginxClient) modifyKeyValPair(zone string, key string, val string, path := fmt.Sprintf("%v/keyvals/%v", base, zone) input := KeyValPairs{key: val} - err := client.patch(path, &input) + err := client.patch(path, &input, http.StatusNoContent) if err != nil { return fmt.Errorf("failed to update key value pair for %v/%v zone: %v", base, zone, err) } @@ -1092,7 +1233,7 @@ func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bo keyval[key] = nil path := fmt.Sprintf("%v/keyvals/%v", base, zone) - err := client.patch(path, &keyval) + err := client.patch(path, &keyval, http.StatusNoContent) if err != nil { return fmt.Errorf("failed to remove key values pair for %v/%v zone: %v", base, zone, err) } @@ -1125,3 +1266,43 @@ func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error { } return nil } + +// UpdateHTTPServer updates the server of the upstream. +func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServer) error { + path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, server.ID) + server.ID = 0 + err := client.patch(path, &server, http.StatusOK) + if err != nil { + return fmt.Errorf("failed to update %v server to %v upstream: %v", server.Server, upstream, err) + } + + return nil +} + +// UpdateStreamServer updates the stream server of the upstream. +func (client *NginxClient) UpdateStreamServer(upstream string, server StreamUpstreamServer) error { + path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, server.ID) + server.ID = 0 + err := client.patch(path, &server, http.StatusOK) + if err != nil { + return fmt.Errorf("failed to update %v stream server to %v upstream: %v", server.Server, upstream, err) + } + + return nil +} + +func addPortToServer(server string) string { + if len(strings.Split(server, ":")) == 2 { + return server + } + + if len(strings.Split(server, "]:")) == 2 { + return server + } + + if strings.HasPrefix(server, "unix:") { + return server + } + + return fmt.Sprintf("%v:%v", server, defaultServerPort) +} diff --git a/client/nginx_test.go b/client/nginx_test.go index 4c9e156f..e9e3db41 100644 --- a/client/nginx_test.go +++ b/client/nginx_test.go @@ -6,11 +6,13 @@ import ( ) func TestDetermineUpdates(t *testing.T) { + maxConns := 1 var tests = []struct { updated []UpstreamServer nginx []UpstreamServer expectedToAdd []UpstreamServer expectedToDelete []UpstreamServer + expectedToUpdate []UpstreamServer }{{ updated: []UpstreamServer{ { @@ -95,35 +97,65 @@ func TestDetermineUpdates(t *testing.T) { }}, nginx: []UpstreamServer{ { - ID: 1, Server: "10.0.0.1:80", }, { - ID: 2, Server: "10.0.0.2:80", }, { - ID: 3, Server: "10.0.0.3:80", }, }}, { // empty values - }} + }, + { + updated: []UpstreamServer{ + { + Server: "10.0.0.1:80", + MaxConns: &maxConns, + }, + }, + nginx: []UpstreamServer{ + { + ID: 1, + Server: "10.0.0.1:80", + }, + { + ID: 2, + Server: "10.0.0.2:80", + }, + }, + expectedToDelete: []UpstreamServer{ + { + ID: 2, + Server: "10.0.0.2:80", + }, + }, + expectedToUpdate: []UpstreamServer{ + { + ID: 1, + Server: "10.0.0.1:80", + MaxConns: &maxConns, + }, + }}, + } for _, test := range tests { - toAdd, toDelete := determineUpdates(test.updated, test.nginx) - if !reflect.DeepEqual(toAdd, test.expectedToAdd) || !reflect.DeepEqual(toDelete, test.expectedToDelete) { - t.Errorf("determiteUpdates(%v, %v) = (%v, %v)", test.updated, test.nginx, toAdd, toDelete) + toAdd, toDelete, toUpdate := determineUpdates(test.updated, test.nginx) + if !reflect.DeepEqual(toAdd, test.expectedToAdd) || !reflect.DeepEqual(toDelete, test.expectedToDelete) || !reflect.DeepEqual(toUpdate, test.expectedToUpdate) { + t.Errorf("determineUpdates(%v, %v) = (%v, %v, %v)", test.updated, test.nginx, toAdd, toDelete, toUpdate) } } } func TestStreamDetermineUpdates(t *testing.T) { + maxConns := 1 var tests = []struct { updated []StreamUpstreamServer nginx []StreamUpstreamServer expectedToAdd []StreamUpstreamServer expectedToDelete []StreamUpstreamServer + expectedToUpdate []StreamUpstreamServer }{{ updated: []StreamUpstreamServer{ { @@ -221,12 +253,253 @@ func TestStreamDetermineUpdates(t *testing.T) { }, }}, { // empty values - }} + }, + { + updated: []StreamUpstreamServer{ + { + Server: "10.0.0.1:80", + MaxConns: &maxConns, + }, + }, + nginx: []StreamUpstreamServer{ + { + ID: 1, + Server: "10.0.0.1:80", + }, + { + ID: 2, + Server: "10.0.0.2:80", + }, + }, + expectedToDelete: []StreamUpstreamServer{ + { + ID: 2, + Server: "10.0.0.2:80", + }, + }, + expectedToUpdate: []StreamUpstreamServer{ + { + ID: 1, + Server: "10.0.0.1:80", + MaxConns: &maxConns, + }, + }}, + } + + for _, test := range tests { + toAdd, toDelete, toUpdate := determineStreamUpdates(test.updated, test.nginx) + if !reflect.DeepEqual(toAdd, test.expectedToAdd) || !reflect.DeepEqual(toDelete, test.expectedToDelete) || !reflect.DeepEqual(toUpdate, test.expectedToUpdate) { + t.Errorf("determiteUpdates(%v, %v) = (%v, %v, %v)", test.updated, test.nginx, toAdd, toDelete, toUpdate) + } + } +} + +func TestAddPortToServer(t *testing.T) { + // More info about addresses http://nginx.org/en/docs/http/ngx_http_upstream_module.html#server + var tests = []struct { + address string + expected string + msg string + }{ + { + address: "example.com:8080", + expected: "example.com:8080", + msg: "host and port", + }, + { + address: "127.0.0.1:8080", + expected: "127.0.0.1:8080", + msg: "ipv4 and port", + }, + { + address: "[::]:8080", + expected: "[::]:8080", + msg: "ipv6 and port", + }, + { + address: "unix:/path/to/socket", + expected: "unix:/path/to/socket", + msg: "unix socket", + }, + { + address: "example.com", + expected: "example.com:80", + msg: "host without port", + }, + { + address: "127.0.0.1", + expected: "127.0.0.1:80", + msg: "ipv4 without port", + }, + { + address: "[::]", + expected: "[::]:80", + msg: "ipv6 without port", + }, + } + + for _, test := range tests { + result := addPortToServer(test.address) + if result != test.expected { + t.Errorf("addPortToServer(%v) returned %v but expected %v for %v", test.address, result, test.expected, test.msg) + } + } +} + +func TestHaveSameParameters(t *testing.T) { + tests := []struct { + server UpstreamServer + serverNGX UpstreamServer + expected bool + }{ + { + server: UpstreamServer{}, + serverNGX: UpstreamServer{}, + expected: true, + }, + { + server: UpstreamServer{ID: 2}, + serverNGX: UpstreamServer{ID: 3}, + expected: true, + }, + { + server: UpstreamServer{}, + serverNGX: UpstreamServer{ + MaxConns: &defaultMaxConns, + MaxFails: &defaultMaxFails, + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Backup: &defaultBackup, + Weight: &defaultWeight, + Down: &defaultDown, + }, + expected: true, + }, + { + server: UpstreamServer{ + ID: 1, + Server: "127.0.0.1", + MaxConns: &defaultMaxConns, + MaxFails: &defaultMaxFails, + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Backup: &defaultBackup, + Weight: &defaultWeight, + Down: &defaultDown, + }, + serverNGX: UpstreamServer{ + ID: 1, + Server: "127.0.0.1", + MaxConns: &defaultMaxConns, + MaxFails: &defaultMaxFails, + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Backup: &defaultBackup, + Weight: &defaultWeight, + Down: &defaultDown, + }, + expected: true, + }, + { + server: UpstreamServer{SlowStart: "10s"}, + serverNGX: UpstreamServer{}, + expected: false, + }, + { + server: UpstreamServer{}, + serverNGX: UpstreamServer{SlowStart: "10s"}, + expected: false, + }, + { + server: UpstreamServer{SlowStart: "20s"}, + serverNGX: UpstreamServer{SlowStart: "10s"}, + expected: false, + }, + } + + for _, test := range tests { + result := haveSameParameters(test.server, test.serverNGX) + if result != test.expected { + t.Errorf("haveSameParameters(%v, %v) returned %v but expected %v", test.server, test.serverNGX, result, test.expected) + } + } +} + +func TestHaveSameParametersForStream(t *testing.T) { + tests := []struct { + server StreamUpstreamServer + serverNGX StreamUpstreamServer + expected bool + }{ + { + server: StreamUpstreamServer{}, + serverNGX: StreamUpstreamServer{}, + expected: true, + }, + { + server: StreamUpstreamServer{ID: 2}, + serverNGX: StreamUpstreamServer{ID: 3}, + expected: true, + }, + { + server: StreamUpstreamServer{}, + serverNGX: StreamUpstreamServer{ + MaxConns: &defaultMaxConns, + MaxFails: &defaultMaxFails, + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Backup: &defaultBackup, + Weight: &defaultWeight, + Down: &defaultDown, + }, + expected: true, + }, + { + server: StreamUpstreamServer{ + ID: 1, + Server: "127.0.0.1", + MaxConns: &defaultMaxConns, + MaxFails: &defaultMaxFails, + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Backup: &defaultBackup, + Weight: &defaultWeight, + Down: &defaultDown, + }, + serverNGX: StreamUpstreamServer{ + ID: 1, + Server: "127.0.0.1", + MaxConns: &defaultMaxConns, + MaxFails: &defaultMaxFails, + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Backup: &defaultBackup, + Weight: &defaultWeight, + Down: &defaultDown, + }, + expected: true, + }, + { + server: StreamUpstreamServer{SlowStart: "10s"}, + serverNGX: StreamUpstreamServer{}, + expected: false, + }, + { + server: StreamUpstreamServer{}, + serverNGX: StreamUpstreamServer{SlowStart: "10s"}, + expected: false, + }, + { + server: StreamUpstreamServer{SlowStart: "20s"}, + serverNGX: StreamUpstreamServer{SlowStart: "10s"}, + expected: false, + }, + } for _, test := range tests { - toAdd, toDelete := determineStreamUpdates(test.updated, test.nginx) - if !reflect.DeepEqual(toAdd, test.expectedToAdd) || !reflect.DeepEqual(toDelete, test.expectedToDelete) { - t.Errorf("determiteUpdates(%v, %v) = (%v, %v)", test.updated, test.nginx, toAdd, toDelete) + result := haveSameParametersForStream(test.server, test.serverNGX) + if result != test.expected { + t.Errorf("haveSameParametersForStream(%v, %v) returned %v but expected %v", test.server, test.serverNGX, result, test.expected) } } } diff --git a/tests/client_test.go b/tests/client_test.go index a53b021f..f7a1d126 100644 --- a/tests/client_test.go +++ b/tests/client_test.go @@ -18,8 +18,15 @@ const ( resolverMetric = "resolver_test" ) -var defaultMaxFails = 1 -var defaultWeight = 1 +var ( + defaultMaxConns = 0 + defaultMaxFails = 1 + defaultFailTimeout = "10s" + defaultSlowStart = "0s" + defaultBackup = false + defaultDown = false + defaultWeight = 1 +) func TestStreamClient(t *testing.T) { httpClient := &http.Client{} @@ -32,6 +39,7 @@ func TestStreamClient(t *testing.T) { streamServer := client.StreamUpstreamServer{ Server: "127.0.0.1:8001", } + // test adding a stream server err = c.AddStreamServer(streamUpstream, streamServer) @@ -69,17 +77,17 @@ func TestStreamClient(t *testing.T) { // test updating stream servers streamServers1 := []client.StreamUpstreamServer{ { - Server: "127.0.0.2:8001", + Server: "127.0.0.1:8001", }, { Server: "127.0.0.2:8002", }, { - Server: "127.0.0.2:8003", + Server: "127.0.0.3:8003", }, } - streamAdded, streamDeleted, err := c.UpdateStreamServers(streamUpstream, streamServers1) + streamAdded, streamDeleted, streamUpdated, err := c.UpdateStreamServers(streamUpstream, streamServers1) if err != nil { t.Fatalf("Error when updating servers: %v", err) @@ -90,6 +98,9 @@ func TestStreamClient(t *testing.T) { if len(streamDeleted) != 0 { t.Errorf("The number of deleted servers %v != 0", len(streamDeleted)) } + if len(streamUpdated) != 0 { + t.Errorf("The number of updated servers %v != 0", len(streamUpdated)) + } // test getting servers @@ -103,7 +114,7 @@ func TestStreamClient(t *testing.T) { // updating with the same servers - added, deleted, err := c.UpdateStreamServers(streamUpstream, streamServers1) + added, deleted, updated, err := c.UpdateStreamServers(streamUpstream, streamServers1) if err != nil { t.Fatalf("Error when updating servers: %v", err) @@ -114,6 +125,73 @@ func TestStreamClient(t *testing.T) { if len(deleted) != 0 { t.Errorf("The number of deleted servers %v != 0", len(deleted)) } + if len(updated) != 0 { + t.Errorf("The number of updated servers %v != 0", len(updated)) + } + + // updating one server with different parameters + newMaxConns := 5 + newMaxFails := 6 + newFailTimeout := "15s" + newSlowStart := "10s" + streamServers[0].MaxConns = &newMaxConns + streamServers[0].MaxFails = &newMaxFails + streamServers[0].FailTimeout = newFailTimeout + streamServers[0].SlowStart = newSlowStart + + // updating one server with only one different parameter + streamServers[1].SlowStart = newSlowStart + + added, deleted, updated, err = c.UpdateStreamServers(streamUpstream, streamServers) + if err != nil { + t.Fatalf("Error when updating server with different parameters: %v", err) + } + if len(added) != 0 { + t.Errorf("The number of added servers %v != 0", len(added)) + } + if len(deleted) != 0 { + t.Errorf("The number of deleted servers %v != 0", len(deleted)) + } + if len(updated) != 2 { + t.Errorf("The number of updated servers %v != 2", len(updated)) + } + + streamServers, err = c.GetStreamServers(streamUpstream) + if err != nil { + t.Fatalf("Error when getting servers: %v", err) + } + + for _, srv := range streamServers { + if srv.Server == streamServers[0].Server { + if *srv.MaxConns != newMaxConns { + t.Errorf("The parameter MaxConns of the updated server %v is != %v", *srv.MaxConns, newMaxConns) + } + if *srv.MaxFails != newMaxFails { + t.Errorf("The parameter MaxFails of the updated server %v is != %v", *srv.MaxFails, newMaxFails) + } + if srv.FailTimeout != newFailTimeout { + t.Errorf("The parameter FailTimeout of the updated server %v is != %v", srv.FailTimeout, newFailTimeout) + } + if srv.SlowStart != newSlowStart { + t.Errorf("The parameter SlowStart of the updated server %v is != %v", srv.SlowStart, newSlowStart) + } + } + + if srv.Server == streamServers[1].Server { + if *srv.MaxConns != defaultMaxConns { + t.Errorf("The parameter MaxConns of the updated server %v is != %v", *srv.MaxConns, defaultMaxConns) + } + if *srv.MaxFails != defaultMaxFails { + t.Errorf("The parameter MaxFails of the updated server %v is != %v", *srv.MaxFails, defaultMaxFails) + } + if srv.FailTimeout != defaultFailTimeout { + t.Errorf("The parameter FailTimeout of the updated server %v is != %v", srv.FailTimeout, defaultFailTimeout) + } + if srv.SlowStart != newSlowStart { + t.Errorf("The parameter SlowStart of the updated server %v is != %v", srv.SlowStart, newSlowStart) + } + } + } streamServers2 := []client.StreamUpstreamServer{ { @@ -128,21 +206,24 @@ func TestStreamClient(t *testing.T) { // updating with 2 new servers, 1 existing - added, deleted, err = c.UpdateStreamServers(streamUpstream, streamServers2) + added, deleted, updated, err = c.UpdateStreamServers(streamUpstream, streamServers2) if err != nil { t.Fatalf("Error when updating servers: %v", err) } - if len(added) != 2 { - t.Errorf("The number of added servers %v != 2", len(added)) + if len(added) != 3 { + t.Errorf("The number of added servers %v != 3", len(added)) } - if len(deleted) != 2 { - t.Errorf("The number of deleted servers %v != 2", len(deleted)) + if len(deleted) != 3 { + t.Errorf("The number of deleted servers %v != 3", len(deleted)) + } + if len(updated) != 0 { + t.Errorf("The number of updated servers %v != 0", len(updated)) } // updating with zero servers - removing - added, deleted, err = c.UpdateStreamServers(streamUpstream, []client.StreamUpstreamServer{}) + added, deleted, updated, err = c.UpdateStreamServers(streamUpstream, []client.StreamUpstreamServer{}) if err != nil { t.Fatalf("Error when updating servers: %v", err) @@ -153,6 +234,9 @@ func TestStreamClient(t *testing.T) { if len(deleted) != 3 { t.Errorf("The number of deleted servers %v != 3", len(deleted)) } + if len(updated) != 0 { + t.Errorf("The number of updated servers %v != 0", len(updated)) + } // test getting servers again @@ -175,16 +259,19 @@ func TestStreamUpstreamServer(t *testing.T) { maxFails := 64 weight := 10 + maxConns := 321 + backup := true + down := true streamServer := client.StreamUpstreamServer{ Server: "127.0.0.1:2000", - MaxConns: 321, + MaxConns: &maxConns, MaxFails: &maxFails, FailTimeout: "21s", SlowStart: "12s", Weight: &weight, - Backup: true, - Down: true, + Backup: &backup, + Down: &down, } err = c.AddStreamServer(streamUpstream, streamServer) if err != nil { @@ -205,7 +292,7 @@ func TestStreamUpstreamServer(t *testing.T) { } // remove stream upstream servers - _, _, err = c.UpdateStreamServers(streamUpstream, []client.StreamUpstreamServer{}) + _, _, _, err = c.UpdateStreamServers(streamUpstream, []client.StreamUpstreamServer{}) if err != nil { t.Errorf("Couldn't remove servers: %v", err) } @@ -274,7 +361,7 @@ func TestClient(t *testing.T) { }, } - added, deleted, err := c.UpdateHTTPServers(upstream, servers1) + added, deleted, updated, err := c.UpdateHTTPServers(upstream, servers1) if err != nil { t.Fatalf("Error when updating servers: %v", err) @@ -285,6 +372,9 @@ func TestClient(t *testing.T) { if len(deleted) != 0 { t.Errorf("The number of deleted servers %v != 0", len(deleted)) } + if len(updated) != 0 { + t.Errorf("The number of updated servers %v != 0", len(updated)) + } // test getting servers @@ -300,7 +390,7 @@ func TestClient(t *testing.T) { // updating with the same servers - added, deleted, err = c.UpdateHTTPServers(upstream, servers1) + added, deleted, updated, err = c.UpdateHTTPServers(upstream, servers1) if err != nil { t.Fatalf("Error when updating servers: %v", err) @@ -311,6 +401,73 @@ func TestClient(t *testing.T) { if len(deleted) != 0 { t.Errorf("The number of deleted servers %v != 0", len(deleted)) } + if len(updated) != 0 { + t.Errorf("The number of updated servers %v != 0", len(updated)) + } + + // updating one server with different parameters + newMaxConns := 5 + newMaxFails := 6 + newFailTimeout := "15s" + newSlowStart := "10s" + servers[0].MaxConns = &newMaxConns + servers[0].MaxFails = &newMaxFails + servers[0].FailTimeout = newFailTimeout + servers[0].SlowStart = newSlowStart + + // updating one server with only one different parameter + servers[1].SlowStart = newSlowStart + + added, deleted, updated, err = c.UpdateHTTPServers(upstream, servers) + if err != nil { + t.Fatalf("Error when updating server with different parameters: %v", err) + } + if len(added) != 0 { + t.Errorf("The number of added servers %v != 0", len(added)) + } + if len(deleted) != 0 { + t.Errorf("The number of deleted servers %v != 0", len(deleted)) + } + if len(updated) != 2 { + t.Errorf("The number of updated servers %v != 2", len(updated)) + } + + servers, err = c.GetHTTPServers(upstream) + if err != nil { + t.Fatalf("Error when getting servers: %v", err) + } + + for _, srv := range servers { + if srv.Server == servers[0].Server { + if *srv.MaxConns != newMaxConns { + t.Errorf("The parameter MaxConns of the updated server %v is != %v", *srv.MaxConns, newMaxConns) + } + if *srv.MaxFails != newMaxFails { + t.Errorf("The parameter MaxFails of the updated server %v is != %v", *srv.MaxFails, newMaxFails) + } + if srv.FailTimeout != newFailTimeout { + t.Errorf("The parameter FailTimeout of the updated server %v is != %v", srv.FailTimeout, newFailTimeout) + } + if srv.SlowStart != newSlowStart { + t.Errorf("The parameter SlowStart of the updated server %v is != %v", srv.SlowStart, newSlowStart) + } + } + + if srv.Server == servers[1].Server { + if *srv.MaxConns != defaultMaxConns { + t.Errorf("The parameter MaxConns of the updated server %v is != %v", *srv.MaxConns, defaultMaxConns) + } + if *srv.MaxFails != defaultMaxFails { + t.Errorf("The parameter MaxFails of the updated server %v is != %v", *srv.MaxFails, defaultMaxFails) + } + if srv.FailTimeout != defaultFailTimeout { + t.Errorf("The parameter FailTimeout of the updated server %v is != %v", srv.FailTimeout, defaultFailTimeout) + } + if srv.SlowStart != newSlowStart { + t.Errorf("The parameter SlowStart of the updated server %v is != %v", srv.SlowStart, newSlowStart) + } + } + } servers2 := []client.UpstreamServer{ { @@ -318,14 +475,15 @@ func TestClient(t *testing.T) { }, { Server: "127.0.0.2:8004", - }, { + }, + { Server: "127.0.0.2:8005", }, } // updating with 2 new servers, 1 existing - added, deleted, err = c.UpdateHTTPServers(upstream, servers2) + added, deleted, updated, err = c.UpdateHTTPServers(upstream, servers2) if err != nil { t.Fatalf("Error when updating servers: %v", err) @@ -336,10 +494,13 @@ func TestClient(t *testing.T) { if len(deleted) != 2 { t.Errorf("The number of deleted servers %v != 2", len(deleted)) } + if len(updated) != 0 { + t.Errorf("The number of updated servers %v != 0", len(updated)) + } // updating with zero servers - removing - added, deleted, err = c.UpdateHTTPServers(upstream, []client.UpstreamServer{}) + added, deleted, updated, err = c.UpdateHTTPServers(upstream, []client.UpstreamServer{}) if err != nil { t.Fatalf("Error when updating servers: %v", err) @@ -350,6 +511,9 @@ func TestClient(t *testing.T) { if len(deleted) != 3 { t.Errorf("The number of deleted servers %v != 3", len(deleted)) } + if len(updated) != 0 { + t.Errorf("The number of updated servers %v != 0", len(updated)) + } // test getting servers again @@ -372,16 +536,20 @@ func TestUpstreamServer(t *testing.T) { maxFails := 64 weight := 10 + maxConns := 321 + backup := true + down := true + server := client.UpstreamServer{ Server: "127.0.0.1:2000", - MaxConns: 321, + MaxConns: &maxConns, MaxFails: &maxFails, FailTimeout: "21s", SlowStart: "12s", Weight: &weight, Route: "test", - Backup: true, - Down: true, + Backup: &backup, + Down: &down, } err = c.AddHTTPServer(upstream, server) if err != nil { @@ -402,7 +570,7 @@ func TestUpstreamServer(t *testing.T) { } // remove upstream servers - _, _, err = c.UpdateHTTPServers(upstream, []client.UpstreamServer{}) + _, _, _, err = c.UpdateHTTPServers(upstream, []client.UpstreamServer{}) if err != nil { t.Errorf("Couldn't remove servers: %v", err) } @@ -501,7 +669,7 @@ func TestStats(t *testing.T) { } // cleanup upstream servers - _, _, err = c.UpdateHTTPServers(upstream, []client.UpstreamServer{}) + _, _, _, err = c.UpdateHTTPServers(upstream, []client.UpstreamServer{}) if err != nil { t.Errorf("Couldn't remove servers: %v", err) } @@ -519,11 +687,18 @@ func TestUpstreamServerDefaultParameters(t *testing.T) { } expected := client.UpstreamServer{ + ID: 0, Server: "127.0.0.1:2000", - SlowStart: "0s", + MaxConns: &defaultMaxConns, MaxFails: &defaultMaxFails, - FailTimeout: "10s", + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Route: "", + Backup: &defaultBackup, + Down: &defaultDown, + Drain: false, Weight: &defaultWeight, + Service: "", } err = c.AddHTTPServer(upstream, server) if err != nil { @@ -544,7 +719,7 @@ func TestUpstreamServerDefaultParameters(t *testing.T) { } // remove upstream servers - _, _, err = c.UpdateHTTPServers(upstream, []client.UpstreamServer{}) + _, _, _, err = c.UpdateHTTPServers(upstream, []client.UpstreamServer{}) if err != nil { t.Errorf("Couldn't remove servers: %v", err) } @@ -614,7 +789,7 @@ func TestStreamStats(t *testing.T) { } // cleanup stream upstream servers - _, _, err = c.UpdateStreamServers(streamUpstream, []client.StreamUpstreamServer{}) + _, _, _, err = c.UpdateStreamServers(streamUpstream, []client.StreamUpstreamServer{}) if err != nil { t.Errorf("Couldn't remove stream servers: %v", err) } @@ -632,11 +807,16 @@ func TestStreamUpstreamServerDefaultParameters(t *testing.T) { } expected := client.StreamUpstreamServer{ + ID: 0, Server: "127.0.0.1:2000", - SlowStart: "0s", + MaxConns: &defaultMaxConns, MaxFails: &defaultMaxFails, - FailTimeout: "10s", + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Backup: &defaultBackup, + Down: &defaultDown, Weight: &defaultWeight, + Service: "", } err = c.AddStreamServer(streamUpstream, streamServer) if err != nil { @@ -657,7 +837,7 @@ func TestStreamUpstreamServerDefaultParameters(t *testing.T) { } // cleanup stream upstream servers - _, _, err = c.UpdateStreamServers(streamUpstream, []client.StreamUpstreamServer{}) + _, _, _, err = c.UpdateStreamServers(streamUpstream, []client.StreamUpstreamServer{}) if err != nil { t.Errorf("Couldn't remove stream servers: %v", err) } @@ -993,12 +1173,18 @@ func TestUpstreamServerWithDrain(t *testing.T) { } server := client.UpstreamServer{ + ID: 0, Server: "127.0.0.1:9001", + MaxConns: &defaultMaxConns, MaxFails: &defaultMaxFails, - FailTimeout: "10s", - SlowStart: "0s", - Weight: &defaultWeight, + FailTimeout: defaultFailTimeout, + SlowStart: defaultSlowStart, + Route: "", + Backup: &defaultBackup, + Down: &defaultDown, Drain: true, + Weight: &defaultWeight, + Service: "", } // Get existing upstream servers