Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS Ack/Nak to nats jetstream V3 Finish #1104

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions protocol/nats_jetstream/v3/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/cloudevents/sdk-go/v2/protocol"
)

const (
Expand Down Expand Up @@ -105,6 +106,35 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)

// Finish *must* be called when message from a Receiver can be forgotten by the receiver.
func (m *Message) Finish(err error) error {
// Ack and Nak first checks to see if the message has been acknowleged
// and if Ack/Nak was done, it immediately returns an error without applying any logic to the message on the server.
// Nak will only be sent if the error given is explictly a NACK error(protocol.ResultNACK).
// AckPolicy effects if an explict Ack/Nak is needed.
// AckExplicit: The default policy. Each individual message must be acknowledged.
// Recommended for most reliability and functionality.
// AckNone: No acknowledgment needed; the server assumes acknowledgment on delivery.
// AckAll: Acknowledge only the last message received in a series; all previous messages are automatically acknowledged.
// Will acknowledge all pending messages for all subscribers for Pull Consumer.
// see: github.com/nats-io/nats.go/jetstream/ConsumerConfig.AckPolicy
if m.Msg == nil {
return nil
}
if protocol.IsNACK(err) {
if err = m.Msg.Nak(); err != jetstream.ErrMsgAlreadyAckd {
return err
}
}
if protocol.IsACK(err) {
if err = m.Msg.Ack(); err != jetstream.ErrMsgAlreadyAckd {
return err
}
}

// In the case that we receive an unknown error, the intent of whether the message should Ack/Nak is unknown.
// When this happens, the ack/nak behavior will be based on the consumer configuration. There are several options such as:
// AckPolicy, AckWait, MaxDeliver, MaxAckPending
// that determine how messages would be redelivered by the server.
// [consumers configuration]: https://docs.nats.io/nats-concepts/jetstream/consumers#configuration
return nil
}

Expand Down
123 changes: 122 additions & 1 deletion protocol/nats_jetstream/v3/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"reflect"
"testing"

"github.com/cloudevents/sdk-go/v2/binding/spec"
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test"
"github.com/cloudevents/sdk-go/v2/protocol"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/test"
Expand All @@ -23,11 +25,17 @@ import (

type jetStreamMsg struct {
jetstream.Msg
msg *nats.Msg
msg *nats.Msg
ackCalled bool
ackErr error
nackCalled bool
nackErr error
}

func (j *jetStreamMsg) Data() []byte { return j.msg.Data }
func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header }
func (j *jetStreamMsg) Ack() error { j.ackCalled = true; return j.ackErr }
func (j *jetStreamMsg) Nak() error { j.nackCalled = true; return j.nackErr }

var (
outBinaryMessage = bindingtest.MockBinaryMessage{
Expand Down Expand Up @@ -190,3 +198,116 @@ func TestGetExtension(t *testing.T) {
})
}
}

