Skip to content

Commit

Permalink
Cleanly close agent goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
edaniels committed Jul 2, 2024
1 parent 3cdd012 commit 822108b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
9 changes: 6 additions & 3 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit

userBindingRequestHandler: config.BindingRequestHandler,
}
a.connectionStateNotifier = &handlerNotifier{connectionStateFunc: a.onConnectionStateChange}
a.candidateNotifier = &handlerNotifier{candidateFunc: a.onCandidate}
a.selectedCandidatePairNotifier = &handlerNotifier{candidatePairFunc: a.onSelectedCandidatePairChange}
a.connectionStateNotifier = &handlerNotifier{connectionStateFunc: a.onConnectionStateChange, done: make(chan struct{})}
a.candidateNotifier = &handlerNotifier{candidateFunc: a.onCandidate, done: make(chan struct{})}
a.selectedCandidatePairNotifier = &handlerNotifier{candidatePairFunc: a.onSelectedCandidatePairChange, done: make(chan struct{})}

if a.net == nil {
a.net, err = stdnet.NewNet()
Expand Down Expand Up @@ -931,6 +931,9 @@ func (a *Agent) Close() error {

close(a.done)
<-a.taskLoopDone
a.connectionStateNotifier.Close()
a.candidateNotifier.Close()
a.selectedCandidatePairNotifier.Close()
return nil
}

Expand Down
45 changes: 44 additions & 1 deletion agent_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (a *Agent) onConnectionStateChange(s ConnectionState) {

type handlerNotifier struct {
sync.Mutex
running bool
running bool
notifiers sync.WaitGroup

connectionStates []ConnectionState
connectionStateFunc func(ConnectionState)
Expand All @@ -55,13 +56,38 @@ type handlerNotifier struct {

selectedCandidatePairs []*CandidatePair
candidatePairFunc func(*CandidatePair)

// State for closing
done chan struct{}
}

func (h *handlerNotifier) Close() {
h.Lock()

select {
case <-h.done:
h.Unlock()
return
default:
}
close(h.done)
h.Unlock()

h.notifiers.Wait()
}

func (h *handlerNotifier) EnqueueConnectionState(s ConnectionState) {
h.Lock()
defer h.Unlock()

select {
case <-h.done:
return
default:
}

notify := func() {
defer h.notifiers.Done()
for {
h.Lock()
if len(h.connectionStates) == 0 {
Expand All @@ -79,6 +105,7 @@ func (h *handlerNotifier) EnqueueConnectionState(s ConnectionState) {
h.connectionStates = append(h.connectionStates, s)
if !h.running {
h.running = true
h.notifiers.Add(1)
go notify()
}
}
Expand All @@ -87,7 +114,14 @@ func (h *handlerNotifier) EnqueueCandidate(c Candidate) {
h.Lock()
defer h.Unlock()

select {
case <-h.done:
return
default:
}

notify := func() {
defer h.notifiers.Done()
for {
h.Lock()
if len(h.candidates) == 0 {
Expand All @@ -105,6 +139,7 @@ func (h *handlerNotifier) EnqueueCandidate(c Candidate) {
h.candidates = append(h.candidates, c)
if !h.running {
h.running = true
h.notifiers.Add(1)
go notify()
}
}
Expand All @@ -113,7 +148,14 @@ func (h *handlerNotifier) EnqueueSelectedCandidatePair(p *CandidatePair) {
h.Lock()
defer h.Unlock()

select {
case <-h.done:
return
default:
}

notify := func() {
defer h.notifiers.Done()
for {
h.Lock()
if len(h.selectedCandidatePairs) == 0 {
Expand All @@ -131,6 +173,7 @@ func (h *handlerNotifier) EnqueueSelectedCandidatePair(p *CandidatePair) {
h.selectedCandidatePairs = append(h.selectedCandidatePairs, p)
if !h.running {
h.running = true
h.notifiers.Add(1)
go notify()
}
}
4 changes: 4 additions & 0 deletions agent_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestConnectionStateNotifier(t *testing.T) {
connectionStateFunc: func(_ ConnectionState) {
updates <- struct{}{}
},
done: make(chan struct{}),
}
// Enqueue all updates upfront to ensure that it
// doesn't block
Expand All @@ -38,6 +39,7 @@ func TestConnectionStateNotifier(t *testing.T) {
close(done)
}()
<-done
c.Close()
})
t.Run("TestUpdateOrdering", func(t *testing.T) {
report := test.CheckRoutines(t)
Expand All @@ -47,6 +49,7 @@ func TestConnectionStateNotifier(t *testing.T) {
connectionStateFunc: func(cs ConnectionState) {
updates <- cs
},
done: make(chan struct{}),
}
done := make(chan struct{})
go func() {
Expand All @@ -67,5 +70,6 @@ func TestConnectionStateNotifier(t *testing.T) {
c.EnqueueConnectionState(ConnectionState(i))
}
<-done
c.Close()
})
}
5 changes: 4 additions & 1 deletion agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,11 +1423,12 @@ func TestCloseInConnectionStateCallback(t *testing.T) {

isClosed := make(chan interface{})
isConnected := make(chan interface{})
connectionStateConnectedSeen := make(chan interface{})
err = aAgent.OnConnectionStateChange(func(c ConnectionState) {
switch c {
case ConnectionStateConnected:
<-isConnected
assert.NoError(t, aAgent.Close())
close(connectionStateConnectedSeen)
case ConnectionStateClosed:
close(isClosed)
default:
Expand All @@ -1439,6 +1440,8 @@ func TestCloseInConnectionStateCallback(t *testing.T) {

connect(aAgent, bAgent)
close(isConnected)
<-connectionStateConnectedSeen
require.NoError(t, aAgent.Close())

<-isClosed
assert.NoError(t, bAgent.Close())
Expand Down

0 comments on commit 822108b

Please sign in to comment.