diff --git a/jetstream/consumer.go b/jetstream/consumer.go index c603ad7a5..ecd75d305 100644 --- a/jetstream/consumer.go +++ b/jetstream/consumer.go @@ -55,6 +55,10 @@ type ( // Info returns [ConsumerInfo] for a given consumer func (p *pullConsumer) Info(ctx context.Context) (*ConsumerInfo, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } infoSubject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiConsumerInfoT, p.stream, p.name)) var resp consumerInfoResponse @@ -81,6 +85,10 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo { } func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig) (Consumer, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } req := createConsumerRequest{ Stream: stream, Config: &cfg, @@ -142,6 +150,10 @@ func generateConsName() string { } func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consumer, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } if err := validateConsumerName(name); err != nil { return nil, err } @@ -172,6 +184,10 @@ func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consu } func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string) error { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } if err := validateConsumerName(consumer); err != nil { return err } diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 51a28586b..6d30bb461 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -20,6 +20,7 @@ import ( "fmt" "regexp" "strings" + "time" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" @@ -195,6 +196,9 @@ type ( } ) +// defaultAPITimeout is used if context.Background() or context.TODO() is passed to API calls. +const defaultAPITimeout = 5 * time.Second + var subjectRegexp = regexp.MustCompile(`^[^ >]*[>]?$`) // New returns a new JetStream instance. @@ -297,6 +301,10 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream if err := validateStreamName(cfg.Name); err != nil { return nil, err } + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } ncfg := cfg // If we have a mirror and an external domain, convert to ext.APIPrefix. if ncfg.Mirror != nil && ncfg.Mirror.Domain != "" { @@ -378,6 +386,10 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream if err := validateStreamName(cfg.Name); err != nil { return nil, err } + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } req, err := json.Marshal(cfg) if err != nil { @@ -409,6 +421,10 @@ func (js *jetStream) Stream(ctx context.Context, name string) (Stream, error) { if err := validateStreamName(name); err != nil { return nil, err } + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } infoSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiStreamInfoT, name)) var resp streamInfoResponse @@ -434,6 +450,10 @@ func (js *jetStream) DeleteStream(ctx context.Context, name string) error { if err := validateStreamName(name); err != nil { return err } + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } deleteSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiStreamDeleteT, name)) var resp streamDeleteResponse @@ -518,6 +538,10 @@ func validateSubject(subject string) error { } func (js *jetStream) AccountInfo(ctx context.Context) (*AccountInfo, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } var resp accountInfoResponse infoSubject := apiSubj(js.apiPrefix, apiAccountInfo) @@ -548,6 +572,10 @@ func (js *jetStream) ListStreams(ctx context.Context) StreamInfoLister { errs: make(chan error, 1), } go func() { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } for { page, err := l.streamInfos(ctx) if err != nil && !errors.Is(err, ErrEndOfData) { @@ -590,6 +618,10 @@ func (js *jetStream) StreamNames(ctx context.Context) StreamNameLister { errs: make(chan error, 1), } go func() { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } for { page, err := l.streamNames(ctx) if err != nil && !errors.Is(err, ErrEndOfData) { @@ -615,6 +647,10 @@ func (js *jetStream) StreamNames(ctx context.Context) StreamNameLister { } func (js *jetStream) StreamNameBySubject(ctx context.Context, subject string) (string, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } if err := validateSubject(subject); err != nil { return "", err } @@ -700,3 +736,13 @@ func (s *streamLister) streamNames(ctx context.Context) ([]string, error) { s.offset += len(resp.Streams) return resp.Streams, nil } + +// wrapContextWithoutDeadline wraps context without deadline with default timeout. +// If deadline is already set, it will be returned as is, and cancel() will be nil. +// Caller should check if cancel() is nil before calling it. +func wrapContextWithoutDeadline(ctx context.Context) (context.Context, context.CancelFunc) { + if _, ok := ctx.Deadline(); ok { + return ctx, nil + } + return context.WithTimeout(ctx, defaultAPITimeout) +} diff --git a/jetstream/message.go b/jetstream/message.go index 3d6bf5e59..c7583a753 100644 --- a/jetstream/message.go +++ b/jetstream/message.go @@ -223,8 +223,10 @@ func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool, m.Unlock() if sync { - if _, hasDeadline := ctx.Deadline(); !hasDeadline { - return nats.ErrNoDeadlineContext + var cancel context.CancelFunc + ctx, cancel = wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() } } diff --git a/jetstream/publish.go b/jetstream/publish.go index e80f63889..6588060a4 100644 --- a/jetstream/publish.go +++ b/jetstream/publish.go @@ -131,6 +131,10 @@ func (js *jetStream) Publish(ctx context.Context, subj string, data []byte, opts // PublishMsg publishes a Msg to a stream from JetStream. func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...PublishOpt) (*PubAck, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } o := pubOpts{ retryWait: DefaultPubRetryWait, retryAttempts: DefaultPubRetryAttempts, diff --git a/jetstream/stream.go b/jetstream/stream.go index 4f29d3bfe..e3ea7e34a 100644 --- a/jetstream/stream.go +++ b/jetstream/stream.go @@ -230,6 +230,10 @@ func (s *stream) DeleteConsumer(ctx context.Context, name string) error { // [WithDeletedDetails] - use to display the information about messages deleted from a stream // [WithSubjectFilter] - use to display the information about messages stored on given subjects func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } var infoReq *streamInfoRequest for _, opt := range opts { if infoReq == nil { @@ -280,6 +284,10 @@ func (s *stream) CachedInfo() *StreamInfo { // [WithPurgeSequence] - can be used to set a specific sequence number up to which (but not including) messages will be purged from a stream // [WithPurgeKeep] - can be used to set the number of messages to be kept in the stream after purge. func (s *stream) Purge(ctx context.Context, opts ...StreamPurgeOpt) error { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } var purgeReq StreamPurgeRequest for _, opt := range opts { if err := opt(&purgeReq); err != nil { @@ -321,6 +329,10 @@ func (s *stream) GetLastMsgForSubject(ctx context.Context, subject string) (*Raw } func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStreamMsg, error) { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } req, err := json.Marshal(mreq) if err != nil { return nil, err @@ -448,6 +460,10 @@ func (s *stream) SecureDeleteMsg(ctx context.Context, seq uint64) error { } func (s *stream) deleteMsg(ctx context.Context, req *msgDeleteRequest) error { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } r, err := json.Marshal(req) if err != nil { return err @@ -471,6 +487,10 @@ func (s *stream) ListConsumers(ctx context.Context) ConsumerInfoLister { errs: make(chan error, 1), } go func() { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } for { page, err := l.consumerInfos(ctx, s.name) if err != nil && !errors.Is(err, ErrEndOfData) { @@ -514,6 +534,10 @@ func (s *stream) ConsumerNames(ctx context.Context) ConsumerNameLister { errs: make(chan error, 1), } go func() { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } for { page, err := l.consumerNames(ctx, s.name) if err != nil && !errors.Is(err, ErrEndOfData) { diff --git a/jetstream/test/jetstream_test.go b/jetstream/test/jetstream_test.go index bf9a63cf4..1fcfff72c 100644 --- a/jetstream/test/jetstream_test.go +++ b/jetstream/test/jetstream_test.go @@ -205,31 +205,48 @@ func TestCreateStream(t *testing.T) { name string stream string subject string + timeout time.Duration withError error }{ { name: "create stream, ok", stream: "foo", + timeout: 10 * time.Second, + subject: "FOO.123", + }, + { + name: "with empty context", + stream: "foo", subject: "FOO.123", }, { name: "invalid stream name", stream: "foo.123", subject: "FOO.123", + timeout: 10 * time.Second, withError: jetstream.ErrInvalidStreamName, }, { name: "stream name required", stream: "", subject: "FOO.123", + timeout: 10 * time.Second, withError: jetstream.ErrStreamNameRequired, }, { name: "stream name already in use", stream: "foo", subject: "BAR.123", + timeout: 10 * time.Second, withError: jetstream.ErrStreamNameAlreadyInUse, }, + { + name: "context timeout", + stream: "foo", + subject: "BAR.123", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -239,8 +256,6 @@ func TestCreateStream(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -249,6 +264,12 @@ func TestCreateStream(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), test.timeout) + defer cancel() + } _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: test.stream, Subjects: []string{test.subject}}) if test.withError != nil { if !errors.Is(err, test.withError) { @@ -415,31 +436,48 @@ func TestUpdateStream(t *testing.T) { name string stream string subject string + timeout time.Duration withError error }{ { name: "update existing stream", stream: "foo", subject: "BAR.123", + timeout: 10 * time.Second, + }, + { + name: "with empty context", + stream: "foo", + subject: "FOO.123", }, { name: "invalid stream name", stream: "foo.123", subject: "FOO.123", + timeout: 10 * time.Second, withError: jetstream.ErrInvalidStreamName, }, { name: "stream name required", stream: "", subject: "FOO.123", + timeout: 10 * time.Second, withError: jetstream.ErrStreamNameRequired, }, { name: "stream not found", stream: "bar", subject: "FOO.123", + timeout: 10 * time.Second, withError: jetstream.ErrStreamNotFound, }, + { + name: "context timeout", + stream: "foo", + subject: "FOO.123", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -449,19 +487,23 @@ func TestUpdateStream(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.123"}}) + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.123"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), test.timeout) + defer cancel() + } s, err := js.UpdateStream(ctx, jetstream.StreamConfig{Name: test.stream, Subjects: []string{test.subject}}) if test.withError != nil { if !errors.Is(err, test.withError) { @@ -488,27 +530,42 @@ func TestStream(t *testing.T) { name string stream string subject string + timeout time.Duration withError error }{ { - name: "get existing stream", + name: "get existing stream", + stream: "foo", + timeout: 10 * time.Second, + }, + { + name: "with empty context", stream: "foo", }, { name: "invalid stream name", stream: "foo.123", + timeout: 10 * time.Second, withError: jetstream.ErrInvalidStreamName, }, { name: "stream name required", stream: "", + timeout: 10 * time.Second, withError: jetstream.ErrStreamNameRequired, }, { name: "stream not found", stream: "bar", + timeout: 10 * time.Second, withError: jetstream.ErrStreamNotFound, }, + { + name: "context timeout", + stream: "foo", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv) @@ -517,20 +574,24 @@ func TestStream(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.123"}}) + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.123"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), test.timeout) + defer cancel() + } s, err := js.Stream(ctx, test.stream) if test.withError != nil { if !errors.Is(err, test.withError) { @@ -553,25 +614,34 @@ func TestDeleteStream(t *testing.T) { name string stream string subject string + timeout time.Duration withError error }{ { - name: "delete existing stream", - stream: "foo", + name: "delete existing stream", + stream: "foo", + timeout: 10 * time.Second, + }, + { + name: "with empty context", + stream: "bar", }, { name: "invalid stream name", stream: "foo.123", + timeout: 10 * time.Second, withError: jetstream.ErrInvalidStreamName, }, { name: "stream name required", stream: "", + timeout: 10 * time.Second, withError: jetstream.ErrStreamNameRequired, }, { name: "stream not found", stream: "foo", + timeout: 10 * time.Second, withError: jetstream.ErrStreamNotFound, }, } @@ -582,20 +652,29 @@ func TestDeleteStream(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.123"}}) + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.123"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "bar", Subjects: []string{"BAR.123"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + } err := js.DeleteStream(ctx, test.stream) if test.withError != nil { if !errors.Is(err, test.withError) { @@ -700,16 +779,29 @@ func TestListStreams(t *testing.T) { tests := []struct { name string streamsNum int + timeout time.Duration withError error }{ { name: "list streams", streamsNum: 260, + timeout: 10 * time.Second, + }, + { + name: "with empty context", + streamsNum: 260, }, { name: "no stream available", + timeout: 10 * time.Second, streamsNum: 0, }, + { + name: "context timeout", + streamsNum: 260, + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } for _, test := range tests { @@ -721,15 +813,19 @@ func TestListStreams(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), test.timeout) + defer cancel() + } js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() for i := 0; i < test.streamsNum; i++ { - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: fmt.Sprintf("foo%d", i), Subjects: []string{fmt.Sprintf("FOO.%d", i)}}) + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: fmt.Sprintf("foo%d", i), Subjects: []string{fmt.Sprintf("FOO.%d", i)}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -741,7 +837,16 @@ func TestListStreams(t *testing.T) { select { case s := <-streamsList.Info(): streams = append(streams, s) + if test.withError != nil { + t.Fatalf("Expected error: %v; got none", test.withError) + } case err := <-streamsList.Err(): + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } if !errors.Is(err, jetstream.ErrEndOfData) { t.Fatalf("Unexpected error: %v", err) } @@ -759,15 +864,28 @@ func TestStreamNames(t *testing.T) { tests := []struct { name string streamsNum int + timeout time.Duration withError error }{ { name: "list streams", streamsNum: 500, + timeout: 10 * time.Second, + }, + { + name: "with empty context", + streamsNum: 500, }, { name: "no stream available", streamsNum: 0, + timeout: 10 * time.Second, + }, + { + name: "context timeout", + streamsNum: 500, + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, }, } @@ -780,15 +898,19 @@ func TestStreamNames(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() for i := 0; i < test.streamsNum; i++ { - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: fmt.Sprintf("foo%d", i), Subjects: []string{fmt.Sprintf("FOO.%d", i)}}) + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: fmt.Sprintf("foo%d", i), Subjects: []string{fmt.Sprintf("FOO.%d", i)}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -800,7 +922,16 @@ func TestStreamNames(t *testing.T) { select { case s := <-streamsList.Name(): streams = append(streams, s) + if test.withError != nil { + t.Fatalf("Expected error: %v; got none", test.withError) + } case err := <-streamsList.Err(): + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } if !errors.Is(err, jetstream.ErrEndOfData) { t.Fatalf("Unexpected error: %v", err) } @@ -820,18 +951,27 @@ func TestJetStream_CreateOrUpdateConsumer(t *testing.T) { stream string consumerConfig jetstream.ConsumerConfig shouldCreate bool + timeout time.Duration withError error }{ { name: "create durable pull consumer", stream: "foo", consumerConfig: jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckExplicitPolicy}, + timeout: 10 * time.Second, shouldCreate: true, }, { name: "create ephemeral pull consumer", stream: "foo", consumerConfig: jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}, + timeout: 10 * time.Second, + shouldCreate: true, + }, + { + name: "with empty context", + consumerConfig: jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}, + stream: "foo", shouldCreate: true, }, { @@ -861,6 +1001,13 @@ func TestJetStream_CreateOrUpdateConsumer(t *testing.T) { consumerConfig: jetstream.ConsumerConfig{Durable: "dur.123", AckPolicy: jetstream.AckExplicitPolicy}, withError: jetstream.ErrInvalidConsumerName, }, + { + name: "context timeout", + consumerConfig: jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}, + stream: "foo", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -876,15 +1023,19 @@ func TestJetStream_CreateOrUpdateConsumer(t *testing.T) { } defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), test.timeout) + defer cancel() + } var sub *nats.Subscription if test.consumerConfig.FilterSubject != "" { sub, err = nc.SubscribeSync(fmt.Sprintf("$JS.API.CONSUMER.CREATE.foo.*.%s", test.consumerConfig.FilterSubject)) @@ -919,17 +1070,25 @@ func TestJetStream_Consumer(t *testing.T) { name string stream string durable string + timeout time.Duration withError error }{ { name: "get existing consumer", stream: "foo", durable: "dur", + timeout: 10 * time.Second, + }, + { + name: "with empty context", + stream: "foo", + durable: "dur", }, { name: "consumer does not exist", stream: "foo", durable: "abc", + timeout: 10 * time.Second, withError: jetstream.ErrConsumerNotFound, }, { @@ -950,6 +1109,13 @@ func TestJetStream_Consumer(t *testing.T) { durable: "dur", withError: jetstream.ErrInvalidStreamName, }, + { + name: "context timeout", + stream: "foo", + durable: "dur", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -965,19 +1131,23 @@ func TestJetStream_Consumer(t *testing.T) { } defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) + _, err = s.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), test.timeout) + defer cancel() + } c, err := js.Consumer(ctx, test.stream, test.durable) if test.withError != nil { if err == nil || !errors.Is(err, test.withError) { @@ -1000,17 +1170,25 @@ func TestJetStream_DeleteConsumer(t *testing.T) { name string stream string durable string + timeout time.Duration withError error }{ { name: "delete existing consumer", stream: "foo", durable: "dur", + timeout: 10 * time.Second, + }, + { + name: "with empty context", + stream: "foo", + durable: "dur2", }, { name: "consumer does not exist", stream: "foo", durable: "dur", + timeout: 10 * time.Second, withError: jetstream.ErrConsumerNotFound, }, { @@ -1031,6 +1209,13 @@ func TestJetStream_DeleteConsumer(t *testing.T) { durable: "dur", withError: jetstream.ErrInvalidStreamName, }, + { + name: "context timeout", + stream: "foo", + durable: "dur", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -1046,19 +1231,27 @@ func TestJetStream_DeleteConsumer(t *testing.T) { } defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) + _, err = s.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = s.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{Durable: "dur2", AckPolicy: jetstream.AckAllPolicy, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), test.timeout) + defer cancel() + } err := js.DeleteConsumer(ctx, test.stream, test.durable) if test.withError != nil { if err == nil || !errors.Is(err, test.withError) { @@ -1082,11 +1275,18 @@ func TestStreamNameBySubject(t *testing.T) { name string subject string withError error + timeout time.Duration expected string }{ { name: "get stream name by subject explicit", subject: "FOO.123", + timeout: 10 * time.Second, + expected: "foo", + }, + { + name: "with empty context", + subject: "FOO.123", expected: "foo", }, { @@ -1109,6 +1309,12 @@ func TestStreamNameBySubject(t *testing.T) { subject: "FOO.>.123", withError: jetstream.ErrInvalidSubject, }, + { + name: "context timeout", + subject: "FOO.123", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -1124,19 +1330,23 @@ func TestStreamNameBySubject(t *testing.T) { } defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "bar", Subjects: []string{"BAR.ABC"}}) + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "bar", Subjects: []string{"BAR.ABC"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), test.timeout) + defer cancel() + } name, err := js.StreamNameBySubject(ctx, test.subject) if test.withError != nil { if err == nil || !errors.Is(err, test.withError) { @@ -1150,7 +1360,6 @@ func TestStreamNameBySubject(t *testing.T) { if test.expected != "" && name != test.expected { t.Fatalf("Unexpected stream name; want: %s; got: %s", test.expected, name) } - }) } } diff --git a/jetstream/test/message_test.go b/jetstream/test/message_test.go index fc9efc3d8..c150ba22b 100644 --- a/jetstream/test/message_test.go +++ b/jetstream/test/message_test.go @@ -217,32 +217,6 @@ func TestAckVariants(t *testing.T) { t.Fatalf("Invalid ack body: %q", string(ack.Data)) } }) - t.Run("double ack, empty context", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - srv, nc, js, c := setup(ctx, t) - defer shutdownJSServerAndRemoveStorage(t, srv) - defer nc.Close() - - if _, err := js.Publish(ctx, "FOO.1", []byte("msg")); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - msgs, err := c.Fetch(1) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - msg := <-msgs.Messages() - if msg == nil { - t.Fatalf("No messages available") - } - if err := msgs.Error(); err != nil { - t.Fatalf("unexpected error during fetch: %v", err) - } - - if err := msg.DoubleAck(context.Background()); err == nil || !errors.Is(err, nats.ErrNoDeadlineContext) { - t.Fatalf("Expected error: %v; got: %v", nats.ErrNoDeadlineContext, err) - } - }) t.Run("standard nak", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/jetstream/test/stream_test.go b/jetstream/test/stream_test.go index 3b53f086f..c8b9ccf10 100644 --- a/jetstream/test/stream_test.go +++ b/jetstream/test/stream_test.go @@ -260,30 +260,44 @@ func TestStreamInfo(t *testing.T) { subjectsFilter string expectedSubjectMsgs map[string]uint64 deletedDetails bool + timeout time.Duration withError error }{ { - name: "info without opts", + name: "info without opts", + timeout: 5 * time.Second, + }, + { + name: "with empty context", }, { name: "with deleted details", deletedDetails: true, + timeout: 5 * time.Second, }, { name: "with subjects filter, one subject", subjectsFilter: "FOO.A", + timeout: 5 * time.Second, expectedSubjectMsgs: map[string]uint64{"FOO.A": 8}, }, { name: "with subjects filter, wildcard subject", subjectsFilter: "FOO.*", + timeout: 5 * time.Second, expectedSubjectMsgs: map[string]uint64{"FOO.A": 8, "FOO.B": 10}, }, { name: "with subjects filter, and deleted details", subjectsFilter: "FOO.A", + timeout: 5 * time.Second, expectedSubjectMsgs: map[string]uint64{"FOO.A": 8}, }, + { + name: "context timeout", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -299,29 +313,33 @@ func TestStreamInfo(t *testing.T) { } defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, Description: "desc"}) + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for i := 0; i < 10; i++ { - if _, err := js.Publish(ctx, "FOO.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { + if _, err := js.Publish(context.Background(), "FOO.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } - if _, err := js.Publish(ctx, "FOO.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { + if _, err := js.Publish(context.Background(), "FOO.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } } - if err := s.DeleteMsg(ctx, 3); err != nil { + if err := s.DeleteMsg(context.Background(), 3); err != nil { t.Fatalf("Unexpected error: %v", err) } - if err := s.DeleteMsg(ctx, 5); err != nil { + if err := s.DeleteMsg(context.Background(), 5); err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } opts := make([]jetstream.StreamInfoOpt, 0) if test.deletedDetails { opts = append(opts, jetstream.WithDeletedDetails(test.deletedDetails)) @@ -422,13 +440,20 @@ func TestGetMsg(t *testing.T) { opts []jetstream.GetMsgOpt expectedData string expectedHeaders nats.Header + timeout time.Duration withError error }{ { name: "get existing msg", seq: 2, + timeout: 5 * time.Second, expectedData: "msg 1 on subject B", }, + { + name: "with empty context", + seq: 2, + expectedData: `msg 1 on subject B`, + }, { name: "get deleted msg", seq: 3, @@ -458,6 +483,12 @@ func TestGetMsg(t *testing.T) { "X-Nats-Key": {"123"}, }, }, + { + name: "context timeout", + seq: 1, + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -473,21 +504,19 @@ func TestGetMsg(t *testing.T) { } defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - s1, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, Description: "desc"}) + s1, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for i := 1; i < 5; i++ { - if _, err := js.Publish(ctx, "FOO.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { + if _, err := js.Publish(context.Background(), "FOO.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } - if _, err := js.Publish(ctx, "FOO.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { + if _, err := js.Publish(context.Background(), "FOO.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } } - if _, err := js.PublishMsg(ctx, &nats.Msg{ + if _, err := js.PublishMsg(context.Background(), &nats.Msg{ Data: []byte("msg with headers"), Header: map[string][]string{ "X-Nats-Test-Data": {"test_data"}, @@ -497,15 +526,15 @@ func TestGetMsg(t *testing.T) { }); err != nil { t.Fatalf("Unexpected error: %v", err) } - if err := s1.DeleteMsg(ctx, 3); err != nil { + if err := s1.DeleteMsg(context.Background(), 3); err != nil { t.Fatalf("Unexpected error: %v", err) } - if err := s1.DeleteMsg(ctx, 5); err != nil { + if err := s1.DeleteMsg(context.Background(), 5); err != nil { t.Fatalf("Unexpected error: %v", err) } // same stream, but with allow direct - s2, err := js.CreateStream(ctx, + s2, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "bar", Subjects: []string{"BAR.*"}, Description: "desc", @@ -515,14 +544,14 @@ func TestGetMsg(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } for i := 1; i < 5; i++ { - if _, err := js.Publish(ctx, "BAR.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { + if _, err := js.Publish(context.Background(), "BAR.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } - if _, err := js.Publish(ctx, "BAR.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { + if _, err := js.Publish(context.Background(), "BAR.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } } - if _, err := js.PublishMsg(ctx, &nats.Msg{ + if _, err := js.PublishMsg(context.Background(), &nats.Msg{ Data: []byte("msg with headers"), Header: map[string][]string{ "X-Nats-Test-Data": {"test_data"}, @@ -532,14 +561,20 @@ func TestGetMsg(t *testing.T) { }); err != nil { t.Fatalf("Unexpected error: %v", err) } - if err := s2.DeleteMsg(ctx, 3); err != nil { + if err := s2.DeleteMsg(context.Background(), 3); err != nil { t.Fatalf("Unexpected error: %v", err) } - if err := s2.DeleteMsg(ctx, 5); err != nil { + if err := s2.DeleteMsg(context.Background(), 5); err != nil { t.Fatalf("Unexpected error: %v", err) } for _, test := range tests { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } t.Run(fmt.Sprintf("%s - %s", test.name, "allow direct: false"), func(t *testing.T) { msg, err := s1.GetMsg(ctx, test.seq, test.opts...) if test.withError != nil { @@ -587,23 +622,37 @@ func TestGetLastMsgForSubject(t *testing.T) { subject string expectedData string allowDirect bool + timeout time.Duration withError error }{ { name: "get existing msg", subject: "*.A", expectedData: "msg 4 on subject A", + timeout: 5 * time.Second, + }, + { + name: "with empty context", + subject: "*.A", + expectedData: "msg 4 on subject A", }, { name: "get last msg from stream", subject: ">", expectedData: "msg 4 on subject B", + timeout: 5 * time.Second, }, { name: "no messages on subject", subject: "*.Z", withError: jetstream.ErrMsgNotFound, }, + { + name: "context timeout", + subject: "*.A", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() @@ -619,23 +668,21 @@ func TestGetLastMsgForSubject(t *testing.T) { } defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - s1, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, Description: "desc"}) + s1, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, Description: "desc"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for i := 1; i < 5; i++ { - if _, err := js.Publish(ctx, "FOO.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { + if _, err := js.Publish(context.Background(), "FOO.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } - if _, err := js.Publish(ctx, "FOO.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { + if _, err := js.Publish(context.Background(), "FOO.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } } // same stream, but with allow direct - s2, err := js.CreateStream(ctx, + s2, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "bar", Subjects: []string{"BAR.*"}, Description: "desc", @@ -645,15 +692,21 @@ func TestGetLastMsgForSubject(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } for i := 1; i < 5; i++ { - if _, err := js.Publish(ctx, "BAR.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { + if _, err := js.Publish(context.Background(), "BAR.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } - if _, err := js.Publish(ctx, "BAR.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { + if _, err := js.Publish(context.Background(), "BAR.B", []byte(fmt.Sprintf("msg %d on subject B", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } } for _, test := range tests { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } t.Run(fmt.Sprintf("%s - %s", test.name, "allow direct: false"), func(t *testing.T) { msg, err := s1.GetLastMsgForSubject(ctx, test.subject) if test.withError != nil { @@ -691,17 +744,29 @@ func TestDeleteMsg(t *testing.T) { tests := []struct { name string seq uint64 + timeout time.Duration withError error }{ { - name: "delete message", - seq: 3, + name: "delete message", + seq: 3, + timeout: 5 * time.Second, + }, + { + name: "with empty context", + seq: 2, }, { name: "msg not found", seq: 10, withError: jetstream.ErrMsgDeleteUnsuccessful, }, + { + name: "context timeout", + seq: 1, + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv) @@ -732,6 +797,12 @@ func TestDeleteMsg(t *testing.T) { } for _, test := range tests { + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } t.Run(test.name, func(t *testing.T) { sub, err := nc.SubscribeSync("$JS.API.STREAM.MSG.DELETE.foo") if err != nil { @@ -839,16 +910,28 @@ func TestListConsumers(t *testing.T) { tests := []struct { name string consumersNum int + timeout time.Duration withError error }{ { name: "list consumers", consumersNum: 500, + timeout: 5 * time.Second, + }, + { + name: "with empty context", + consumersNum: 500, }, { name: "no consumers available", consumersNum: 0, }, + { + name: "context timeout", + consumersNum: 500, + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } for _, test := range tests { @@ -860,23 +943,27 @@ func TestListConsumers(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() - s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for i := 0; i < test.consumersNum; i++ { - _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + _, err = s.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } } + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } consumersList := s.ListConsumers(ctx) consumers := make([]*jetstream.ConsumerInfo, 0) Loop: @@ -884,7 +971,16 @@ func TestListConsumers(t *testing.T) { select { case s := <-consumersList.Info(): consumers = append(consumers, s) + if test.withError != nil { + t.Fatalf("Expected error: %v; got none", test.withError) + } case err := <-consumersList.Err(): + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } if !errors.Is(err, jetstream.ErrEndOfData) { t.Fatalf("Unexpected error: %v", err) } @@ -902,15 +998,28 @@ func TestConsumerNames(t *testing.T) { tests := []struct { name string consumersNum int + timeout time.Duration withError error }{ { name: "list consumer names", consumersNum: 500, + timeout: 5 * time.Second, + }, + { + name: "with empty context", + consumersNum: 500, }, { name: "no consumers available", consumersNum: 0, + timeout: 5 * time.Second, + }, + { + name: "context timeout", + consumersNum: 500, + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, }, } @@ -923,23 +1032,27 @@ func TestConsumerNames(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() - s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for i := 0; i < test.consumersNum; i++ { - _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + _, err = s.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) } } + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } consumersList := s.ConsumerNames(ctx) consumers := make([]string, 0) Loop: @@ -947,7 +1060,16 @@ func TestConsumerNames(t *testing.T) { select { case s := <-consumersList.Name(): consumers = append(consumers, s) + if test.withError != nil { + t.Fatalf("Expected error: %v; got none", test.withError) + } case err := <-consumersList.Err(): + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } if !errors.Is(err, jetstream.ErrEndOfData) { t.Fatalf("Unexpected error: %v", err) } @@ -966,42 +1088,58 @@ func TestPurgeStream(t *testing.T) { name string opts []jetstream.StreamPurgeOpt expectedSeq []uint64 + timeout time.Duration withError error }{ { name: "purge all messages", expectedSeq: []uint64{}, + timeout: 5 * time.Second, + }, + { + name: "with empty context", + expectedSeq: []uint64{}, }, { name: "purge on subject", opts: []jetstream.StreamPurgeOpt{jetstream.WithPurgeSubject("FOO.2")}, expectedSeq: []uint64{1, 3, 5, 7, 9}, + timeout: 5 * time.Second, }, { name: "purge with sequence", opts: []jetstream.StreamPurgeOpt{jetstream.WithPurgeSequence(5)}, expectedSeq: []uint64{5, 6, 7, 8, 9, 10}, + timeout: 5 * time.Second, }, { name: "purge with keep", opts: []jetstream.StreamPurgeOpt{jetstream.WithPurgeKeep(3)}, expectedSeq: []uint64{8, 9, 10}, + timeout: 5 * time.Second, }, { name: "purge with filter and sequence", opts: []jetstream.StreamPurgeOpt{jetstream.WithPurgeSubject("FOO.2"), jetstream.WithPurgeSequence(8)}, expectedSeq: []uint64{1, 3, 5, 7, 8, 9, 10}, + timeout: 5 * time.Second, }, { name: "purge with filter and keep", opts: []jetstream.StreamPurgeOpt{jetstream.WithPurgeSubject("FOO.2"), jetstream.WithPurgeKeep(3)}, expectedSeq: []uint64{1, 3, 5, 6, 7, 8, 9, 10}, + timeout: 5 * time.Second, }, { name: "with sequence and keep", opts: []jetstream.StreamPurgeOpt{jetstream.WithPurgeSequence(5), jetstream.WithPurgeKeep(3)}, withError: jetstream.ErrInvalidOption, }, + { + name: "context timeout", + timeout: 50 * time.Microsecond, + withError: context.DeadlineExceeded, + }, } for _, test := range tests { @@ -1013,27 +1151,31 @@ func TestPurgeStream(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() - s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } for i := 0; i < 5; i++ { - if _, err := js.Publish(ctx, "FOO.1", []byte(fmt.Sprintf("msg %d on FOO.1", i))); err != nil { + if _, err := js.Publish(context.Background(), "FOO.1", []byte(fmt.Sprintf("msg %d on FOO.1", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } - if _, err := js.Publish(ctx, "FOO.2", []byte(fmt.Sprintf("msg %d on FOO.2", i))); err != nil { + if _, err := js.Publish(context.Background(), "FOO.2", []byte(fmt.Sprintf("msg %d on FOO.2", i))); err != nil { t.Fatalf("Unexpected error: %v", err) } } + ctx := context.Background() + if test.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } err = s.Purge(ctx, test.opts...) if test.withError != nil { if err == nil || !errors.Is(err, test.withError) {