Skip to content

Commit

Permalink
kgo: add PurgeTopicsFrom{Producing,Consuming}
Browse files Browse the repository at this point in the history
For when you are using one client for both producing and consuming to
the same topic.

Closes #543.
  • Loading branch information
twmb committed Sep 16, 2023
1 parent 01651af commit 253e1a9
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,9 @@ func (cl *Client) Ping(ctx context.Context) error {
}

// PurgeTopicsFromClient internally removes all internal information about the
// input topics.
// input topics. If you you want to purge information for only consuming or
// only producing, see the related functions [PurgeTopicsFromConsuming] and
// [PurgeTopicsFromProducing].
//
// For producing, this clears all knowledge that these topics have ever been
// produced to. Producing to the topic again may result in out of order
Expand Down Expand Up @@ -610,6 +612,34 @@ func (cl *Client) PurgeTopicsFromClient(topics ...string) {
cl.mappedMetaMu.Unlock()
}

// PurgeTopicsFromProducing internally removes all internal information for
// producing about the input topics. This runs the producer bit of logic that
// is documented in [PurgeTopicsFromClient]; see that function for more
// details.
func (cl *Client) PurgeTopicsFromProducing(topics ...string) {
if len(topics) == 0 {
return
}
sort.Strings(topics)
cl.blockingMetadataFn(func() {
cl.producer.purgeTopics(topics)
})
}

// PurgeTopicsFromConsuming internally removes all internal information for
// consuming about the input topics. This runs the consumer bit of logic that
// is documented in [PurgeTopicsFromClient]; see that function for more
// details.
func (cl *Client) PurgeTopicsFromConsuming(topics ...string) {
if len(topics) == 0 {
return
}
sort.Strings(topics)
cl.blockingMetadataFn(func() {
cl.consumer.purgeTopics(topics)
})
}

// Parse broker IP/host and port from a string, using the default Kafka port if
// unspecified. Supported address formats:
//
Expand Down

0 comments on commit 253e1a9

Please sign in to comment.