Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metrics counter retrieval #65

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions avgsamplerate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type AvgSampleRate struct {
done chan struct{}

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -151,6 +155,9 @@ func (a *AvgSampleRate) GetSampleRateMulti(key string, count int) int {
a.lock.Lock()
defer a.lock.Unlock()

a.requestCount++
a.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if a.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand Down Expand Up @@ -205,3 +212,14 @@ func (a *AvgSampleRate) LoadState(state []byte) error {

return nil
}

func (a *AvgSampleRate) GetMetrics(prefix string) map[string]int64 {
a.lock.Lock()
defer a.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": a.requestCount,
prefix + "event_count": a.eventCount,
prefix + "keyspace_size": int64(len(a.currentCounts)),
}
return mets
}
18 changes: 18 additions & 0 deletions avgsamplewithmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type AvgSampleWithMin struct {
done chan struct{}

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -168,6 +172,9 @@ func (a *AvgSampleWithMin) GetSampleRateMulti(key string, count int) int {
a.lock.Lock()
defer a.lock.Unlock()

a.requestCount++
a.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if a.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand Down Expand Up @@ -195,3 +202,14 @@ func (a *AvgSampleWithMin) SaveState() ([]byte, error) {
func (a *AvgSampleWithMin) LoadState(state []byte) error {
return nil
}

func (a *AvgSampleWithMin) GetMetrics(prefix string) map[string]int64 {
a.lock.Lock()
defer a.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": a.requestCount,
prefix + "event_count": a.eventCount,
prefix + "keyspace_size": int64(len(a.currentCounts)),
}
return mets
}
6 changes: 6 additions & 0 deletions dynsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ type Sampler interface {
// LoadState accepts a byte array containing the serialized, previous state of the sampler
// implementation. It should be called before `Start`.
LoadState([]byte) error

// GetMetrics returns a map of metrics about the sampler's performance.
// All values are returned as int64; counters are cumulative and the names
// always end with "_count", while gauges are instantaneous with no particular naming convention.
// All names are prefixed with the given string.
GetMetrics(prefix string) map[string]int64
}
22 changes: 22 additions & 0 deletions emasamplerate.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type EMASampleRate struct {

// used only in tests
testSignalMapsDone chan struct{}

// metrics
requestCount int64
eventCount int64
burstCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -238,6 +243,9 @@ func (e *EMASampleRate) GetSampleRateMulti(key string, count int) int {
e.lock.Lock()
defer e.lock.Unlock()

e.requestCount++
e.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if e.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand All @@ -254,6 +262,7 @@ func (e *EMASampleRate) GetSampleRateMulti(key string, count int) int {
if e.burstThreshold > 0 && e.currentBurstSum >= e.burstThreshold && e.intervalCount >= e.BurstDetectionDelay {
// reset the burst sum to prevent additional burst updates from occurring while updateMaps is running
e.currentBurstSum = 0
e.burstCount++
// send but don't block - consuming is blocked on updateMaps, which takes the same lock we're holding
select {
case e.burstSignal <- struct{}{}:
Expand Down Expand Up @@ -348,6 +357,19 @@ func (e *EMASampleRate) LoadState(state []byte) error {
return nil
}

func (e *EMASampleRate) GetMetrics(prefix string) map[string]int64 {
e.lock.Lock()
defer e.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": e.requestCount,
prefix + "event_count": e.eventCount,
prefix + "burst_count": e.burstCount,
prefix + "interval_count": int64(e.intervalCount),
prefix + "keyspace_size": int64(len(e.currentCounts)),
}
return mets
}

func adjustAverage(oldAvg, value float64, alpha float64) float64 {
adjustedNewVal := value * alpha
adjustedOldAvg := (1.0 - alpha) * oldAvg
Expand Down
22 changes: 22 additions & 0 deletions emathroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ type EMAThroughput struct {

// used only in tests
testSignalMapsDone chan struct{}

// metrics
requestCount int64
eventCount int64
burstCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -249,6 +254,9 @@ func (e *EMAThroughput) GetSampleRateMulti(key string, count int) int {
e.lock.Lock()
defer e.lock.Unlock()

e.requestCount++
e.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if e.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand All @@ -265,6 +273,7 @@ func (e *EMAThroughput) GetSampleRateMulti(key string, count int) int {
if e.burstThreshold > 0 && e.currentBurstSum >= e.burstThreshold && e.intervalCount >= e.BurstDetectionDelay {
// reset the burst sum to prevent additional burst updates from occurring while updateMaps is running
e.currentBurstSum = 0
e.burstCount++
// send but don't block - consuming is blocked on updateMaps, which takes the same lock we're holding
select {
case e.burstSignal <- struct{}{}:
Expand Down Expand Up @@ -358,3 +367,16 @@ func (e *EMAThroughput) LoadState(state []byte) error {

return nil
}

func (e *EMAThroughput) GetMetrics(prefix string) map[string]int64 {
e.lock.Lock()
defer e.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": e.requestCount,
prefix + "event_count": e.eventCount,
prefix + "burst_count": e.burstCount,
prefix + "interval_count": int64(e.intervalCount),
prefix + "keyspace_size": int64(len(e.currentCounts)),
}
return mets
}
18 changes: 18 additions & 0 deletions onlyonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type OnlyOnce struct {
seen map[string]bool
done chan struct{}

// metrics
requestCount int64
eventCount int64

lock sync.Mutex
}

Expand Down Expand Up @@ -99,6 +103,9 @@ func (o *OnlyOnce) GetSampleRate(key string) int {
func (o *OnlyOnce) GetSampleRateMulti(key string, count int) int {
o.lock.Lock()
defer o.lock.Unlock()
o.requestCount++
o.eventCount += int64(count)

if _, found := o.seen[key]; found {
return 1000000000
}
Expand All @@ -115,3 +122,14 @@ func (o *OnlyOnce) SaveState() ([]byte, error) {
func (o *OnlyOnce) LoadState(state []byte) error {
return nil
}

func (o *OnlyOnce) GetMetrics(prefix string) map[string]int64 {
o.lock.Lock()
defer o.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": o.requestCount,
prefix + "event_count": o.eventCount,
prefix + "keyspace_size": int64(len(o.seen)),
}
return mets
}
19 changes: 19 additions & 0 deletions perkeythroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type PerKeyThroughput struct {
done chan struct{}

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -129,6 +133,10 @@ func (p *PerKeyThroughput) GetSampleRate(key string) int {
func (p *PerKeyThroughput) GetSampleRateMulti(key string, count int) int {
p.lock.Lock()
defer p.lock.Unlock()

p.requestCount++
p.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if p.MaxKeys > 0 {
// If a key already exists, add the count. If not, but we're under the limit, store a new key
Expand All @@ -153,3 +161,14 @@ func (p *PerKeyThroughput) SaveState() ([]byte, error) {
func (p *PerKeyThroughput) LoadState(state []byte) error {
return nil
}

func (p *PerKeyThroughput) GetMetrics(prefix string) map[string]int64 {
p.lock.Lock()
defer p.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": p.requestCount,
prefix + "event_count": p.eventCount,
prefix + "keyspace_size": int64(len(p.currentCounts)),
}
return mets
}
24 changes: 24 additions & 0 deletions static.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dynsampler

import "sync"

// Static implements Sampler with a static mapping for sample rates. This is
// useful if you have a known set of keys that you want to sample at specific
// rates and apply a default to everything else.
Expand All @@ -8,6 +10,12 @@ type Static struct {
Rates map[string]int
// Default is the value to use if the key is not whitelisted in Rates
Default int

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand All @@ -34,6 +42,11 @@ func (s *Static) GetSampleRate(key string) int {
// GetSampleRateMulti takes a key representing count spans and returns the
// appropriate sample rate for that key.
func (s *Static) GetSampleRateMulti(key string, count int) int {
s.lock.Lock()
defer s.lock.Unlock()

s.requestCount++
s.eventCount += int64(count)
if rate, found := s.Rates[key]; found {
return rate
}
Expand All @@ -49,3 +62,14 @@ func (s *Static) SaveState() ([]byte, error) {
func (s *Static) LoadState(state []byte) error {
return nil
}

func (s *Static) GetMetrics(prefix string) map[string]int64 {
s.lock.Lock()
defer s.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": s.requestCount,
prefix + "event_count": s.eventCount,
prefix + "keyspace_size": int64(len(s.Rates)),
}
return mets
}
19 changes: 19 additions & 0 deletions totalthroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type TotalThroughput struct {
done chan struct{}

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -138,6 +142,10 @@ func (t *TotalThroughput) GetSampleRate(key string) int {
func (t *TotalThroughput) GetSampleRateMulti(key string, count int) int {
t.lock.Lock()
defer t.lock.Unlock()

t.requestCount++
t.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if t.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand All @@ -162,3 +170,14 @@ func (t *TotalThroughput) SaveState() ([]byte, error) {
func (t *TotalThroughput) LoadState(state []byte) error {
return nil
}

func (t *TotalThroughput) GetMetrics(prefix string) map[string]int64 {
t.lock.Lock()
defer t.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": t.requestCount,
prefix + "event_count": t.eventCount,
prefix + "keyspace_size": int64(len(t.currentCounts)),
}
return mets
}
25 changes: 22 additions & 3 deletions windowedthroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type WindowedThroughput struct {
indexGenerator IndexGenerator

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
numKeys int
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -146,8 +151,8 @@ func (t *WindowedThroughput) updateMaps() {

// Apply the same aggregation algorithm as total throughput
// Short circuit if no traffic
numKeys := len(aggregateCounts)
if numKeys == 0 {
t.numKeys = len(aggregateCounts)
if t.numKeys == 0 {
// no traffic during the last period.
t.lock.Lock()
defer t.lock.Unlock()
Expand All @@ -157,7 +162,7 @@ func (t *WindowedThroughput) updateMaps() {
// figure out our target throughput per key over the lookback window.
totalGoalThroughput := t.GoalThroughputPerSec * t.LookbackFrequencyDuration.Seconds()
// floor the throughput but min should be 1 event per bucket per time period
throughputPerKey := math.Max(1, float64(totalGoalThroughput)/float64(numKeys))
throughputPerKey := math.Max(1, float64(totalGoalThroughput)/float64(t.numKeys))
// for each key, calculate sample rate by dividing counted events by the
// desired number of events
newSavedSampleRates := make(map[string]int)
Expand All @@ -180,6 +185,9 @@ func (t *WindowedThroughput) GetSampleRate(key string) int {
// GetSampleRateMulti takes a key representing count spans and returns the
// appropriate sample rate for that key.
func (t *WindowedThroughput) GetSampleRateMulti(key string, count int) int {
t.requestCount++
t.eventCount += int64(count)

// Insert the new key into the map.
current := t.indexGenerator.GetCurrentIndex()
err := t.countList.IncrementKey(key, current, count)
Expand All @@ -206,3 +214,14 @@ func (t *WindowedThroughput) SaveState() ([]byte, error) {
func (t *WindowedThroughput) LoadState(state []byte) error {
return nil
}

func (t *WindowedThroughput) GetMetrics(prefix string) map[string]int64 {
t.lock.Lock()
defer t.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": t.requestCount,
prefix + "event_count": t.eventCount,
prefix + "keyspace_size": int64(t.numKeys),
}
return mets
}