From 2ab2858622cabbf2cad242c39ad7eec263d42573 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 13 Jul 2023 13:46:57 +0200 Subject: [PATCH] [FIXED] Invalid heartbeat errors in Consume() Signed-off-by: Piotr Piotrowski --- jetstream/README.md | 40 ++++++++++++++++++++++++++++++++++++++-- jetstream/pull.go | 3 ++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/jetstream/README.md b/jetstream/README.md index 0e64aa215..8eeabf303 100644 --- a/jetstream/README.md +++ b/jetstream/README.md @@ -413,8 +413,12 @@ and new subscription is created for each execution. #### Continuous polling There are 2 ways to achieve push-like behavior using pull consumers in -`jetstream` package. Both `Messages()` and `Consume()` methods perform exactly -the same optimizations and can be used interchangeably. +`jetstream` package. Both `Messages()` and `Consume()` methods perform similar optimizations +and for most cases can be used interchangeably. + +There is an advantage of using `Messages()` instead of `Consume()` for work-queue scenarios, +where messages should be fetched one by one, as it allows for finer control over fetching +single messages on demand. Subject filtering is achieved by configuring a consumer with a `FilterSubject` value. @@ -494,6 +498,34 @@ type PullThresholdMessages int request. An error will be triggered if at least 2 heartbeats are missed (unless `WithMessagesErrOnMissingHeartbeat(false)` is used) +##### Using `Messages()` to fetch single messages one by one + +When implementing work queue, it is possible to use `Messages()` in order to +fetch messages from the server one-by-one, without optimizations and +pre-buffering (to avoid redeliveries when processing messages at slow rate). + +```go +// PullMaxMessages determines how many messages will be sent to the client in a single pull request +iter, _ := cons.Messages(jetstream.PullMaxMessages(1)) +numWorkers := 5 +sem := make(chan struct{}, numWorkers) +for { + sem <- struct{}{} + go func() { + defer func() { + <-sem + }() + msg, err := iter.Next() + if err != nil { + // handle err + } + fmt.Printf("Processing msg: %s\n", string(msg.Data())) + doWork() + msg.Ack() + }() +} +``` + ## Publishing on stream `JetStream` interface allows publishing messages on stream in 2 ways: @@ -562,3 +594,7 @@ ackF, err = js.PublishAsync("ORDERS.new", []byte("hello")) Just as for synchronous publish, `PublishAsync()` and `PublishMsgAsync()` accept options for setting headers. + +## Examples + +You can find more examples of `jetstream` usage [here](https://github.com/nats-io/nats.go/tree/main/examples/jetstream). diff --git a/jetstream/pull.go b/jetstream/pull.go index abb9cefeb..46d4d58af 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -178,7 +178,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( internalHandler := func(msg *nats.Msg) { if sub.hbMonitor != nil { - sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat) + sub.hbMonitor.Stop() } userMsg, msgErr := checkMsg(msg) if !userMsg && msgErr == nil { @@ -187,6 +187,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( defer func() { sub.Lock() sub.checkPending() + sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat) sub.Unlock() }() if !userMsg {