Skip to content

Commit

Permalink
kadm: set per-partition errors on missing offsets in CommitOffsets
Browse files Browse the repository at this point in the history
If we error when the response is not exactly as expected, then we could
commit but not know the results.
  • Loading branch information
twmb committed Jan 6, 2022
1 parent 32425df commit e0b520c
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,33 +587,39 @@ func (cl *Client) CommitOffsets(ctx context.Context, group string, os Offsets) (
}

rs := make(OffsetResponses)
for i, t := range resp.Topics {
for _, t := range resp.Topics {
rt := make(map[int32]OffsetResponse)
rs[t.Topic] = rt
if i >= len(req.Topics) {
return nil, fmt.Errorf("topic %q at response index %d was not in offset commit request", t.Topic, i)
}
reqt := req.Topics[i]
if reqt.Topic != t.Topic {
return nil, fmt.Errorf("topic %q at response index %d does not match request topic %q", t.Topic, i, reqt.Topic)
}
for j, p := range t.Partitions {
if j >= len(reqt.Partitions) {
return nil, fmt.Errorf("topic %q partition %d at response index %d was not in offset commit request", t.Topic, p.Partition, j)
}
reqp := reqt.Partitions[j]
if reqp.Partition != p.Partition {
return nil, fmt.Errorf("topic %q partition %d at response index %d does not match request partition %d", t.Topic, p.Partition, j, reqp.Partition)
}
for _, p := range t.Partitions {
rt[p.Partition] = OffsetResponse{
Offset: os[t.Topic][p.Partition],
Err: kerr.ErrorForCode(p.ErrorCode),
}
}
}

for t, ps := range os {
respt := rs[t]
if respt == nil {
respt = make(map[int32]OffsetResponse)
rs[t] = respt
}
for p, o := range ps {
if _, exists := respt[p]; exists {
continue
}
respt[p] = OffsetResponse{
Offset: o,
Err: errOffsetCommitMissing,
}
}
}

return rs, nil
}

var errOffsetCommitMissing = errors.New("partition missing in commit response")

// CommitAllOffsets is identical to CommitOffsets, but returns an error if the
// offset commit was successful, but some offset within the commit failed to be
// committed.
Expand Down

0 comments on commit e0b520c

Please sign in to comment.