Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ratelimiter usage-counting bugfix: rejected reservations were not counted #6158

Merged
merged 2 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,10 @@ func (c CountedLimiter) Wait(ctx context.Context) error {
}

func (c CountedLimiter) Reserve() clock.Reservation {
c.usage.idle.Store(0) // not idle regardless of the result
Copy link
Contributor Author

@Groxx Groxx Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh I think "immediately not-idle" is more technically correct (that's when the token is reserved / it might not be returned by Used(false)), but our use shouldn't really make a difference, and this way all Reserve() impls are completely trivial.


res := c.wrapped.Reserve()
if res.Allow() {
// may be used or canceled, return a wrapped version so it's tracked
// when we know which it is.
return countedReservation{
wrapped: res,
usage: c.usage,
}
return countedReservation{
wrapped: c.wrapped.Reserve(),
usage: c.usage,
}
// cannot be used, just count the rejection immediately
// and return the original so it's a bit cheaper
c.usage.rejected.Add(1)
return res
}

func (c CountedLimiter) Collect() UsageMetrics {
Expand All @@ -100,15 +89,30 @@ func (c countedReservation) Allow() bool {
}

func (c countedReservation) Used(wasUsed bool) {
if wasUsed {
c.usage.allowed.Add(1)
}
c.wrapped.Used(wasUsed)
c.usage.idle.Store(0)
if c.Allow() {
if wasUsed {
// only counts as allowed if used, else it is hopefully rolled back.
// this may or may not restore the token, but it does imply "this limiter did not limit the event".
c.usage.Count(true)
}

// else it was canceled, and not "used".
//
// currently these are not tracked because some other rejection will occur
// and be emitted in all our current uses, but with bad enough luck or
// latency before canceling it could lead to misleading metrics.
// else it was canceled, and not "used".
//
// currently these are not tracked because some other rejection will occur
// and be emitted in all our current uses, but with bad enough luck or
// latency before canceling it could lead to misleading metrics.
} else {
// these reservations cannot be waited on so they cannot become allowed,
// and they cannot be returned, so they are always rejected.
//
// specifically: it is likely that `wasUsed == Allow()`, so false cannot be
// trusted to mean "will not use for some other reason", and the underlying
// rate.Limiter did not change state anyway because it returned the
// pending-token before becoming a clock.Reservation.
c.usage.Count(false)
}
}

func (a *AtomicUsage) Count(allowed bool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ package internal

import (
"context"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
)

func TestUsage(t *testing.T) {
Expand Down Expand Up @@ -90,10 +92,87 @@ func TestUsage(t *testing.T) {

r = lim.Reserve()
assert.False(t, r.Allow(), "should not have a token available")
assert.Equal(t, UsageMetrics{0, 1, 0}, lim.Collect(), "not-allowed reservations immediately count rejection")
r.Used(false)
assert.Equal(t, UsageMetrics{0, 1, 0}, lim.Collect(), "not-allowed reservations count as rejection")
})
}

func TestRegression_ReserveCountsCorrectly(t *testing.T) {
run := func(t *testing.T, lim quotas.Limiter, advance func(time.Duration), collect func() UsageMetrics) {
allowed, returned, rejected := 0, 0, 0
for i := 0; ; i++ {
if rejected > 3 {
// normal exit: some rejects occurred.
break // just to get more than 1 to be more interesting
}
if i > 1_000 {
// infinite loop guard because it's a real mess to debug
t.Error("too many attempts, test is not sane. allowed:", allowed, "rejected:", rejected, "returned:", returned)
break
}

r := lim.Reserve()

if rand.Intn(2) == 0 {
// time advancing before canceling should not affect this test because it is not concurrent,
// so only do it sometimes to make sure that's true
advance(time.Millisecond)
}

if r.Allow() {
if i%2 == 0 {
allowed++
r.Used(true)
} else {
returned++
r.Used(false)
}
} else {
rejected++
// try with both true and false.
// expected use is to call with false on all rejects, but it should not be required
r.Used(i%2 == 0)
}
}
usage := collect()
t.Logf("usage: %#v", usage)
assert.NotZero(t, allowed, "should have allowed some requests")
assert.Equal(t, allowed, usage.Allowed, "wrong num of requests allowed")
assert.Equal(t, rejected, usage.Rejected, "wrong num of requests rejected")
assert.Equal(t, 0, usage.Idle, "limiter should never be idle in this test")
}

t.Run("counted", func(t *testing.T) {
// "base" counting-limiter should count correctly
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
lim := NewCountedLimiter(wrapped)

run(t, lim, ts.Advance, lim.Collect)
})
t.Run("shadowed", func(t *testing.T) {
// "shadowed" should call the primary correctly at the very least
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
counted := NewCountedLimiter(wrapped)
lim := NewShadowedLimiter(counted, allowlimiter{})

run(t, lim, ts.Advance, counted.Collect)
})
t.Run("fallback", func(t *testing.T) {
// "fallback" uses a different implementation, but it should count exactly the same.
// TODO: ideally it would actually be the same code, but that's a bit awkward due to needing different interfaces.
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
l := NewFallbackLimiter(allowlimiter{})
l.Update(1) // allows using primary, else it calls the fallback
l.primary = wrapped // cheat, just swap it out

run(t, l, ts.Advance, func() UsageMetrics {
u, _, _ := l.Collect()
return u
})
})
}

// Wait-based tests can block forever if there's an issue, better to fail fast.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,8 @@ func (b *FallbackLimiter) Wait(ctx context.Context) error {
}

func (b *FallbackLimiter) Reserve() clock.Reservation {
res := b.both().Reserve()
return countedReservation{
wrapped: res,
wrapped: b.both().Reserve(),
usage: &b.usage,
}
Comment on lines -206 to 209
Copy link
Contributor Author

@Groxx Groxx Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main flaw was here: it didn't pass through the Used(false) call when rejected, and did not count rejections.

it should have matched the counted-limiter's implementation. now they're both simple enough that it's fairly unlikely to diverge like this.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package internal_test
package internal
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly to share the allow-limiter. in principle I still like having this external because it doesn't need to be internal, but meh. it's rather minor.


import (
"context"
Expand All @@ -35,27 +35,26 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/quotas/global/collection/internal"
)

func TestLimiter(t *testing.T) {
t.Run("uses fallback initially", func(t *testing.T) {
m := quotas.NewMockLimiter(gomock.NewController(t))
m.EXPECT().Allow().Times(1).Return(true)
m.EXPECT().Allow().Times(2).Return(false)
lim := internal.NewFallbackLimiter(m)
lim := NewFallbackLimiter(m)

assert.True(t, lim.Allow(), "should return fallback's first response")
assert.False(t, lim.Allow(), "should return fallback's second response")
assert.False(t, lim.Allow(), "should return fallback's third response")

usage, starting, failing := lim.Collect()
assert.Equal(t, internal.UsageMetrics{1, 2, 0}, usage, "usage metrics should match returned values")
assert.Equal(t, UsageMetrics{1, 2, 0}, usage, "usage metrics should match returned values")
assert.True(t, starting, "should still be starting up")
assert.False(t, failing, "should not be failing, still starting up")
})
t.Run("uses primary after update", func(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{})
lim := NewFallbackLimiter(allowlimiter{})
lim.Update(1_000_000) // large enough to allow millisecond sleeps to refill

time.Sleep(time.Millisecond) // allow some tokens to fill
Expand All @@ -65,11 +64,11 @@ func TestLimiter(t *testing.T) {
usage, startup, failing := lim.Collect()
assert.False(t, failing, "should not use fallback limiter after update")
assert.False(t, startup, "should not be starting up, has had an update")
assert.Equal(t, internal.UsageMetrics{2, 0, 0}, usage, "usage should match behavior")
assert.Equal(t, UsageMetrics{2, 0, 0}, usage, "usage should match behavior")
})

t.Run("collecting usage data resets counts", func(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{})
lim := NewFallbackLimiter(allowlimiter{})
lim.Update(1)
lim.Allow()
limit, _, _ := lim.Collect()
Expand All @@ -88,7 +87,7 @@ func TestLimiter(t *testing.T) {
})

t.Run("falls back after too many failures", func(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{}) // fallback behavior is ignored
lim := NewFallbackLimiter(allowlimiter{}) // fallback behavior is ignored
lim.Update(1)
_, startup, failing := lim.Collect()
require.False(t, failing, "should not be using fallback")
Expand All @@ -112,7 +111,7 @@ func TestLimiter(t *testing.T) {
assert.True(t, lim.Allow(), "should return fallback's allowed request")
})
t.Run("failing many times does not accidentally switch away from startup mode", func(t *testing.T) {
lim := internal.NewFallbackLimiter(nil)
lim := NewFallbackLimiter(nil)
for i := 0; i < maxFailedUpdates*10; i++ {
lim.FailedUpdate()
_, startup, failing := lim.Collect()
Expand All @@ -124,14 +123,14 @@ func TestLimiter(t *testing.T) {

t.Run("coverage", func(t *testing.T) {
// easy line to cover to bring to 100%
lim := internal.NewFallbackLimiter(nil)
lim := NewFallbackLimiter(nil)
lim.Update(1)
lim.Update(1) // should go down "no changes needed, return early" path
})
}

func TestLimiterNotRacy(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{})
lim := NewFallbackLimiter(allowlimiter{})
var g errgroup.Group
const loops = 1000
for i := 0; i < loops; i++ {
Expand Down
Loading