Skip to content

Commit

Permalink
kgo: track topic IDs in the fetch session
Browse files Browse the repository at this point in the history
When using a fetch session, if we stop fetching a topic or partition, we
send that information in the fetch request.

If we forget an entire topic, that means we do not add any cursor for
the topic internally -- we just outright are no longer fetching the
topic -- we previously had no topic ID in the fetch request for the
forgotten topic. When sending this forgotten topic in the fetch request,
we would not have the ID for it, and this would cause a NPE in Kafka.

Now, when we add a topic to the session, we also save the topic ID.
We use this for two purposes:
* Now we correctly send the forgotten topic ID
* We also can pin fetch requests to non-topic-ID versions if any
  topic is missing an ID at any point in the session (i.e. if a
  forgotten topic has no ID)

Lastly we add a guard in metadata updating to ignore updates that miss
topic IDs if we previously had a topic ID.

Closes #535.
  • Loading branch information
twmb committed Aug 27, 2023
1 parent a1a2a45 commit 8a9a459
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
17 changes: 16 additions & 1 deletion pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,18 @@ func (cl *Client) mergeTopicPartitions(
)
}

if !isProduce {
var noID [16]byte
if newTP.cursor.topicID == noID && oldTP.cursor.topicID != noID {
cl.cfg.logger.Log(LogLevelWarn, "metadata update is missing the topic ID when we previously had one, ignoring update",
"topic", topic,
"partition", part,
)
retryWhy.add(topic, int32(part), errMissingTopicID)
continue
}
}

// If the tp data is the same, we simply copy over the records
// and cursor pointers.
//
Expand Down Expand Up @@ -866,7 +878,10 @@ func (cl *Client) mergeTopicPartitions(
}
}

var errEpochRewind = errors.New("epoch rewind")
var (
errEpochRewind = errors.New("epoch rewind")
errMissingTopicID = errors.New("missing topic ID")
)

type multiUpdateWhy map[kerrOrString]map[string]map[int32]struct{}

Expand Down
34 changes: 30 additions & 4 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,7 +1871,7 @@ func (f *fetchRequest) adjustPreferringLag() {

func (*fetchRequest) Key() int16 { return 1 }
func (f *fetchRequest) MaxVersion() int16 {
if f.disableIDs {
if f.disableIDs || f.session.disableIDs {
return 12
}
return 15
Expand Down Expand Up @@ -1905,7 +1905,7 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
partitions := f.usedOffsets[topic]

var reqTopic *kmsg.FetchRequestTopic
sessionTopic := f.session.lookupTopic(topic)
sessionTopic := f.session.lookupTopic(topic, f.topic2id)

var usedTopic map[int32]struct{}
if sessionUsed != nil {
Expand Down Expand Up @@ -1972,6 +1972,20 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
}
if len(partitions) == 0 {
delete(f.session.used, topic)
id := f.session.t2id[topic]
delete(f.session.t2id, topic)
// If we deleted a topic that was missing an ID, then we clear the
// previous disableIDs state and potentially reenable it.
var noID [16]byte
if id == noID {
f.session.disableIDs = false
for _, id := range f.session.t2id {
if id == noID {
f.session.disableIDs = true
break
}
}
}
}
}
}
Expand All @@ -1996,13 +2010,17 @@ type fetchSession struct {
epoch int32

used map[string]map[int32]fetchSessionOffsetEpoch // what we have in the session so far
t2id map[string][16]byte

killed bool // if we cannot use a session anymore
disableIDs bool // if anything in t2id has no ID
killed bool // if we cannot use a session anymore
}

func (s *fetchSession) kill() {
s.epoch = -1
s.used = nil
s.t2id = nil
s.disableIDs = false
s.killed = true
}

Expand All @@ -2015,6 +2033,8 @@ func (s *fetchSession) reset() {
}
s.epoch = 0
s.used = nil
s.t2id = nil
s.disableIDs = false
}

// bumpEpoch bumps the epoch and saves the session id.
Expand All @@ -2035,17 +2055,23 @@ func (s *fetchSession) bumpEpoch(id int32) {
s.id = id
}

func (s *fetchSession) lookupTopic(topic string) fetchSessionTopic {
func (s *fetchSession) lookupTopic(topic string, t2id map[string][16]byte) fetchSessionTopic {
if s.killed {
return nil
}
if s.used == nil {
s.used = make(map[string]map[int32]fetchSessionOffsetEpoch)
s.t2id = make(map[string][16]byte)
}
t := s.used[topic]
if t == nil {
t = make(map[int32]fetchSessionOffsetEpoch)
s.used[topic] = t
id := t2id[topic]
s.t2id[topic] = id
if id == ([16]byte{}) {
s.disableIDs = true
}
}
return t
}
Expand Down

0 comments on commit 8a9a459

Please sign in to comment.