Skip to content

Commit

Permalink
Merge branch 'master' into coverdatastore
Browse files Browse the repository at this point in the history
  • Loading branch information
Groxx authored Mar 14, 2024
2 parents 6e4b033 + 6c1ca00 commit 2907992
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions common/persistence/pinotVisibilityTripleManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,13 @@ type userParameters struct {

// For Pinot Migration uses. It will be a temporary usage
// logUserQueryParameters will log user queries' parameters so that a comparator workflow can consume
func (v *pinotVisibilityTripleManager) logUserQueryParameters(userParam userParameters, domain string) {
func (v *pinotVisibilityTripleManager) logUserQueryParameters(userParam userParameters, domain string, override bool) {
// Don't log if it is not enabled
if !v.logCustomerQueryParameter(domain) {
// don't log if it is a call from Pinot Response Comparator workflow
if !v.logCustomerQueryParameter(domain) || override {
return
}

randNum := rand.Intn(10)
if randNum != 5 { // Intentionally to have 1/10 chance to log custom query parameters
return
Expand Down Expand Up @@ -362,13 +364,14 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: -1, // is open. Will have --open flag in comparator workflow
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
}, request.Domain, override != nil)

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListOpenWorkflowExecutions(ctx, request)
Expand All @@ -378,13 +381,14 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutions(ctx, request)
}
Expand All @@ -393,14 +397,15 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByType(
ctx context.Context,
request *ListWorkflowExecutionsByTypeRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowType: request.WorkflowTypeName,
closeStatus: -1, // is open. Will have --open flag in comparator workflow
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListOpenWorkflowExecutionsByType(ctx, request)
}
Expand All @@ -409,14 +414,15 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByType(
ctx context.Context,
request *ListWorkflowExecutionsByTypeRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowType: request.WorkflowTypeName,
closeStatus: 6, // 6 means not set closeStatus.
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByType(ctx, request)
}
Expand All @@ -425,14 +431,15 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *ListWorkflowExecutionsByWorkflowIDRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowID: request.WorkflowID,
closeStatus: -1,
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
}
Expand All @@ -441,14 +448,15 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *ListWorkflowExecutionsByWorkflowIDRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowID: request.WorkflowID,
closeStatus: 6, // 6 means not set closeStatus.
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
}
Expand All @@ -457,13 +465,14 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByStatus(
ctx context.Context,
request *ListClosedWorkflowExecutionsByStatusRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: int(request.Status),
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByStatus(ctx, request)
}
Expand All @@ -475,13 +484,14 @@ func (v *pinotVisibilityTripleManager) GetClosedWorkflowExecution(
earlistTime := int64(0) // this is to get all closed workflow execution
latestTime := time.Now().UnixNano()

override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
earliestTime: earlistTime,
latestTime: latestTime,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.GetClosedWorkflowExecution(ctx, request)
}
Expand All @@ -490,14 +500,15 @@ func (v *pinotVisibilityTripleManager) ListWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
earliestTime: -1,
latestTime: -1,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListWorkflowExecutions(ctx, request)
}
Expand All @@ -506,14 +517,15 @@ func (v *pinotVisibilityTripleManager) ScanWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
) (*ListWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
earliestTime: -1,
latestTime: -1,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ScanWorkflowExecutions(ctx, request)
}
Expand All @@ -522,14 +534,15 @@ func (v *pinotVisibilityTripleManager) CountWorkflowExecutions(
ctx context.Context,
request *CountWorkflowExecutionsRequest,
) (*CountWorkflowExecutionsResponse, error) {
override := ctx.Value(ContextKey)
v.logUserQueryParameters(userParameters{
operation: string(Operation.COUNT),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
earliestTime: -1,
latestTime: -1,
}, request.Domain)
}, request.Domain, override != nil)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.CountWorkflowExecutions(ctx, request)
}
Expand Down

0 comments on commit 2907992

Please sign in to comment.