From be1d04929fe384d9e7dc3e0a762f38347dc33351 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Tue, 21 May 2024 10:11:22 +0200 Subject: [PATCH 1/3] Added a new metric to the workflow cache This is to track the max number of requests to a single workflow per domain --- common/metrics/client_mock.go | 278 ++++++++++++++++++ common/metrics/defs.go | 2 + common/metrics/interfaces.go | 2 + service/history/workflowcache/cache.go | 12 +- service/history/workflowcache/metrics.go | 89 ++++++ service/history/workflowcache/metrics_test.go | 101 +++++++ 6 files changed, 483 insertions(+), 1 deletion(-) create mode 100644 common/metrics/client_mock.go create mode 100644 service/history/workflowcache/metrics.go create mode 100644 service/history/workflowcache/metrics_test.go diff --git a/common/metrics/client_mock.go b/common/metrics/client_mock.go new file mode 100644 index 00000000000..ade3fdbf996 --- /dev/null +++ b/common/metrics/client_mock.go @@ -0,0 +1,278 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: interfaces.go + +// Package metrics is a generated GoMock package. +package metrics + +import ( + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + tally "github.com/uber-go/tally" +) + +// MockClient is a mock of Client interface. +type MockClient struct { + ctrl *gomock.Controller + recorder *MockClientMockRecorder +} + +// MockClientMockRecorder is the mock recorder for MockClient. +type MockClientMockRecorder struct { + mock *MockClient +} + +// NewMockClient creates a new mock instance. +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder +} + +// AddCounter mocks base method. +func (m *MockClient) AddCounter(scope, counter int, delta int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddCounter", scope, counter, delta) +} + +// AddCounter indicates an expected call of AddCounter. +func (mr *MockClientMockRecorder) AddCounter(scope, counter, delta interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddCounter", reflect.TypeOf((*MockClient)(nil).AddCounter), scope, counter, delta) +} + +// IncCounter mocks base method. +func (m *MockClient) IncCounter(scope, counter int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncCounter", scope, counter) +} + +// IncCounter indicates an expected call of IncCounter. +func (mr *MockClientMockRecorder) IncCounter(scope, counter interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncCounter", reflect.TypeOf((*MockClient)(nil).IncCounter), scope, counter) +} + +// RecordHistogramDuration mocks base method. +func (m *MockClient) RecordHistogramDuration(scope, timer int, d time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RecordHistogramDuration", scope, timer, d) +} + +// RecordHistogramDuration indicates an expected call of RecordHistogramDuration. +func (mr *MockClientMockRecorder) RecordHistogramDuration(scope, timer, d interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordHistogramDuration", reflect.TypeOf((*MockClient)(nil).RecordHistogramDuration), scope, timer, d) +} + +// RecordTimer mocks base method. +func (m *MockClient) RecordTimer(scope, timer int, d time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RecordTimer", scope, timer, d) +} + +// RecordTimer indicates an expected call of RecordTimer. +func (mr *MockClientMockRecorder) RecordTimer(scope, timer, d interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordTimer", reflect.TypeOf((*MockClient)(nil).RecordTimer), scope, timer, d) +} + +// Scope mocks base method. +func (m *MockClient) Scope(scope int, tags ...Tag) Scope { + m.ctrl.T.Helper() + varargs := []interface{}{scope} + for _, a := range tags { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Scope", varargs...) + ret0, _ := ret[0].(Scope) + return ret0 +} + +// Scope indicates an expected call of Scope. +func (mr *MockClientMockRecorder) Scope(scope interface{}, tags ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{scope}, tags...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scope", reflect.TypeOf((*MockClient)(nil).Scope), varargs...) +} + +// StartTimer mocks base method. +func (m *MockClient) StartTimer(scope, timer int) tally.Stopwatch { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartTimer", scope, timer) + ret0, _ := ret[0].(tally.Stopwatch) + return ret0 +} + +// StartTimer indicates an expected call of StartTimer. +func (mr *MockClientMockRecorder) StartTimer(scope, timer interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartTimer", reflect.TypeOf((*MockClient)(nil).StartTimer), scope, timer) +} + +// UpdateGauge mocks base method. +func (m *MockClient) UpdateGauge(scope, gauge int, value float64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateGauge", scope, gauge, value) +} + +// UpdateGauge indicates an expected call of UpdateGauge. +func (mr *MockClientMockRecorder) UpdateGauge(scope, gauge, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateGauge", reflect.TypeOf((*MockClient)(nil).UpdateGauge), scope, gauge, value) +} + +// MockScope is a mock of Scope interface. +type MockScope struct { + ctrl *gomock.Controller + recorder *MockScopeMockRecorder +} + +// MockScopeMockRecorder is the mock recorder for MockScope. +type MockScopeMockRecorder struct { + mock *MockScope +} + +// NewMockScope creates a new mock instance. +func NewMockScope(ctrl *gomock.Controller) *MockScope { + mock := &MockScope{ctrl: ctrl} + mock.recorder = &MockScopeMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockScope) EXPECT() *MockScopeMockRecorder { + return m.recorder +} + +// AddCounter mocks base method. +func (m *MockScope) AddCounter(counter int, delta int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddCounter", counter, delta) +} + +// AddCounter indicates an expected call of AddCounter. +func (mr *MockScopeMockRecorder) AddCounter(counter, delta interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddCounter", reflect.TypeOf((*MockScope)(nil).AddCounter), counter, delta) +} + +// IncCounter mocks base method. +func (m *MockScope) IncCounter(counter int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncCounter", counter) +} + +// IncCounter indicates an expected call of IncCounter. +func (mr *MockScopeMockRecorder) IncCounter(counter interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncCounter", reflect.TypeOf((*MockScope)(nil).IncCounter), counter) +} + +// RecordHistogramDuration mocks base method. +func (m *MockScope) RecordHistogramDuration(timer int, d time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RecordHistogramDuration", timer, d) +} + +// RecordHistogramDuration indicates an expected call of RecordHistogramDuration. +func (mr *MockScopeMockRecorder) RecordHistogramDuration(timer, d interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordHistogramDuration", reflect.TypeOf((*MockScope)(nil).RecordHistogramDuration), timer, d) +} + +// RecordHistogramValue mocks base method. +func (m *MockScope) RecordHistogramValue(timer int, value float64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RecordHistogramValue", timer, value) +} + +// RecordHistogramValue indicates an expected call of RecordHistogramValue. +func (mr *MockScopeMockRecorder) RecordHistogramValue(timer, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordHistogramValue", reflect.TypeOf((*MockScope)(nil).RecordHistogramValue), timer, value) +} + +// RecordTimer mocks base method. +func (m *MockScope) RecordTimer(timer int, d time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RecordTimer", timer, d) +} + +// RecordTimer indicates an expected call of RecordTimer. +func (mr *MockScopeMockRecorder) RecordTimer(timer, d interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordTimer", reflect.TypeOf((*MockScope)(nil).RecordTimer), timer, d) +} + +// StartTimer mocks base method. +func (m *MockScope) StartTimer(timer int) Stopwatch { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartTimer", timer) + ret0, _ := ret[0].(Stopwatch) + return ret0 +} + +// StartTimer indicates an expected call of StartTimer. +func (mr *MockScopeMockRecorder) StartTimer(timer interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartTimer", reflect.TypeOf((*MockScope)(nil).StartTimer), timer) +} + +// Tagged mocks base method. +func (m *MockScope) Tagged(tags ...Tag) Scope { + m.ctrl.T.Helper() + varargs := []interface{}{} + for _, a := range tags { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Tagged", varargs...) + ret0, _ := ret[0].(Scope) + return ret0 +} + +// Tagged indicates an expected call of Tagged. +func (mr *MockScopeMockRecorder) Tagged(tags ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tagged", reflect.TypeOf((*MockScope)(nil).Tagged), tags...) +} + +// UpdateGauge mocks base method. +func (m *MockScope) UpdateGauge(gauge int, value float64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateGauge", gauge, value) +} + +// UpdateGauge indicates an expected call of UpdateGauge. +func (mr *MockScopeMockRecorder) UpdateGauge(gauge, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateGauge", reflect.TypeOf((*MockScope)(nil).UpdateGauge), gauge, value) +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 61ef2092597..de6d4419e52 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2458,6 +2458,7 @@ const ( UpdateWorkflowExecutionCount WorkflowIDCacheSizeGauge WorkflowIDCacheRequestsExternalRatelimitedCounter + WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge WorkflowIDCacheRequestsInternalRatelimitedCounter NumHistoryMetrics ) @@ -3093,6 +3094,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ UpdateWorkflowExecutionCount: {metricName: "update_workflow_execution_count", metricType: Counter}, WorkflowIDCacheSizeGauge: {metricName: "workflow_id_cache_size", metricType: Gauge}, WorkflowIDCacheRequestsExternalRatelimitedCounter: {metricName: "workflow_id_external_requests_ratelimited", metricType: Counter}, + WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge: {metricName: "workflow_id_external_requests_max_requests_per_seconds", metricType: Gauge}, WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter}, }, Matching: { diff --git a/common/metrics/interfaces.go b/common/metrics/interfaces.go index f36993e5f4a..3e017f79abc 100644 --- a/common/metrics/interfaces.go +++ b/common/metrics/interfaces.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination client_mock.go + package metrics import ( diff --git a/service/history/workflowcache/cache.go b/service/history/workflowcache/cache.go index 97dc590b401..6bab827925d 100644 --- a/service/history/workflowcache/cache.go +++ b/service/history/workflowcache/cache.go @@ -26,9 +26,11 @@ package workflowcache import ( "errors" + "sync" "time" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -48,6 +50,7 @@ type WFCache interface { type wfCache struct { lru cache.Cache + domainMaxCount sync.Map // map[string]domainMaxCount externalLimiterFactory quotas.LimiterFactory internalLimiterFactory quotas.LimiterFactory workflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter @@ -55,7 +58,10 @@ type wfCache struct { domainCache cache.DomainCache metricsClient metrics.Client logger log.Logger - getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error) + timeSource clock.TimeSource + + // we use functions to get cache items, and the current time, so we can mock it in unit tests + getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error) } type cacheKey struct { @@ -66,6 +72,7 @@ type cacheKey struct { type cacheValue struct { externalRateLimiter quotas.Limiter internalRateLimiter quotas.Limiter + countMetric workflowIDCountMetric } // Params is the parameters for a new WFCache @@ -90,12 +97,14 @@ func New(params Params) WFCache { MaxCount: params.MaxCount, ActivelyEvict: true, }), + domainMaxCount: sync.Map{}, // map[string]domainMaxCount externalLimiterFactory: params.ExternalLimiterFactory, internalLimiterFactory: params.InternalLimiterFactory, workflowIDCacheExternalEnabled: params.WorkflowIDCacheExternalEnabled, workflowIDCacheInternalEnabled: params.WorkflowIDCacheInternalEnabled, domainCache: params.DomainCache, metricsClient: params.MetricsClient, + timeSource: clock.NewRealTimeSource(), logger: params.Logger, } // We set getCacheItemFn to cache.getCacheItem so that we can mock it in unit tests @@ -138,6 +147,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi switch rateLimitType { case external: + c.updatePerDomainMaxWFRequestCount(domainName, value) if !value.externalRateLimiter.Allow() { c.emitRateLimitMetrics(domainID, workflowID, domainName, "external", metrics.WorkflowIDCacheRequestsExternalRatelimitedCounter) return false diff --git a/service/history/workflowcache/metrics.go b/service/history/workflowcache/metrics.go new file mode 100644 index 00000000000..8592c7a74ed --- /dev/null +++ b/service/history/workflowcache/metrics.go @@ -0,0 +1,89 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package workflowcache + +import ( + "sync" + "time" + + "github.com/uber/cadence/common/metrics" +) + +type workflowIDCountMetric struct { + secondStart time.Time + count int +} + +func (w *workflowIDCountMetric) reset(now time.Time) { + w.secondStart = now + w.count = 0 +} + +type domainMaxCount struct { + mu sync.Mutex + maxCount int + secondStart time.Time +} + +func (d *domainMaxCount) reset(now time.Time) { + d.maxCount = 0 + d.secondStart = now +} + +func (c *wfCache) getOrCreateDomain(domainName string, now time.Time) *domainMaxCount { + newDomainCount := &domainMaxCount{ + mu: sync.Mutex{}, + maxCount: 0, + secondStart: now, + } + + domainCount, _ := c.domainMaxCount.LoadOrStore(domainName, newDomainCount) + + return domainCount.(*domainMaxCount) +} + +func (c *wfCache) updatePerDomainMaxWFRequestCount(domainName string, value *cacheValue) { + now := c.timeSource.Now() + domain := c.getOrCreateDomain(domainName, now) + + // It's enough to lock the domain, as the workflowIDCountMetric belongs to the domain + domain.mu.Lock() + defer domain.mu.Unlock() + + if c.timeSource.Since(domain.secondStart) > time.Second { + // Emit the max count for the previous second and reset the count + c.metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)). + UpdateGauge(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge, float64(domain.maxCount)) + domain.reset(now) + } + + if c.timeSource.Since(value.countMetric.secondStart) > time.Second { + value.countMetric.reset(now) + } + + value.countMetric.count++ + + if value.countMetric.count > domain.maxCount { + domain.maxCount = value.countMetric.count + } +} diff --git a/service/history/workflowcache/metrics_test.go b/service/history/workflowcache/metrics_test.go new file mode 100644 index 00000000000..99a42e35186 --- /dev/null +++ b/service/history/workflowcache/metrics_test.go @@ -0,0 +1,101 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package workflowcache + +import ( + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/metrics" +) + +func TestGetOrCreateDomain(t *testing.T) { + wfCache := &wfCache{} + + domainID := "some domain name" + otherDomainID := "some other domain name" + + domainMaxCount1 := wfCache.getOrCreateDomain(domainID, time.Unix(123, 0)) + domainMaxCount2 := wfCache.getOrCreateDomain(domainID, time.Unix(456, 0)) + otherDomainMaxCount := wfCache.getOrCreateDomain(otherDomainID, time.Unix(789, 0)) + + // None of them are nil + assert.NotNil(t, domainMaxCount1) + assert.NotNil(t, domainMaxCount2) + assert.NotNil(t, otherDomainMaxCount) + + // domainMaxCount1 and domainMaxCount2 are the same + assert.Equal(t, domainMaxCount1, domainMaxCount2) + + // domainMaxCount1/domainMaxCount2 and otherDomainMaxCount are different + assert.NotEqual(t, domainMaxCount1, otherDomainMaxCount) + assert.NotEqual(t, domainMaxCount2, otherDomainMaxCount) +} + +func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) { + ctrl := gomock.NewController(t) + metricsClientMock := metrics.NewMockClient(ctrl) + metricsMockScope := metrics.NewMockScope(ctrl) + + now := time.Unix(123, 456) + timeSource := clock.NewMockedTimeSourceAt(now) + + wfc := &wfCache{ + metricsClient: metricsClientMock, + timeSource: timeSource, + } + + domainName := "some domain name" + value := &cacheValue{} + + for i := 0; i < 5; i++ { + wfc.updatePerDomainMaxWFRequestCount(domainName, value) + } + + // Test that the max count is recorded + domain := wfc.getOrCreateDomain(domainName, now) + assert.Equal(t, 5, domain.maxCount) + + // Test that different values are independent + value = &cacheValue{} + for i := 0; i < 8; i++ { + wfc.updatePerDomainMaxWFRequestCount(domainName, value) + } + domain = wfc.getOrCreateDomain(domainName, now) + assert.Equal(t, 8, domain.maxCount) + + // Test that the max count is emitted and reset after a second + metricsClientMock.EXPECT().Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)). + Return(metricsMockScope) + metricsMockScope.EXPECT(). + UpdateGauge(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge, 8.0) + timeSource.Advance(1100 * time.Millisecond) + wfc.updatePerDomainMaxWFRequestCount(domainName, value) + + domain = wfc.getOrCreateDomain(domainName, now) + assert.Equal(t, 1, domain.maxCount) +} From 73afab2ab1522e05eacebaa4610b2d2b9c08d1e6 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Thu, 30 May 2024 10:38:08 +0200 Subject: [PATCH 2/3] Updated based on review - Change from gauge to timer - Only track for workflow IDs the timer will do the bookkeeping for max value --- common/metrics/client_mock.go | 278 ------------------ common/metrics/defs.go | 4 +- common/metrics/interfaces.go | 2 - service/history/workflowcache/cache.go | 3 - service/history/workflowcache/metrics.go | 57 +--- service/history/workflowcache/metrics_test.go | 123 ++++---- 6 files changed, 80 insertions(+), 387 deletions(-) delete mode 100644 common/metrics/client_mock.go diff --git a/common/metrics/client_mock.go b/common/metrics/client_mock.go deleted file mode 100644 index ade3fdbf996..00000000000 --- a/common/metrics/client_mock.go +++ /dev/null @@ -1,278 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2017-2020 Uber Technologies Inc. - -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -// Code generated by MockGen. DO NOT EDIT. -// Source: interfaces.go - -// Package metrics is a generated GoMock package. -package metrics - -import ( - reflect "reflect" - time "time" - - gomock "github.com/golang/mock/gomock" - tally "github.com/uber-go/tally" -) - -// MockClient is a mock of Client interface. -type MockClient struct { - ctrl *gomock.Controller - recorder *MockClientMockRecorder -} - -// MockClientMockRecorder is the mock recorder for MockClient. -type MockClientMockRecorder struct { - mock *MockClient -} - -// NewMockClient creates a new mock instance. -func NewMockClient(ctrl *gomock.Controller) *MockClient { - mock := &MockClient{ctrl: ctrl} - mock.recorder = &MockClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockClient) EXPECT() *MockClientMockRecorder { - return m.recorder -} - -// AddCounter mocks base method. -func (m *MockClient) AddCounter(scope, counter int, delta int64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddCounter", scope, counter, delta) -} - -// AddCounter indicates an expected call of AddCounter. -func (mr *MockClientMockRecorder) AddCounter(scope, counter, delta interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddCounter", reflect.TypeOf((*MockClient)(nil).AddCounter), scope, counter, delta) -} - -// IncCounter mocks base method. -func (m *MockClient) IncCounter(scope, counter int) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "IncCounter", scope, counter) -} - -// IncCounter indicates an expected call of IncCounter. -func (mr *MockClientMockRecorder) IncCounter(scope, counter interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncCounter", reflect.TypeOf((*MockClient)(nil).IncCounter), scope, counter) -} - -// RecordHistogramDuration mocks base method. -func (m *MockClient) RecordHistogramDuration(scope, timer int, d time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RecordHistogramDuration", scope, timer, d) -} - -// RecordHistogramDuration indicates an expected call of RecordHistogramDuration. -func (mr *MockClientMockRecorder) RecordHistogramDuration(scope, timer, d interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordHistogramDuration", reflect.TypeOf((*MockClient)(nil).RecordHistogramDuration), scope, timer, d) -} - -// RecordTimer mocks base method. -func (m *MockClient) RecordTimer(scope, timer int, d time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RecordTimer", scope, timer, d) -} - -// RecordTimer indicates an expected call of RecordTimer. -func (mr *MockClientMockRecorder) RecordTimer(scope, timer, d interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordTimer", reflect.TypeOf((*MockClient)(nil).RecordTimer), scope, timer, d) -} - -// Scope mocks base method. -func (m *MockClient) Scope(scope int, tags ...Tag) Scope { - m.ctrl.T.Helper() - varargs := []interface{}{scope} - for _, a := range tags { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "Scope", varargs...) - ret0, _ := ret[0].(Scope) - return ret0 -} - -// Scope indicates an expected call of Scope. -func (mr *MockClientMockRecorder) Scope(scope interface{}, tags ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{scope}, tags...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scope", reflect.TypeOf((*MockClient)(nil).Scope), varargs...) -} - -// StartTimer mocks base method. -func (m *MockClient) StartTimer(scope, timer int) tally.Stopwatch { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StartTimer", scope, timer) - ret0, _ := ret[0].(tally.Stopwatch) - return ret0 -} - -// StartTimer indicates an expected call of StartTimer. -func (mr *MockClientMockRecorder) StartTimer(scope, timer interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartTimer", reflect.TypeOf((*MockClient)(nil).StartTimer), scope, timer) -} - -// UpdateGauge mocks base method. -func (m *MockClient) UpdateGauge(scope, gauge int, value float64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "UpdateGauge", scope, gauge, value) -} - -// UpdateGauge indicates an expected call of UpdateGauge. -func (mr *MockClientMockRecorder) UpdateGauge(scope, gauge, value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateGauge", reflect.TypeOf((*MockClient)(nil).UpdateGauge), scope, gauge, value) -} - -// MockScope is a mock of Scope interface. -type MockScope struct { - ctrl *gomock.Controller - recorder *MockScopeMockRecorder -} - -// MockScopeMockRecorder is the mock recorder for MockScope. -type MockScopeMockRecorder struct { - mock *MockScope -} - -// NewMockScope creates a new mock instance. -func NewMockScope(ctrl *gomock.Controller) *MockScope { - mock := &MockScope{ctrl: ctrl} - mock.recorder = &MockScopeMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockScope) EXPECT() *MockScopeMockRecorder { - return m.recorder -} - -// AddCounter mocks base method. -func (m *MockScope) AddCounter(counter int, delta int64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddCounter", counter, delta) -} - -// AddCounter indicates an expected call of AddCounter. -func (mr *MockScopeMockRecorder) AddCounter(counter, delta interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddCounter", reflect.TypeOf((*MockScope)(nil).AddCounter), counter, delta) -} - -// IncCounter mocks base method. -func (m *MockScope) IncCounter(counter int) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "IncCounter", counter) -} - -// IncCounter indicates an expected call of IncCounter. -func (mr *MockScopeMockRecorder) IncCounter(counter interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncCounter", reflect.TypeOf((*MockScope)(nil).IncCounter), counter) -} - -// RecordHistogramDuration mocks base method. -func (m *MockScope) RecordHistogramDuration(timer int, d time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RecordHistogramDuration", timer, d) -} - -// RecordHistogramDuration indicates an expected call of RecordHistogramDuration. -func (mr *MockScopeMockRecorder) RecordHistogramDuration(timer, d interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordHistogramDuration", reflect.TypeOf((*MockScope)(nil).RecordHistogramDuration), timer, d) -} - -// RecordHistogramValue mocks base method. -func (m *MockScope) RecordHistogramValue(timer int, value float64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RecordHistogramValue", timer, value) -} - -// RecordHistogramValue indicates an expected call of RecordHistogramValue. -func (mr *MockScopeMockRecorder) RecordHistogramValue(timer, value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordHistogramValue", reflect.TypeOf((*MockScope)(nil).RecordHistogramValue), timer, value) -} - -// RecordTimer mocks base method. -func (m *MockScope) RecordTimer(timer int, d time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RecordTimer", timer, d) -} - -// RecordTimer indicates an expected call of RecordTimer. -func (mr *MockScopeMockRecorder) RecordTimer(timer, d interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordTimer", reflect.TypeOf((*MockScope)(nil).RecordTimer), timer, d) -} - -// StartTimer mocks base method. -func (m *MockScope) StartTimer(timer int) Stopwatch { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StartTimer", timer) - ret0, _ := ret[0].(Stopwatch) - return ret0 -} - -// StartTimer indicates an expected call of StartTimer. -func (mr *MockScopeMockRecorder) StartTimer(timer interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartTimer", reflect.TypeOf((*MockScope)(nil).StartTimer), timer) -} - -// Tagged mocks base method. -func (m *MockScope) Tagged(tags ...Tag) Scope { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range tags { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "Tagged", varargs...) - ret0, _ := ret[0].(Scope) - return ret0 -} - -// Tagged indicates an expected call of Tagged. -func (mr *MockScopeMockRecorder) Tagged(tags ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tagged", reflect.TypeOf((*MockScope)(nil).Tagged), tags...) -} - -// UpdateGauge mocks base method. -func (m *MockScope) UpdateGauge(gauge int, value float64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "UpdateGauge", gauge, value) -} - -// UpdateGauge indicates an expected call of UpdateGauge. -func (mr *MockScopeMockRecorder) UpdateGauge(gauge, value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateGauge", reflect.TypeOf((*MockScope)(nil).UpdateGauge), gauge, value) -} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index de6d4419e52..fe4cb77a1de 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2458,7 +2458,7 @@ const ( UpdateWorkflowExecutionCount WorkflowIDCacheSizeGauge WorkflowIDCacheRequestsExternalRatelimitedCounter - WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge + WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer WorkflowIDCacheRequestsInternalRatelimitedCounter NumHistoryMetrics ) @@ -3094,7 +3094,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ UpdateWorkflowExecutionCount: {metricName: "update_workflow_execution_count", metricType: Counter}, WorkflowIDCacheSizeGauge: {metricName: "workflow_id_cache_size", metricType: Gauge}, WorkflowIDCacheRequestsExternalRatelimitedCounter: {metricName: "workflow_id_external_requests_ratelimited", metricType: Counter}, - WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge: {metricName: "workflow_id_external_requests_max_requests_per_seconds", metricType: Gauge}, + WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer: {metricName: "workflow_id_external_requests_max_requests_per_seconds", metricType: Timer}, WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter}, }, Matching: { diff --git a/common/metrics/interfaces.go b/common/metrics/interfaces.go index 3e017f79abc..f36993e5f4a 100644 --- a/common/metrics/interfaces.go +++ b/common/metrics/interfaces.go @@ -18,8 +18,6 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination client_mock.go - package metrics import ( diff --git a/service/history/workflowcache/cache.go b/service/history/workflowcache/cache.go index 6bab827925d..4c308a830f5 100644 --- a/service/history/workflowcache/cache.go +++ b/service/history/workflowcache/cache.go @@ -26,7 +26,6 @@ package workflowcache import ( "errors" - "sync" "time" "github.com/uber/cadence/common/cache" @@ -50,7 +49,6 @@ type WFCache interface { type wfCache struct { lru cache.Cache - domainMaxCount sync.Map // map[string]domainMaxCount externalLimiterFactory quotas.LimiterFactory internalLimiterFactory quotas.LimiterFactory workflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter @@ -97,7 +95,6 @@ func New(params Params) WFCache { MaxCount: params.MaxCount, ActivelyEvict: true, }), - domainMaxCount: sync.Map{}, // map[string]domainMaxCount externalLimiterFactory: params.ExternalLimiterFactory, internalLimiterFactory: params.InternalLimiterFactory, workflowIDCacheExternalEnabled: params.WorkflowIDCacheExternalEnabled, diff --git a/service/history/workflowcache/metrics.go b/service/history/workflowcache/metrics.go index 8592c7a74ed..dcb0fa46ed1 100644 --- a/service/history/workflowcache/metrics.go +++ b/service/history/workflowcache/metrics.go @@ -30,60 +30,27 @@ import ( ) type workflowIDCountMetric struct { - secondStart time.Time - count int + sync.Mutex + + startingSecond time.Time + count int } func (w *workflowIDCountMetric) reset(now time.Time) { - w.secondStart = now + w.startingSecond = now w.count = 0 } -type domainMaxCount struct { - mu sync.Mutex - maxCount int - secondStart time.Time -} - -func (d *domainMaxCount) reset(now time.Time) { - d.maxCount = 0 - d.secondStart = now -} - -func (c *wfCache) getOrCreateDomain(domainName string, now time.Time) *domainMaxCount { - newDomainCount := &domainMaxCount{ - mu: sync.Mutex{}, - maxCount: 0, - secondStart: now, - } - - domainCount, _ := c.domainMaxCount.LoadOrStore(domainName, newDomainCount) - - return domainCount.(*domainMaxCount) -} - func (c *wfCache) updatePerDomainMaxWFRequestCount(domainName string, value *cacheValue) { - now := c.timeSource.Now() - domain := c.getOrCreateDomain(domainName, now) + value.countMetric.Lock() + defer value.countMetric.Unlock() - // It's enough to lock the domain, as the workflowIDCountMetric belongs to the domain - domain.mu.Lock() - defer domain.mu.Unlock() - - if c.timeSource.Since(domain.secondStart) > time.Second { - // Emit the max count for the previous second and reset the count - c.metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)). - UpdateGauge(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge, float64(domain.maxCount)) - domain.reset(now) + if c.timeSource.Since(value.countMetric.startingSecond) > time.Second { + value.countMetric.reset(c.timeSource.Now().UTC()) } - - if c.timeSource.Since(value.countMetric.secondStart) > time.Second { - value.countMetric.reset(now) - } - value.countMetric.count++ - if value.countMetric.count > domain.maxCount { - domain.maxCount = value.countMetric.count - } + // We can just use the upper of the metric, so it is not an issue to emit all the counts + c.metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)). + RecordTimer(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer, time.Duration(value.countMetric.count)) } diff --git a/service/history/workflowcache/metrics_test.go b/service/history/workflowcache/metrics_test.go index 99a42e35186..4a0d3a76d02 100644 --- a/service/history/workflowcache/metrics_test.go +++ b/service/history/workflowcache/metrics_test.go @@ -26,76 +26,85 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/metrics" ) -func TestGetOrCreateDomain(t *testing.T) { - wfCache := &wfCache{} - - domainID := "some domain name" - otherDomainID := "some other domain name" - - domainMaxCount1 := wfCache.getOrCreateDomain(domainID, time.Unix(123, 0)) - domainMaxCount2 := wfCache.getOrCreateDomain(domainID, time.Unix(456, 0)) - otherDomainMaxCount := wfCache.getOrCreateDomain(otherDomainID, time.Unix(789, 0)) - - // None of them are nil - assert.NotNil(t, domainMaxCount1) - assert.NotNil(t, domainMaxCount2) - assert.NotNil(t, otherDomainMaxCount) +func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) { + domainName := "some domain name" - // domainMaxCount1 and domainMaxCount2 are the same - assert.Equal(t, domainMaxCount1, domainMaxCount2) + cases := []struct { + name string + updatePerDomainMaxWFRequestCount func(wfc *wfCache, source clock.MockedTimeSource) + expecetMetrics []time.Duration + }{ + { + name: "Single workflowID", + updatePerDomainMaxWFRequestCount: func(wfc *wfCache, timeSource clock.MockedTimeSource) { + workflowID1 := &cacheValue{} + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 1 + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 2 + }, + expecetMetrics: []time.Duration{1, 2}, + }, + { + name: "Separate workflowIDs", + updatePerDomainMaxWFRequestCount: func(wfc *wfCache, timeSource clock.MockedTimeSource) { + workflowID1 := &cacheValue{} + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 1 + + workflowID2 := &cacheValue{} + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID2) // Emits 1 + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID2) // Emits 2 + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID2) // Emits 3 + + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 2 + + }, + expecetMetrics: []time.Duration{1, 1, 2, 3, 2}, + }, + { + name: "Reset", + updatePerDomainMaxWFRequestCount: func(wfc *wfCache, timeSource clock.MockedTimeSource) { + workflowID1 := &cacheValue{} + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 1 + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 2 + + timeSource.Advance(1100 * time.Millisecond) + wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 1 + }, + expecetMetrics: []time.Duration{1, 2, 1}, + }, + } - // domainMaxCount1/domainMaxCount2 and otherDomainMaxCount are different - assert.NotEqual(t, domainMaxCount1, otherDomainMaxCount) - assert.NotEqual(t, domainMaxCount2, otherDomainMaxCount) -} + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + testScope := tally.NewTestScope("", make(map[string]string)) + timeSource := clock.NewMockedTimeSourceAt(time.Unix(123, 456)) -func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) { - ctrl := gomock.NewController(t) - metricsClientMock := metrics.NewMockClient(ctrl) - metricsMockScope := metrics.NewMockScope(ctrl) + wfc := &wfCache{ + metricsClient: metrics.NewClient(testScope, metrics.History), + timeSource: timeSource, + } - now := time.Unix(123, 456) - timeSource := clock.NewMockedTimeSourceAt(now) + tc.updatePerDomainMaxWFRequestCount(wfc, timeSource) - wfc := &wfCache{ - metricsClient: metricsClientMock, - timeSource: timeSource, - } + // We expect the domain tag to be set to "all" and "some domain name", we don't know the order, so use a set + expectedDomainTags := map[string]struct{}{"all": {}, "some domain name": {}} + actualDomainTags := map[string]struct{}{} - domainName := "some domain name" - value := &cacheValue{} + timers := testScope.Snapshot().Timers() + assert.Equal(t, 2, len(timers)) + for _, v := range timers { + actualDomainTags[v.Tags()["domain"]] = struct{}{} + assert.Equal(t, tc.expecetMetrics, v.Values()) + } - for i := 0; i < 5; i++ { - wfc.updatePerDomainMaxWFRequestCount(domainName, value) + assert.Equal(t, expectedDomainTags, actualDomainTags) + }) } - // Test that the max count is recorded - domain := wfc.getOrCreateDomain(domainName, now) - assert.Equal(t, 5, domain.maxCount) - - // Test that different values are independent - value = &cacheValue{} - for i := 0; i < 8; i++ { - wfc.updatePerDomainMaxWFRequestCount(domainName, value) - } - domain = wfc.getOrCreateDomain(domainName, now) - assert.Equal(t, 8, domain.maxCount) - - // Test that the max count is emitted and reset after a second - metricsClientMock.EXPECT().Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)). - Return(metricsMockScope) - metricsMockScope.EXPECT(). - UpdateGauge(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge, 8.0) - timeSource.Advance(1100 * time.Millisecond) - wfc.updatePerDomainMaxWFRequestCount(domainName, value) - - domain = wfc.getOrCreateDomain(domainName, now) - assert.Equal(t, 1, domain.maxCount) } From 557e21cd834f69c181e28faeff2d7fa14ab8470e Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Thu, 30 May 2024 13:34:33 +0200 Subject: [PATCH 3/3] Moved the method to workflowIDCountMetric as it modifies this only --- service/history/workflowcache/cache.go | 2 +- service/history/workflowcache/metrics.go | 31 ++++++++----- service/history/workflowcache/metrics_test.go | 44 +++++++++---------- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/service/history/workflowcache/cache.go b/service/history/workflowcache/cache.go index 4c308a830f5..48591dbb61e 100644 --- a/service/history/workflowcache/cache.go +++ b/service/history/workflowcache/cache.go @@ -144,7 +144,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi switch rateLimitType { case external: - c.updatePerDomainMaxWFRequestCount(domainName, value) + value.countMetric.updatePerDomainMaxWFRequestCount(domainName, c.timeSource, c.metricsClient) if !value.externalRateLimiter.Allow() { c.emitRateLimitMetrics(domainID, workflowID, domainName, "external", metrics.WorkflowIDCacheRequestsExternalRatelimitedCounter) return false diff --git a/service/history/workflowcache/metrics.go b/service/history/workflowcache/metrics.go index dcb0fa46ed1..7678516437d 100644 --- a/service/history/workflowcache/metrics.go +++ b/service/history/workflowcache/metrics.go @@ -26,9 +26,14 @@ import ( "sync" "time" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/metrics" ) +// workflowIDCountMetric holds the count of requests for a given second, for a domain/workflowID pair +// This is used to emit the max count of requests for a given domain +// Ideally we would just emit the count of requests for a given domain/workflowID pair, but this is not +// possible, due to the high cardinality of workflowIDs type workflowIDCountMetric struct { sync.Mutex @@ -36,21 +41,25 @@ type workflowIDCountMetric struct { count int } -func (w *workflowIDCountMetric) reset(now time.Time) { - w.startingSecond = now - w.count = 0 +func (cm *workflowIDCountMetric) reset(now time.Time) { + cm.startingSecond = now + cm.count = 0 } -func (c *wfCache) updatePerDomainMaxWFRequestCount(domainName string, value *cacheValue) { - value.countMetric.Lock() - defer value.countMetric.Unlock() +func (cm *workflowIDCountMetric) updatePerDomainMaxWFRequestCount( + domainName string, + timeSource clock.TimeSource, + metricsClient metrics.Client, +) { + cm.Lock() + defer cm.Unlock() - if c.timeSource.Since(value.countMetric.startingSecond) > time.Second { - value.countMetric.reset(c.timeSource.Now().UTC()) + if timeSource.Since(cm.startingSecond) > time.Second { + cm.reset(timeSource.Now().UTC()) } - value.countMetric.count++ + cm.count++ // We can just use the upper of the metric, so it is not an issue to emit all the counts - c.metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)). - RecordTimer(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer, time.Duration(value.countMetric.count)) + metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)). + RecordTimer(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer, time.Duration(cm.count)) } diff --git a/service/history/workflowcache/metrics_test.go b/service/history/workflowcache/metrics_test.go index 4a0d3a76d02..830f95ee02d 100644 --- a/service/history/workflowcache/metrics_test.go +++ b/service/history/workflowcache/metrics_test.go @@ -38,43 +38,43 @@ func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) { cases := []struct { name string - updatePerDomainMaxWFRequestCount func(wfc *wfCache, source clock.MockedTimeSource) + updatePerDomainMaxWFRequestCount func(metricsClient metrics.Client, source clock.MockedTimeSource) expecetMetrics []time.Duration }{ { name: "Single workflowID", - updatePerDomainMaxWFRequestCount: func(wfc *wfCache, timeSource clock.MockedTimeSource) { - workflowID1 := &cacheValue{} - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 1 - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 2 + updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) { + workflowID1 := &workflowIDCountMetric{} + workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1 + workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2 }, expecetMetrics: []time.Duration{1, 2}, }, { name: "Separate workflowIDs", - updatePerDomainMaxWFRequestCount: func(wfc *wfCache, timeSource clock.MockedTimeSource) { - workflowID1 := &cacheValue{} - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 1 + updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) { + workflowID1 := &workflowIDCountMetric{} + workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1 - workflowID2 := &cacheValue{} - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID2) // Emits 1 - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID2) // Emits 2 - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID2) // Emits 3 + workflowID2 := &workflowIDCountMetric{} + workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1 + workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2 + workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 3 - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 2 + workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2 }, expecetMetrics: []time.Duration{1, 1, 2, 3, 2}, }, { name: "Reset", - updatePerDomainMaxWFRequestCount: func(wfc *wfCache, timeSource clock.MockedTimeSource) { - workflowID1 := &cacheValue{} - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 1 - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 2 + updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) { + workflowID1 := &workflowIDCountMetric{} + workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1 + workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2 timeSource.Advance(1100 * time.Millisecond) - wfc.updatePerDomainMaxWFRequestCount(domainName, workflowID1) // Emits 1 + workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1 }, expecetMetrics: []time.Duration{1, 2, 1}, }, @@ -84,13 +84,9 @@ func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) { t.Run(tc.name, func(t *testing.T) { testScope := tally.NewTestScope("", make(map[string]string)) timeSource := clock.NewMockedTimeSourceAt(time.Unix(123, 456)) + metricsClient := metrics.NewClient(testScope, metrics.History) - wfc := &wfCache{ - metricsClient: metrics.NewClient(testScope, metrics.History), - timeSource: timeSource, - } - - tc.updatePerDomainMaxWFRequestCount(wfc, timeSource) + tc.updatePerDomainMaxWFRequestCount(metricsClient, timeSource) // We expect the domain tag to be set to "all" and "some domain name", we don't know the order, so use a set expectedDomainTags := map[string]struct{}{"all": {}, "some domain name": {}}