Skip to content

Commit

Permalink
feat: replace message_ttl with static max retry count (ory#2638)
Browse files Browse the repository at this point in the history
This PR replaces the `courier.message_ttl` configuration option with a `courier.message_retries` option to limit how often the sending of a message is retried before it is marked as `abandoned`. 

BREAKING CHANGES: This is a breaking change, as it removes the `courier.message_ttl` config key and replaces it with a counter `courier.message_retries`.

Closes ory#402
Closes ory#1598
  • Loading branch information
jonas-jonas authored Aug 4, 2022
1 parent 7ffbcd2 commit a409ad2
Show file tree
Hide file tree
Showing 18 changed files with 213 additions and 78 deletions.
14 changes: 6 additions & 8 deletions .schemastore/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1471,15 +1471,13 @@
"/conf/courier-templates"
]
},
"message_ttl": {
"description": "Defines a Time-To-Live for courier messages that could not be delivered. After the defined TTL has expired for a message that message is abandoned.",
"type": "string",
"pattern": "^([0-9]+(ns|us|ms|s|m|h))+$",
"default": "1h",
"message_retries": {
"description": "Defines the maximum number of times the sending of a message is retried after it failed before it is marked as abandoned",
"type": "integer",
"default": 5,
"examples": [
"1h",
"1m",
"1s"
10,
60
]
},
"smtp": {
Expand Down
64 changes: 38 additions & 26 deletions courier/courier_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,37 @@ package courier

import (
"context"
"time"

"github.com/pkg/errors"
)

func (c *courier) DispatchMessage(ctx context.Context, msg Message) error {
maxRetries := c.deps.CourierConfig(ctx).CourierMessageRetries()

if msg.SendCount > maxRetries {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to reset the retried message's status to "abandoned".`)
return err
}

// Skip the message
c.deps.Logger().
WithField("message_id", msg.ID).
Warnf(`Message was abandoned because it did not deliver after %d attempts`, msg.SendCount)
return nil
}

if err := c.deps.CourierPersister().IncrementMessageSendCount(ctx, msg.ID); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to increment the message's "send_count" field`)
return err
}

switch msg.Type {
case MessageTypeEmail:
if err := c.dispatchEmail(ctx, msg); err != nil {
Expand Down Expand Up @@ -48,35 +73,22 @@ func (c *courier) DispatchQueue(ctx context.Context) error {
return err
}

ttl := c.deps.CourierConfig(ctx).CourierMessageTTL()

for k, msg := range messages {
if time.Now().After(msg.CreatedAt.Add(ttl)) {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
if c.failOnError {
return err
}
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to reset the timed out message's status to "abandoned".`)
}
} else {
if err := c.DispatchMessage(ctx, msg); err != nil {
for _, replace := range messages[k:] {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, replace.ID, MessageStatusQueued); err != nil {
if c.failOnError {
return err
}
c.deps.Logger().
WithError(err).
WithField("message_id", replace.ID).
Error(`Unable to reset the failed message's status to "queued".`)
if err := c.DispatchMessage(ctx, msg); err != nil {

for _, replace := range messages[k:] {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, replace.ID, MessageStatusQueued); err != nil {
if c.failOnError {
return err
}
c.deps.Logger().
WithError(err).
WithField("message_id", replace.ID).
Error(`Unable to reset the failed message's status to "queued".`)
}

return err
}

return err
}
}

Expand Down
75 changes: 65 additions & 10 deletions courier/courier_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,75 @@ package courier_test
import (
"context"
"testing"
"time"

"github.com/gofrs/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/ory/kratos/courier"
"github.com/ory/kratos/courier/template"
templates "github.com/ory/kratos/courier/template/email"
"github.com/ory/kratos/driver/config"
"github.com/ory/kratos/internal"
)

func TestMessageTTL(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
func queueNewMessage(t *testing.T, ctx context.Context, c courier.Courier, d template.Dependencies) uuid.UUID {
t.Helper()
id, err := c.QueueEmail(ctx, templates.NewTestStub(d, &templates.TestStubModel{
To: "[email protected]",
Subject: "test-subject-1",
Body: "test-body-1",
}))
require.NoError(t, err)
return id
}

func TestDispatchMessageWithInvalidSMTP(t *testing.T) {
ctx := context.Background()

conf, reg := internal.NewRegistryDefaultWithDSN(t, "")
conf.MustSet(config.ViperKeyCourierMessageTTL, 1*time.Nanosecond)
conf.MustSet(config.ViperKeyCourierMessageRetries, 5)
conf.MustSet(config.ViperKeyCourierSMTPURL, "http://foo.url")

reg.Logger().Level = logrus.TraceLevel
ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := reg.Courier(ctx)

t.Run("case=failed sending", func(t *testing.T) {
id := queueNewMessage(t, ctx, c, reg)
message, err := reg.CourierPersister().LatestQueuedMessage(ctx)
require.NoError(t, err)
require.Equal(t, id, message.ID)

err = c.DispatchMessage(ctx, *message)
// sending the email fails, because there is no SMTP server at foo.url
require.Error(t, err)

messages, err := reg.CourierPersister().NextMessages(ctx, 10)
require.Len(t, messages, 1)
})

t.Run("case=max retries reached", func(t *testing.T) {
id := queueNewMessage(t, ctx, c, reg)
message, err := reg.CourierPersister().LatestQueuedMessage(ctx)
require.NoError(t, err)
require.Equal(t, id, message.ID)
message.SendCount = 6

err = c.DispatchMessage(ctx, *message)
require.NoError(t, err)

messages, err := reg.CourierPersister().NextMessages(ctx, 1)
require.Empty(t, messages)
})

}

func TestDispatchMessage2(t *testing.T) {
ctx := context.Background()

conf, reg := internal.NewRegistryDefaultWithDSN(t, "")
conf.MustSet(config.ViperKeyCourierMessageRetries, 1)

c := reg.Courier(ctx)

Expand All @@ -39,9 +86,17 @@ func TestMessageTTL(t *testing.T) {
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id)

c.DispatchQueue(ctx)
// Fails to deliver the first time
err = c.DispatchQueue(ctx)
require.Error(t, err)

// Retry once, as we set above - still fails
err = c.DispatchQueue(ctx)
require.Error(t, err)

time.Sleep(1 * time.Second)
// Now it has been retried once, which means 2 > 1 is true and it is no longer tried
err = c.DispatchQueue(ctx)
require.NoError(t, err)

var message courier.Message
err = reg.Persister().GetConnection(ctx).
Expand Down
21 changes: 11 additions & 10 deletions courier/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ const (

// swagger:ignore
type Message struct {
ID uuid.UUID `json:"-" faker:"-" db:"id"`
NID uuid.UUID `json:"-" faker:"-" db:"nid"`
Status MessageStatus `json:"-" db:"status"`
Type MessageType `json:"-" db:"type"`
Recipient string `json:"-" db:"recipient"`
Body string `json:"-" db:"body"`
Subject string `json:"-" db:"subject"`
TemplateType TemplateType `json:"-" db:"template_type"`
ID uuid.UUID `json:"id" faker:"-" db:"id"`
NID uuid.UUID `json:"-" faker:"-" db:"nid"`
Status MessageStatus `json:"status" db:"status"`
Type MessageType `json:"type" db:"type"`
Recipient string `json:"recipient" db:"recipient"`
Body string `json:"body" db:"body"`
Subject string `json:"subject" db:"subject"`
TemplateType TemplateType `json:"template_type" db:"template_type"`
TemplateData []byte `json:"-" db:"template_data"`
SendCount int `json:"send_count" db:"send_count"`

// CreatedAt is a helper struct field for gobuffalo.pop.
CreatedAt time.Time `json:"-" faker:"-" db:"created_at"`
CreatedAt time.Time `json:"created_at" faker:"-" db:"created_at"`
// UpdatedAt is a helper struct field for gobuffalo.pop.
UpdatedAt time.Time `json:"-" faker:"-" db:"updated_at"`
UpdatedAt time.Time `json:"updated_at" faker:"-" db:"updated_at"`
}

func (m Message) TableName(ctx context.Context) string {
Expand Down
2 changes: 2 additions & 0 deletions courier/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type (
SetMessageStatus(context.Context, uuid.UUID, MessageStatus) error

LatestQueuedMessage(ctx context.Context) (*Message, error)

IncrementMessageSendCount(context.Context, uuid.UUID) error
}
PersistenceProvider interface {
CourierPersister() Persister
Expand Down
14 changes: 14 additions & 0 deletions courier/smtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"net/textproto"
"strconv"
"time"

Expand Down Expand Up @@ -199,6 +200,19 @@ func (c *courier) dispatchEmail(ctx context.Context, msg Message) error {
// WithField("email_to", msg.Recipient).
WithField("message_from", from).
Error("Unable to send email using SMTP connection.")

var protoErr *textproto.Error
if containsProtoErr := errors.As(err, &protoErr); containsProtoErr && protoErr.Code >= 500 {
// See https://en.wikipedia.org/wiki/List_of_SMTP_server_return_codes
// If the SMTP server responds with 5xx, sending the message should not be retried (without changing something about the request)
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to reset the retried message's status to "abandoned".`)
return err
}
}
return errors.WithStack(err)
}

Expand Down
12 changes: 12 additions & 0 deletions courier/test/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ func TestPersister(ctx context.Context, newNetworkUnlessExisting NetworkWrapper,
require.ErrorIs(t, err, courier.ErrQueueEmpty)
})

t.Run("case=incrementing send count", func(t *testing.T) {
originalSendCount := messages[0].SendCount
require.NoError(t, p.SetMessageStatus(ctx, messages[0].ID, courier.MessageStatusQueued))

require.NoError(t, p.IncrementMessageSendCount(ctx, messages[0].ID))
ms, err := p.NextMessages(ctx, 1)
require.NoError(t, err)
require.Len(t, ms, 1)
assert.Equal(t, messages[0].ID, ms[0].ID)
assert.Equal(t, originalSendCount+1, ms[0].SendCount)
})

t.Run("case=network", func(t *testing.T) {
id := x.NewUUID()

Expand Down
8 changes: 4 additions & 4 deletions driver/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (
ViperKeyCourierSMSRequestConfig = "courier.sms.request_config"
ViperKeyCourierSMSEnabled = "courier.sms.enabled"
ViperKeyCourierSMSFrom = "courier.sms.from"
ViperKeyCourierMessageTTL = "courier.message_ttl"
ViperKeyCourierMessageRetries = "courier.message_retries"
ViperKeySecretsDefault = "secrets.default"
ViperKeySecretsCookie = "secrets.cookie"
ViperKeySecretsCipher = "secrets.cipher"
Expand Down Expand Up @@ -257,7 +257,7 @@ type (
CourierTemplatesVerificationValid() *CourierEmailTemplate
CourierTemplatesRecoveryInvalid() *CourierEmailTemplate
CourierTemplatesRecoveryValid() *CourierEmailTemplate
CourierMessageTTL() time.Duration
CourierMessageRetries() int
}
)

Expand Down Expand Up @@ -948,8 +948,8 @@ func (p *Config) CourierTemplatesRecoveryValid() *CourierEmailTemplate {
return p.CourierTemplatesHelper(ViperKeyCourierTemplatesRecoveryValidEmail)
}

func (p *Config) CourierMessageTTL() time.Duration {
return p.p.DurationF(ViperKeyCourierMessageTTL, time.Hour)
func (p *Config) CourierMessageRetries() int {
return p.p.IntF(ViperKeyCourierMessageRetries, 5)
}

func (p *Config) CourierSMTPHeaders() map[string]string {
Expand Down
6 changes: 3 additions & 3 deletions driver/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,13 +1110,13 @@ func TestCourierMessageTTL(t *testing.T) {

t.Run("case=configs set", func(t *testing.T) {
conf, _ := config.New(ctx, logrusx.New("", ""), os.Stderr,
configx.WithConfigFiles("stub/.kratos.courier.messageTTL.yaml"), configx.SkipValidation())
assert.Equal(t, conf.CourierMessageTTL(), time.Duration(5*time.Minute))
configx.WithConfigFiles("stub/.kratos.courier.message_retries.yaml"), configx.SkipValidation())
assert.Equal(t, conf.CourierMessageRetries(), 10)
})

t.Run("case=defaults", func(t *testing.T) {
conf, _ := config.New(ctx, logrusx.New("", ""), os.Stderr, configx.SkipValidation())
assert.Equal(t, conf.CourierMessageTTL(), time.Duration(1*time.Hour))
assert.Equal(t, conf.CourierMessageRetries(), 5)
})
}

Expand Down
2 changes: 0 additions & 2 deletions driver/config/stub/.kratos.courier.messageTTL.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions driver/config/stub/.kratos.courier.message_retries.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
courier:
message_retries: 10
14 changes: 6 additions & 8 deletions embedx/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1549,15 +1549,13 @@
"/conf/courier-templates"
]
},
"message_ttl": {
"description": "Defines a Time-To-Live for courier messages that could not be delivered. After the defined TTL has expired for a message that message is abandoned.",
"type": "string",
"pattern": "^([0-9]+(ns|us|ms|s|m|h))+$",
"default": "1h",
"message_retries": {
"description": "Defines the maximum number of times the sending of a message is retried after it failed before it is marked as abandoned",
"type": "integer",
"default": 5,
"examples": [
"1h",
"1m",
"1s"
10,
60
]
},
"smtp": {
Expand Down
Loading

0 comments on commit a409ad2

Please sign in to comment.