Skip to content

Commit

Permalink
Add support for updating upstream servers
Browse files Browse the repository at this point in the history
  • Loading branch information
Vighneswar Rao Bojja committed Oct 17, 2019
1 parent 4b32c44 commit aabcebe
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 94 deletions.
114 changes: 92 additions & 22 deletions client/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"net/http"
"reflect"
)

// APIVersion is a version of NGINX Plus API.
Expand All @@ -29,13 +30,13 @@ type versions []int
type UpstreamServer struct {
ID int `json:"id,omitempty"`
Server string `json:"server"`
MaxConns int `json:"max_conns"`
MaxConns *int `json:"max_conns,omitempty"`
MaxFails *int `json:"max_fails,omitempty"`
FailTimeout string `json:"fail_timeout,omitempty"`
SlowStart string `json:"slow_start,omitempty"`
Route string `json:"route"`
Backup bool `json:"backup"`
Down bool `json:"down"`
Route string `json:"route,omitempty"`
Backup *bool `json:"backup,omitempty"`
Down *bool `json:"down,omitempty"`
Drain bool `json:"drain,omitempty"`
Weight *int `json:"weight,omitempty"`
Service string `json:"service,omitempty"`
Expand All @@ -45,12 +46,12 @@ type UpstreamServer struct {
type StreamUpstreamServer struct {
ID int `json:"id,omitempty"`
Server string `json:"server"`
MaxConns int `json:"max_conns"`
MaxConns *int `json:"max_conns,omitempty"`
MaxFails *int `json:"max_fails,omitempty"`
FailTimeout string `json:"fail_timeout,omitempty"`
SlowStart string `json:"slow_start,omitempty"`
Backup bool `json:"backup"`
Down bool `json:"down"`
Backup *bool `json:"backup,omitempty"`
Down *bool `json:"down,omitempty"`
Weight *int `json:"weight,omitempty"`
Service string `json:"service,omitempty"`
}
Expand Down Expand Up @@ -469,32 +470,54 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro
// UpdateHTTPServers updates the servers of the upstream.
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) ([]UpstreamServer, []UpstreamServer, error) {
// Servers that are in the slice and exist NGINX, but have different parameters, will be updated.
func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) {
serversInNginx, err := client.GetHTTPServers(upstream)
if err != nil {
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
}

toAdd, toDelete := determineUpdates(servers, serversInNginx)
toAdd, toDelete, toUpdate := determineUpdates(servers, serversInNginx)

for _, server := range toAdd {
err := client.AddHTTPServer(upstream, server)
if err != nil {
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
}
}

for _, server := range toDelete {
err := client.DeleteHTTPServer(upstream, server.Server)
if err != nil {
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
}
}

return toAdd, toDelete, nil
for _, server := range toUpdate {
err := client.UpdateHTTPServer(upstream, server)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
}
}

return toAdd, toDelete, toUpdate, nil
}

func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer) {
func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer, toUpdate []UpstreamServer) {
for _, server := range updatedServers {
updateFound := false
for _, serverNGX := range nginxServers {
server.ID = serverNGX.ID
if server.Server == serverNGX.Server && !reflect.DeepEqual(server, serverNGX) {
updateFound = true
break
}
}
if updateFound {
toUpdate = append(toUpdate, server)
}
}

for _, server := range updatedServers {
found := false
for _, serverNGX := range nginxServers {
Expand Down Expand Up @@ -627,7 +650,7 @@ func (client *NginxClient) patch(path string, input interface{}) error {
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusNoContent {
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
return createResponseMismatchError(resp.Body).Wrap(fmt.Sprintf(
"failed to complete patch request: expected %v response, got %v",
http.StatusNoContent, resp.StatusCode))
Expand Down Expand Up @@ -692,29 +715,37 @@ func (client *NginxClient) DeleteStreamServer(upstream string, server string) er
// UpdateStreamServers updates the servers of the upstream.
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) ([]StreamUpstreamServer, []StreamUpstreamServer, error) {
// Servers that are in the slice and exist NGINX, but have different parameters, will be updated.
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) {
serversInNginx, err := client.GetStreamServers(upstream)
if err != nil {
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
}

toAdd, toDelete := determineStreamUpdates(servers, serversInNginx)
toAdd, toDelete, toUpdate := determineStreamUpdates(servers, serversInNginx)

for _, server := range toAdd {
err := client.AddStreamServer(upstream, server)
if err != nil {
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("add - failed to update stream servers of %v upstream: %v", upstream, err)
}
}

for _, server := range toDelete {
err := client.DeleteStreamServer(upstream, server.Server)
if err != nil {
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
return nil, nil, nil, fmt.Errorf("delete - failed to update stream servers of %v upstream: %v", upstream, err)
}
}

return toAdd, toDelete, nil
for _, server := range toUpdate {
err := client.UpdateStreamServer(upstream, server)
if err != nil {
return nil, nil, nil, fmt.Errorf("update - failed to update stream servers of %v upstream: %v", upstream, err)
}
}

return toAdd, toDelete, toUpdate, nil
}

func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int, error) {
Expand All @@ -732,7 +763,22 @@ func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (in
return -1, nil
}

func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer) {
func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer, toUpdate []StreamUpstreamServer) {
for _, server := range updatedServers {
updateFound := false
for _, serverNGX := range nginxServers {
server.ID = serverNGX.ID

if server.Server == serverNGX.Server && !reflect.DeepEqual(server, serverNGX) {
updateFound = true
break
}
}
if updateFound {
toUpdate = append(toUpdate, server)
}
}

for _, server := range updatedServers {
found := false
for _, serverNGX := range nginxServers {
Expand Down Expand Up @@ -1125,3 +1171,27 @@ func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error {
}
return nil
}

// UpdateHTTPServer updates the server of the upstream.
func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServer) error {
path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, server.ID)
server.ID = 0
err := client.patch(path, &server)
if err != nil {
return fmt.Errorf("failed to update %v server to %v upstream: %v", server.Server, upstream, err)
}

return nil
}

// UpdateStreamServer updates the server of the upstream.
func (client *NginxClient) UpdateStreamServer(upstream string, server StreamUpstreamServer) error {
path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, server.ID)
server.ID = 0
err := client.patch(path, &server)
if err != nil {
return fmt.Errorf("failed to update %v stream server to %v upstream: %v", server.Server, upstream, err)
}

return nil
}
80 changes: 78 additions & 2 deletions client/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestDetermineUpdates(t *testing.T) {
}}

for _, test := range tests {
toAdd, toDelete := determineUpdates(test.updated, test.nginx)
toAdd, toDelete, _ := determineUpdates(test.updated, test.nginx)
if !reflect.DeepEqual(toAdd, test.expectedToAdd) || !reflect.DeepEqual(toDelete, test.expectedToDelete) {
t.Errorf("determiteUpdates(%v, %v) = (%v, %v)", test.updated, test.nginx, toAdd, toDelete)
}
Expand Down Expand Up @@ -224,9 +224,85 @@ func TestStreamDetermineUpdates(t *testing.T) {
}}

for _, test := range tests {
toAdd, toDelete := determineStreamUpdates(test.updated, test.nginx)
toAdd, toDelete, _ := determineStreamUpdates(test.updated, test.nginx)
if !reflect.DeepEqual(toAdd, test.expectedToAdd) || !reflect.DeepEqual(toDelete, test.expectedToDelete) {
t.Errorf("determiteUpdates(%v, %v) = (%v, %v)", test.updated, test.nginx, toAdd, toDelete)
}
}
}

