Skip to content

Commit

Permalink
bench: add -compression, -max-inflight, -batch-max-bytes
Browse files Browse the repository at this point in the history
More ways to produce!
  • Loading branch information
twmb committed Jun 16, 2021
1 parent 229f9da commit 05a6b8a
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions examples/bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ var (
pprofPort = flag.String("pprof", ":9876", "port to bind to for pprof, if non-empty")

recordBytes = flag.Int("record-bytes", 100, "bytes per record (producing)")
noCompression = flag.Bool("no-compression", false, "set to disable snappy compression (producing)")
noCompression = flag.Bool("no-compression", false, "set to disable compression (alias for -compression none, producing)")
compression = flag.String("compression", "snappy", "compression algorithm to use (none,gzip,snappy,lz4,zstd, for producing)")
poolProduce = flag.Bool("pool", false, "if true, use a sync.Pool to reuse record structs/slices (producing)")

noIdempotency = flag.Bool("disable-idempotency", false, "if true, disable idempotency (force 1 produce rps")
logLevel = flag.String("log-level", "", "if non-empty, use a basic logger with this log level (debug, info, warn, error)")
noIdempotency = flag.Bool("disable-idempotency", false, "if true, disable idempotency (force 1 produce rps)")
maxInflight = flag.Int("max-inflight", 1, "if idempotency is disabled, set this to increase the max-inflight (increase duplicate risk, producing)")
linger = flag.Duration("linger", 0, "if non-zero, linger to use when producing")
batchMaxBytes = flag.Int("batch-max-bytes", 1000000, "the maximum batch size to allow per-partition (must be less than Kafka's max.message.bytes, producing)")

logLevel = flag.String("log-level", "", "if non-empty, use a basic logger with this log level (debug, info, warn, error)")

consume = flag.Bool("consume", false, "if true, consume rather than produce")
group = flag.String("group", "", "if non-empty, group to use for consuming rather than direct partition consuming (consuming)")
Expand Down Expand Up @@ -82,9 +85,11 @@ func main() {
// We have good compression, so we want to limit what we read
// back because snappy deflation will balloon our memory usage.
kgo.FetchMaxBytes(5 << 20),
kgo.BatchMaxBytes(int32(*batchMaxBytes)),
}
if *noIdempotency {
opts = append(opts, kgo.DisableIdempotentWrite())
opts = append(opts, kgo.MaxProduceInflight(*maxInflight))
}
if *consume {
opts = append(opts, kgo.ConsumeTopics(*topic))
Expand All @@ -103,14 +108,32 @@ func main() {
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelWarn, nil)))
case "error":
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelError, nil)))
default:
die("unrecognized log level %s", *logLevel)
}

if *linger != 0 {
opts = append(opts, kgo.Linger(*linger))
}
if *noCompression {
opts = append(opts, kgo.BatchCompression(kgo.NoCompression()))
} else {
switch strings.ToLower(*compression) {
case "", "none":
opts = append(opts, kgo.BatchCompression(kgo.NoCompression()))
case "gzip":
opts = append(opts, kgo.BatchCompression(kgo.GzipCompression()))
case "snappy":
opts = append(opts, kgo.BatchCompression(kgo.SnappyCompression()))
case "lz4":
opts = append(opts, kgo.BatchCompression(kgo.Lz4Compression()))
case "zstd":
opts = append(opts, kgo.BatchCompression(kgo.ZstdCompression()))
default:
die("unrecognized compression %s", *compression)
}
}

if *dialTLS {
opts = append(opts, kgo.Dialer((new(tls.Dialer)).DialContext))
}
Expand Down

0 comments on commit 05a6b8a

Please sign in to comment.