Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace policy and action limiters with a checkin limiter #3255

Merged
merged 10 commits into from
Feb 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Replace policy throttle with limiter
summary: Move action limiter to checkin response writer

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Replace the existing policy_throttle setting with a policy_limit
setting that controls the same thing, how fast policies are passed to
subscriptions to control mass responses. If no policy_limit settings
are specified the policy_throttle is used with burst 1.
Remove the policy_limiter that was added by 3008 and not released.
Move the action limiter to the checkin handler's write response method before a gzip.Writer is retrieved from the pool or allocated.
This allows fleet-server to control gzip.Writer access for responses that contain either actions from the action dispatcher, or a
POLICY_CHANGE action from the policy monitor.
Removing the throttle/limiter from the policy monitor also greatly improves performance of policy dispatches.

# Affected component; a word indicating the component this changeset affects.
component:
Expand All @@ -29,8 +30,8 @@ component:
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https:/owner/repo/1234
pr: 3255

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 3008
issue: 3254
16 changes: 3 additions & 13 deletions fleet-server.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ fleet:
# # If specified a set of other limits is automatically loaded.
# max_agents: 0
# # policy_throttle is the duration that the fleet-server will wait in between attempts to dispatch policy updates to polling agents
# # deprecated: replaced by policy_limit settings
# policy_throttle: 5ms # 1ms min is forced
# # deprecated: replaced by action_limit settings
# policy_throttle: 5ms
# # max_header_byte_size is the request header size limit
# max_header_byte_size: 8192 # 8Kib
# # max_connections is the maximum number of connnections per API endpoint
# max_connections: 0
#
# # action_limit is a limiter for the action dispatcher, it is added to control how fast the checkin endpoint writes responses when an action effecting multiple agents is detected.
# # action_limit is a limiter for checkin responses that contain actions
# # This is done in order to be able to reuse gzip writers if gzip is requested as allocating new writers is expensive (around 1.2MB for a new allocation).
# # If the interval is too high it may negativly effect assumtions around server write timeouts and poll poll durations, if used we expect the value to be around 5ms.
# # An interval value of 0 disables the limiter by using an infinite rate limit (default behavior).
Expand All @@ -161,16 +161,6 @@ fleet:
# interval: 0
# burst: 5
#
# # policy_limit is a limiter used to control how fast fleet-server sends POLICY_CHANGE actions to agents.
# # As with the action_limit settings this is done to avoid allocating a lot of gzip writers.
# # The default settings are to have the interval of 5ms with a burst of 1.
# # A min burst value of 1 is always enforced.
# # If no interval is specified, the policy_throttle may be used as the interval instead.
# # if both interval and policy_throttle are 0, a value of 1ns is used instead.
# policy_limit:
# interval: 5ms
# burst: 1
#
# # endpoint specific limits below
# checkin_limit:
# interval: 1ms
Expand Down
19 changes: 4 additions & 15 deletions internal/pkg/action/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"

"github.com/rs/zerolog"
"golang.org/x/time/rate"
)

// Sub is an action subscription that will give a single agent all of it's actions.
Expand All @@ -34,23 +33,17 @@ func (s Sub) Ch() chan []model.Action {

// Dispatcher tracks agent subscriptions and emits actions to the subscriptions.
type Dispatcher struct {
am monitor.SimpleMonitor
limit *rate.Limiter
am monitor.SimpleMonitor

mx sync.RWMutex
subs map[string]Sub
}

// NewDispatcher creates a Dispatcher using the provided monitor.
func NewDispatcher(am monitor.SimpleMonitor, throttle time.Duration, i int) *Dispatcher {
r := rate.Inf
if throttle > 0 {
r = rate.Every(throttle)
}
func NewDispatcher(am monitor.SimpleMonitor) *Dispatcher {
return &Dispatcher{
am: am,
limit: rate.NewLimiter(r, i),
subs: make(map[string]Sub),
am: am,
subs: make(map[string]Sub),
}
}

Expand Down Expand Up @@ -129,10 +122,6 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {
}

for agentID, actions := range agentActions {
if err := d.limit.Wait(ctx); err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("action dispatcher rate limit error")
return
}
d.dispatch(ctx, agentID, actions)
}
}
Expand Down
45 changes: 2 additions & 43 deletions internal/pkg/action/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"
)

