Skip to content

Commit

Permalink
source: bugfix Timestamp for LogAppendTime
Browse files Browse the repository at this point in the history
As noted in the Record type itself, LogAppendTime uses
batch.MaxTimestamp rather than per-record timestamps.
  • Loading branch information
twmb committed May 29, 2022
1 parent 653010d commit a23a076
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,11 +1426,10 @@ func recordToRecord(
})
}

return &Record{
r := &Record{
Key: record.Key,
Value: record.Value,
Headers: h,
Timestamp: timeFromMillis(batch.FirstTimestamp + int64(record.TimestampDelta)),
Topic: topic,
Partition: partition,
Attrs: RecordAttrs{uint8(batch.Attributes)},
Expand All @@ -1439,6 +1438,12 @@ func recordToRecord(
LeaderEpoch: batch.PartitionLeaderEpoch,
Offset: batch.FirstOffset + int64(record.OffsetDelta),
}
if r.Attrs.TimestampType() == 0 {
r.Timestamp = timeFromMillis(batch.FirstTimestamp + int64(record.TimestampDelta))
} else {
r.Timestamp = timeFromMillis(batch.MaxTimestamp)
}
return r
}

func messageAttrsToRecordAttrs(attrs int8, v0 bool) RecordAttrs {
Expand Down

0 comments on commit a23a076

Please sign in to comment.