Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add GetSampleRateMulti #53

Merged
merged 7 commits into from
Mar 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions avgsamplerate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ type AvgSampleRate struct {
// gotten any samples of traffic, we should we should use the default goal
// sample rate for all events instead of sampling everything at 1
haveData bool
done chan struct{}

lock sync.Mutex
}

// Ensure we implement the sampler interface
var _ Sampler = (*AvgSampleRate)(nil)

func (a *AvgSampleRate) Start() error {
// apply defaults
if a.ClearFrequencySec == 0 {
Expand All @@ -59,17 +63,29 @@ func (a *AvgSampleRate) Start() error {
a.savedSampleRates = make(map[string]int)
}
a.currentCounts = make(map[string]int)
a.done = make(chan struct{})

// spin up calculator
go func() {
ticker := time.NewTicker(time.Second * time.Duration(a.ClearFrequencySec))
for range ticker.C {
a.updateMaps()
defer ticker.Stop()
for {
select {
case <-ticker.C:
a.updateMaps()
case <-a.done:
return
}
}
}()
return nil
}

func (a *AvgSampleRate) Stop() error {
close(a.done)
return nil
}

// updateMaps calculates a new saved rate map based on the contents of the
// counter map
func (a *AvgSampleRate) updateMaps() {
Expand Down Expand Up @@ -155,19 +171,25 @@ func (a *AvgSampleRate) updateMaps() {
}

// GetSampleRate takes a key and returns the appropriate sample rate for that
// key. Will never return zero.
// key.
func (a *AvgSampleRate) GetSampleRate(key string) int {
return a.GetSampleRateMulti(key, 1)
}

// GetSampleRateMulti takes a key representing count spans and returns the
// appropriate sample rate for that key.
func (a *AvgSampleRate) GetSampleRateMulti(key string, count int) int {
a.lock.Lock()
defer a.lock.Unlock()

// Enforce MaxKeys limit on the size of the map
if a.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
if _, found := a.currentCounts[key]; found || len(a.currentCounts) < a.MaxKeys {
a.currentCounts[key]++
a.currentCounts[key] += count
}
} else {
a.currentCounts[key]++
a.currentCounts[key] += count
}
if !a.haveData {
return a.GoalSampleRate
Expand Down
3 changes: 1 addition & 2 deletions avgsamplerate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ func TestAvgSampleRateSaveState(t *testing.T) {
state, err := sampler.SaveState()
assert.Nil(t, err)

var newSampler Sampler
newSampler = &AvgSampleRate{}
var newSampler Sampler = &AvgSampleRate{}

err = newSampler.LoadState(state)
assert.Nil(t, err)
Expand Down
32 changes: 27 additions & 5 deletions avgsamplewithmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ type AvgSampleWithMin struct {
// gotten any samples of traffic, we should we should use the default goal
// sample rate for all events instead of sampling everything at 1
haveData bool
done chan struct{}

lock sync.Mutex
}

// Ensure we implement the sampler interface
var _ Sampler = (*AvgSampleWithMin)(nil)

func (a *AvgSampleWithMin) Start() error {
// apply defaults
if a.ClearFrequencySec == 0 {
Expand All @@ -63,17 +67,29 @@ func (a *AvgSampleWithMin) Start() error {
// initialize internal variables
a.savedSampleRates = make(map[string]int)
a.currentCounts = make(map[string]int)
a.done = make(chan struct{})

// spin up calculator
go func() {
ticker := time.NewTicker(time.Second * time.Duration(a.ClearFrequencySec))
for range ticker.C {
a.updateMaps()
defer ticker.Stop()
for {
select {
case <-ticker.C:
a.updateMaps()
case <-a.done:
return
}
}
}()
return nil
}

func (a *AvgSampleWithMin) Stop() error {
close(a.done)
return nil
}

// updateMaps calculates a new saved rate map based on the contents of the
// counter map
func (a *AvgSampleWithMin) updateMaps() {
Expand Down Expand Up @@ -172,19 +188,25 @@ func (a *AvgSampleWithMin) updateMaps() {
}

// GetSampleRate takes a key and returns the appropriate sample rate for that
// key
// key.
func (a *AvgSampleWithMin) GetSampleRate(key string) int {
return a.GetSampleRateMulti(key, 1)
}

// GetSampleRateMulti takes a key representing count spans and returns the
// appropriate sample rate for that key.
func (a *AvgSampleWithMin) GetSampleRateMulti(key string, count int) int {
a.lock.Lock()
defer a.lock.Unlock()

// Enforce MaxKeys limit on the size of the map
if a.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
if _, found := a.currentCounts[key]; found || len(a.currentCounts) < a.MaxKeys {
a.currentCounts[key]++
a.currentCounts[key] += count
}
} else {
a.currentCounts[key]++
a.currentCounts[key] += count
}
if !a.haveData {
return a.GoalSampleRate
Expand Down
25 changes: 13 additions & 12 deletions blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
// BlockList is a data structure that keeps track of how often keys occur in a given time range in
// order to perform windowed lookback sampling. BlockList operates with monotonically increasing
// indexes, instead of timestamps.
// A BlockList is a single linked list of Blocks. Each Block has a frequency hashmap and an unique
// A BlockList is a single linked list of Blocks. Each Block has a frequency hashmap and a unique
// index.
type BlockList interface {
IncrementKey(key string, keyIndex int64) error
IncrementKey(key string, keyIndex int64, count int) error
AggregateCounts(currentIndex int64, lookbackIndex int64) map[string]int
}

Expand Down Expand Up @@ -42,19 +42,20 @@ func NewUnboundedBlockList() BlockList {
}
}

// IncrementKey is used when we've encounted a new key. The current keyIndex is also provided.
// This function will increment the key in the current block or create a new block, if needed.
// The happy path invocation is very fast, O(1).
func (b *UnboundedBlockList) IncrementKey(key string, keyIndex int64) error {
// IncrementKey is used when we've encounted a new key. The current keyIndex is
// also provided. This function will increment the key in the current block or
// create a new block, if needed. The happy path invocation is very fast, O(1).
// The count is the number of events that this call represents.
func (b *UnboundedBlockList) IncrementKey(key string, keyIndex int64, count int) error {
b.lock.Lock()
defer b.lock.Unlock()
return b.doIncrement(key, keyIndex)
return b.doIncrement(key, keyIndex, count)
}

func (b *UnboundedBlockList) doIncrement(key string, keyIndex int64) error {
func (b *UnboundedBlockList) doIncrement(key string, keyIndex int64, count int) error {
// A block matching keyStamp exists. Just increment the key there.
if b.head.next != nil && b.head.next.index == keyIndex {
b.head.next.keyToCount[key] += 1
b.head.next.keyToCount[key] += count
return nil
}

Expand All @@ -65,7 +66,7 @@ func (b *UnboundedBlockList) doIncrement(key string, keyIndex int64) error {
keyToCount: make(map[string]int),
next: currentFront,
}
b.head.next.keyToCount[key] += 1
b.head.next.keyToCount[key] += count
return nil
}

Expand Down Expand Up @@ -145,7 +146,7 @@ func NewBoundedBlockList(maxKeys int) BlockList {

// IncrementKey will always increment an existing key. If the key is new, it will be rejected if
// there are maxKeys existing entries.
func (b *BoundedBlockList) IncrementKey(key string, keyIndex int64) error {
func (b *BoundedBlockList) IncrementKey(key string, keyIndex int64, count int) error {
b.baseList.lock.Lock()
defer b.baseList.lock.Unlock()

Expand All @@ -154,7 +155,7 @@ func (b *BoundedBlockList) IncrementKey(key string, keyIndex int64) error {
return MaxSizeError{key: key}
}

b.baseList.doIncrement(key, keyIndex)
b.baseList.doIncrement(key, keyIndex, count)
return nil
}

Expand Down
43 changes: 25 additions & 18 deletions blocklist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,34 @@ import (
// for our tests.
// This datastructure is designed to be completely linearizable, as it has a single lock that it
// acquires with every operation.

type pair struct {
index int64
count int
}

type AtomicRecord struct {
records map[string][]int64
records map[string][]pair
maxKeys int
lock sync.Mutex
}

func NewAtomicRecord(maxKeys int) *AtomicRecord {
return &AtomicRecord{
records: make(map[string][]int64),
records: make(map[string][]pair),
maxKeys: maxKeys,
}
}

func (r *AtomicRecord) IncrementKey(key string, keyIndex int64) error {
func (r *AtomicRecord) IncrementKey(key string, keyIndex int64, count int) error {

r.lock.Lock()
defer r.lock.Unlock()

if len(r.records) >= r.maxKeys {
return MaxSizeError{key: key}
}
r.records[key] = append([]int64{keyIndex}, r.records[key]...)
r.records[key] = append([]pair{{index: keyIndex, count: count}}, r.records[key]...)
return nil
}

Expand All @@ -55,10 +62,10 @@ func (r *AtomicRecord) AggregateCounts(
// Aggregate.
lastIndex := -1
for i, r := range record {
if r <= startIndex && r > finishIndex {
aggregateCounts[key] += 1
if r.index <= startIndex && r.index > finishIndex {
aggregateCounts[key] += r.count
}
if lastIndex == -1 && r <= finishIndex {
if lastIndex == -1 && r.index <= finishIndex {
lastIndex = i
}
}
Expand Down Expand Up @@ -88,8 +95,8 @@ func TestSanity(t *testing.T) {
currentIndex := int64(0)

for i := 0; i < 10; i++ {
blockList.IncrementKey(testKey, currentIndex)
atomicRecord.IncrementKey(testKey, currentIndex)
blockList.IncrementKey(testKey, currentIndex, 1)
atomicRecord.IncrementKey(testKey, currentIndex, 1)
currentIndex += 1
}

Expand All @@ -107,8 +114,8 @@ func TestBounded(t *testing.T) {
// Test basic dropping.
for i := 0; i < 15; i++ {
testKey := fmt.Sprintf("test_%d", i)
actualErr := blockList.IncrementKey(testKey, currentIndex)
expectedErr := atomicRecord.IncrementKey(testKey, currentIndex)
actualErr := blockList.IncrementKey(testKey, currentIndex, 1)
expectedErr := atomicRecord.IncrementKey(testKey, currentIndex, 1)
assert.Equal(t, expectedErr, actualErr)
}

Expand All @@ -120,8 +127,8 @@ func TestBounded(t *testing.T) {
// Consistent single insert per count.
for i := 0; i < 15; i++ {
testKey := fmt.Sprintf("test_%d", i)
actualErr := blockList.IncrementKey(testKey, currentIndex)
expectedErr := atomicRecord.IncrementKey(testKey, currentIndex)
actualErr := blockList.IncrementKey(testKey, currentIndex, 1)
expectedErr := atomicRecord.IncrementKey(testKey, currentIndex, 1)
assert.Equal(t, expectedErr, actualErr)
assert.Equal(t, atomicRecord.AggregateCounts(currentIndex, 10),
blockList.AggregateCounts(currentIndex, 10))
Expand All @@ -134,8 +141,8 @@ func TestBounded(t *testing.T) {
for j := 0; j < 10; j++ {
keySuffix := random.Intn(20)
testKey := fmt.Sprintf("test_%d", keySuffix)
actualErr := blockList.IncrementKey(testKey, currentIndex)
expectedErr := atomicRecord.IncrementKey(testKey, currentIndex)
actualErr := blockList.IncrementKey(testKey, currentIndex, 1)
expectedErr := atomicRecord.IncrementKey(testKey, currentIndex, 1)
assert.Equal(t, expectedErr, actualErr)
}

Expand Down Expand Up @@ -199,8 +206,8 @@ func compareConcurrency(t *testing.T, reference BlockList, actual BlockList) {
// These need to be performed atomically.
lock.Lock()
currentIndex := atomic.LoadInt64(&globalIndex)
referenceErr := reference.IncrementKey(testKey, currentIndex)
actualErr := actual.IncrementKey(testKey, currentIndex)
referenceErr := reference.IncrementKey(testKey, currentIndex, 1)
actualErr := actual.IncrementKey(testKey, currentIndex, 1)
assert.Equal(t, referenceErr, actualErr)

sleepTime := time.Duration(random.Intn(100)) * time.Millisecond
Expand All @@ -225,7 +232,7 @@ func concurrentUpdates(t *testing.T, blockList BlockList) {
for j := 0; j < 15; j++ {
currentIndex := atomic.LoadInt64(&globalIndex)
testKey := fmt.Sprintf("test_%d", j)
blockList.IncrementKey(testKey, currentIndex)
blockList.IncrementKey(testKey, currentIndex, 1)
}
}
}()
Expand Down
19 changes: 16 additions & 3 deletions dynsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,24 @@ type Sampler interface {
// sampler.
Start() error

// GetSampleRate will return the sample rate to use for the string given. You
// should call it with whatever key you choose to use to partition traffic
// into different sample rates.
// Stop halts the sampler and any background goroutines
Stop() error

// GetSampleRate will return the sample rate to use for the given key
// string. You should call it with whatever key you choose to use to
// partition traffic into different sample rates. It assumes that you're
// calling it for a single item to be sampled (typically a span from a
// trace), and simply calls GetSampleRateMulti with 1 for the second
// parameter.
GetSampleRate(string) int

// GetSampleRateMulti will return the sample rate to use for the given key
// string. You should call it with whatever key you choose to use to
// partition traffic into different sample rates. It assumes you're calling
// it for a group of samples. The second parameter is the number of samples
// this call represents.
GetSampleRateMulti(string, int) int
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved

// SaveState returns a byte array containing the state of the Sampler implementation.
// It can be used to persist state between process restarts.
SaveState() ([]byte, error)
Expand Down
Loading