Skip to content

Commit

Permalink
reworked tests
Browse files Browse the repository at this point in the history
  • Loading branch information
3vilhamster committed Dec 29, 2023
1 parent 3be849a commit a9329e8
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 293 deletions.
56 changes: 28 additions & 28 deletions common/persistence/wrappers/sampled/visibility_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
// errPersistenceLimitExceededForList is the error indicating QPS limit reached for list visibility.
var errPersistenceLimitExceededForList = &types.ServiceBusyError{Message: "Persistence Max QPS Reached for List Operations."}

type sampledVisibilityManager struct {
type visibilityManager struct {
rateLimitersForOpen RateLimiterFactory
rateLimitersForClosed RateLimiterFactory
rateLimitersForList RateLimiterFactory
Expand Down Expand Up @@ -75,7 +75,7 @@ type Params struct {
// For read requests, it will do sampling which will return service busy errors.
// Note that this is different from NewVisibilityPersistenceRateLimitedClient which is overlapping with the read processing.
func NewVisibilityManager(persistence persistence.VisibilityManager, p Params) persistence.VisibilityManager {
return &sampledVisibilityManager{
return &visibilityManager{
persistence: persistence,
rateLimitersForOpen: p.RateLimiterFactoryFunc(p.TimeSource, numOfPriorityForOpen, p.Config.VisibilityOpenMaxQPS),
rateLimitersForClosed: p.RateLimiterFactoryFunc(p.TimeSource, numOfPriorityForClosed, p.Config.VisibilityClosedMaxQPS),
Expand All @@ -85,7 +85,7 @@ func NewVisibilityManager(persistence persistence.VisibilityManager, p Params) p
}
}

func (p *sampledVisibilityManager) RecordWorkflowExecutionStarted(
func (p *visibilityManager) RecordWorkflowExecutionStarted(
ctx context.Context,
request *persistence.RecordWorkflowExecutionStartedRequest,
) error {
Expand All @@ -108,7 +108,7 @@ func (p *sampledVisibilityManager) RecordWorkflowExecutionStarted(
return nil
}

func (p *sampledVisibilityManager) RecordWorkflowExecutionClosed(
func (p *visibilityManager) RecordWorkflowExecutionClosed(
ctx context.Context,
request *persistence.RecordWorkflowExecutionClosedRequest,
) error {
Expand All @@ -132,14 +132,7 @@ func (p *sampledVisibilityManager) RecordWorkflowExecutionClosed(
return nil
}

func (p *sampledVisibilityManager) RecordWorkflowExecutionUninitialized(
ctx context.Context,
request *persistence.RecordWorkflowExecutionUninitializedRequest,
) error {
return p.persistence.RecordWorkflowExecutionUninitialized(ctx, request)
}

func (p *sampledVisibilityManager) UpsertWorkflowExecution(
func (p *visibilityManager) UpsertWorkflowExecution(
ctx context.Context,
request *persistence.UpsertWorkflowExecutionRequest,
) error {
Expand All @@ -162,7 +155,7 @@ func (p *sampledVisibilityManager) UpsertWorkflowExecution(
return nil
}

func (p *sampledVisibilityManager) ListOpenWorkflowExecutions(
func (p *visibilityManager) ListOpenWorkflowExecutions(
ctx context.Context,
request *persistence.ListWorkflowExecutionsRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
Expand All @@ -173,7 +166,7 @@ func (p *sampledVisibilityManager) ListOpenWorkflowExecutions(
return p.persistence.ListOpenWorkflowExecutions(ctx, request)
}

func (p *sampledVisibilityManager) ListClosedWorkflowExecutions(
func (p *visibilityManager) ListClosedWorkflowExecutions(
ctx context.Context,
request *persistence.ListWorkflowExecutionsRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
Expand All @@ -184,7 +177,7 @@ func (p *sampledVisibilityManager) ListClosedWorkflowExecutions(
return p.persistence.ListClosedWorkflowExecutions(ctx, request)
}

func (p *sampledVisibilityManager) ListOpenWorkflowExecutionsByType(
func (p *visibilityManager) ListOpenWorkflowExecutionsByType(
ctx context.Context,
request *persistence.ListWorkflowExecutionsByTypeRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
Expand All @@ -195,7 +188,7 @@ func (p *sampledVisibilityManager) ListOpenWorkflowExecutionsByType(
return p.persistence.ListOpenWorkflowExecutionsByType(ctx, request)
}

func (p *sampledVisibilityManager) ListClosedWorkflowExecutionsByType(
func (p *visibilityManager) ListClosedWorkflowExecutionsByType(
ctx context.Context,
request *persistence.ListWorkflowExecutionsByTypeRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
Expand All @@ -206,7 +199,7 @@ func (p *sampledVisibilityManager) ListClosedWorkflowExecutionsByType(
return p.persistence.ListClosedWorkflowExecutionsByType(ctx, request)
}

func (p *sampledVisibilityManager) ListOpenWorkflowExecutionsByWorkflowID(
func (p *visibilityManager) ListOpenWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *persistence.ListWorkflowExecutionsByWorkflowIDRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
Expand All @@ -217,7 +210,7 @@ func (p *sampledVisibilityManager) ListOpenWorkflowExecutionsByWorkflowID(
return p.persistence.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
}

func (p *sampledVisibilityManager) ListClosedWorkflowExecutionsByWorkflowID(
func (p *visibilityManager) ListClosedWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *persistence.ListWorkflowExecutionsByWorkflowIDRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
Expand All @@ -228,7 +221,7 @@ func (p *sampledVisibilityManager) ListClosedWorkflowExecutionsByWorkflowID(
return p.persistence.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
}

func (p *sampledVisibilityManager) ListClosedWorkflowExecutionsByStatus(
func (p *visibilityManager) ListClosedWorkflowExecutionsByStatus(
ctx context.Context,
request *persistence.ListClosedWorkflowExecutionsByStatusRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
Expand All @@ -239,53 +232,60 @@ func (p *sampledVisibilityManager) ListClosedWorkflowExecutionsByStatus(
return p.persistence.ListClosedWorkflowExecutionsByStatus(ctx, request)
}

func (p *sampledVisibilityManager) GetClosedWorkflowExecution(
func (p *visibilityManager) RecordWorkflowExecutionUninitialized(
ctx context.Context,
request *persistence.RecordWorkflowExecutionUninitializedRequest,
) error {
return p.persistence.RecordWorkflowExecutionUninitialized(ctx, request)
}

func (p *visibilityManager) GetClosedWorkflowExecution(
ctx context.Context,
request *persistence.GetClosedWorkflowExecutionRequest,
) (*persistence.GetClosedWorkflowExecutionResponse, error) {
return p.persistence.GetClosedWorkflowExecution(ctx, request)
}

func (p *sampledVisibilityManager) DeleteWorkflowExecution(
func (p *visibilityManager) DeleteWorkflowExecution(
ctx context.Context,
request *persistence.VisibilityDeleteWorkflowExecutionRequest,
) error {
return p.persistence.DeleteWorkflowExecution(ctx, request)
}

func (p *sampledVisibilityManager) DeleteUninitializedWorkflowExecution(
func (p *visibilityManager) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *persistence.VisibilityDeleteWorkflowExecutionRequest,
) error {
return p.persistence.DeleteUninitializedWorkflowExecution(ctx, request)
}

func (p *sampledVisibilityManager) ListWorkflowExecutions(
func (p *visibilityManager) ListWorkflowExecutions(
ctx context.Context,
request *persistence.ListWorkflowExecutionsByQueryRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
return p.persistence.ListWorkflowExecutions(ctx, request)
}

func (p *sampledVisibilityManager) ScanWorkflowExecutions(
func (p *visibilityManager) ScanWorkflowExecutions(
ctx context.Context,
request *persistence.ListWorkflowExecutionsByQueryRequest,
) (*persistence.ListWorkflowExecutionsResponse, error) {
return p.persistence.ScanWorkflowExecutions(ctx, request)
}

func (p *sampledVisibilityManager) CountWorkflowExecutions(
func (p *visibilityManager) CountWorkflowExecutions(
ctx context.Context,
request *persistence.CountWorkflowExecutionsRequest,
) (*persistence.CountWorkflowExecutionsResponse, error) {
return p.persistence.CountWorkflowExecutions(ctx, request)
}

func (p *sampledVisibilityManager) Close() {
func (p *visibilityManager) Close() {
p.persistence.Close()
}

func (p *sampledVisibilityManager) GetName() string {
func (p *visibilityManager) GetName() string {
return p.persistence.GetName()
}

Expand All @@ -297,7 +297,7 @@ func getRequestPriority(request *persistence.RecordWorkflowExecutionClosedReques
return priority
}

func (p *sampledVisibilityManager) tryConsumeListToken(domain, method string) error {
func (p *visibilityManager) tryConsumeListToken(domain, method string) error {
rateLimiter := p.rateLimitersForList.GetRateLimiter(domain)
ok, _ := rateLimiter.GetToken(0, 1)
if ok {
Expand Down
Loading

0 comments on commit a9329e8

Please sign in to comment.