Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

writer: simplify monitor #503

Merged
merged 2 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/merge_ini.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"strings"
"time"

"github.com/DataDog/datadog-trace-agent/backoff"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/writer/backoff"
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
log "github.com/cihub/seelog"
"github.com/go-ini/ini"
Expand Down
5 changes: 2 additions & 3 deletions config/merge_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"regexp"
"time"

"gopkg.in/yaml.v2"

"github.com/DataDog/datadog-trace-agent/backoff"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/osutil"
"github.com/DataDog/datadog-trace-agent/writer/backoff"
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
log "github.com/cihub/seelog"
"gopkg.in/yaml.v2"
)

// apiEndpointPrefix is the URL prefix prepended to the default site value from YamlAgentConfig.
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion writer/config/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package config
import (
"time"

"github.com/DataDog/datadog-trace-agent/backoff"
"github.com/DataDog/datadog-trace-agent/writer/backoff"
)

// QueuablePayloadSenderConf contains the configuration needed by a QueuablePayloadSender to operate.
Expand Down
90 changes: 42 additions & 48 deletions writer/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"

"github.com/DataDog/datadog-trace-agent/testutil"
log "github.com/cihub/seelog"
)

// payloadConstructedHandlerArgs encodes the arguments passed to a PayloadConstructedHandler call.
Expand All @@ -19,8 +18,8 @@ type payloadConstructedHandlerArgs struct {
type testEndpoint struct {
sync.RWMutex
err error
successPayloads []Payload
errorPayloads []Payload
successPayloads []*Payload
errorPayloads []*Payload
}

func (e *testEndpoint) BaseURL() string { return "<testEndpoint>" }
Expand All @@ -31,9 +30,9 @@ func (e *testEndpoint) Write(payload *Payload) error {
e.Lock()
defer e.Unlock()
if e.err != nil {
e.errorPayloads = append(e.errorPayloads, *payload)
e.errorPayloads = append(e.errorPayloads, payload)
} else {
e.successPayloads = append(e.successPayloads, *payload)
e.successPayloads = append(e.successPayloads, payload)
}
return e.err
}
Expand All @@ -45,14 +44,14 @@ func (e *testEndpoint) Error() error {
}

// ErrorPayloads returns all the error payloads registered with the test endpoint.
func (e *testEndpoint) ErrorPayloads() []Payload {
func (e *testEndpoint) ErrorPayloads() []*Payload {
e.RLock()
defer e.RUnlock()
return e.errorPayloads
}

// SuccessPayloads returns all the success payloads registered with the test endpoint.
func (e *testEndpoint) SuccessPayloads() []Payload {
func (e *testEndpoint) SuccessPayloads() []*Payload {
e.RLock()
defer e.RUnlock()
return e.successPayloads
Expand Down Expand Up @@ -120,7 +119,7 @@ func (c *testPayloadSender) Run() {
}

// Payloads allows access to all payloads recorded as being successfully sent by this sender.
func (c *testPayloadSender) Payloads() []Payload {
func (c *testPayloadSender) Payloads() []*Payload {
return c.testEndpoint.SuccessPayloads()
}

Expand All @@ -135,13 +134,9 @@ func (c *testPayloadSender) setEndpoint(endpoint Endpoint) {

// testPayloadSenderMonitor monitors a PayloadSender and stores all events
type testPayloadSenderMonitor struct {
SuccessEvents []SenderSuccessEvent
FailureEvents []SenderFailureEvent
RetryEvents []SenderRetryEvent

events []monitorEvent
sender PayloadSender

exit chan struct{}
exit chan struct{}
}

// newTestPayloadSenderMonitor creates a new testPayloadSenderMonitor monitoring the specified sender.
Expand All @@ -163,21 +158,11 @@ func (m *testPayloadSenderMonitor) Run() {

for {
select {
case event := <-m.sender.Monitor():
if event == nil {
continue
}

switch event := event.(type) {
case SenderSuccessEvent:
m.SuccessEvents = append(m.SuccessEvents, event)
case SenderFailureEvent:
m.FailureEvents = append(m.FailureEvents, event)
case SenderRetryEvent:
m.RetryEvents = append(m.RetryEvents, event)
default:
log.Errorf("Unknown event of type %T", event)
case event, ok := <-m.sender.monitor():
if !ok {
continue // wait for exit
}
m.events = append(m.events, event)
case <-m.exit:
return
}
Expand All @@ -191,34 +176,43 @@ func (m *testPayloadSenderMonitor) Stop() {
}

// SuccessPayloads returns a slice containing all successful payloads.
func (m *testPayloadSenderMonitor) SuccessPayloads() []Payload {
result := make([]Payload, len(m.SuccessEvents))

for i, successEvent := range m.SuccessEvents {
result[i] = *successEvent.Payload
}

return result
func (m *testPayloadSenderMonitor) SuccessPayloads() []*Payload {
return m.eventPayloads(eventTypeSuccess)
}

// FailurePayloads returns a slice containing all failed payloads.
func (m *testPayloadSenderMonitor) FailurePayloads() []Payload {
result := make([]Payload, len(m.FailureEvents))

for i, successEvent := range m.FailureEvents {
result[i] = *successEvent.Payload
}
func (m *testPayloadSenderMonitor) FailurePayloads() []*Payload {
return m.eventPayloads(eventTypeFailure)
}

return result
// FailureEvents returns all failure events.
func (m *testPayloadSenderMonitor) FailureEvents() []monitorEvent {
return m.eventsByType(eventTypeFailure)
}

// RetryPayloads returns a slice containing all failed payloads.
func (m *testPayloadSenderMonitor) RetryPayloads() []Payload {
result := make([]Payload, len(m.RetryEvents))
func (m *testPayloadSenderMonitor) RetryPayloads() []*Payload {
return m.eventPayloads(eventTypeRetry)
}

for i, successEvent := range m.RetryEvents {
result[i] = *successEvent.Payload
func (m *testPayloadSenderMonitor) eventPayloads(t eventType) []*Payload {
res := make([]*Payload, 0)
for _, e := range m.events {
if e.typ != t {
continue
}
res = append(res, e.payload)
}
return res
}

return result
func (m *testPayloadSenderMonitor) eventsByType(t eventType) []monitorEvent {
res := make([]monitorEvent, 0)
for _, e := range m.events {
if e.typ != t {
continue
}
res = append(res, e)
}
return res
}
12 changes: 6 additions & 6 deletions writer/multi_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ var _ PayloadSender = (*multiSender)(nil)
// events.
type multiSender struct {
senders []PayloadSender
mwg sync.WaitGroup // monitor funnel waitgroup
mch chan interface{} // monitor funneling channel
mwg sync.WaitGroup // monitor funnel waitgroup
mch chan monitorEvent // monitor funneling channel
}

// newMultiSender returns a new PayloadSender which forwards all sent payloads to all
Expand All @@ -29,7 +29,7 @@ func newMultiSender(endpoints []Endpoint, cfg config.QueuablePayloadSenderConf)
}
return &multiSender{
senders: senders,
mch: make(chan interface{}, len(senders)),
mch: make(chan monitorEvent, len(senders)),
}
}

Expand All @@ -40,12 +40,12 @@ func (w *multiSender) Start() {
}
for _, sender := range w.senders {
w.mwg.Add(1)
go func(ch <-chan interface{}) {
go func(ch <-chan monitorEvent) {
defer w.mwg.Done()
for event := range ch {
w.mch <- event
}
}(sender.Monitor())
}(sender.monitor())
}
}

Expand All @@ -65,7 +65,7 @@ func (w *multiSender) Send(p *Payload) {
}
}

func (w *multiSender) Monitor() <-chan interface{} { return w.mch }
func (w *multiSender) monitor() <-chan monitorEvent { return w.mch }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make Monitor private? This way, no one outside of this package will be able to use a sender?


// Run implements PayloadSender.
func (w *multiSender) Run() { /* no-op */ }
Expand Down
31 changes: 17 additions & 14 deletions writer/multi_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestMultiSender(t *testing.T) {
t.Run("Start", func(t *testing.T) {
mock1 := newMockSender()
mock2 := newMockSender()
multi := &multiSender{senders: []PayloadSender{mock1, mock2}, mch: make(chan interface{})}
multi := &multiSender{senders: []PayloadSender{mock1, mock2}, mch: make(chan monitorEvent)}
multi.Start()
defer multi.Stop()

Expand All @@ -71,7 +71,7 @@ func TestMultiSender(t *testing.T) {
t.Run("Stop", func(t *testing.T) {
mock1 := newMockSender()
mock2 := newMockSender()
multi := &multiSender{senders: []PayloadSender{mock1, mock2}, mch: make(chan interface{})}
multi := &multiSender{senders: []PayloadSender{mock1, mock2}, mch: make(chan monitorEvent)}
multi.Stop()

assert := assert.New(t)
Expand All @@ -89,7 +89,7 @@ func TestMultiSender(t *testing.T) {
mock1 := newMockSender()
mock2 := newMockSender()
p := &Payload{CreationDate: time.Now(), Bytes: []byte{1, 2, 3}}
multi := &multiSender{senders: []PayloadSender{mock1, mock2}, mch: make(chan interface{})}
multi := &multiSender{senders: []PayloadSender{mock1, mock2}, mch: make(chan monitorEvent)}
multi.Send(p)

assert := assert.New(t)
Expand All @@ -100,16 +100,19 @@ func TestMultiSender(t *testing.T) {
t.Run("funnel", func(t *testing.T) {
mock1 := newMockSender()
mock2 := newMockSender()
multi := &multiSender{senders: []PayloadSender{mock1, mock2}, mch: make(chan interface{})}
multi := &multiSender{senders: []PayloadSender{mock1, mock2}, mch: make(chan monitorEvent)}
multi.Start()
defer multi.Stop()

mock1.monitor <- "ping1"
mock2.monitor <- "ping2"
event1 := monitorEvent{typ: eventTypeSuccess, stats: sendStats{host: "ABC"}}
event2 := monitorEvent{typ: eventTypeFailure, stats: sendStats{host: "QWE"}}

mock1.monitorCh <- event1
mock2.monitorCh <- event2

assert.ElementsMatch(t,
[]string{"ping1", "ping2"},
[]string{(<-multi.mch).(string), (<-multi.mch).(string)},
[]monitorEvent{event1, event2},
[]monitorEvent{<-multi.mch, <-multi.mch},
)
})
}
Expand Down Expand Up @@ -144,19 +147,19 @@ type mockPayloadSender struct {

mu sync.Mutex
sendCalls []*Payload
monitor chan interface{}
monitorCh chan monitorEvent
}

func newMockSender() *mockPayloadSender {
return &mockPayloadSender{monitor: make(chan interface{})}
return &mockPayloadSender{monitorCh: make(chan monitorEvent)}
}

func (m *mockPayloadSender) Reset() {
atomic.SwapUint64(&m.startCalls, 0)
atomic.SwapUint64(&m.stopCalls, 0)
m.mu.Lock()
m.sendCalls = m.sendCalls[:0]
m.monitor = make(chan interface{})
m.monitorCh = make(chan monitorEvent)
m.mu.Unlock()
}

Expand All @@ -171,7 +174,7 @@ func (m *mockPayloadSender) StartCalls() int {
// Stop must be called only once. It closes the monitor channel.
func (m *mockPayloadSender) Stop() {
atomic.AddUint64(&m.stopCalls, 1)
close(m.monitor)
close(m.monitorCh)
}

func (m *mockPayloadSender) StopCalls() int {
Expand All @@ -190,10 +193,10 @@ func (m *mockPayloadSender) SendCalls() []*Payload {
return m.sendCalls
}

func (m *mockPayloadSender) Monitor() <-chan interface{} {
func (m *mockPayloadSender) monitor() <-chan monitorEvent {
m.mu.Lock()
defer m.mu.Unlock()
return m.monitor
return m.monitorCh
}

func (m *mockPayloadSender) Run() {}
Expand Down
Loading