Skip to content

Commit

Permalink
[release]Support for Polling
Browse files Browse the repository at this point in the history
This release adds the following features:
- Adds environment variables `DF_SERVICE_POLLING_INTERVAL` and `DF_USE_DOCKER_SERVICE_EVENTS` to configure service polling. Details can be found at the [Configuration documentation](http://swarmlistener.dockerflow.com/config/).
- Adds environment variables `DF_NODE_POLLING_INTERVAL` and `DF_USE_DOCKER_NODE_EVENTS` to configure node polling. Details can be found at the [Configuration documentation](http://swarmlistener.dockerflow.com/config/).
- More robust handling of events and sending out notifications.
  • Loading branch information
thomasjpfan authored Jun 1, 2018
1 parent cf38abd commit 6d47a88
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 31 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ ENV DF_DOCKER_HOST="unix:///var/run/docker.sock" \
DF_NOTIFY_LABEL="com.df.notify" \
DF_INCLUDE_NODE_IP_INFO="false" \
DF_SERVICE_POLLING_INTERVAL="-1" \
DF_USE_DOCKER_SERVICE_EVENTS="true"
DF_USE_DOCKER_SERVICE_EVENTS="true" \
DF_NODE_POLLING_INTERVAL="-1" \
DF_USE_DOCKER_NODE_EVENTS="true"

EXPOSE 8080

Expand Down
6 changes: 4 additions & 2 deletions args.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (

type args struct {
ServicePollingInterval int
NodePollingInterval int
Retry int
RetryInterval int
}

func getArgs() *args {
return &args{
ServicePollingInterval: getValue(-1, "DF_SERVICE_POLLING_INTERVAL"),
Retry: getValue(1, "DF_RETRY"),
RetryInterval: getValue(0, "DF_RETRY_INTERVAL"),
NodePollingInterval: getValue(-1, "DF_NODE_POLLING_INTERVAL"),
Retry: getValue(1, "DF_RETRY"),
RetryInterval: getValue(0, "DF_RETRY_INTERVAL"),
}
}

Expand Down
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ The following environment variables can be used when creating the `swarm-listene
|DF_RETRY_INTERVAL |Time between each notificationo request retry, in seconds.<br>**Default**: `5`<br>**Example**:`10`|
|DF_SERVICE_POLLING_INTERVAL |Time between each service polling request, in seconds. When this value is set less than or equal to zero, service polling is disabled.<br>**Default**: `-1`<br>**Example**:`20`|
|DF_USE_DOCKER_SERVICE_EVENTS|Use docker events api to get service updates.<br>**Default**:`true`|
|DF_NODE_POLLING_INTERVAL |Time between each node polling request, in seconds. When this value is set less than or equal to zero, node polling is disabled.<br>**Default**: `-1`<br>**Example**:`20`|
|DF_USE_DOCKER_NODE_EVENTS|Use docker events api to get node updates.<br>**Default**:`true`|
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ func main() {

l.Printf("Starting Docker Flow: Swarm Listener")
args := getArgs()
swarmListener, err := service.NewSwarmListenerFromEnv(args.Retry, args.RetryInterval, args.ServicePollingInterval, l)
swarmListener, err := service.NewSwarmListenerFromEnv(
args.Retry, args.RetryInterval,
args.ServicePollingInterval, args.NodePollingInterval, l)
if err != nil {
l.Printf("Failed to initialize Docker Flow: Swarm Listener")
l.Printf("ERROR: %v", err)
Expand Down
18 changes: 18 additions & 0 deletions service/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ func (m *nodeCacherMock) Get(ID string) (NodeMini, bool) {
return args.Get(0).(NodeMini), args.Bool(1)
}

func (m *nodeCacherMock) IsNewOrUpdated(n NodeMini) bool {
args := m.Called(n)
return args.Bool(0)
}

func (m *nodeCacherMock) Keys() map[string]struct{} {
args := m.Called()
return args.Get(0).(map[string]struct{})
}

type notifyDistributorMock struct {
mock.Mock
}
Expand All @@ -154,3 +164,11 @@ type swarmServicePollingMock struct {
func (m *swarmServicePollingMock) Run(eventChan chan<- Event) {
m.Called(eventChan)
}

type nodePollingMock struct {
mock.Mock
}

func (m *nodePollingMock) Run(eventChan chan<- Event) {
m.Called(eventChan)
}
36 changes: 35 additions & 1 deletion service/nodecache.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package service

import "sync"

// NodeCacher caches sevices
type NodeCacher interface {
InsertAndCheck(n NodeMini) bool
IsNewOrUpdated(n NodeMini) bool
Delete(ID string)
Get(ID string) (NodeMini, bool)
Keys() map[string]struct{}
}

// NodeCache implements `NodeCacher`
// Not threadsafe!
type NodeCache struct {
cache map[string]NodeMini
mux sync.RWMutex
}

// NewNodeCache creates a new `NewNodeCache`
Expand All @@ -23,6 +28,9 @@ func NewNodeCache() *NodeCache {
// InsertAndCheck inserts `NodeMini` into cache
// If the node is new or updated `InsertAndCheck` returns true.
func (c *NodeCache) InsertAndCheck(n NodeMini) bool {
c.mux.Lock()
defer c.mux.Unlock()

cachedNode, ok := c.cache[n.ID]
c.cache[n.ID] = n

Expand All @@ -31,11 +39,37 @@ func (c *NodeCache) InsertAndCheck(n NodeMini) bool {

// Delete removes node from cache
func (c *NodeCache) Delete(ID string) {
c.mux.Lock()
defer c.mux.Unlock()

delete(c.cache, ID)
}

// Get gets node from cache
func (c NodeCache) Get(ID string) (NodeMini, bool) {
func (c *NodeCache) Get(ID string) (NodeMini, bool) {
c.mux.RLock()
defer c.mux.RUnlock()

v, ok := c.cache[ID]
return v, ok
}

// IsNewOrUpdated returns true if node is new or updated
func (c *NodeCache) IsNewOrUpdated(n NodeMini) bool {
c.mux.RLock()
defer c.mux.RUnlock()

cachedNode, ok := c.cache[n.ID]
return !ok || !n.Equal(cachedNode)
}

// Keys return the keys of the cache
func (c *NodeCache) Keys() map[string]struct{} {
c.mux.RLock()
defer c.mux.RUnlock()
output := map[string]struct{}{}
for key := range c.cache {
output[key] = struct{}{}
}
return output
}
37 changes: 37 additions & 0 deletions service/nodecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,43 @@ func (s *NodeCacheTestSuite) Test_GetAndRemove_NotInCache_ReturnsFalse() {
s.False(ok)
}

func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeInCache() {
s.Cache.InsertAndCheck(s.NMini)
s.AssertInCache(s.NMini)

newOrUpdated := s.Cache.IsNewOrUpdated(s.NMini)
s.False(newOrUpdated)
}

func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeNotInCache() {
newOrUpdated := s.Cache.IsNewOrUpdated(s.NMini)
s.True(newOrUpdated)
}

func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeIsDifferentCache() {

s.Cache.InsertAndCheck(s.NMini)
s.AssertInCache(s.NMini)

anotherNMini := getNewNodeMini()
anotherNMini.State = swarm.NodeStateDown

newOrUpdated := s.Cache.IsNewOrUpdated(anotherNMini)
s.True(newOrUpdated)

}

func (s *NodeCacheTestSuite) Test_Keys() {
s.Cache.InsertAndCheck(s.NMini)
s.AssertInCache(s.NMini)

keys := s.Cache.Keys()

s.Require().Len(keys, 1)
s.Contains(keys, s.NMini.ID)

}

func (s *NodeCacheTestSuite) AssertInCache(nm NodeMini) {
ss, ok := s.Cache.Get(nm.ID)
s.True(ok)
Expand Down
87 changes: 87 additions & 0 deletions service/nodepoller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package service

import (
"context"
"log"
"time"

"github.com/docker/docker/api/types/swarm"
)

// NodePolling provides an interface for polling node changes
type NodePolling interface {
Run(eventChan chan<- Event)
}

// NodePoller implements `NodePolling`
type NodePoller struct {
Client NodeInspector
Cache NodeCacher
PollingInterval int
MinifyFunc func(swarm.Node) NodeMini
Log *log.Logger
}

// NewNodePoller creates a new `NodePoller`
func NewNodePoller(
client NodeInspector,
cache NodeCacher,
pollingInterval int,
minifyFunc func(swarm.Node) NodeMini,
log *log.Logger,
) *NodePoller {
return &NodePoller{
Client: client,
Cache: cache,
PollingInterval: pollingInterval,
MinifyFunc: minifyFunc,
Log: log,
}
}

// Run starts poller and places events onto `eventChan`
func (n NodePoller) Run(eventChan chan<- Event) {

if n.PollingInterval <= 0 {
return
}

ctx := context.Background()

n.Log.Printf("Polling for Node Changes")
time.Sleep(time.Duration(n.PollingInterval) * time.Second)

for {
nodes, err := n.Client.NodeList(ctx)
if err != nil {
n.Log.Printf("ERROR (NodePoller): %v", err)
} else {
nowTimeNano := time.Now().UTC().UnixNano()
keys := n.Cache.Keys()
for _, node := range nodes {
delete(keys, node.ID)

nodeMini := n.MinifyFunc(node)
if n.Cache.IsNewOrUpdated(nodeMini) {
eventChan <- Event{
Type: EventTypeCreate,
ID: node.ID,
TimeNano: nowTimeNano,
UseCache: true,
}
}
}

// Remaining key sare removal events
for k := range keys {
eventChan <- Event{
Type: EventTypeRemove,
ID: k,
TimeNano: nowTimeNano,
UseCache: true,
}
}
}
time.Sleep(time.Duration(n.PollingInterval) * time.Second)
}
}
Loading

0 comments on commit 6d47a88

Please sign in to comment.