Skip to content

Commit

Permalink
kadm: fix occasionally empty topic/partitions in Lag
Browse files Browse the repository at this point in the history
If listing end offsets errored, then the lag would have no
topic/partition. We now add Topic and Partition to the Lag itself so
that we can always ensure we set these fields to be non-empty (and this
makes it easier to reason about what field to use if you use
groupLags.Sorted()).
  • Loading branch information
twmb committed Sep 21, 2023
1 parent 85ac96c commit bfd07b2
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,9 @@ func (cl *Client) DeleteOffsets(ctx context.Context, group string, s TopicsSet)
type GroupMemberLag struct {
// Member is a reference to the group member consuming this partition.
// If the group is in state Empty, the member will be nil.
Member *DescribedGroupMember
Member *DescribedGroupMember
Topic string // Topic is the topic this lag is for.
Partition int32 // Partition is the partition this lag is for.

Commit Offset // Commit is this member's current offset commit.
End ListedOffset // EndOffset is a reference to the end offset of this partition.
Expand Down Expand Up @@ -1201,13 +1203,13 @@ func (l GroupLag) Sorted() []GroupMemberLag {
}
sort.Slice(all, func(i, j int) bool {
l, r := all[i], all[j]
if l.End.Topic < r.End.Topic {
if l.Topic < r.Topic {
return true
}
if l.End.Topic > r.End.Topic {
if l.Topic > r.Topic {
return false
}
return l.End.Partition < r.End.Partition
return l.Partition < r.Partition
})
return all
}
Expand Down Expand Up @@ -1563,11 +1565,13 @@ func CalculateGroupLag(
}

lt[p] = GroupMemberLag{
Member: &group.Members[mi],
Commit: pcommit.Offset,
End: pend,
Lag: lag,
Err: perr,
Member: &group.Members[mi],
Topic: t.Topic,
Partition: p,
Commit: pcommit.Offset,
End: pend,
Lag: lag,
Err: perr,
}

}
Expand Down Expand Up @@ -1614,10 +1618,12 @@ func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLa
}

lt[p] = GroupMemberLag{
Commit: pcommit.Offset,
End: pend,
Lag: lag,
Err: perr,
Commit: pcommit.Offset,
Topic: t,
Partition: p,
End: pend,
Lag: lag,
Err: perr,
}
}
}
Expand Down

0 comments on commit bfd07b2

Please sign in to comment.