Skip to content

Commit

Permalink
add BasicConsistentPartitioner / ManualPartitioner
Browse files Browse the repository at this point in the history
ManualPartition makes it such that you can set a partition field on the
record struct and just use that.
  • Loading branch information
twmb committed Jun 7, 2021
1 parent 62e8fff commit 6808a55
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
50 changes: 50 additions & 0 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (

// Partitioner creates topic partitioners to determine which partition messages
// should be sent to.
//
// Note that a record struct is unmodified (minus a potential default topic)
// from producing through partitioning, so you can set fields in the record
// struct before producing to aid in partitioning with a custom partitioner.
type Partitioner interface {
// forTopic returns a partitioner for an individual topic. It is
// guaranteed that only one record will use the an individual topic's
Expand All @@ -30,6 +34,52 @@ type TopicPartitioner interface {
Partition(r *Record, n int) int
}

// BasicConsistentPartitioner wraps a single function to provide a Partitioner
// and TopicPartitioner (that function is essentially a combination of
// Partitioner.ForTopic and TopicPartitioner.Partition).
//
// As a minimal example, if you do not care about the topic and you set the
// partition before producing:
//
// kgo.BasicConsistentPartitioner(func(topic) func(*Record, int) int {
// return func(r *Record, n int) int {
// return int(r.Partition)
// }
// })
func BasicConsistentPartitioner(partition func(string) func(r *Record, n int) int) Partitioner {
return &basicPartitioner{partition}
}

type basicPartitioner struct {
fn func(string) func(*Record, int) int
}

func (b *basicPartitioner) ForTopic(t string) TopicPartitioner {
return &basicTopicPartitioner{b.fn(t)}
}

type basicTopicPartitioner struct {
fn func(*Record, int) int
}

func (*basicTopicPartitioner) OnNewBatch() {}
func (*basicTopicPartitioner) RequiresConsistency(*Record) bool { return true }
func (b *basicTopicPartitioner) Partition(r *Record, n int) int { return b.fn(r, n) }

// ManualPartitioner is a partitioner that simply returns the Partition field
// that is already set on any record.
//
// Any record with an invalid partition will be immediately failed. This
// partitioner is simply the partitioner that is demonstrated in the
// BasicConsistentPartitioner documentation.
func ManualPartitioner() Partitioner {
return BasicConsistentPartitioner(func(string) func(*Record, int) int {
return func(r *Record, _ int) int {
return int(r.Partition)
}
})
}

// StickyPartitioner is the same as StickyKeyPartitioner, but with no logic to
// consistently hash keys. That is, this only partitions according to the
// sticky partition strategy.
Expand Down
5 changes: 4 additions & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ func (f *FirstErrPromise) Err() error {
//
// If the record has an empty Topic field, the client will use a default topic
// if the client was configured with one via ProduceTopic, otherwise the record
// will be failed immediately.
// will be failed immediately. The Partition field is ignored (setting it does
// not set which partition will be produced to), but, because the field is set
// only when finishing a record successfully, you can set the Partition field
// yourself and use the ManualPartitioner to obey the Partition field.
//
// If the record is too large to fit in a batch on its own in a produce
// request, the promise will be called with kerr.MessageTooLarge and there will
Expand Down
6 changes: 4 additions & 2 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ type Record struct {

// Partition is the partition that a record is written to.
//
// For producing, this is left unset. This will be set by the client
// as appropriate.
// For producing, this is left unset. This will be set by the client as
// appropriate. Alternatively, you can use the ManualPartitioner, which
// makes it such that this field is always the field chosen when
// partitioning (i.e., you partition manually ahead of time).
Partition int32

// Attrs specifies what attributes were on this record.
Expand Down

0 comments on commit 6808a55

Please sign in to comment.