Skip to content

Commit

Permalink
Support app shutdown before app.Done() is called (#805)
Browse files Browse the repository at this point in the history
* Record app shutdown and offer the signal on app.Done()

* Add data race test

* Add lock for shutdown signal

* Pull test into function and block go routines on ready channel

* lint

* Return channel early if shutdown signal received

* Remove comment

* Remove fmt qualifier for assert error message

* Clarify comment

* Assert no error on shutdown for TestDataRace

* Apply suggestions from code review

* go fmt

Co-authored-by: Abhinav Gupta <[email protected]>
  • Loading branch information
manjari25 and abhinav authored Nov 10, 2021
1 parent 5bb9132 commit cc87b09
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 6 deletions.
17 changes: 13 additions & 4 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,9 @@ type App struct {
errorHooks []ErrorHandler
validate bool
// Used to signal shutdowns.
donesMu sync.RWMutex
dones []chan os.Signal
donesMu sync.Mutex // guards dones and shutdownSig
dones []chan os.Signal
shutdownSig os.Signal

osExit func(code int) // os.Exit override; used for testing only
}
Expand Down Expand Up @@ -787,11 +788,19 @@ func (app *App) Stop(ctx context.Context) (err error) {
// using the Shutdown functionality (see the Shutdowner documentation for details).
func (app *App) Done() <-chan os.Signal {
c := make(chan os.Signal, 1)
signal.Notify(c, _sigINT, _sigTERM)

app.donesMu.Lock()
defer app.donesMu.Unlock()
// If shutdown signal has been received already
// send it and return. If not, wait for user to send a termination
// signal.
if app.shutdownSig != nil {
c <- app.shutdownSig
return c
}

signal.Notify(c, _sigINT, _sigTERM)
app.dones = append(app.dones, c)
app.donesMu.Unlock()
return c
}

Expand Down
6 changes: 4 additions & 2 deletions shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ func (app *App) shutdowner() Shutdowner {
}

func (app *App) broadcastSignal(signal os.Signal) error {
app.donesMu.RLock()
defer app.donesMu.RUnlock()
app.donesMu.Lock()
defer app.donesMu.Unlock()

app.shutdownSig = signal

var unsent int
for _, done := range app.dones {
Expand Down
61 changes: 61 additions & 0 deletions shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
package fx_test

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
"go.uber.org/fx/fxtest"
)
Expand Down Expand Up @@ -65,4 +68,62 @@ func TestShutdown(t *testing.T) {
"unexpected error returned when shutdown is called with a blocked channel")
assert.NotNil(t, <-done, "done channel did not receive signal")
})

t.Run("shutdown app before calling Done()", func(t *testing.T) {
t.Parallel()

var s fx.Shutdowner
app := fxtest.New(
t,
fx.Populate(&s),
)

require.NoError(t, app.Start(context.Background()), "error starting app")
assert.NoError(t, s.Shutdown(), "error in app shutdown")
done1, done2 := app.Done(), app.Done()
defer app.Stop(context.Background())
// Receiving on done1 and done2 will deadlock in the event that app.Done()
// doesn't work as expected.
assert.NotNil(t, <-done1, "done channel 1 did not receive signal")
assert.NotNil(t, <-done2, "done channel 2 did not receive signal")
})
}

func TestDataRace(t *testing.T) {
t.Parallel()

var s fx.Shutdowner
app := fxtest.New(
t,
fx.Populate(&s),
)
require.NoError(t, app.Start(context.Background()), "error starting app")

const N = 50
ready := make(chan struct{}) // used to orchestrate goroutines for Done() and ShutdownOption()
var wg sync.WaitGroup // tracks and waits for all goroutines

// Spawn N goroutines, each of which call app.Done() and assert
// the signal received.
wg.Add(N)
for i := 0; i < N; i++ {
i := i
go func() {
defer wg.Done()
<-ready
done := app.Done()
assert.NotNil(t, <-done, "done channel %v did not receive signal", i)
}()
}

// call Shutdown()
wg.Add(1)
go func() {
<-ready
defer wg.Done()
assert.NoError(t, s.Shutdown(), "error in app shutdown")
}()

close(ready)
wg.Wait()
}

0 comments on commit cc87b09

Please sign in to comment.