Skip to content

Commit

Permalink
Added a new metric to the workflow cache
Browse files Browse the repository at this point in the history
This is to track the max number of requests to a single workflow per
domain
  • Loading branch information
jakobht committed May 27, 2024
1 parent 2f08de5 commit be1d049
Show file tree
Hide file tree
Showing 6 changed files with 483 additions and 1 deletion.
278 changes: 278 additions & 0 deletions common/metrics/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,7 @@ const (
UpdateWorkflowExecutionCount
WorkflowIDCacheSizeGauge
WorkflowIDCacheRequestsExternalRatelimitedCounter
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge
WorkflowIDCacheRequestsInternalRatelimitedCounter
NumHistoryMetrics
)
Expand Down Expand Up @@ -3093,6 +3094,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
UpdateWorkflowExecutionCount: {metricName: "update_workflow_execution_count", metricType: Counter},
WorkflowIDCacheSizeGauge: {metricName: "workflow_id_cache_size", metricType: Gauge},
WorkflowIDCacheRequestsExternalRatelimitedCounter: {metricName: "workflow_id_external_requests_ratelimited", metricType: Counter},
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsGauge: {metricName: "workflow_id_external_requests_max_requests_per_seconds", metricType: Gauge},
WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter},
},
Matching: {
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination client_mock.go

package metrics

import (
Expand Down
12 changes: 11 additions & 1 deletion service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ package workflowcache

import (
"errors"
"sync"
"time"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -48,14 +50,18 @@ type WFCache interface {

type wfCache struct {
lru cache.Cache
domainMaxCount sync.Map // map[string]domainMaxCount
externalLimiterFactory quotas.LimiterFactory
internalLimiterFactory quotas.LimiterFactory
workflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
workflowIDCacheInternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
timeSource clock.TimeSource

// we use functions to get cache items, and the current time, so we can mock it in unit tests
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
}

type cacheKey struct {
Expand All @@ -66,6 +72,7 @@ type cacheKey struct {
type cacheValue struct {
externalRateLimiter quotas.Limiter
internalRateLimiter quotas.Limiter
countMetric workflowIDCountMetric
}

// Params is the parameters for a new WFCache
Expand All @@ -90,12 +97,14 @@ func New(params Params) WFCache {
MaxCount: params.MaxCount,
ActivelyEvict: true,
}),
domainMaxCount: sync.Map{}, // map[string]domainMaxCount
externalLimiterFactory: params.ExternalLimiterFactory,
internalLimiterFactory: params.InternalLimiterFactory,
workflowIDCacheExternalEnabled: params.WorkflowIDCacheExternalEnabled,
workflowIDCacheInternalEnabled: params.WorkflowIDCacheInternalEnabled,
domainCache: params.DomainCache,
metricsClient: params.MetricsClient,
timeSource: clock.NewRealTimeSource(),
logger: params.Logger,
}
// We set getCacheItemFn to cache.getCacheItem so that we can mock it in unit tests
Expand Down Expand Up @@ -138,6 +147,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi

switch rateLimitType {
case external:
c.updatePerDomainMaxWFRequestCount(domainName, value)
if !value.externalRateLimiter.Allow() {
c.emitRateLimitMetrics(domainID, workflowID, domainName, "external", metrics.WorkflowIDCacheRequestsExternalRatelimitedCounter)
return false
Expand Down
Loading

0 comments on commit be1d049

Please sign in to comment.