func TestDetermineUpdatesWithUpdate(t *testing.T) {
maxConns := 1
var tests = []struct {
updated []UpstreamServer
nginx []UpstreamServer
expectedToUpdate UpstreamServer
}{{
updated: []UpstreamServer{
{
Server: "10.0.0.1:80",
MaxConns: &maxConns,
},
},
nginx: []UpstreamServer{
{
Server: "10.0.0.1:80",
},
{
ID: 2,
Server: "10.0.0.2:80",
},
},
expectedToUpdate: UpstreamServer{
Server: "10.0.0.1:80",
MaxConns: &maxConns,
}},}

for _, test := range tests {
_, _, toUpdate := determineUpdates(test.updated, test.nginx)

for _, update := range toUpdate {
if !reflect.DeepEqual(update, test.expectedToUpdate) {
t.Errorf("determiteUpdates(%v, %v) = (%v)", test.updated, test.nginx, toUpdate)
}
}
}
}

func TestStreamDetermineUpdatesWithUpdate(t *testing.T) {
maxConns := 1
var tests = []struct {
updated []StreamUpstreamServer
nginx []StreamUpstreamServer
expectedToUpdate StreamUpstreamServer
}{{
updated: []StreamUpstreamServer{
{
Server: "10.0.0.1:80",
MaxConns: &maxConns,
},
},
nginx: []StreamUpstreamServer{
{
Server: "10.0.0.1:80",
},
{
ID: 2,
Server: "10.0.0.2:80",
},
},
expectedToUpdate: StreamUpstreamServer{
Server: "10.0.0.1:80",
MaxConns: &maxConns,
}},}

for _, test := range tests {
_, _, toUpdate := determineStreamUpdates(test.updated, test.nginx)

for _, update := range toUpdate {
if !reflect.DeepEqual(update, test.expectedToUpdate) {
t.Errorf("determiteUpdates(%v, %v) = (%v)", test.updated, test.nginx, toUpdate)
}
}
}
}
6 changes: 3 additions & 3 deletions docker/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ http {
default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';

access_log /var/log/nginx/access.log main;

Expand Down Expand Up @@ -58,4 +58,4 @@ stream {
zone_sync;
zone_sync_server nginx-plus-test:7777 resolve;
}
}
}
26 changes: 13 additions & 13 deletions docker/nginx_no_stream.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ pid /var/run/nginx.pid;


events {
worker_connections 1024;
worker_connections 1024;
}


http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
include /etc/nginx/mime.types;
default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';

access_log /var/log/nginx/access.log main;
access_log /var/log/nginx/access.log main;

sendfile on;
#tcp_nopush on;
sendfile on;
#tcp_nopush on;

keepalive_timeout 65;
keepalive_timeout 65;

#gzip on;
#gzip on;

include /etc/nginx/conf.d/*.conf;
}
include /etc/nginx/conf.d/*.conf;
}
Loading

0 comments on commit aabcebe

Please sign in to comment.