Skip to content

Commit

Permalink
Merge pull request #72 from steadybit/feat/string-interning
Browse files Browse the repository at this point in the history
feat: add option to intern discovery data strings
  • Loading branch information
joshiste authored Sep 30, 2024
2 parents 05eefcd + 61c3d0d commit 853d881
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
- uses: actions/setup-go@v5
with:
cache-dependency-path: ./go/discovery_kit_sdk
go-version: '1.22'
go-version: '1.23'
- name: Audit
run: |
go mod download
Expand Down
5 changes: 5 additions & 0 deletions go/discovery_kit_sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.2.0

- Update to go 1.23
- add `WithTargetStringInterning` / `WithEnrichmentDataStringInterning`

## 1.1.1

- add http request to context
Expand Down
71 changes: 66 additions & 5 deletions go/discovery_kit_sdk/caching_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ import (
"runtime/debug"
"sync"
"time"
"unique"
)

type discoverFn[T any] func(ctx context.Context) ([]T, error)

type CachedDiscovery[T any] struct {
Discovery

mu sync.RWMutex
lastModified time.Time
supplier func(ctx context.Context) ([]T, error)
supplier discoverFn[T]
data []T
err error
}
Expand Down Expand Up @@ -50,7 +53,7 @@ func NewCachedTargetDiscovery(d TargetDiscovery, opts ...CachedDiscoveryOpt[disc
c := &CachedTargetDiscovery{
CachedDiscovery: CachedDiscovery[discovery_kit_api.Target]{
Discovery: d,
supplier: recoverable(d.DiscoverTargets),
supplier: recoverable(internStrings(internTargetStrings, d.DiscoverTargets)),
data: make([]discovery_kit_api.Target, 0),
},
}
Expand All @@ -69,7 +72,7 @@ func NewCachedEnrichmentDataDiscovery(d EnrichmentDataDiscovery, opts ...CachedD
c := &CachedDataEnrichmentDiscovery{
CachedDiscovery: CachedDiscovery[discovery_kit_api.EnrichmentData]{
Discovery: d,
supplier: recoverable(d.DiscoverEnrichmentData),
supplier: recoverable(internStrings(internEnrichmentDataItemStrings, d.DiscoverEnrichmentData)),
data: make([]discovery_kit_api.EnrichmentData, 0),
},
}
Expand All @@ -83,8 +86,8 @@ func (c *CachedDataEnrichmentDiscovery) DiscoverEnrichmentData(_ context.Context
return c.CachedDiscovery.Get()
}

func recoverable[T any](fn func(ctx context.Context) (T, error)) func(ctx context.Context) (T, error) {
return func(ctx context.Context) (d T, e error) {
func recoverable[T any](fn discoverFn[T]) discoverFn[T] {
return func(ctx context.Context) (d []T, e error) {
defer func() {
if err := recover(); err != nil {
log.Error().Msgf("discovery panic: %v\n %s", err, string(debug.Stack()))
Expand Down Expand Up @@ -297,3 +300,61 @@ func WithRefreshTimeout[T any](timeout time.Duration) CachedDiscoveryOpt[T] {
}
}
}

type makeHandleFunc func(s string) string

func internStrings[T any](fnInternItem func(makeHandleFunc, *T) error, fn discoverFn[T]) discoverFn[T] {
return func(ctx context.Context) ([]T, error) {
data, err := fn(ctx)
if err != nil || len(data) == 0 {
return data, err
}

//keep handles until all are process so gc won't remove them and break the deduplication
handles := make([]unique.Handle[string], len(data))
makeHandle := func(s string) string {
h := unique.Make(s)
handles = append(handles, h)
return h.Value()
}

for i := range data {
err = errors.Join(fnInternItem(makeHandle, &data[i]))
}
return data, err
}
}

func internEnrichmentDataItemStrings(makeHandle makeHandleFunc, data *discovery_kit_api.EnrichmentData) error {
if data == nil {
return nil
}

data.Id = makeHandle(data.Id)
data.EnrichmentDataType = makeHandle(data.EnrichmentDataType)
for key, values := range data.Attributes {
for i, value := range values {
values[i] = makeHandle(value)
}
data.Attributes[makeHandle(key)] = values
}

return nil
}

func internTargetStrings(makeHandle makeHandleFunc, data *discovery_kit_api.Target) error {
if data == nil {
return nil
}

data.Id = makeHandle(data.Id)
data.TargetType = makeHandle(data.TargetType)
for key, values := range data.Attributes {
for i, value := range values {
values[i] = makeHandle(value)
}
data.Attributes[makeHandle(key)] = values
}

return nil
}
120 changes: 93 additions & 27 deletions go/discovery_kit_sdk/caching_discovery_ed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (
"github.com/steadybit/discovery-kit/go/discovery_kit_api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"sync"
"testing"
"time"
"unsafe"
)

func Test_enrichmentData_caching(t *testing.T) {
ctx := context.Background()

discovery := newMockEnrichmentDataDiscovery()
cached := NewCachedEnrichmentDataDiscovery(discovery, WithRefreshEnrichmentDataNow())
cached := NewCachedEnrichmentDataDiscovery(discovery)

discovery.WaitForNextDiscovery()
cached.Refresh(context.Background())
first, _ := cached.DiscoverEnrichmentData(ctx)
second, _ := cached.DiscoverEnrichmentData(ctx)

Expand Down Expand Up @@ -68,20 +68,21 @@ func Test_enrichmentData_caching_error(t *testing.T) {
discovery.On("DiscoverEnrichmentData", mock.Anything).Return([]discovery_kit_api.EnrichmentData{}, errors.New("test")).Once()
discovery.On("DiscoverEnrichmentData", mock.Anything).Return([]discovery_kit_api.EnrichmentData{{}}, nil).Once()

ch <- struct{}{}
discovery.WaitForNextDiscovery()
trigger := func() {
ch <- struct{}{}
}

triggerAndWaitForUpdate(t, &cached.CachedDiscovery, trigger)
data, err := cached.DiscoverEnrichmentData(ctx)
assert.NoError(t, err)
assert.Len(t, data, 1)

ch <- struct{}{}
discovery.WaitForNextDiscovery()
triggerAndWaitForUpdate(t, &cached.CachedDiscovery, trigger)
data, err = cached.DiscoverEnrichmentData(ctx)
assert.Error(t, err)
assert.Len(t, data, 0)

ch <- struct{}{}
discovery.WaitForNextDiscovery()
triggerAndWaitForUpdate(t, &cached.CachedDiscovery, trigger)
data, err = cached.DiscoverEnrichmentData(ctx)
assert.NoError(t, err)
assert.Len(t, data, 1)
Expand Down Expand Up @@ -117,17 +118,17 @@ func Test_enrichmentData_cache_interval(t *testing.T) {
cached := NewCachedEnrichmentDataDiscovery(discovery, WithRefreshEnrichmentDataInterval(ctx, 20*time.Millisecond))

//should cache
discovery.WaitForNextDiscovery()
waitForLastModifiedChanges(t, &cached.CachedDiscovery)
first, _ := cached.DiscoverEnrichmentData(ctx)
second, _ := cached.DiscoverEnrichmentData(ctx)
assert.Equal(t, first, second)

//should refresh cache
first, _ = cached.DiscoverEnrichmentData(ctx)
discovery.WaitForNextDiscovery()
waitForLastModifiedChanges(t, &cached.CachedDiscovery)
second, _ = cached.DiscoverEnrichmentData(ctx)
assert.NotEqual(t, first, second)
discovery.WaitForNextDiscovery()
waitForLastModifiedChanges(t, &cached.CachedDiscovery)
third, _ := cached.DiscoverEnrichmentData(ctx)
assert.NotEqual(t, second, third)

Expand All @@ -147,18 +148,18 @@ func Test_enrichmentData_cache_trigger(t *testing.T) {
cached := NewCachedEnrichmentDataDiscovery(discovery, WithRefreshEnrichmentDataTrigger(ctx, ch, 0))

//should cache
discovery.WaitForNextDiscovery(func() {
trigger := func() {
ch <- struct{}{}
})
}
triggerAndWaitForUpdate(t, &cached.CachedDiscovery, trigger)

first, _ := cached.DiscoverEnrichmentData(ctx)
second, _ := cached.DiscoverEnrichmentData(ctx)
assert.Equal(t, first, second)

//should refresh cache
first, _ = cached.DiscoverEnrichmentData(ctx)
discovery.WaitForNextDiscovery(func() {
ch <- struct{}{}
})
triggerAndWaitForUpdate(t, &cached.CachedDiscovery, trigger)
second, _ = cached.DiscoverEnrichmentData(ctx)
assert.NotEqual(t, first, second)

Expand All @@ -184,7 +185,7 @@ func Test_enrichmentData_cache_trigger_throttle(t *testing.T) {
ch <- struct{}{}
ch <- struct{}{}
ch <- struct{}{}
discovery.WaitForNextDiscovery(func() {
triggerAndWaitForUpdate(t, &cached.CachedDiscovery, func() {
ch <- struct{}{}
})
second, _ := cached.DiscoverEnrichmentData(ctx)
Expand All @@ -196,36 +197,101 @@ func Test_enrichmentData_cache_update(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg := sync.WaitGroup{}

discovery := newMockEnrichmentDataDiscovery()
ch := make(chan string)
updateFn := func(data []discovery_kit_api.EnrichmentData, update string) ([]discovery_kit_api.EnrichmentData, error) {
defer wg.Done()
if update == "clear" {
return []discovery_kit_api.EnrichmentData{}, nil
}
return data, nil
}
cached := NewCachedEnrichmentDataDiscovery(discovery,
WithRefreshEnrichmentDataNow(),
WithEnrichmentDataUpdate(ctx, ch, updateFn),
)

//should cache
discovery.WaitForNextDiscovery()
triggerAndWaitForUpdate(t, &cached.CachedDiscovery, func() {
cached.Refresh(context.Background())
})
first, _ := cached.DiscoverEnrichmentData(ctx)
second, _ := cached.DiscoverEnrichmentData(ctx)
assert.Equal(t, first, second)

//should update cache
first, _ = cached.DiscoverEnrichmentData(ctx)
wg.Add(1)
go func() {
triggerAndWaitForUpdate(t, &cached.CachedDiscovery, func() {
ch <- "clear"
}()
wg.Wait()
})
second, _ = cached.DiscoverEnrichmentData(ctx)
assert.NotEqual(t, first, second)
assert.Empty(t, second)
}

func Test_enrichment_data_string_interning(t *testing.T) {
largeString := "ID: this is a very large string which should get unshared"
ctx := context.Background()

discovery := newMockEnrichmentDataDiscovery()
cached := NewCachedEnrichmentDataDiscovery(discovery)

discovery.On("DiscoverEnrichmentData", mock.Anything).Unset()
discovery.On("DiscoverEnrichmentData", ctx).Return([]discovery_kit_api.EnrichmentData{{
Id: largeString[:2],
Attributes: map[string][]string{
largeString[:2]: {largeString[4:]},
},
}}, nil)
cached.Refresh(ctx)
data, _ := cached.DiscoverEnrichmentData(ctx)

assert.Equal(t, "ID", data[0].Id)
assert.Equal(t, []string{"this is a very large string which should get unshared"}, data[0].Attributes["ID"])

assertSliceNotShared(t, largeString, data[0].Id)
for _, datum := range data {
for key, values := range datum.Attributes {
assertSliceNotShared(t, largeString, key)
for _, value := range values {
assertSliceNotShared(t, largeString, value)
}
}
}
}

func waitForLastModifiedChanges(t *testing.T, p p) {
triggerAndWaitForUpdate(t, p, nil)
}

func triggerAndWaitForUpdate(t *testing.T, cached p, trigger func()) {
t.Helper()

lm := cached.LastModified()
if trigger != nil {
trigger()
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
for {
if lm != cached.LastModified() {
break
}
if ctx.Err() != nil {
t.Fatalf("timeout waiting for last modified changes")
}
time.Sleep(10 * time.Millisecond)
}

}

func assertSliceNotShared(t *testing.T, haystack string, needle string) {
t.Helper()

pHayBegin := uintptr(unsafe.Pointer(unsafe.StringData(haystack)))
pHayEnd := uintptr(unsafe.Add(unsafe.Pointer(unsafe.StringData(haystack)), len(haystack)))
pNeedle := uintptr(unsafe.Pointer(unsafe.StringData(needle)))

if pNeedle >= pHayBegin && pNeedle < pHayEnd {
t.Errorf("slice is shared")
}
}
Loading

0 comments on commit 853d881

Please sign in to comment.