Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Pinto query validator failed log, minor refactor pinot visibility store to remove panics #5664

Merged
merged 5 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 68 additions & 32 deletions common/persistence/pinot/pinotVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutions(
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}
query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -302,7 +306,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutions(
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -320,7 +328,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByType(ctx context.Cont
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by type query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -338,7 +350,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Co
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by type query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -356,7 +372,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx contex
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -374,7 +394,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(ctx cont
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -392,7 +416,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByStatus(ctx context.
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request)
query, err := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by status query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand Down Expand Up @@ -439,7 +467,11 @@ func (v *pinotVisibilityStore) GetClosedWorkflowExecution(ctx context.Context, r
func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
checkPageSize(request)

query := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -462,7 +494,11 @@ func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, reque
func (v *pinotVisibilityStore) ScanWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
checkPageSize(request)

query := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build scan workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand Down Expand Up @@ -749,14 +785,14 @@ func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string,
return query.String()
}

func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) string {
func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) (string, error) {
if request == nil {
return ""
return "", nil
}

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
neil-xie marked this conversation as resolved.
Show resolved Hide resolved
}

query := NewPinotQuery(tableName)
Expand All @@ -770,20 +806,20 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s
// if customized query is empty, directly return
if requestQuery == "" {
query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

requestQuery = filterPrefix(requestQuery)
if common.IsJustOrderByClause(requestQuery) {
query.concatSorter(requestQuery)
query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

comparExpr, orderBy := parseOrderBy(requestQuery)
comparExpr, err = v.pinotQueryValidator.ValidateQuery(comparExpr)
if err != nil {
v.logger.Error(fmt.Sprintf("pinot query validator error: %s", err))
return "", fmt.Errorf("pinot query validator error: %w, query: %s", err, request.Query)
}

comparExpr = filterPrefix(comparExpr)
Expand All @@ -800,7 +836,7 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s
}

query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

func filterPrefix(query string) string {
Expand Down Expand Up @@ -907,14 +943,14 @@ func findLastOrderBy(list []string) int {
return 0
}

func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) string {
func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
Expand All @@ -939,12 +975,12 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor
query.addPinotSorter(StartTime, DescendingOrder)
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) string {
func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand All @@ -968,19 +1004,19 @@ func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalL

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) string {
func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand All @@ -1004,19 +1040,19 @@ func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.Int

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) string {
func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand Down Expand Up @@ -1047,14 +1083,14 @@ func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.Interna

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getGetClosedWorkflowExecutionQuery(tableName string, request *p.InternalGetClosedWorkflowExecutionRequest) string {
Expand Down
40 changes: 25 additions & 15 deletions common/persistence/pinot/pinotVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,9 @@ LIMIT 0, 0

for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.NotPanics(t, func() {
output := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input)
assert.Equal(t, test.expectedOutput, output)
})
output, err := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input)
assert.Equal(t, test.expectedOutput, output)
assert.NoError(t, err)
})
}
}
Expand All @@ -329,9 +328,9 @@ func TestGetListWorkflowExecutionsQuery(t *testing.T) {
NextPageToken: nil,
}

closeResult := getListWorkflowExecutionsQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -356,6 +355,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) {
Expand All @@ -371,9 +373,9 @@ func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) {
WorkflowTypeName: testWorkflowType,
}

closeResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsByTypeQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsByTypeQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -400,6 +402,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) {
Expand All @@ -415,9 +420,9 @@ func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) {
WorkflowID: testWorkflowID,
}

closeResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -444,6 +449,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) {
Expand All @@ -459,8 +467,8 @@ func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) {
Status: types.WorkflowExecutionCloseStatus(0),
}

closeResult := getListWorkflowExecutionsByStatusQuery(testTableName, request)
nilResult := getListWorkflowExecutionsByStatusQuery(testTableName, nil)
closeResult, err1 := getListWorkflowExecutionsByStatusQuery(testTableName, request)
nilResult, err2 := getListWorkflowExecutionsByStatusQuery(testTableName, nil)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -474,6 +482,8 @@ LIMIT 0, 10

assert.Equal(t, expectCloseResult, closeResult)
assert.Equal(t, expectNilResult, nilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
}

func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) {
Expand Down
Loading