diff --git a/common/persistence/pinot/pinotVisibilityStore.go b/common/persistence/pinot/pinotVisibilityStore.go index bd31ed42d6e..8e49afb6428 100644 --- a/common/persistence/pinot/pinotVisibilityStore.go +++ b/common/persistence/pinot/pinotVisibilityStore.go @@ -282,7 +282,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutions( isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) } - query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false) + query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -302,7 +306,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutions( return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } - query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true) + query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -320,7 +328,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByType(ctx context.Cont return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) } - query := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false) + query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions by type query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -338,7 +350,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Co return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } - query := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true) + query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions by type query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -356,7 +372,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx contex return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) } - query := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false) + query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -374,7 +394,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(ctx cont return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } - query := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true) + query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -392,7 +416,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByStatus(ctx context. return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } - query := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request) + query, err := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions by status query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -439,7 +467,11 @@ func (v *pinotVisibilityStore) GetClosedWorkflowExecution(ctx context.Context, r func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) { checkPageSize(request) - query := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request) + query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -462,7 +494,11 @@ func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, reque func (v *pinotVisibilityStore) ScanWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) { checkPageSize(request) - query := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request) + query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request) + if err != nil { + v.logger.Error(fmt.Sprintf("failed to build scan workflow executions query %v", err)) + return nil, err + } req := &pnt.SearchRequest{ Query: query, @@ -749,14 +785,14 @@ func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string, return query.String() } -func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) string { +func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) (string, error) { if request == nil { - return "" + return "", nil } token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { - panic(fmt.Sprintf("deserialize next page token error: %s", err)) + return "", fmt.Errorf("next page token: %w", err) } query := NewPinotQuery(tableName) @@ -770,20 +806,20 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s // if customized query is empty, directly return if requestQuery == "" { query.addOffsetAndLimits(token.From, request.PageSize) - return query.String() + return query.String(), nil } requestQuery = filterPrefix(requestQuery) if common.IsJustOrderByClause(requestQuery) { query.concatSorter(requestQuery) query.addOffsetAndLimits(token.From, request.PageSize) - return query.String() + return query.String(), nil } comparExpr, orderBy := parseOrderBy(requestQuery) comparExpr, err = v.pinotQueryValidator.ValidateQuery(comparExpr) if err != nil { - v.logger.Error(fmt.Sprintf("pinot query validator error: %s", err)) + return "", fmt.Errorf("pinot query validator error: %w, query: %s", err, request.Query) } comparExpr = filterPrefix(comparExpr) @@ -800,7 +836,7 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s } query.addOffsetAndLimits(token.From, request.PageSize) - return query.String() + return query.String(), nil } func filterPrefix(query string) string { @@ -907,14 +943,14 @@ func findLastOrderBy(list []string) int { return 0 } -func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) string { +func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) (string, error) { if request == nil { - return "" + return "", nil } token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { - panic(fmt.Sprintf("deserialize next page token error: %s", err)) + return "", fmt.Errorf("next page token: %w", err) } from := token.From @@ -939,12 +975,12 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor query.addPinotSorter(StartTime, DescendingOrder) query.addOffsetAndLimits(from, pageSize) - return query.String() + return query.String(), nil } -func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) string { +func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) { if request == nil { - return "" + return "", nil } query := NewPinotQuery(tableName) @@ -968,19 +1004,19 @@ func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalL token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { - panic(fmt.Sprintf("deserialize next page token error: %s", err)) + return "", fmt.Errorf("next page token: %w", err) } from := token.From pageSize := request.PageSize query.addOffsetAndLimits(from, pageSize) - return query.String() + return query.String(), nil } -func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) string { +func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) (string, error) { if request == nil { - return "" + return "", nil } query := NewPinotQuery(tableName) @@ -1004,19 +1040,19 @@ func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.Int token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { - panic(fmt.Sprintf("deserialize next page token error: %s", err)) + return "", fmt.Errorf("next page token: %w", err) } from := token.From pageSize := request.PageSize query.addOffsetAndLimits(from, pageSize) - return query.String() + return query.String(), nil } -func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) string { +func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) (string, error) { if request == nil { - return "" + return "", nil } query := NewPinotQuery(tableName) @@ -1047,14 +1083,14 @@ func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.Interna token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { - panic(fmt.Sprintf("deserialize next page token error: %s", err)) + return "", fmt.Errorf("next page token: %w", err) } from := token.From pageSize := request.PageSize query.addOffsetAndLimits(from, pageSize) - return query.String() + return query.String(), nil } func getGetClosedWorkflowExecutionQuery(tableName string, request *p.InternalGetClosedWorkflowExecutionRequest) string { diff --git a/common/persistence/pinot/pinotVisibilityStore_test.go b/common/persistence/pinot/pinotVisibilityStore_test.go index 76df48a9c1e..4c09aa6659d 100644 --- a/common/persistence/pinot/pinotVisibilityStore_test.go +++ b/common/persistence/pinot/pinotVisibilityStore_test.go @@ -311,10 +311,9 @@ LIMIT 0, 0 for name, test := range tests { t.Run(name, func(t *testing.T) { - assert.NotPanics(t, func() { - output := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input) - assert.Equal(t, test.expectedOutput, output) - }) + output, err := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input) + assert.Equal(t, test.expectedOutput, output) + assert.NoError(t, err) }) } } @@ -329,9 +328,9 @@ func TestGetListWorkflowExecutionsQuery(t *testing.T) { NextPageToken: nil, } - closeResult := getListWorkflowExecutionsQuery(testTableName, request, true) - openResult := getListWorkflowExecutionsQuery(testTableName, request, false) - nilResult := getListWorkflowExecutionsQuery(testTableName, nil, true) + closeResult, err1 := getListWorkflowExecutionsQuery(testTableName, request, true) + openResult, err2 := getListWorkflowExecutionsQuery(testTableName, request, false) + nilResult, err3 := getListWorkflowExecutionsQuery(testTableName, nil, true) expectCloseResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' @@ -356,6 +355,9 @@ LIMIT 0, 10 assert.Equal(t, closeResult, expectCloseResult) assert.Equal(t, openResult, expectOpenResult) assert.Equal(t, nilResult, expectNilResult) + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NoError(t, err3) } func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) { @@ -371,9 +373,9 @@ func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) { WorkflowTypeName: testWorkflowType, } - closeResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, true) - openResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, false) - nilResult := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true) + closeResult, err1 := getListWorkflowExecutionsByTypeQuery(testTableName, request, true) + openResult, err2 := getListWorkflowExecutionsByTypeQuery(testTableName, request, false) + nilResult, err3 := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true) expectCloseResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' @@ -400,6 +402,9 @@ LIMIT 0, 10 assert.Equal(t, closeResult, expectCloseResult) assert.Equal(t, openResult, expectOpenResult) assert.Equal(t, nilResult, expectNilResult) + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NoError(t, err3) } func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) { @@ -415,9 +420,9 @@ func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) { WorkflowID: testWorkflowID, } - closeResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true) - openResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false) - nilResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true) + closeResult, err1 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true) + openResult, err2 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false) + nilResult, err3 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true) expectCloseResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' @@ -444,6 +449,9 @@ LIMIT 0, 10 assert.Equal(t, closeResult, expectCloseResult) assert.Equal(t, openResult, expectOpenResult) assert.Equal(t, nilResult, expectNilResult) + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NoError(t, err3) } func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) { @@ -459,8 +467,8 @@ func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) { Status: types.WorkflowExecutionCloseStatus(0), } - closeResult := getListWorkflowExecutionsByStatusQuery(testTableName, request) - nilResult := getListWorkflowExecutionsByStatusQuery(testTableName, nil) + closeResult, err1 := getListWorkflowExecutionsByStatusQuery(testTableName, request) + nilResult, err2 := getListWorkflowExecutionsByStatusQuery(testTableName, nil) expectCloseResult := fmt.Sprintf(`SELECT * FROM %s WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' @@ -474,6 +482,8 @@ LIMIT 0, 10 assert.Equal(t, expectCloseResult, closeResult) assert.Equal(t, expectNilResult, nilResult) + assert.NoError(t, err1) + assert.NoError(t, err2) } func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { diff --git a/common/pinot/pinotQueryValidator.go b/common/pinot/pinotQueryValidator.go index 0405fa709bd..0a06833ab79 100644 --- a/common/pinot/pinotQueryValidator.go +++ b/common/pinot/pinotQueryValidator.go @@ -133,14 +133,14 @@ func (qv *VisibilityQueryValidator) validateRangeExpr(expr sqlparser.Expr) (stri if lowerBound, ok := rangeCond.From.(*sqlparser.SQLVal); ok { trimmed, err := trimTimeFieldValueFromNanoToMilliSeconds(lowerBound) if err != nil { - return "", err + return "", fmt.Errorf("trim time field %s got error: %w", colNameStr, err) } rangeCond.From = trimmed } if upperBound, ok := rangeCond.To.(*sqlparser.SQLVal); ok { trimmed, err := trimTimeFieldValueFromNanoToMilliSeconds(upperBound) if err != nil { - return "", err + return "", fmt.Errorf("trim time field %s got error: %w", colNameStr, err) } rangeCond.To = trimmed } @@ -237,7 +237,7 @@ func (qv *VisibilityQueryValidator) processSystemKey(expr sqlparser.Expr) (strin colName, ok := comparisonExpr.Left.(*sqlparser.ColName) if !ok { - return "", errors.New("invalid comparison expression, left") + return "", fmt.Errorf("left comparison is invalid: %v", comparisonExpr.Left) } colNameStr := colName.Name.String() @@ -245,11 +245,11 @@ func (qv *VisibilityQueryValidator) processSystemKey(expr sqlparser.Expr) (strin if _, ok := timeSystemKeys[colNameStr]; ok { sqlVal, ok := comparisonExpr.Right.(*sqlparser.SQLVal) if !ok { - return "", fmt.Errorf("error: Failed to convert val") + return "", fmt.Errorf("right comparison is invalid: %v", comparisonExpr.Right) } trimmed, err := trimTimeFieldValueFromNanoToMilliSeconds(sqlVal) if err != nil { - return "", err + return "", fmt.Errorf("trim time field %s got error: %w", colNameStr, err) } comparisonExpr.Right = trimmed } @@ -265,13 +265,13 @@ func (qv *VisibilityQueryValidator) processSystemKey(expr sqlparser.Expr) (strin if !ok { // this means, the value is a string, and not surrounded by single qoute, which means, val = missing colVal, ok := comparisonExpr.Right.(*sqlparser.ColName) if !ok { - return "", fmt.Errorf("error: Failed to convert val") + return "", fmt.Errorf("right comparison is invalid: %v", comparisonExpr.Right) } colValStr := colVal.Name.String() // double check if val is not missing if colValStr != "missing" { - return "", fmt.Errorf("error: failed to convert val") + return "", fmt.Errorf("right comparison is invalid string value: %s", colValStr) } var newColVal string @@ -288,11 +288,11 @@ func (qv *VisibilityQueryValidator) processSystemKey(expr sqlparser.Expr) (strin if _, ok := timeSystemKeys[colNameStr]; ok { sqlVal, ok := comparisonExpr.Right.(*sqlparser.SQLVal) if !ok { - return "", fmt.Errorf("error: Failed to convert val") + return "", fmt.Errorf("right comparison is invalid/missing. key %s, right expr %v", colNameStr, comparisonExpr.Right) } trimmed, err := trimTimeFieldValueFromNanoToMilliSeconds(sqlVal) if err != nil { - return "", err + return "", fmt.Errorf("trim time field %s got error: %w", colNameStr, err) } comparisonExpr.Right = trimmed } diff --git a/common/pinot/pinotQueryValidator_test.go b/common/pinot/pinotQueryValidator_test.go index 865634ace5a..2f98449e684 100644 --- a/common/pinot/pinotQueryValidator_test.go +++ b/common/pinot/pinotQueryValidator_test.go @@ -156,22 +156,22 @@ func TestValidateQuery(t *testing.T) { "Case15-8: invalid string for trim": { query: "CloseTime = abc", validated: "", - err: "error: failed to convert val", + err: "right comparison is invalid string value: abc", }, "Case15-9: invalid value for trim": { query: "CloseTime = 123.45", validated: "", - err: "error: failed to parse int from SQLVal 123.45", + err: "trim time field CloseTime got error: error: failed to parse int from SQLVal 123.45", }, "Case15-10: invalid from time for range query": { query: "StartTime BETWEEN 17.50 AND 1707319950935000128", validated: "", - err: "error: failed to parse int from SQLVal 17.50", + err: "trim time field StartTime got error: error: failed to parse int from SQLVal 17.50", }, "Case15-11: invalid to time for range query": { query: "StartTime BETWEEN 1707319950934000128 AND 1707319950935000128.1", validated: "", - err: "error: failed to parse int from SQLVal 1707319950935000128.1", + err: "trim time field StartTime got error: error: failed to parse int from SQLVal 1707319950935000128.1", }, "Case15-12: value already in milliseconds": { query: "StartTime = 170731995093",