Skip to content

Commit

Permalink
[CHANGED] Add default timeout in JetStream when empty context is used
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Jul 15, 2023
1 parent 79f5f2f commit ecb1e04
Show file tree
Hide file tree
Showing 8 changed files with 526 additions and 109 deletions.
16 changes: 16 additions & 0 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type (

// Info returns [ConsumerInfo] for a given consumer
func (p *pullConsumer) Info(ctx context.Context) (*ConsumerInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
infoSubject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiConsumerInfoT, p.stream, p.name))
var resp consumerInfoResponse

Expand All @@ -81,6 +85,10 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo {
}

func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig) (Consumer, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
req := createConsumerRequest{
Stream: stream,
Config: &cfg,
Expand Down Expand Up @@ -142,6 +150,10 @@ func generateConsName() string {
}

func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consumer, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateConsumerName(name); err != nil {
return nil, err
}
Expand Down Expand Up @@ -172,6 +184,10 @@ func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consu
}

func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string) error {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateConsumerName(consumer); err != nil {
return err
}
Expand Down
46 changes: 46 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"regexp"
"strings"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nuid"
Expand Down Expand Up @@ -195,6 +196,9 @@ type (
}
)

// defaultAPITimeout is used if context.Background() or context.TODO() is passed to API calls.
const defaultAPITimeout = 5 * time.Second

var subjectRegexp = regexp.MustCompile(`^[^ >]*[>]?$`)

// New returns a new JetStream instance.
Expand Down Expand Up @@ -297,6 +301,10 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream
if err := validateStreamName(cfg.Name); err != nil {
return nil, err
}
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
ncfg := cfg
// If we have a mirror and an external domain, convert to ext.APIPrefix.
if ncfg.Mirror != nil && ncfg.Mirror.Domain != "" {
Expand Down Expand Up @@ -378,6 +386,10 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
if err := validateStreamName(cfg.Name); err != nil {
return nil, err
}
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}

req, err := json.Marshal(cfg)
if err != nil {
Expand Down Expand Up @@ -409,6 +421,10 @@ func (js *jetStream) Stream(ctx context.Context, name string) (Stream, error) {
if err := validateStreamName(name); err != nil {
return nil, err
}
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
infoSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiStreamInfoT, name))

var resp streamInfoResponse
Expand All @@ -434,6 +450,10 @@ func (js *jetStream) DeleteStream(ctx context.Context, name string) error {
if err := validateStreamName(name); err != nil {
return err
}
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
deleteSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiStreamDeleteT, name))
var resp streamDeleteResponse

Expand Down Expand Up @@ -518,6 +538,10 @@ func validateSubject(subject string) error {
}

func (js *jetStream) AccountInfo(ctx context.Context) (*AccountInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
var resp accountInfoResponse

infoSubject := apiSubj(js.apiPrefix, apiAccountInfo)
Expand Down Expand Up @@ -548,6 +572,10 @@ func (js *jetStream) ListStreams(ctx context.Context) StreamInfoLister {
errs: make(chan error, 1),
}
go func() {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
for {
page, err := l.streamInfos(ctx)
if err != nil && !errors.Is(err, ErrEndOfData) {
Expand Down Expand Up @@ -590,6 +618,10 @@ func (js *jetStream) StreamNames(ctx context.Context) StreamNameLister {
errs: make(chan error, 1),
}
go func() {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
for {
page, err := l.streamNames(ctx)
if err != nil && !errors.Is(err, ErrEndOfData) {
Expand All @@ -615,6 +647,10 @@ func (js *jetStream) StreamNames(ctx context.Context) StreamNameLister {
}

func (js *jetStream) StreamNameBySubject(ctx context.Context, subject string) (string, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateSubject(subject); err != nil {
return "", err
}
Expand Down Expand Up @@ -700,3 +736,13 @@ func (s *streamLister) streamNames(ctx context.Context) ([]string, error) {
s.offset += len(resp.Streams)
return resp.Streams, nil
}

// wrapContextWithoutDeadline wraps context without deadline with default timeout.
// If deadline is already set, it will be returned as is, and cancel() will be nil.
// Caller should check if cancel() is nil before calling it.
func wrapContextWithoutDeadline(ctx context.Context) (context.Context, context.CancelFunc) {
if _, ok := ctx.Deadline(); ok {
return ctx, nil
}
return context.WithTimeout(ctx, defaultAPITimeout)
}
6 changes: 4 additions & 2 deletions jetstream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool,
m.Unlock()

if sync {
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
return nats.ErrNoDeadlineContext
var cancel context.CancelFunc
ctx, cancel = wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
}

Expand Down
4 changes: 4 additions & 0 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (js *jetStream) Publish(ctx context.Context, subj string, data []byte, opts

// PublishMsg publishes a Msg to a stream from JetStream.
func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...PublishOpt) (*PubAck, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
o := pubOpts{
retryWait: DefaultPubRetryWait,
retryAttempts: DefaultPubRetryAttempts,
Expand Down
24 changes: 24 additions & 0 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (s *stream) DeleteConsumer(ctx context.Context, name string) error {
// [WithDeletedDetails] - use to display the information about messages deleted from a stream
// [WithSubjectFilter] - use to display the information about messages stored on given subjects
func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
var infoReq *streamInfoRequest
for _, opt := range opts {
if infoReq == nil {
Expand Down Expand Up @@ -280,6 +284,10 @@ func (s *stream) CachedInfo() *StreamInfo {
// [WithPurgeSequence] - can be used to set a specific sequence number up to which (but not including) messages will be purged from a stream
// [WithPurgeKeep] - can be used to set the number of messages to be kept in the stream after purge.
func (s *stream) Purge(ctx context.Context, opts ...StreamPurgeOpt) error {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
var purgeReq StreamPurgeRequest
for _, opt := range opts {
if err := opt(&purgeReq); err != nil {
Expand Down Expand Up @@ -321,6 +329,10 @@ func (s *stream) GetLastMsgForSubject(ctx context.Context, subject string) (*Raw
}

func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStreamMsg, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
req, err := json.Marshal(mreq)
if err != nil {
return nil, err
Expand Down Expand Up @@ -448,6 +460,10 @@ func (s *stream) SecureDeleteMsg(ctx context.Context, seq uint64) error {
}

func (s *stream) deleteMsg(ctx context.Context, req *msgDeleteRequest) error {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
r, err := json.Marshal(req)
if err != nil {
return err
Expand All @@ -471,6 +487,10 @@ func (s *stream) ListConsumers(ctx context.Context) ConsumerInfoLister {
errs: make(chan error, 1),
}
go func() {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
for {
page, err := l.consumerInfos(ctx, s.name)
if err != nil && !errors.Is(err, ErrEndOfData) {
Expand Down Expand Up @@ -514,6 +534,10 @@ func (s *stream) ConsumerNames(ctx context.Context) ConsumerNameLister {
errs: make(chan error, 1),
}
go func() {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
for {
page, err := l.consumerNames(ctx, s.name)
if err != nil && !errors.Is(err, ErrEndOfData) {
Expand Down
Loading

0 comments on commit ecb1e04

Please sign in to comment.