Skip to content

Commit

Permalink
config: drop ProduceRequestTimeout to 10s, doc more
Browse files Browse the repository at this point in the history
30s is too generous. Even 10s is likely too generous, since usually
writes at p99 finish within 100ms.
  • Loading branch information
twmb committed Oct 28, 2021
1 parent 6912cfe commit df80a52
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (cfg *cfg) validate() error {
// Some random producer settings.
{name: "max buffered records", v: int64(cfg.maxBufferedRecords), allowed: 1, badcmp: i64lt},
{name: "linger", v: int64(cfg.linger), allowed: int64(time.Minute), badcmp: i64gt, durs: true},
{name: "produce timeout", v: int64(cfg.produceTimeout), allowed: int64(time.Second), badcmp: i64lt, durs: true},
{name: "produce timeout", v: int64(cfg.produceTimeout), allowed: int64(100 * time.Millisecond), badcmp: i64lt, durs: true},
{name: "record timeout", v: int64(cfg.recordTimeout), allowed: int64(time.Second), badcmp: func(l, r int64) (bool, string) {
if l == 0 {
return false, "" // we print nothing when things are good
Expand Down Expand Up @@ -446,7 +446,7 @@ func defaultCfg() cfg {
compression: []CompressionCodec{SnappyCompression(), NoCompression()},
maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012
maxBufferedRecords: 10000,
produceTimeout: 30 * time.Second,
produceTimeout: 10 * time.Second,
recordRetries: math.MaxInt64, // effectively unbounded
partitioner: StickyKeyPartitioner(nil), // default to how Kafka partitions

Expand Down Expand Up @@ -874,11 +874,13 @@ func RecordPartitioner(partitioner Partitioner) ProducerOpt {
}

// ProduceRequestTimeout sets how long Kafka broker's are allowed to respond to
// produce requests, overriding the default 30s. If a broker exceeds this
// produce requests, overriding the default 10s. If a broker exceeds this
// duration, it will reply with a request timeout error.
//
// This corresponds to Kafka's request.timeout.ms setting, but only applies to
// produce requests.
// This somewhat corresponds to Kafka's request.timeout.ms setting, but only
// applies to produce requests. This settings sets the TimeoutMillis field in
// the produce request itself. The ConnTimeoutOverhead is applied as a write
// limit and read limit in addition to this.
func ProduceRequestTimeout(limit time.Duration) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.produceTimeout = limit }}
}
Expand Down

0 comments on commit df80a52

Please sign in to comment.