Skip to content

Commit

Permalink
sink: bugfix sequence number wrapping for EOS
Browse files Browse the repository at this point in the history
KIP-98 specifies that a sequence number will "wrap" on overflow. There
is no description of what wrapping is, so the obvious implementation is
just to wrap negative.

While browsing the Kafka source, I noticed that they actually just wrap
back to 0. So, I tested producing >2**31 records, and the first batch
after wrapping causes INVALID_RECORD problems. I then changed to this
logic (mirroring Kafka's) and producing >2**31 records is fine.
  • Loading branch information
twmb committed May 16, 2022
1 parent f57c050 commit 27880b4
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"hash/crc32"
"math"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -122,7 +123,11 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti
recBuf.inflight++

recBuf.batchDrainIdx++
recBuf.seq += int32(len(batch.records))
if recBuf.seq > math.MaxInt32-int32(len(batch.records)) {
recBuf.seq = int32(len(batch.records)) - (math.MaxInt32 - recBuf.seq) - 1
} else {
recBuf.seq += int32(len(batch.records))
}
moreToDrain = moreToDrain || recBuf.tryStopLingerForDraining()
recBuf.mu.Unlock()

Expand Down

0 comments on commit 27880b4

Please sign in to comment.