type mockMonitor struct {
Expand All @@ -39,7 +38,7 @@ func (m *mockMonitor) GetCheckpoint() sqn.SeqNo {

func TestNewDispatcher(t *testing.T) {
m := &mockMonitor{}
d := NewDispatcher(m, 0, 0)
d := NewDispatcher(m)

assert.NotNil(t, d.am)
assert.NotNil(t, d.subs)
Expand Down Expand Up @@ -128,41 +127,6 @@ func Test_Dispatcher_Run(t *testing.T) {
Type: "upgrade",
}},
},
}, {
name: "three agent action with limiter",
throttle: 1 * time.Second,
getMock: func() *mockMonitor {
m := &mockMonitor{}
ch := make(chan []es.HitT)
go func() {
ch <- []es.HitT{es.HitT{
Source: json.RawMessage(`{"action_id":"test-action","agents":["agent1","agent2","agent3"],"data":{"key":"value"},"type":"upgrade"}`),
}}
}()
var rch <-chan []es.HitT = ch
m.On("Output").Return(rch)
return m
},
expect: map[string][]model.Action{
"agent1": []model.Action{model.Action{
ActionID: "test-action",
Agents: nil,
Data: json.RawMessage(`{"key":"value"}`),
Type: "upgrade",
}},
"agent2": []model.Action{model.Action{
ActionID: "test-action",
Agents: nil,
Data: json.RawMessage(`{"key":"value"}`),
Type: "upgrade",
}},
"agent3": []model.Action{model.Action{
ActionID: "test-action",
Agents: nil,
Data: json.RawMessage(`{"key":"value"}`),
Type: "upgrade",
}},
},
}, {
name: "one agent action with scheduling",
getMock: func() *mockMonitor {
Expand Down Expand Up @@ -235,13 +199,8 @@ func Test_Dispatcher_Run(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := tt.getMock()
throttle := rate.Inf
if tt.throttle > 0 {
throttle = rate.Every(tt.throttle)
}
d := &Dispatcher{
am: m,
limit: rate.NewLimiter(throttle, 1),
am: m,
subs: map[string]Sub{
"agent1": Sub{
agentID: "agent1",
Expand Down
22 changes: 21 additions & 1 deletion internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/hashicorp/go-version"
"github.com/miolini/datacounter"
"github.com/rs/zerolog"
"golang.org/x/time/rate"

"go.elastic.co/apm/module/apmhttp/v2"
"go.elastic.co/apm/v2"
Expand Down Expand Up @@ -75,6 +76,7 @@ type CheckinT struct {
// gwPool is a gzip.Writer pool intended to lower the amount of writers created when responding to checkin requests.
// gzip.Writer allocations are expensive (~1.2MB each) and can exhaust an instance's memory if a lot of concurrent responses are sent (this occurs when a mass-action such as an upgrade is detected).
// effectiveness of the pool is controlled by rate limiter configured through the limit.action_limit attribute.
limit *rate.Limiter
gwPool sync.Pool
bulker bulk.Bulk
}
Expand All @@ -90,6 +92,13 @@ func NewCheckinT(
tr *action.TokenResolver,
bulker bulk.Bulk,
) *CheckinT {
rt := rate.Every(cfg.Limits.ActionLimit.Interval)
if cfg.Limits.ActionLimit.Interval == 0 && cfg.Limits.PolicyThrottle != 0 {
rt = rate.Every(cfg.Limits.PolicyThrottle)
} else if cfg.Limits.ActionLimit.Interval == 0 && cfg.Limits.PolicyThrottle == 0 {
rt = rate.Inf
}
zerolog.Ctx(context.TODO()).Debug().Any("event_rate", rt).Int("burst", cfg.Limits.ActionLimit.Burst).Msg("checkin response gzip limiter")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't there be a ctx passed instead of context.TODO()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally yes, but we would need to make sure that the context is tied to the function instead of checkinT's lifecycle.
We also use context.TODO in a few other places similar to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an existing issue to address this: #3087

ct := &CheckinT{
verCon: verCon,
cfg: cfg,
Expand All @@ -99,6 +108,7 @@ func NewCheckinT(
gcp: gcp,
ad: ad,
tr: tr,
limit: rate.NewLimiter(rt, cfg.Limits.ActionLimit.Burst),
Copy link
Contributor

@juliaElastic juliaElastic Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no ActionLimit.Max setting, does it mean only the Burst is being limited?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interval and burst are used to configure the rate limiter (interval is the time it takes for 1 token to be added to the rate limit pool, burst is max pool size)
the max attributes was used to add limits to the total number of connections allowed on an endpoint (here we would use the checkin endpoint setting).

gwPool: sync.Pool{
New: func() any {
zipper, err := gzip.NewWriterLevel(io.Discard, cfg.CompressionLevel)
Expand Down Expand Up @@ -548,7 +558,17 @@ func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r
compressionLevel := ct.cfg.CompressionLevel
compressThreshold := ct.cfg.CompressionThresh

if len(payload) > compressThreshold && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) {
if (len(fromPtr(resp.Actions)) > 0 || len(payload) > compressThreshold) && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) {
// We compress the response if it would be over the threshold (default 1kb) or we are sending an action.
// gzip.Writer allocations are expensive, and we can easily hit an OOM issue if we try to write a large number of responses at once.
// This is likely to happen on bulk upgrades, or policy changes.
// In order to avoid a massive memory allocation we do two things:
// 1. re-use gzip.Writer instances across responses (through a pool)
// 2. rate-limit access to the writer pool
if err := ct.limit.Wait(ctx); err != nil {
return fmt.Errorf("checkin response limiter error: %w", err)
}

wrCounter := datacounter.NewWriterCounter(w)

zipper, _ := ct.gwPool.Get().(*gzip.Writer)
Expand Down
44 changes: 42 additions & 2 deletions internal/pkg/api/handleCheckin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ func TestResolveSeqNo(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
verCon := mustBuildConstraints("8.0.0")
cfg := &config.Server{}
cfg := &config.Server{Limits: config.ServerLimits{ActionLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}}}
c, _ := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
bc := checkin.NewBulk(nil)
bulker := ftesting.NewMockBulk()
pim := mockmonitor.NewMockMonitor()
pm := policy.NewMonitor(bulker, pim, config.ServerLimits{PolicyLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}})
pm := policy.NewMonitor(bulker, pim)
ct := NewCheckinT(verCon, cfg, c, bc, pm, nil, nil, nil, nil)

resp, _ := ct.resolveSeqNo(ctx, logger, tc.req, tc.agent)
Expand Down Expand Up @@ -623,6 +623,46 @@ func Test_CheckinT_writeResponse(t *testing.T) {
}
}

func Test_CheckinT_writeResponse_limiter(t *testing.T) {
verCon := mustBuildConstraints("8.0.0")
cfg := &config.Server{
CompressionLevel: flate.BestSpeed,
CompressionThresh: 1,
Limits: config.ServerLimits{
ActionLimit: config.Limit{
Interval: 50 * time.Millisecond,
Burst: 1,
},
},
}

ct := NewCheckinT(verCon, cfg, nil, nil, nil, nil, nil, nil, ftesting.NewMockBulk())
wr1 := httptest.NewRecorder()
wr2 := httptest.NewRecorder()
req := &http.Request{
Header: http.Header{
"Accept-Encoding": []string{"gzip"},
},
}
cresp := CheckinResponse{
Action: "checkin",
}

ts := time.Now()
err1 := ct.writeResponse(testlog.SetLogger(t), wr1, req, &model.Agent{}, cresp)
err2 := ct.writeResponse(testlog.SetLogger(t), wr2, req, &model.Agent{}, cresp)
dur := time.Since(ts)
require.NoError(t, err1)
require.NoError(t, err2)

res1 := wr1.Result()
assert.Equal(t, "gzip", res1.Header.Get("Content-Encoding"))
res2 := wr2.Result()
assert.Equal(t, "gzip", res2.Header.Get("Content-Encoding"))

assert.GreaterOrEqual(t, dur, 50*time.Millisecond)
}

func Benchmark_CheckinT_writeResponse(b *testing.B) {
verCon := mustBuildConstraints("8.0.0")
cfg := &config.Server{
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func Test_server_Run(t *testing.T) {
require.NoError(t, err)
bulker := ftesting.NewMockBulk()
pim := mock.NewMockMonitor()
pm := policy.NewMonitor(bulker, pim, config.ServerLimits{PolicyLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}})
pm := policy.NewMonitor(bulker, pim)
bc := checkin.NewBulk(nil)
ct := NewCheckinT(verCon, cfg, c, bc, pm, nil, nil, nil, nil)
et, err := NewEnrollerT(verCon, cfg, nil, c)
Expand Down
3 changes: 0 additions & 3 deletions internal/pkg/config/defaults/lte10000_limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ server_limits:
action_limit:
interval: 1ms
burst: 10
policy_interval:
interval: 5ms
burst: 1
checkin_limit:
interval: 1ms
burst: 2000
Expand Down
3 changes: 0 additions & 3 deletions internal/pkg/config/defaults/lte20000_limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ server_limits:
action_limit:
interval: 1ms
burst: 100
policy_limit:
interval: 5ms
burst: 1
checkin_limit:
interval: 0.5ms
burst: 4000
Expand Down
3 changes: 0 additions & 3 deletions internal/pkg/config/defaults/lte2500_limite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ server_limits:
action_limit:
interval: 5ms
burst: 1
policy_limit:
interval: 5ms
burst: 1
checkin_limit:
interval: 5ms
burst: 500
Expand Down
3 changes: 0 additions & 3 deletions internal/pkg/config/defaults/lte40000_limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ server_limits:
action_limit:
interval: 0.5ms
burst: 100
policy_limit:
interval: 2ms
burst: 1
checkin_limit:
interval: 0.5ms
burst: 4000
Expand Down
3 changes: 0 additions & 3 deletions internal/pkg/config/defaults/lte5000_limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ server_limits:
action_limit:
interval: 5ms
burst: 5
policy_limit:
interval: 5ms
burst: 1
checkin_limit:
interval: 2ms
burst: 1000
Expand Down
3 changes: 0 additions & 3 deletions internal/pkg/config/defaults/max_limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ server_limits:
action_limit:
interval: 0.25ms
burst: 100
policy_limit:
interval: 0.25ms
burst: 1
checkin_limit:
interval: 0.25ms
burst: 8000
Expand Down
Loading
Loading