Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] PublishAsyncMaxPending not reset after server disconnect #1346

Merged
merged 2 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type (
stallCh chan struct{}
doneCh chan struct{}
rr *rand.Rand
// channel to signal when server is disconnected or conn is closed
connStatusCh chan (nats.Status)
}

pubAckResponse struct {
Expand Down Expand Up @@ -296,6 +298,10 @@ func (js *jetStream) newAsyncReply() (string, error) {
js.publisher.replySubject = sub
js.publisher.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
}
if js.publisher.connStatusCh == nil {
js.publisher.connStatusCh = js.conn.StatusChanged(nats.RECONNECTING, nats.CLOSED)
go js.resetPendingAcksOnReconnect()
}
var sb strings.Builder
sb.WriteString(js.publisher.replyPrefix)
rn := js.publisher.rr.Int63()
Expand Down Expand Up @@ -378,6 +384,21 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
js.publisher.Unlock()
}

func (js *jetStream) resetPendingAcksOnReconnect() {
for {
newStatus, ok := <-js.publisher.connStatusCh
if !ok || newStatus == nats.CLOSED {
return
}
js.publisher.Lock()
for _, paf := range js.publisher.acks {
paf.err = nats.ErrDisconnected
}
js.publisher.acks = nil
js.publisher.Unlock()
}
}

// registerPAF will register for a PubAckFuture.
func (js *jetStream) registerPAF(id string, paf *pubAckFuture) (int, int) {
js.publisher.Lock()
Expand Down
64 changes: 64 additions & 0 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,3 +1234,67 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
}
})
}

func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

errs := make(chan error, 1)
done := make(chan struct{}, 1)
acks := make(chan jetstream.PubAckFuture, 100)
go func() {
for i := 0; i < 100; i++ {
if ack, err := js.PublishAsync("FOO.A", []byte("hello")); err != nil {
errs <- err
return
} else {
acks <- ack
}
}
close(acks)
done <- struct{}{}
}()
select {
case <-done:
case err := <-errs:
t.Fatalf("Unexpected error during publish: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
s.Shutdown()
time.Sleep(100 * time.Millisecond)
if pending := js.PublishAsyncPending(); pending != 0 {
t.Fatalf("Expected no pending messages after server shutdown; got: %d", pending)
}
s = RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

for ack := range acks {
select {
case <-ack.Ok():
case err := <-ack.Err():
if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) {
t.Fatalf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
}
}
38 changes: 31 additions & 7 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,14 @@ type js struct {
opts *jsOpts

// For async publish context.
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
}

type jsOpts struct {
Expand Down Expand Up @@ -666,6 +667,10 @@ func (js *js) newAsyncReply() string {
js.rsub = sub
js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
}
if js.connStatusCh == nil {
js.connStatusCh = js.nc.StatusChanged(RECONNECTING, CLOSED)
go js.resetPendingAcksOnReconnect()
}
var sb strings.Builder
sb.WriteString(js.rpre)
rn := js.rr.Int63()
Expand All @@ -679,12 +684,31 @@ func (js *js) newAsyncReply() string {
return sb.String()
}

func (js *js) resetPendingAcksOnReconnect() {
for {
js.mu.Lock()
newStatus, ok := <-js.connStatusCh
if !ok || newStatus == CLOSED {
return
}
for _, paf := range js.pafs {
paf.err = ErrDisconnected
}
js.pafs = nil
js.mu.Unlock()
}
}

func (js *js) cleanupReplySub() {
js.mu.Lock()
if js.rsub != nil {
js.rsub.Unsubscribe()
js.rsub = nil
}
if js.connStatusCh != nil {
close(js.connStatusCh)
js.connStatusCh = nil
}
js.mu.Unlock()
}

Expand Down
55 changes: 55 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7376,6 +7376,61 @@ func TestJetStreamPublishAsync(t *testing.T) {
}
}

func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

// Now create a stream and expect a PubAck from <-OK().
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

errs := make(chan error, 1)
done := make(chan struct{}, 1)
acks := make(chan nats.PubAckFuture, 100)
go func() {
for i := 0; i < 100; i++ {
if ack, err := js.PublishAsync("FOO", []byte("hello")); err != nil {
errs <- err
return
} else {
acks <- ack
}
}
close(acks)
done <- struct{}{}
}()
select {
case <-done:
case err := <-errs:
t.Fatalf("Unexpected error during publish: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
s.Shutdown()
time.Sleep(100 * time.Millisecond)
if pending := js.PublishAsyncPending(); pending != 0 {
t.Fatalf("Expected no pending messages after server shutdown; got: %d", pending)
}
s = RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

for ack := range acks {
select {
case <-ack.Ok():
case err := <-ack.Err():
if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) {
t.Fatalf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
}
}

func TestJetStreamPublishAsyncPerf(t *testing.T) {
// Comment out below to run this benchmark.
t.SkipNow()
Expand Down