func TestFinish(t *testing.T) {
type args struct {
err error
ackErr error
nakErr error
}
type wants struct {
err error
ackCalled bool
nackCalled bool
}
tests := []struct {
name string
args args
wants wants
}{
{
name: "nil error given",
args: args{
err: nil,
},
wants: wants{
err: nil,
ackCalled: true,
nackCalled: false,
},
},
{
name: "ACK error given",
args: args{
err: protocol.ResultACK,
},
wants: wants{
err: nil,
ackCalled: true,
nackCalled: false,
},
},
{
name: "NACK error given",
args: args{
err: protocol.ResultNACK,
},
wants: wants{
err: nil,
ackCalled: false,
nackCalled: true,
},
stephen-totty-hpe marked this conversation as resolved.
Show resolved Hide resolved
},
{
name: "unknown error given",
args: args{
err: errors.New("unknown"),
},
wants: wants{
err: nil,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're slurping the error and returning nil? Is this the correct behavior when finishing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the previous behavior. I am open to changing this behavior if there is support to change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, good question if we want consistency (but ambiguous behavior) or fix it. What do you think? Perhaps someone using v3 would not use the prior implementations and therefore we should do what's right for users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at
https:/cloudevents/sdk-go/blob/main/v2/client/client.go#L263
and
https:/cloudevents/sdk-go/blob/main/v2/client/invoker.go#L59

I believe the error returned from Finish is only used for logging:
cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err)

So the real question is more about whether we should call Ack() or Nak() in the unknown error case.
In the case that Ack() or Nak() is not called, the ACK/NACK behavior on the server is based on how the NATS consumer is setup.
There are three options: AckNonePolicy, AckAllPolicy, AckExplicitPolicy

So I would think if you had AckNonePolicy or AckAllPolicy, not calling Ack() would result in the server auto-acking.
If you had AckExplicitPolicy set, then failing to call Ack() would result in the message being redelivered.

This might be correct as it would "default" to how the user setup the consumer.
NATS has a MaxDeliver option which would give up after the sending "MaxDeliver times", but the default is "unlimited".

I am inclined to give the user the benefit of defining that behavior, but I am not married to that feeling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sounds reasonable! Feel free to document the behavior e.g., adding a README or code comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

ackCalled: false,
nackCalled: false,
},
},
{
name: "jetstream.ErrMsgAlreadyAckd error returned from Ack",
args: args{
err: protocol.ResultACK,
ackErr: jetstream.ErrMsgAlreadyAckd,
},
wants: wants{
err: nil,
ackCalled: true,
nackCalled: false,
},
},
{
name: "jetstream.ErrMsgAlreadyAckd error returned from Nak",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, there's a case when a (protocol) NACK would contain NATS ACK-ed in this implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nak and ack make the same NATS call deep down with a minor difference in the ackType argument passed via the API. The ErrMsgAlreadyAckd error that comes back simply means that the message has been already "marked" as having been processed. It is used for both ack and nak.

In practice, a message should not be marked ack or nak before calling Finish. But there may be cases where a binding may have a behavior to handle MalformedEvent or a responder may be interjected. In these cases, I believe Finish will still only called once. The error check is simply there to not send back an error in the unlikely case somehow Finish is called more than once.

args: args{
err: protocol.ResultNACK,
nakErr: jetstream.ErrMsgAlreadyAckd,
},
wants: wants{
err: nil,
ackCalled: false,
nackCalled: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
binaryReceiverMessage.ackCalled = false
binaryReceiverMessage.ackErr = tt.args.ackErr
binaryReceiverMessage.nackCalled = false
binaryReceiverMessage.nackErr = tt.args.nakErr
message := NewMessage(binaryReceiverMessage)
if message == nil {
t.Errorf("Error in NewMessage!")
}
gotErr := message.Finish(tt.args.err)
if gotErr != tt.wants.err {
t.Errorf("ExpectedErr %s, while got %s", tt.wants.err, gotErr)
}
var mockMessage *jetStreamMsg
if message != nil {
mockMessage = message.Msg.(*jetStreamMsg)
}
if mockMessage.ackCalled != tt.wants.ackCalled {
t.Errorf("ExpectedAck %t, while got %t", tt.wants.ackCalled, mockMessage.ackCalled)
}
if mockMessage.nackCalled != tt.wants.nackCalled {
t.Errorf("ExpectedNack %t, while got %t", tt.wants.nackCalled, mockMessage.nackCalled)
}
})
}
}
2 changes: 1 addition & 1 deletion protocol/nats_jetstream/v3/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func WithConnection(conn *nats.Conn) ProtocolOption {
// WithJetStreamOptions sets jetstream options used in the protocol sender and receiver
func WithJetStreamOptions(jetStreamOpts []jetstream.JetStreamOpt) ProtocolOption {
return func(p *Protocol) error {
p.jetSteamOpts = jetStreamOpts
p.jetStreamOpts = jetStreamOpts
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/nats_jetstream/v3/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestWithJetStreamOptions(t *testing.T) {
wants: wants{
err: nil,
protocol: &Protocol{
jetSteamOpts: jetStreamOpts,
jetStreamOpts: jetStreamOpts,
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions protocol/nats_jetstream/v3/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Protocol struct {
natsOpts []nats.Option

// jetstream options
jetSteamOpts []jetstream.JetStreamOpt
jetStream jetstream.JetStream
jetStreamOpts []jetstream.JetStreamOpt
jetStream jetstream.JetStream

// receiver
incoming chan msgErr
Expand Down Expand Up @@ -76,7 +76,7 @@ func New(ctx context.Context, opts ...ProtocolOption) (*Protocol, error) {
}
}

if p.jetStream, errConnection = jetstream.New(p.conn, p.jetSteamOpts...); errConnection != nil {
if p.jetStream, errConnection = jetstream.New(p.conn, p.jetStreamOpts...); errConnection != nil {
return nil, errConnection
}

Expand Down