Skip to content

Commit

Permalink
Added a new metric to the workflow cache (uber#6064)
Browse files Browse the repository at this point in the history
* Added a new metric to the workflow cache

This is to track the max number of requests to a single workflow per
domain

* Updated based on review

- Change from gauge to timer
- Only track for workflow IDs the timer will do the bookkeeping for max
  value

* Moved the method to workflowIDCountMetric as it modifies this only
  • Loading branch information
jakobht authored and timl3136 committed Jun 6, 2024
1 parent 6a2b4a2 commit 0f33bda
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 1 deletion.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,7 @@ const (
UpdateWorkflowExecutionCount
WorkflowIDCacheSizeGauge
WorkflowIDCacheRequestsExternalRatelimitedCounter
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer
WorkflowIDCacheRequestsInternalRatelimitedCounter
NumHistoryMetrics
)
Expand Down Expand Up @@ -3093,6 +3094,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
UpdateWorkflowExecutionCount: {metricName: "update_workflow_execution_count", metricType: Counter},
WorkflowIDCacheSizeGauge: {metricName: "workflow_id_cache_size", metricType: Gauge},
WorkflowIDCacheRequestsExternalRatelimitedCounter: {metricName: "workflow_id_external_requests_ratelimited", metricType: Counter},
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer: {metricName: "workflow_id_external_requests_max_requests_per_seconds", metricType: Timer},
WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter},
},
Matching: {
Expand Down
9 changes: 8 additions & 1 deletion service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -55,7 +56,10 @@ type wfCache struct {
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
timeSource clock.TimeSource

// we use functions to get cache items, and the current time, so we can mock it in unit tests
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
}

type cacheKey struct {
Expand All @@ -66,6 +70,7 @@ type cacheKey struct {
type cacheValue struct {
externalRateLimiter quotas.Limiter
internalRateLimiter quotas.Limiter
countMetric workflowIDCountMetric
}

// Params is the parameters for a new WFCache
Expand Down Expand Up @@ -96,6 +101,7 @@ func New(params Params) WFCache {
workflowIDCacheInternalEnabled: params.WorkflowIDCacheInternalEnabled,
domainCache: params.DomainCache,
metricsClient: params.MetricsClient,
timeSource: clock.NewRealTimeSource(),
logger: params.Logger,
}
// We set getCacheItemFn to cache.getCacheItem so that we can mock it in unit tests
Expand Down Expand Up @@ -138,6 +144,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi

switch rateLimitType {
case external:
value.countMetric.updatePerDomainMaxWFRequestCount(domainName, c.timeSource, c.metricsClient)
if !value.externalRateLimiter.Allow() {
c.emitRateLimitMetrics(domainID, workflowID, domainName, "external", metrics.WorkflowIDCacheRequestsExternalRatelimitedCounter)
return false
Expand Down
65 changes: 65 additions & 0 deletions service/history/workflowcache/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package workflowcache

import (
"sync"
"time"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/metrics"
)

// workflowIDCountMetric holds the count of requests for a given second, for a domain/workflowID pair
// This is used to emit the max count of requests for a given domain
// Ideally we would just emit the count of requests for a given domain/workflowID pair, but this is not
// possible, due to the high cardinality of workflowIDs
type workflowIDCountMetric struct {
sync.Mutex

startingSecond time.Time
count int
}

func (cm *workflowIDCountMetric) reset(now time.Time) {
cm.startingSecond = now
cm.count = 0
}

func (cm *workflowIDCountMetric) updatePerDomainMaxWFRequestCount(
domainName string,
timeSource clock.TimeSource,
metricsClient metrics.Client,
) {
cm.Lock()
defer cm.Unlock()

if timeSource.Since(cm.startingSecond) > time.Second {
cm.reset(timeSource.Now().UTC())
}
cm.count++

// We can just use the upper of the metric, so it is not an issue to emit all the counts
metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)).
RecordTimer(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer, time.Duration(cm.count))
}
106 changes: 106 additions & 0 deletions service/history/workflowcache/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package workflowcache

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/uber-go/tally"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/metrics"
)

func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) {
domainName := "some domain name"

cases := []struct {
name string
updatePerDomainMaxWFRequestCount func(metricsClient metrics.Client, source clock.MockedTimeSource)
expecetMetrics []time.Duration
}{
{
name: "Single workflowID",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
},
expecetMetrics: []time.Duration{1, 2},
},
{
name: "Separate workflowIDs",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1

workflowID2 := &workflowIDCountMetric{}
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 3

workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2

},
expecetMetrics: []time.Duration{1, 1, 2, 3, 2},
},
{
name: "Reset",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2

timeSource.Advance(1100 * time.Millisecond)
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
},
expecetMetrics: []time.Duration{1, 2, 1},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
testScope := tally.NewTestScope("", make(map[string]string))
timeSource := clock.NewMockedTimeSourceAt(time.Unix(123, 456))
metricsClient := metrics.NewClient(testScope, metrics.History)

tc.updatePerDomainMaxWFRequestCount(metricsClient, timeSource)

// We expect the domain tag to be set to "all" and "some domain name", we don't know the order, so use a set
expectedDomainTags := map[string]struct{}{"all": {}, "some domain name": {}}
actualDomainTags := map[string]struct{}{}

timers := testScope.Snapshot().Timers()
assert.Equal(t, 2, len(timers))
for _, v := range timers {
actualDomainTags[v.Tags()["domain"]] = struct{}{}
assert.Equal(t, tc.expecetMetrics, v.Values())
}

assert.Equal(t, expectedDomainTags, actualDomainTags)
})
}

}

0 comments on commit 0f33bda

Please sign in to comment.