From b94de65e06a7c990a868e771f9fc282315632a79 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Sun, 9 Jun 2024 22:49:18 -0700 Subject: [PATCH] flightcontrol: protect contention timeouts Signed-off-by: Tonis Tiigi --- util/flightcontrol/flightcontrol.go | 10 +++++----- util/flightcontrol/flightcontrol_test.go | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/util/flightcontrol/flightcontrol.go b/util/flightcontrol/flightcontrol.go index 42cb23678f1bd..15a545b8c65cb 100644 --- a/util/flightcontrol/flightcontrol.go +++ b/util/flightcontrol/flightcontrol.go @@ -3,7 +3,7 @@ package flightcontrol import ( "context" "io" - "runtime" + "math/rand" "sort" "sync" "time" @@ -43,13 +43,13 @@ func (g *Group[T]) Do(ctx context.Context, key string, fn func(ctx context.Conte err = errors.Wrapf(errRetryTimeout, "flightcontrol") return v, err } - runtime.Gosched() if backoff > 0 { - time.Sleep(backoff) - backoff *= 2 + backoff = time.Duration(float64(backoff) * 1.2) } else { - backoff = time.Millisecond + // randomize initial backoff to avoid all goroutines retrying at once + backoff = time.Millisecond + time.Duration(rand.Intn(1e7))*time.Nanosecond } + time.Sleep(backoff) } } diff --git a/util/flightcontrol/flightcontrol_test.go b/util/flightcontrol/flightcontrol_test.go index 5440a8804ecb5..9511b376ebf9a 100644 --- a/util/flightcontrol/flightcontrol_test.go +++ b/util/flightcontrol/flightcontrol_test.go @@ -223,6 +223,28 @@ func TestContention(t *testing.T) { wg.Wait() } +func TestMassiveParallel(t *testing.T) { + var retryCount int64 + g := &Group[string]{} + eg, ctx := errgroup.WithContext(context.Background()) + for i := 0; i < 1000; i++ { + eg.Go(func() error { + _, err := g.Do(ctx, "key", func(ctx context.Context) (string, error) { + return "", errors.Errorf("always fail") + }) + if errors.Is(err, errRetryTimeout) { + atomic.AddInt64(&retryCount, 1) + } + return err + }) + // magic numbers to increase contention + time.Sleep(5 * time.Microsecond) + } + err := eg.Wait() + assert.Error(t, err) + assert.Equal(t, int64(0), retryCount) +} + func testFunc(wait time.Duration, ret string, counter *int64) func(ctx context.Context) (string, error) { return func(ctx context.Context) (string, error) { atomic.AddInt64(counter, 1)