Skip to content

Commit

Permalink
fix:add graceful shutdown to courier handler (#296)
Browse files Browse the repository at this point in the history
Courier would not stop with the provided Background handler.
This changes the methods of Courier so that the graceful package can be
used in the same way as the http endpoints can be used.

Closes #295
  • Loading branch information
Gibheer authored and aeneasr committed Apr 7, 2020
1 parent e9b124c commit ddff709
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
4 changes: 2 additions & 2 deletions cmd/daemon/serve.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package daemon

import (
stdctx "context"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -165,9 +164,10 @@ func sqa(cmd *cobra.Command, d driver.Driver) *metricsx.Service {
func bgTasks(d driver.Driver, wg *sync.WaitGroup, cmd *cobra.Command, args []string) {
defer wg.Done()

if err := d.Registry().Courier().Work(stdctx.Background()); err != nil {
if err := graceful.Graceful(d.Registry().Courier().Work, d.Registry().Courier().Shutdown); err != nil {
d.Logger().WithError(err).Fatalf("Failed to run courier worker.")
}
d.Logger().Println("courier worker was shutdown gracefully")
}

func ServeAll(d driver.Driver) func(cmd *cobra.Command, args []string) {
Expand Down
26 changes: 20 additions & 6 deletions courier/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type (
dialer *gomail.Dialer
d smtpDependencies
c configuration.Provider
// graceful shutdown handling
ctx context.Context
shutdown context.CancelFunc
}
Provider interface {
Courier() *Courier
Expand All @@ -38,9 +41,12 @@ func NewSMTP(d smtpDependencies, c configuration.Provider) *Courier {
sslSkipVerify, _ := strconv.ParseBool(uri.Query().Get("skip_ssl_verify"))
password, _ := uri.User.Password()
port, _ := strconv.ParseInt(uri.Port(), 10, 64)
ctx, cancel := context.WithCancel(context.Background())
return &Courier{
d: d,
c: c,
d: d,
c: c,
ctx: ctx,
shutdown: cancel,
dialer: &gomail.Dialer{
Host: uri.Hostname(),
Port: int(port),
Expand Down Expand Up @@ -82,20 +88,28 @@ func (m *Courier) QueueEmail(ctx context.Context, t EmailTemplate) (uuid.UUID, e
return message.ID, nil
}

func (m *Courier) Work(ctx context.Context) error {
func (m *Courier) Work() error {
errChan := make(chan error)
defer close(errChan)

go m.watchMessages(ctx, errChan)
go m.watchMessages(m.ctx, errChan)

select {
case <-ctx.Done():
return ctx.Err()
case <-m.ctx.Done():
if m.ctx.Err() == context.Canceled {
return nil
}
return m.ctx.Err()
case err := <-errChan:
return err
}
}

func (m *Courier) Shutdown(ctx context.Context) error {
m.shutdown()
return nil
}

func (m *Courier) watchMessages(ctx context.Context, errChan chan error) {
for {
if err := backoff.Retry(func() error {
Expand Down
2 changes: 1 addition & 1 deletion courier/courier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestSMTP(t *testing.T) {
c := reg.Courier()

go func() {
require.NoError(t, c.Work(context.Background()))
require.NoError(t, c.Work())
}()

t.Run("case=queue messages", func(t *testing.T) {
Expand Down

0 comments on commit ddff709

Please sign in to comment.