Skip to content

Commit

Permalink
[FIXED] PublishAsyncMaxPending not reset after server disconnect
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Jul 17, 2023
1 parent 49ae579 commit e5363a7
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 7 deletions.
21 changes: 21 additions & 0 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type (
stallCh chan struct{}
doneCh chan struct{}
rr *rand.Rand
// channel to signal when server is disconnected or conn is closed
connStatusCh chan (nats.Status)
}

pubAckResponse struct {
Expand Down Expand Up @@ -296,6 +298,10 @@ func (js *jetStream) newAsyncReply() (string, error) {
js.publisher.replySubject = sub
js.publisher.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
}
if js.publisher.connStatusCh == nil {
js.publisher.connStatusCh = js.conn.StatusChanged(nats.RECONNECTING, nats.CLOSED)
go js.resetPendingAcksOnReconnect()
}
var sb strings.Builder
sb.WriteString(js.publisher.replyPrefix)
rn := js.publisher.rr.Int63()
Expand Down Expand Up @@ -378,6 +384,21 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
js.publisher.Unlock()
}

func (js *jetStream) resetPendingAcksOnReconnect() {
for {
newStatus, ok := <-js.publisher.connStatusCh
if !ok || newStatus == nats.CLOSED {
return
}
js.publisher.Lock()
for _, paf := range js.publisher.acks {
paf.err = nats.ErrDisconnected
}
js.publisher.acks = nil
js.publisher.Unlock()
}
}

// registerPAF will register for a PubAckFuture.
func (js *jetStream) registerPAF(id string, paf *pubAckFuture) (int, int) {
js.publisher.Lock()
Expand Down
64 changes: 64 additions & 0 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,3 +1234,67 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
}
})
}

func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

errs := make(chan error, 1)
done := make(chan struct{}, 1)
acks := make(chan jetstream.PubAckFuture, 100)
go func() {
for i := 0; i < 100; i++ {
if ack, err := js.PublishAsync("FOO.A", []byte("hello")); err != nil {
errs <- err
return
} else {
acks <- ack
}
}
close(acks)
done <- struct{}{}
}()
select {
case <-done:
case err := <-errs:
t.Fatalf("Unexpected error during publish: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
s.Shutdown()
time.Sleep(100 * time.Millisecond)
if pending := js.PublishAsyncPending(); pending != 0 {
t.Fatalf("Expected no pending messages after server shutdown; got: %d", pending)
}
s = RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

for ack := range acks {
select {
case <-ack.Ok():
case err := <-ack.Err():
if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) {
t.Fatalf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
}
}
38 changes: 31 additions & 7 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,14 @@ type js struct {
opts *jsOpts

// For async publish context.
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
}

type jsOpts struct {
Expand Down Expand Up @@ -666,6 +667,10 @@ func (js *js) newAsyncReply() string {
js.rsub = sub
js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
}
if js.connStatusCh == nil {
js.connStatusCh = js.nc.StatusChanged(RECONNECTING, CLOSED)
go js.resetPendingAcksOnReconnect()
}
var sb strings.Builder
sb.WriteString(js.rpre)
rn := js.rr.Int63()
Expand All @@ -679,12 +684,31 @@ func (js *js) newAsyncReply() string {
return sb.String()
}

func (js *js) resetPendingAcksOnReconnect() {
for {
js.mu.Lock()
newStatus, ok := <-js.connStatusCh
if !ok || newStatus == CLOSED {
return
}
for _, paf := range js.pafs {
paf.err = ErrDisconnected
}
js.pafs = nil
js.mu.Unlock()
}
}

func (js *js) cleanupReplySub() {
js.mu.Lock()
if js.rsub != nil {
js.rsub.Unsubscribe()
js.rsub = nil
}
if js.connStatusCh != nil {
close(js.connStatusCh)
js.connStatusCh = nil
}
js.mu.Unlock()
}

Expand Down
55 changes: 55 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7376,6 +7376,61 @@ func TestJetStreamPublishAsync(t *testing.T) {
}
}

func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

// Now create a stream and expect a PubAck from <-OK().
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

errs := make(chan error, 1)
done := make(chan struct{}, 1)
acks := make(chan nats.PubAckFuture, 100)
go func() {
for i := 0; i < 100; i++ {
if ack, err := js.PublishAsync("FOO", []byte("hello")); err != nil {
errs <- err
return
} else {
acks <- ack
}
}
close(acks)
done <- struct{}{}
}()
select {
case <-done:
case err := <-errs:
t.Fatalf("Unexpected error during publish: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
s.Shutdown()
time.Sleep(100 * time.Millisecond)
if pending := js.PublishAsyncPending(); pending != 0 {
t.Fatalf("Expected no pending messages after server shutdown; got: %d", pending)
}
s = RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

for ack := range acks {
select {
case <-ack.Ok():
case err := <-ack.Err():
if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) {
t.Fatalf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
}
}

func TestJetStreamPublishAsyncPerf(t *testing.T) {
// Comment out below to run this benchmark.
t.SkipNow()
Expand Down

0 comments on commit e5363a7

Please sign in to comment.