Skip to content

Commit

Permalink
[FIXED] Invalid heartbeat errors in Consume() (#1345)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Aug 22, 2023
1 parent 88f3fad commit 35ba596
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
40 changes: 38 additions & 2 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,12 @@ and new subscription is created for each execution.
#### Continuous polling

There are 2 ways to achieve push-like behavior using pull consumers in
`jetstream` package. Both `Messages()` and `Consume()` methods perform exactly
the same optimizations and can be used interchangeably.
`jetstream` package. Both `Messages()` and `Consume()` methods perform similar optimizations
and for most cases can be used interchangeably.

There is an advantage of using `Messages()` instead of `Consume()` for work-queue scenarios,
where messages should be fetched one by one, as it allows for finer control over fetching
single messages on demand.

Subject filtering is achieved by configuring a consumer with a `FilterSubject`
value.
Expand Down Expand Up @@ -494,6 +498,34 @@ type PullThresholdMessages int
request. An error will be triggered if at least 2 heartbeats are missed (unless
`WithMessagesErrOnMissingHeartbeat(false)` is used)

##### Using `Messages()` to fetch single messages one by one

When implementing work queue, it is possible to use `Messages()` in order to
fetch messages from the server one-by-one, without optimizations and
pre-buffering (to avoid redeliveries when processing messages at slow rate).

```go
// PullMaxMessages determines how many messages will be sent to the client in a single pull request
iter, _ := cons.Messages(jetstream.PullMaxMessages(1))
numWorkers := 5
sem := make(chan struct{}, numWorkers)
for {
sem <- struct{}{}
go func() {
defer func() {
<-sem
}()
msg, err := iter.Next()
if err != nil {
// handle err
}
fmt.Printf("Processing msg: %s\n", string(msg.Data()))
doWork()
msg.Ack()
}()
}
```

## Publishing on stream

`JetStream` interface allows publishing messages on stream in 2 ways:
Expand Down Expand Up @@ -562,3 +594,7 @@ ackF, err = js.PublishAsync("ORDERS.new", []byte("hello"))

Just as for synchronous publish, `PublishAsync()` and `PublishMsgAsync()` accept
options for setting headers.

## Examples

You can find more examples of `jetstream` usage [here](https:/nats-io/nats.go/tree/main/examples/jetstream).
3 changes: 2 additions & 1 deletion jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (

internalHandler := func(msg *nats.Msg) {
if sub.hbMonitor != nil {
sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat)
sub.hbMonitor.Stop()
}
userMsg, msgErr := checkMsg(msg)
if !userMsg && msgErr == nil {
Expand All @@ -187,6 +187,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
defer func() {
sub.Lock()
sub.checkPending()
sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat)
sub.Unlock()
}()
if !userMsg {
Expand Down

0 comments on commit 35ba596

Please sign in to comment.